
Experiment on the use of confluent-kafka for python

Primary LanguagePythonGNU General Public License v3.0GPL-3.0

#Example of confluent-kafka use Required docker

###Get the repo Clone the repository into an example folder:

git clone https://github.com/kyrsideris/confluent-kafka-python-example.git example

###Setup the services Bring up the environment:

docker-compose -f test.yml up 2>&1 | tee docker.log &

Check the state of the docker containers:

docker-compose -f test.yml ps


          Name                         Command               State                      Ports
example_consumer_1          /bin/sh -c python consumer ...   Up
example_kafka_1             start-kafka.sh                   Up>9092/tcp
example_librdkafka-base_1   python2                          Exit 0
example_producer_1          /bin/sh -c python producer.py    Up
example_zookeeper_1         /docker-entrypoint.sh zkSe ...   Up>2181/tcp, 2888/tcp, 3888/tcp

###Scale up the kafka Scale the kafka service up to 3 containers:

docker-compose -f test.yml scale kafka=3

Check again the state:

$ docker-compose -f test.yml ps

          Name                         Command               State                      Ports
example_consumer_1          /bin/sh -c python consumer ...   Up
example_kafka_1             start-kafka.sh                   Up>9092/tcp
example_kafka_2             start-kafka.sh                   Up>9092/tcp
example_kafka_3             start-kafka.sh                   Up>9092/tcp
example_librdkafka-base_1   python2                          Exit 0
example_producer_1          /bin/sh -c python producer.py    Up
example_zookeeper_1         /docker-entrypoint.sh zkSe ...   Up>2181/tcp, 2888/tcp, 3888/tcp

###Gotcha The python kafka client will not get the updated list immediately of brokers as the kafka clients are scaling up. The update rate of the broker list depends on the metadata refresh interval metadata.max.age.ms or topic.metadata.refresh.interval.ms. So if we kill the master kafka container that was instantiated in the beginning before the metadata refresh interval, from the instantiation time, then the producer and consumer will fail.

Check the configuration properties: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Kill the kafka master:

docker stop example_kafka_1

Check state:

docker-compose -f test.yml ps
example_kafka_1             start-kafka.sh                   Exit 0

In the log, the producer and consumer are complaining:

producer_1         | %3|1484908517.312|FAIL|rdkafka#producer-1| 4c71e483a7fc:9092/1001: Failed to resolve '4c71e483a7fc:9092': Name or service not known
producer_1         | %3|1484908517.312|ERROR|rdkafka#producer-1| 4c71e483a7fc:9092/1001: Failed to resolve '4c71e483a7fc:9092': Name or service not known
consumer_1         | %3|1484908517.313|FAIL|rdkafka#consumer-1| 4c71e483a7fc:9092/1001: Failed to resolve '4c71e483a7fc:9092': Name or service not known
consumer_1         | %3|1484908517.313|ERROR|rdkafka#consumer-1| 4c71e483a7fc:9092/1001: Failed to resolve '4c71e483a7fc:9092': Name or service not known

According to the librdkafka configuration documentation: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

metadata.max.age.ms: topic.metadata.refresh.interval.ms:
Metadata cache max age. Defaults to metadata.refresh.interval.ms * 3. Type: integer Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh. Type: integer

This issue was discussed here: confluentinc/confluent-kafka-python#111

###Stop the services

Stop and remove the dockers containers:

docker-compose -f test.yml down