testcontainers/testcontainers-python

Make the kafka package dependency optional to be able to use confluent_kafka

jankatins opened this issue · 5 comments

Describe the bug

We use confluent_kafka in the project and it's strange that we need another kafka sdk (kafka-python) just to check that the container is running.

To Reproduce

Install testcontainers-kafka -> observe that a kafka-python gets installed

Yes, let's explore. Cf. #336 for the same idea for the postgres container. We'd have to figure out what the default behavior should be, e.g.,

  • Which client library is used out-of-the-box?
  • Do we always require users to also install their own client library?
  • How are convenience method to get a client connection implemented (although admittedly a corresponding method does not exist for the KafkaContainer)?

For kafka, the only dependency is on the connect checks. So an idea would be to program the "checks" outside the class and use them conditionally. It would then need two extras, one depending on confluent-kafka, one on kafka-python:

KAFKA_PACKAGE: str
try: 
    import confluent_kafka
    KAFKA_PACKAGE="confluent_kafka"
except ImportError:
    try:
        from kafka import KafkaConsumer
        from kafka.errors import KafkaError, UnrecognizedBrokerVersion, NoBrokersAvailable
        KAFKA_PACKAGE="kafka_python"
    except ImportError
        KAFKA_PACKAGE="MISSING"

@wait_container_is_ready(confluent_kafka.KafkaError, confluent_kafka.KafkaException)
def _connect_confluent_kafka(self) -> None:
   """Condition: try to connect to the broker."""
   consumer = confluent_kafka.Consumer(
        {
           "bootstrap.servers": self.get_bootstrap_server(),
           "group.id": "test",
       }
   )
   if not consumer.list_topics(timeout=0.1):
        raise confluent_kafka.KafkaException("Unable to connect with kafka container!")


@wait_container_is_ready(UnrecognizedBrokerVersion, NoBrokersAvailable, KafkaError, ValueError)
def _connect_kafka_python(self):
    bootstrap_server = self.get_bootstrap_server()
    consumer = KafkaConsumer(group_id='test', bootstrap_servers=[bootstrap_server])
    if not consumer.bootstrap_connected():
        raise KafkaError("Unable to connect with kafka container!")
        

class KafkaContainer:
    ...
    def _connect(self):
        if KAFKA_PACKAGE=="confluent_kafka":
            _connect_confluent_kafka()
       elif KAFKA_PACKAGE=="kafka_python":
            _connect_kafka_python()
      else:
          raise NotImplmentedError("No kafka sdk found, install either confluent-kafka or kafka-python")

(Written into the github window, not tested...)

Do you think it would be feasible to use wait_for_logs as discussed in #340 for postgres? Then we wouldn't have to install any additional packages.

The java testcontainer seem to think so: https://github.com/testcontainers/testcontainers-java/blob/main/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java#L136-L143

    protected void configure() {
        if (this.kraftEnabled) {
            waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
            configureKraft();
        } else {
            waitingFor(Wait.forLogMessage(".*\\[KafkaServer id=\\d+\\] started.*", 1));
            configureZookeeper();
        }
    }

Great, let's do that. Would be nice to bring the implementations closer, too.