/confluent-spark-avro

Spark UDFs to deserialize Avro messages with schemas stored in Schema Registry.

Primary LanguageScalaApache License 2.0Apache-2.0

Confluent Spark Avro

Spark UDFs to deserialize Avro messages with schemas stored in Schema Registry. More details about Schema Registry on the official website.

Usages

We expect that you use it together with native Spark Kafka Reader.

val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topic1")
    .load()

val utils = ConfluentSparkAvroUtils("http://schema-registry.my-company.com:8081")
val keyDeserializer = utils.deserializerForSubject("topic1-key")
val valueDeserialzer = utils.deserializerForSubject("topic1-value")

df.select(
    keyDeserializer(col("key").alias("key")),
    valueDeserializer(col("value").alias("value"))
).show(10)

Build

The tool is designed to be used with Spark >= 2.0.2.

sbt assembly
ll target/scala-2.11/confluent-spark-avro-assembly-1.0.jar

Testing

We haven't added unit tests, but you can test UDFs with the next command:

sbt "project confluent-spark-avro" "run kafka.host:9092 http://schema-registry.host:8081 kafka.topic"

TODO

[ ] Spark UDFs to serialize messages.

License

The project is licensed under the Apache 2 license.