asyncapi/bindings

Specify how Avro schema ID is serialized on Kafka

lbroudoux opened this issue · 16 comments

Reason/Context

AsyncAPI allows to reference Avro schema used for serializing / deserializing messages on a topic and parser has a sample on how to express that with a schema being already into a registry. Nice !

When it comes to serializing Avro data to a Kafka topic, you usually have 2 options :

  • The « old-fashioned one » that is about putting raw Avro binary representation of the message payload,
  • The « modern one » that is about putting the Schema ID + the Avro binary representation of the message payload (see Schema Registry: A quick introduction.

As you have guessed, using one or the other of these options has impact on the consumers ! But as of today, consumer has no mean to know which one of the serialization mode is used.

Moreover, it appears that different SerDes libraries (Confluent, Apicurio and IBM as examples) have different way of serializing the Schema ID. Confluent is using first byte of payload to encode the schema ID whereas Apicurio and IBM SerDes are also able to put the ID into headers.

After exposing this issue on the AsyncAPI Slack, there's also concerns on how to retrieve Schema Registry endpoint URL. As we already have Server information with specific bindings info into AsyncAPI, it can be considered as legit - in the case a Schema Registry is needed to deserialize message - to also allow specification of the Schema Registry URL associated to Server.

Description

I propose to add a new Kafka specific binding attribute like at the Channel level to specify how the Schema ID is encoded. We could call it schemaIDLocation with possible values being header or payload. If binding attribute is missing then it simply means that Schema info is not provided. Depending on location value you may switch to different SerDes implementation for consumer generation or configuration

I also propose to add a new Kafka specific binding attribute at the Server level to specify the Schema Registry URL. Simply name it schemaRegistryUrl.

Welcome to AsyncAPI. Thanks a lot for reporting your first issue.

Keep in mind there are also other channels you can use to interact with AsyncAPI community. For more details check out this issue.

I had a go at illustrating what this would look like. It got a bit long, so instead of putting it in a super-long comment here, I've put it in a stand-alone gist at https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188

I believe - binary/json can be inferred from the message binding content type? application/json or application/octet-stream?

@nictownsend That's a great idea - much simpler! I've updated the gist with that.

@lbroudoux I've updated the gist to reflect your remaining comments in Slack. It's slightly different to what you suggested (I put useRegistry alongside the schema registry url in the server binding, because it felt a better fit to make it part of describing server info) but I think I've addressed the gaps you spotted

https://gist.github.com/dalelane/3931c17b14c51fa4a1cf25496237d188#how-this-could-be-described-in-asyncapi-3 - I don't think schemaIdLocation: "payload" is enough to infer the sample code byte skipping, we need to know the schemaVendor too.

Also, I think I agree with @lbroudoux over useRegistry - it seems more explicit to say "this channel/operation uses a registry" vs "this server uses a registry", as there may be use cases where the channel contents don't need a schema?

Hi @dalelane and @nictownsend,
Thanks for pursuing discussion : I was quite a bit in a rush these last days.

I agree with @nictownsend with the useRegistry flag - for me it's also more explicit to have it at the channel/operation level than having to browse the servers for that.

Also, to me it seems that the schemaRegistryAvailable flag you introduced is serving a different purpose by telling consumers that "we've used a registry you will not be able to access" and "you'll have to skip the 5 first bytes".

To me useRegistry is much more useful to message producers telling that they'll have to pick up a correct SerDes library and integrate lately with a registry.

What do you think?

I'll also have a review on the PR #55

This issue has been automatically marked as stale because it has not had recent activity 😴
It will be closed in 60 days if no further activity occurs. To unstale this issue, add a comment with detailed explanation.
Thank you for your contributions ❤️

Hi folks, this one is getting stale, do we plan to push it forward?

hi @derberg

yeah, sorry - my to-do list has gotten a bit out of control recently so I've not had time to put into this one for a while

If someone wants to take this and run with it, I certainly won't complain. But if no-one does, I really do want to come back to this and finish it.

Hi!

I think that all the intent of this issue is now embedded into #55 where conversation is going on... I have a review today and will share thoughts and questions on #55 - to not disseminate information.

This issue has been automatically marked as stale because it has not had recent activity 😴
It will be closed in 60 days if no further activity occurs. To unstale this issue, add a comment with detailed explanation.
Thank you for your contributions ❤️

Hey folks, stale bot killed #55 and killed this issue too. I on my own can't really proceed with this as you are the ones using Avro in production and know the best direction we should take here. Feel free to reopen, or just comment whenever you want to continue with these

Sorry about that - I've been a bit distracted the last couple months between new job, holiday, etc.

Let me pick this up again this week, and I'll re-open when I have something worth sharing.

Can anyone please help why I am not able to run and execute the generator for the remote using
ag ./asyncapi.yaml @asyncapi/java-spring-template -o output -p user="username"-p password="password"

I have seen some examples where they are providing the schema registry url under Url-->Binding

Here is how my asyncapi.yaml looks like

asyncapi: 2.4.0
info:
title: Sample Service
version: 1.0.0
description: This service reads the schema from remote confluent schema registry
servers:
development:
url: "url:port"
description: Development server
protocol: kafka
protocolVersion: 1.0.0
bindings:
kafka:
schemaRegistryUrl: "schemaRegistryUrl"
schemaRegistryVendor: "confluent"
schemaRegistryAvailable: true
channels:
Sample-output-topic:
publish:
bindings:
kafka:
message:
name: TempratureReading
schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
contentType: 'application/octet-stream'
payload:
$ref: 'Schema url'
title: ConsumerRecord
Sample-output-topic::
subscribe:
bindings:
kafka: {}
message:
name: TempratureReading
schemaFormat: 'application/vnd.apache.avro;version=1.9.0'
contentType: 'application/octet-stream'
payload:
$ref: 'Schema url'
title: Customer

@tanujazz3 Hello!

Your question doesn't look like it is related to this closed issue - would you mind opening a new issue so this can be investigated, please?

I think https://github.com/asyncapi/java-spring-template/issues would be the best place to do that. And please can you include an example of the output you get when you run the command.