How to setup confluent Kafka.
Create a conda environment
conda create -p venv python==3.7 -y
Activate conda environment
conda activate venv
To use confluent kafka we need following details from Confluent dashboard.
confluentClusterName = ""
confluentBootstrapServers = ""
confluentTopicName = ""
confluentApiKey = ""
confluentSecret = ""
Add below library in requirements.txt
confluent-kafka[avro,json,protobuf]
pyspark==3.2.1
Import necessary packages
from pyspark.sql import SparkSession
Create a spark session object using below snippet.
spark_session=SparkSession.builder.master("local[*]").appName("Confluent").getOrCreate()
Read data from kafka topic
df = (spark_session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", confluentBootstrapServers)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", confluentTopicName)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
)
process read data from kafka topic
df = (df.withColumn('key_str',df['key'].cast('string').alias('key_str')).drop('key').withColumn('value_str',df['value'].cast('string').alias('key_str')))
Write data in json file.
query = (df.selectExpr("value_str").writeStream
.format("json")
.option("format", "append")
.trigger(processingTime="5 seconds")
.option("checkpointLocation", os.path.join("csv_checkpoint"))
.option("path", os.path.join("json"))
.outputMode("append")
.start()
)
query.awaitTermination()
Write data in csv file
query = (df.writeStream
.format("csv")
.option("format", "append")
.trigger(processingTime="5 seconds")
.option("checkpointLocation", os.path.join("csv_checkpoint"))
.option("path", os.path.join("csv"))
.outputMode("append")
.start()
)
query.awaitTermination()
Write data to kafka topic
query = (df.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", confluentBootstrapServers)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(
confluentApiKey, confluentSecret))
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("checkpointLocation", os.path.join("kafka_checkpoint"))
.option("topic", confluentTopicName).start())
query.awaitTermination()
Note: Don't run your python script using python command use below command to run your script for kafka confluent.
To run python script
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 <scipt_name.py>