Kakfa hello-world.
This repo provides instructions to:
- Deploy a Kakfa broker
- Use the command line tool Kafkacat:
- to publish messages to Kafka
- to subscribe to topics and receive messages from Kafka
- Use python:
- to publish messages to the broker
- to subscribe to topics and receive messages from Kafka
The file docker-compose.yml uses the Docker images wurstmeister/zookeeper and wurstmeister/kafka
Install Docker and Docker-compose
Edit the file docker-compose.yml and update KAFKA_ADVERTISED_HOST_NAME
with your host IP
Run this command to create and start the containers
$ docker-compose -f docker-compose.yml up -d
Run these commands to verify
$ docker images | grep wurstmeister
wurstmeister/kafka latest 988f4a6ca13c 4 months ago 421MB
wurstmeister/zookeeper latest 3f43f72cb283 10 months ago 510MB
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
45b13d484728 wurstmeister/kafka "start-kafka.sh" 9 hours ago Up 9 hours 0.0.0.0:9092->9092/tcp kafka
0957d9af0d62 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 9 hours ago Up 9 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
$ nc -vz 100.123.35.0 9092
Connection to 100.123.35.0 9092 port [tcp/*] succeeded!
Kafkacat is a command line tool to produce and consume messages
On Ubuntu, run this command to install kafkacat
$ apt-get install kafkacat
Alternatively, install Docker and use the Docker image edenhill/kafkacat
In producer mode, Kafkacat reads messages from stdin, and sends them to the broker.
In consumer mode, Kafkacat gets messages from the broker and writes messages to stdout.
Using the below command, Kafkacat is used in producer mode, the broker is 100.123.35.0:9092, the topic is Topic1.
$ kafkacat -P -b 100.123.35.0:9092 -t Topic1
first message
second message
third message
Using the below command, Kafkacat is used in consumer mode
$ kafkacat -C -b 100.123.35.0:9092 -t Topic1
first message
second message
third message
Using the below command, Kafkacat consumes 2 messages and exit
$ kafkacat -C -b 100.123.35.0 -t Topic1 -c 2 -e
first message
second message
$
Using the below command, Kafkacat consumes the last 2 messages and exit
$ kafkacat -C -b 100.123.35.0:9092 -t Topic1 -o -2 -e
second message
third message
% Reached end of topic Topic1 [0] at offset 3: exiting
$
Using the below command, Kafkacat consumes the messages, changes the delimiter ( default is \n
) that separates messages on stdout, and exit
$ kafkacat -C -b 100.123.35.0:9092 -t Topic1 -D "\n####\n" -e
first message
####
second message
####
third message
####
% Reached end of topic Topic1 [0] at offset 3: exiting
$
You can also change on the producer the delimiter (default is \n
) that splits input (stdin) into messages.
Example with a new topic
$ kafkacat -P -b 100.123.35.0:9092 -t Topic2 -D "##"
message 1##message 2##message 3##
$ kafkacat -C -b 100.123.35.0:9092 -t Topic2 -e
message 1
message 2
message 3
% Reached end of topic Topic2 [0] at offset 3: exiting
You can also produce messages from files. Kafkacat will read files.
The entire file content will be sent as one single message. The producer will exit after sending the messages.
$ more message1
The content of the file message1 is sent as one single message.
$ more message2
The content of the file message2
is sent
as one single message.
$ kafkacat -P -b 100.123.35.0:9092 -t Topic3 message1 message2
$
$ kafkacat -C -b 100.123.35.0:9092 -t Topic3 -e
The content of the file message1 is sent as one single message.
The content of the file message2
is sent
as one single message.
% Reached end of topic Topic3 [0] at offset 2: exiting
$
In the below example, the producer uses the delimiter is :
to split keys and messages.
$ kafkacat -P -b 100.123.35.0:9092 -t Topic4 -K:
key1:message1
Key2:message2
Key3:message3
In the below example, the consumer uses the delimiter is :::
to split keys and messages.
$ kafkacat -C -b 100.123.35.0:9092 -t Topic4 -K::: -e
key1:::message1
Key2:::message2
Key3:::message3
$
In the below example, the consumer gets the messages without the keys
$ kafkacat -C -b 100.123.35.0:9092 -t Topic4 -e
message1
message2
message3
$
In the below example, the consumer changes the output format.
It uses the option %k
to get the messages key and %s
to get the messages payload
$ kafkacat -C -b 100.123.35.0:9092 -t Topic4 -f "\nKey %k\nValue %s\n" -e
Key: key1
Value: message1
Key: Key2
Value: message2
Key: Key3
Value: message3
% Reached end of topic Topic5 [0] at offset 3: exiting
$
$ kafkacat -L -b 100.123.35.0:9092
Metadata for all topics (from broker -1: 100.123.35.0:9092/bootstrap):
1 brokers:
broker 1001 at 100.123.35.0:9092
6 topics:
topic "Topic4" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "Topic1" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "Topic2" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "Topic3" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
Insteaf of installing kafkacat you can install Docker and use the Docker image edenhill/kafkacat
$ docker run --rm -it edenhill/kafkacat:1.5.0 -C -b 100.123.35.0:9092 -t Topic1 -e
first message
second message
third message
% Reached end of topic Topic1 [0] at offset 3: exiting
$
$ docker images edenhill/kafkacat
REPOSITORY TAG IMAGE ID CREATED SIZE
edenhill/kafkacat 1.5.0 d3dc4f492999 2 months ago 22.4MB
On Ubuntu, run this command
$ pip install kafka-python
This python interactive session produces one single message (key is key1
and payload is value1
) to the topic topic6
on the broker 100.123.35.0:9092
>>> from kafka import KafkaProducer
>>>
>>> topic = "topic6"
>>> key="key1"
>>> value="value1"
>>> kafka = '100.123.35.0:9092'
>>>
>>> producer = KafkaProducer(bootstrap_servers=[kafka], security_protocol="PLAINTEXT", retries=2, compression_type=None, request_timeout_ms=10000)
>>> producer.bootstrap_connected()
True
>>> message = producer.send(topic, key = key, value = value)
>>> producer.flush()
>>> exit()
>>> from kafka import KafkaConsumer
>>>
>>> kafka = '100.123.35.0:9092'
>>> topic = "topic6"
>>>
>>> consumer1 = KafkaConsumer(topic, bootstrap_servers=kafka, security_protocol="PLAINTEXT", auto_offset_reset='earliest')
>>>
>>> consumer1.bootstrap_connected()
True
>>>
>>> consumer1.topics()
set([u'topic1', u'Topic2', u'topic3', u'topic4', u'topic6'])
>>>
>>> for message in consumer1:
... print ("topic=%s offset=%d key=%s value=%s" % (message.topic, message.offset, message.key, message.value))
...
topic=topic6 offset=0 key=key1 value=value1
use the Python files consumer.py and producer.py to produce and consume several messages
$ python producer.py
$ python consumer.py
topic=topic5 offset=0 key=key1 value=value1
topic=topic5 offset=1 key=key2 value=value2
$ docker-compose stop
Stopping kafka ... done
Stopping zookeeper ... done
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
45b13d484728 wurstmeister/kafka "start-kafka.sh" 9 hours ago Exited (143) 36 seconds ago kafka
0957d9af0d62 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 9 hours ago Exited (137) 29 seconds ago zookeeper
$ docker-compose down
Stopping kafka ... done
Stopping zookeeper ... done
Removing kafka ... done
Removing zookeeper ... done
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES