testcontainers/testcontainers-python

Question: Deploying Kafka Container

ewaldmikolaj opened this issue · 2 comments

What are you trying to do?

Hello,
I am trying to deploy Kafka Container, connect to it and create some test topic, which then can I use in integration tests. I am struggling with connection to Container. It works fine, when I use docker compose prepared by Confluent and described here. When creating a container using library, I can't authenticate.

The error message that I am getting is:

%6|1718198715.195|FAIL|rdkafka#producer-1| [thrd:localhost:32882/bootstrap]: localhost:32882/1: Disconnected (after 15052ms in state UP)
%6|1718198715.195|FAIL|rdkafka#producer-1| [thrd:localhost:32882/bootstrap]: localhost:32882/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
%3|1718198715.434|FAIL|rdkafka#producer-1| [thrd:localhost:32882/bootstrap]: localhost:32882/1: Connect to ipv4#127.0.0.1:32882 failed: Connection refused (after 0ms in state CONNECT)

Should I expose some ports and pass env variables to make it work?

Where are you trying to do it?

import os
import pytest
import time

from testcontainers.kafka import KafkaContainer

from confluent_kafka.admin import AdminClient, NewTopic


@pytest.fixture(scope="module", autouse=True)
def kafka(request):
    kafka = KafkaContainer()
    kafka.start()

    def teardown():
        kafka.stop()

    request.addfinalizer(teardown)
    os.environ["BOOTSTRAP"] = kafka.get_bootstrap_server()


def test():
    admin_client = AdminClient({"bootstrap.servers": os.environ["BOOTSTRAP"]})
    topics = admin_client.create_topics([NewTopic("test-topic", 1, 1)])
    time.sleep(15)

    assert admin_client.list_topics().topics == []

The last assert is just to test connection, and check what's returned by list_topics() function.

Runtime environment

Provide a summary of your runtime environment. Which operating system, python version, and docker version are you using? What is the version of testcontainers-python you are using? You can run the following commands to get the relevant information.

# Get the python version.
$ python --version
Python 3.11.9
$ docker info
Client: Docker Engine - Community
 Version:    26.1.4
 Context:    default
 Debug Mode: false
 Plugins:
  buildx: Docker Buildx (Docker Inc.)
    Version:  v0.14.1
    Path:     /usr/libexec/docker/cli-plugins/docker-buildx
  compose: Docker Compose (Docker Inc.)
    Version:  v2.27.1
    Path:     /usr/libexec/docker/cli-plugins/docker-compose
# Get all python packages.
$ pip freeze
testcontainers==4.5.1
confluent-kafka==2.4.0

this works for me:

mkdir issues-605
cd issues-605
python -m venv .venv
. $_/bin/activate
pip install testcontainers==4.5.1 confluent-kafka==2.4.0 pytest==8.2.2
vim test.py
import os
import pytest
import time

from testcontainers.kafka import KafkaContainer

from confluent_kafka.admin import AdminClient, NewTopic


@pytest.fixture(scope="module", autouse=True)
def kafka(request):
    kafka = KafkaContainer()
    kafka.start()

    def teardown():
        kafka.stop()

    request.addfinalizer(teardown)
    os.environ["BOOTSTRAP"] = kafka.get_bootstrap_server()


def test():
    admin_client = AdminClient({"bootstrap.servers": os.environ["BOOTSTRAP"]})
    topics = admin_client.create_topics([NewTopic("test-topic", 1, 1)])
    time.sleep(15)

    assert admin_client.list_topics().topics != []  # changed this line

I created a new python env and it also worked. Thanks for feedback. I will close this question.