/kafka-producer-consumer

producer and consumer python send and read data of topic Kafka

Primary LanguagePython

To run the applications start here

Prerequisites

- python3
- Kafka topic created

Create python virtual environment to run

virtualenv virtual-environment-name
source ./virtual-environment-name/bin/activate

Install libraries dependencies

pip3 install -r requirements.txt

With the topic created in Kafka, point in the following variable to specify the desired topic

# topic name
topic = 'topic-name'

To specify the desired quantity of produced messages, in the "for i range(3)" of the "main" function. In this case range(3) will be the messages produced

# Create data messages
def main():
      for i in range (3):
          # Generate Fake Data
          data = {
              'id': dataFake.random_int(min=20000, max=100000),
              'name': dataFake.name(),
              'address': dataFake.street_address() + ' | ' + dataFake.city() + ' | ' + dataFake.country_code(),
              'platform': random.choice(['phone', 'laptop', 'Tablet']),
              'date': str(dataFake.date_time_this_month())
          }

Start the producer

python3 kafka-producer.py

You can check the producer's execution log through the log file that will be generated

producer.log

Start the consumer

python3 kafka-consumer.py

If necessary, convert kafka.client.truststore.jks to cacert.pem and certificate.pem for Kafka authentication

Convert kafka.client.truststore.jks to cacert.pem and certificate.pem

1 - Export the cacert from the kafka.client.truststore.jks file

keytool -list -rfc -keystore kafka.client.truststore.jks -storepass pass-kafka.client.truststore | awk '/BEGIN CERTIFICATE/,/END CERTIFICATE/ {print $0}' > cacert.pem

2 - List kafka.client.truststore.jks to query the "Alias name" to generate the certificate.pem

keytool -list -rfc -keystore kafka.client.truststore.jks

3 - Use the "Alias name" listed above and then export the certificate certificate.pem

keytool -exportcert -alias root-users.pem -keystore kafka.client.truststore.jks \
        -rfc -file certificate.pem

After generate .pem files reference the files on producer and consumer

kafka-producer.py

# Connection Kafka Brokers
producer = KafkaProducer(
    bootstrap_servers='server:port,server:port',
    security_protocol='SSL',
    ssl_check_hostname=False,
    ssl_cafile='./cacert.pem',
    ssl_certfile='./certificate.pem',
    # ssl_keyfile='key.pem'
)

kafka-consumer.py

# Connection Kafka Brokers
consumer = KafkaConsumer(
    topic,
    bootstrap_servers='server:port,server:port,
    security_protocol='SSL',
    ssl_check_hostname=False,
    ssl_cafile='./cacert.pem',
    ssl_certfile='./certificate.pem',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id=consumer_group
)