japila-books/spark-structured-streaming-internals

Used structured streaming with kafka Avro message

amoussoubaruch opened this issue ยท 19 comments

Hello,

I want to use spark-structured-streaming to process data fetched from kafka messages, and then store as rows cassandra database.
I read your book on structured-streaming. I need one clarification.
The message in kafka are serialized in Avro format.
How can i deserialize the message to json for processing before storing into cassandra?
Is it possible with spark 2.1.0 or 2.1.1?

Any advice or help could be appreciated.
Thanks in advance

Sorry for being late on this. Somehow I haven't noticed your question.

Avro format is not supported directly in Spark (up to 2.2) and so the use case is not possible without some coding. The use case however pops up quite often and think it's time for me to have a solution for it. I'm going to work on it this week.

Let me know if you've sorted it out in the meantime.

Hi @jaceklaskowski
We are having the same problem. Could you please suggest.
What is the best way to decode an avro encoded message from Kafka using Spark Structured Streaming. Is there any option introduced in Spark2.3 ?
We are currently using 2.2 version of Spark and tried using the below sample code without success
val reader = new GenericDatumReaderGenericRecord
val decoder = DecoderFactory.get().binaryDecoder(Array[Bytes], null)
val rec = reader.read(null, decoder)

What is the best way to decode an avro encoded message from Kafka using Spark Structured Streaming. Is there any option introduced in Spark2.3 ?

No built-in support for Avro in Spark 2.3.0. You could give spark-avro a try. See also https://docs.databricks.com/spark/latest/data-sources/read-avro.html.

I've got no experience with Avro, but looks like I should change that soon. Do you have a sample message to send to a Kafka topic so I could use for testing?

Below is the sample message which we are trying to read from the Kafka Topic through Spark Structured Streaming.
We are able to decode the message in Spark, when using Json with Kafka. But when using Avro we are not able to decode at the Spark end.

Below is what we tried,
Message in Kafka:
{"SEQ_NUM":{"long":393},"TIMESTAMP":{"string":"2018-03-08 09:26:18.0"},"ID":{"int":144},"REF":{"string":"XX0822227"},"NAME":{"string":"xxx"},"ADRS":{"string":"xxx"}}

Spark code when using JSON:
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "TESTJSON").load()

val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").option("truncate","false").start()

With Avro, we couldnt decode at spark.

Thanks

Hi @jaceklaskowski , Did you get a chance to look into this?
Thanks

Had no time for this. Sorry.

While we're at it, how do you produce an avro message? How do you consume it? My recommendation would be to use the code to consume avro messages and write a custom streaming source.

Thanks for the response @jaceklaskowski

We are using Confluent Kafka and the Schema-registry of the same is been used to put the message into topic.
We are using avro-console-producer of confluent Kafka to produce messages into Kafka topic. Sure I will try with a custom streaming source too.

And on the consumer Spark Structured Streaming we tried using KafkaAvroDecoder from confluent,

def my_decoder: KafkaAvroDecoder = {
val props2 = new Properties()
props2.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props2.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-integration-test")
props2.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props2.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
props2.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val vProps2 = new kafka.utils.VerifiableProperties(props2)
new KafkaAvroDecoder(vProps2)
}

val kafkaParams = Map[String, String](
"kafka.bootstrap.servers" -> brokers,
"key.deserializer" -> "KafkaAvroDeserializer",
"value.deserializer" -> "KafkaAvroDeserializer",
"group.id" -> "structured-kafka",
"auto.offset.reset" -> "latest",
"failOnDataLoss"-> "false",
"schema.registry.url" -> schemaRegistryURL
)

val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueName)
val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)

val df = spark.readStream.format("kafka").option("subscribe", topic).options(kafkaParams).load().map{ x => my_decoder.fromBytes(x.getAsArray[Byte], messageSchema).asInstanceOf[String]}

And we get the below exception,
Caused by: java.io.NotSerializableException: io.confluent.kafka.schemaregistry.client.rest.entities.Schema
Serialization stack:
- object not serializable (class: io.confluent.kafka.schemaregistry.client.rest.entities.Schema, value: {subjec.........

Note: We were able to decode the avro messages using KafkaAvroDeserializer in Spark Streaming Job(Dstreams)

@jaceklaskowski
Hi,
I have a same use case like others. I'm new to structured streaming and i'm using it on Databricks cloud platform with Python. I am collecting data using kafka which sends avro messages and selected a column "value" and also cast to "string". However, after getting those data I am still getting pattern like "?0s???LR????(}????4e'd???' So, I'm unable to deserialize it. Is there any way that i can deserialize those value according to my existing schema of avro?
I appreciate your help.
Thanks

I have a working example I adapted from a bunch of stack overflows, but I don't like the map at the end instead of a UDF. If anyone can show me how to use a UDF instead that would be awesome. Also, I'm working on serializing at this point.

`val brokers = "kafka01:9092"
val zookeeper = "zk01:2181"
val schemaRegistryURL = "http://sr:8081"
val topic = "mytest"
val subjectValueName = topic + "-value"

val kafkaParams = Map[String, String](
"kafka.bootstrap.servers" -> brokers,
"key.deserializer" -> "KafkaAvroDeserializer",
"value.deserializer" -> "KafkaAvroDeserializer",
"group.id" -> "structured-kafka",
"auto.offset.reset" -> "latest",
"failOnDataLoss"-> "false",
"schema.registry.url" -> schemaRegistryURL
)

object MyDeserializerWrapper {
val props = new Properties()
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val vProps = new kafka.utils.VerifiableProperties(props)
val deser = new KafkaAvroDecoder(vProps)
val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueName)
val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
}

case class DeserializedFromKafkaRecord( value: String)

val df = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.options(kafkaParams)
.load()
.map( x=>{
DeserializedFromKafkaRecord(MyDeserializerWrapper.deser.fromBytes(x.getAsArray[Byte], MyDeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
})

df
.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
`

@bclipp Could you please provide the full code snippet?

The above code works, the hard part is the schema , which is not inferred in streaming, you have to pull if from a file... val jsonDF = stringDF.select(
from_json(col("value").cast("string"),
spark.read.json("...").schema
).alias("match")
)

@bclipp @DhineshVijayakumar what is the schema registry url mean( val schemaRegistryURL = "http://sr:8081" ). what should I provide here if I am running in production.

@bclipp No idea what it is..where will it be usally..the kafka broker?

Also can you please share the sbt dependencies for the above code

It's available since Spark 2.4 so closing it. See https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-avro.html for some information.

well no its not really. there is a big difference if your using schema reg.

spark-sql-avro expects the Avro schema to be part of the binary payload.

It doesn't recongnize Confluent's Avro Binary Format

And SPARK-26314 was closed saying they won't support it.

Spark avro doesnt support Avro schemas with repeated field names.