Question: avro plus Schema Registry?
MarkRBM opened this issue · 9 comments
I notice with this pr that you have an example on working with avro specifically with Avro4s, #59 .
I am wondering if there is any expectation that this library will or won't work with trying integrate with the schema-registry as demonstrated here https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/SpecificAvroScalaIntegrationTest.scala#L69
I can add the required dependencies that give me AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG and SpecificAvroSerde and SpecificAvroRecord.
So first question is, do either of things matter? should I be able to ignore them and expect the schemaregistry to just work after adding its url to the stream config? I am guessing not but asking just in case I am overthinking things.
Assuming that it will not just work, I think there might be two obstacles to this approach with your library. The first is how to get an instance of SpecificAvroRecord that you can pass into SpecificAvroSerde and the second problem is will the SpecificAvroSerde be acceptable as the implicit key and value serde
Is the schema-registry a use case you guys have talked about at all? Am I missing something straightforward?
Thanks
Edit: in the example I linked I think they get an instance of SpecificAvroRecord from the fact they are using the Avro maven code generation plugin to generate WikiFeed but I am not positive
Edit 2: hmmm maybe I should be looking at the GenericAvroRecord example instead and using avro4s RecordFormat object to convert from case classes to GenericAvroRecord. that seems to mean I would still then need to use avro to convert it into the case class I want in each step of the stream which seems unneccesary. I wonder why avro4s does not offer a SpecificRecordFormat
In kafka-streams-query we had schema registry working (https://github.com/lightbend/kafka-streams-query/blob/develop/examples/example-dsl/src/main/scala/com/lightbend/kafka/scala/iq/example/WeblogProcessing.scala#L92-L94). In fact we had the option of switching it on through configuration. And it worked. But we did not try with case classes there. We need to do some sanity checks - but I think it should not be difficult.
Thanks for the reply, so I am looking at the example and I see the configuration being passed in obviously but I notice that that project does not depend on io.confluent.kafka-streams-avro-serde which as far as I can tell is where the 'schema-registry aware' serdes live so I wonder if that example would actually end up hitting the schema-registry if there was one?
that is the configuration property but I think that property gets ignored unless you use one of these https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro . which get included when your project depends on io.confluent.kafka-streams-avro-serde. I could be wrong but thats how I got it working
I have got a messy version of this projects StreamToTableJoinScalaIntegrationTestImplicitSerdes set up hitting my schema-registry working (to an extent but think I am close) using all GenericAvroSerde and avro4s to convert back and forth from GenericRecord to case classes. The only thing stopping me from being able to use SpecificAvroSerde is being able to generate an instance of SpecificRecord for my case classes as avro4s does not support that it seems.
if it would help at all to see what I have done I could clean it up and push to a branch here
Since I was trying to abstract over the optionality of schema registry, I kept the serializers here .. https://github.com/lightbend/kafka-streams-query/tree/develop/examples/example-dsl/src/main/scala/com/lightbend/kafka/scala/iq/example/serializers and use the AppSerializers as the mixin .. The implementation is a bit old and possibly can be cleaned though. HTH.
ahhhhh thank-you, let me have a look
- feel free to tell me this isnt the place for this convo, dont want to clutter up your github notifications
- in that last link you have this line https://github.com/lightbend/kafka-streams-query/blob/develop/examples/example-dsl/src/main/scala/com/lightbend/kafka/scala/iq/example/serializers/AppSerializers.scala#L25 am I right in saying you can do that because LogRecordAvro was created by the avro code generation tool and therefore meets the condition of
T <: org.apache.avro.specific.SpecificRecordon theSpecificAvroSerializerWithSchemaRegistry?
thats the bit I can't get my head around, I am not using code generation, just have a plain old Case Class that I want to create a SpecificAvroSerdeWithSchemaRegistry for. Maybe bijection is the answer from a brief look at their repo
I think we can discuss here.
Regarding (2) you are correct - I was using code generation using the sbt plugin for avro (which keeps it nicely decoupled from the code base). If you are not using code generation maybe bijection is the answer.
I am trending towards using code generation now. its a poc I am working on currently and would need to go back and reevaluate the ci process but it will probably end up being for the best. Thank-you for all the help.