To run the applications start here
- python3
- Kafka topic created
Create python virtual environment to run
virtualenv virtual-environment-name
source ./virtual-environment-name/bin/activate
Install libraries dependencies
pip3 install -r requirements.txt
With the topic created in Kafka, point in the following variable to specify the desired topic
# topic name
topic = 'topic-name'
To specify the desired quantity of produced messages, in the "for i range(3)" of the "main" function. In this case range(3) will be the messages produced
# Create data messages
def main ():
for i in range (3 ):
# Generate Fake Data
data = {
'id' : dataFake .random_int (min = 20000 , max = 100000 ),
'name' : dataFake .name (),
'address' : dataFake .street_address () + ' | ' + dataFake .city () + ' | ' + dataFake .country_code (),
'platform' : random .choice (['phone' , 'laptop' , 'Tablet' ]),
'date' : str (dataFake .date_time_this_month ())
You can check the producer's execution log through the log file that will be generated
If necessary, convert kafka.client.truststore.jks to cacert.pem and certificate.pem for Kafka authentication
Convert kafka.client.truststore.jks to cacert.pem and certificate.pem
1 - Export the cacert from the kafka.client.truststore.jks file
keytool -list -rfc -keystore kafka.client.truststore.jks -storepass pass-kafka.client.truststore | awk ' /BEGIN CERTIFICATE/,/END CERTIFICATE/ {print $0}' > cacert.pem
2 - List kafka.client.truststore.jks to query the "Alias name" to generate the certificate.pem
keytool -list -rfc -keystore kafka.client.truststore.jks
3 - Use the "Alias name" listed above and then export the certificate certificate.pem
keytool -exportcert -alias root-users.pem -keystore kafka.client.truststore.jks \
-rfc -file certificate.pem
After generate .pem files reference the files on producer and consumer
# Connection Kafka Brokers
producer = KafkaProducer (
bootstrap_servers = 'server:port,server:port' ,
security_protocol = 'SSL' ,
ssl_check_hostname = False ,
ssl_cafile = './cacert.pem' ,
ssl_certfile = './certificate.pem' ,
# ssl_keyfile='key.pem'
# Connection Kafka Brokers
consumer = KafkaConsumer (
topic ,
bootstrap_servers = 'server :port ,server :port ,
security_protocol = 'SSL' ,
ssl_check_hostname = False ,
ssl_cafile = './cacert.pem' ,
ssl_certfile = './certificate.pem' ,
auto_offset_reset = 'earliest' ,
enable_auto_commit = True ,
group_id = consumer_group