Integration support for glueSchemaRegistry
jaychapani opened this issue ยท 19 comments
Hi there,
I'm trying to integrate glueSchemaRegistry with Kinesis using KPL/KCL.
Upon debugging the code I came across these lines of code and found that we don't have a way to pass the schema along the UserRecord
which is calling KinesisProducer.addUserRecord()
. The following piece of code is never execute because of null schema object.
if (schema != null && data != null) {
if (schema.getSchemaDefinition() == null || schema.getDataFormat() == null) {
throw new IllegalArgumentException(
String.format(
"Schema specification is not valid. SchemaDefinition or DataFormat cannot be null. SchemaDefinition: %s, DataFormat: %s",
schema.getSchemaDefinition(),
schema.getDataFormat()
)
);
}
GlueSchemaRegistrySerializer serializer = glueSchemaRegistrySerializerInstance.get(config);
byte[] encodedBytes = serializer.encode(stream, schema, data.array());
data = ByteBuffer.wrap(encodedBytes);
}
There is the logic like this in the component:
else if (message.getPayload() instanceof UserRecord) {
return handleUserRecord(message, buildPutRecordRequest(message), (UserRecord) message.getPayload());
}
So, you can build that UserRecord
together with the provided Schema
upstream and pass it as a payload in the message to produce.
Does it make sense?
The message that we receive is of type GenericMessage
That's not related.
I suggest you to have a simple POJO (or lambda) transformer upfront which would create you an UserRecord
with desired Schema
.
Why you talk about Message
is not clear.
Please, elaborate.
Sorry - Message payload is of type byte[]
. I'm not following your suggestion to transform payload to UserRecord
. It would be nice if you can help me with pointers or example.
Since you point to the code in the KplMessageHandler
like this:
UserRecord userRecord = new ();
userRecord.setExplicitHashKey(putRecordRequest.getExplicitHashKey());
userRecord.setData(putRecordRequest.getData());
userRecord.setPartitionKey(putRecordRequest.getPartitionKey());
userRecord.setStreamName(putRecordRequest.getStreamName());
return handleUserRecord(message, putRecordRequest, userRecord);
It is just a fact that you need to do something similar to create UserRecord
and populate a Schema
in some your service (or transformer) code upfront into this UserRecord
, before sending to this KplMessageHandler
.
See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#transformer-annotation.
Your story sounds more like StackOverflow question.
Please, consider in the future to start from there: we use GitHub for bugs and feature requests.
Anyway: what is your code? How do you send that byte[]
to this KplMessageHandler
and why can't you do the same but with already UserRecord
instead and with requested Schema
?
I am using spring cloud steam with spring-cloud-stream-binder-aws-kinesis
and trying to integrate AWS glue schema registry. This piece of code is converting object to byte[] which is being passed to KplMessageHandler
in the code flow which is resulting in above issue. Going forward I will post such things on StackOverflow.
OK. That's much better and thank you for more info.
Does that convert your data correctly using its Avro approach?
I wonder if you can modify your function to return a UserRecord
converted manually instead...
Maybe there is a chance that you can share with us a simple project revealing the problem?
@artembilan - Thanks for your help through this.
Yes data conversion is working fine. I'm having trouble integrating with Glue Schema registry so that payload have injected value from schema which will help in serialization and deserialization. Here is the test repo that I'm using.
I see in your code this.customerQueue.offer(customer);
.
How about calling the MessageConverter
before that, create a UserRecord
and populate desired Schema
into it before posting to the queue?
Side question: is it really correct to convert via Apache Avro first and then re-convert with that Glue Schema?
Although it is not clear from where then we take a byte[]
for the Glue Schema in the end by that UserRecord
contract...
How about to use an AWSKafkaAvro(De)Serializer
instead : https://stackoverflow.com/q/72420787/2756547 ?
Forget my previous question: it is not related since it talks about Apache Kafka.
So, here is a doc: https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds.
And they indeed shows that we have to use org.apache.avro.Schema.Parser
to serialize data into byte[]
. And I guess that our AvroSchemaMessageConverter
does that trick.
Then their sample goes to the producer.addUserRecord(config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);
indicating that data
is already serialized by Avro, but schema
has to be propagated anyway for the proper Glue logic.
Therefore my suggestion to have your addCustomer()
to produce a UserRecord
manually serialized and supplied with a schema is the way to go for now as a workaround.
I guess as the fix we can add an option to inject a schema
via SpEL expression as we do for many other UserRecord
option.
Let me double check the code how proposed workaround can work!
So, if you produce a UserRecord
(and of course with that schema
option) from your Supplier
, then you need to set spring.cloud.stream.default.producer.useNativeEncoding
.
This way Spring Cloud Stream won't perform the conversion itself: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties.
The OutboundContentTypeConvertingInterceptor
cannot be used in this case:
/**
* Unlike INBOUND where the target type is known and conversion is typically done by
* argument resolvers of {@link InvocableHandlerMethod} for the OUTBOUND case it is
* not known so we simply rely on provided MessageConverters that will use the
* provided 'contentType' and convert messages to a type dictated by the Binders
* (i.e., byte[]).
*/
private final class OutboundContentTypeConvertingInterceptor
It always expects conversion to be done to the byte[]
.
I'm able to supply UserRecord
to KplMessageHandler.handleMessageToAws
. But now the problem is with the following code where it tries to generate PutRecordRequest
using buildPutRecordRequest
. How can I serialize UserRecord
?
else if (message.getPayload() instanceof UserRecord) {
return handleUserRecord(message, buildPutRecordRequest(message), (UserRecord) message.getPayload());
}
I see your point.
That buildPutRecordRequest()
must not do any serialization since we already have the data in the UserRecord
.
All your feedback is great and I guess we have to fix this respectively.
Meanwhile as another workaround, in addition to what we have so far with you about UserRecord
propagation, here is what you can do for that serialization which is there anyway:
@Bean
ProducerMessageHandlerCustomizer<KplMessageHandler> kplMessageHandlerCustomizer() {
return (handler, destinationName) -> handler.setMessageConverter(new MessageConverter() {
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return ((UserRecord) message.getPayload()).getData().array();
}
@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
return null;
}
});
}
Thanks, @artembilan - The suggestion that you provided above worked. Is there a way to get the schema and partition key like the way we are getting in buildPutRecordRequest ()
?
Sorry, not sure in your question. I assumed since you have managed to create a UserRecord
in your own code, then you know how to provide a partitionKey
and schema
for that instance from your code.
Please, share what you have so far and how you provide the partitionKey
for a Spring Cloud Stream producer binding.
By default it is like this:
FunctionExpression<Message<?>> partitionKeyExpression =
new FunctionExpression<>((m) ->
m.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER)
? m.getHeaders().get(BinderHeaders.PARTITION_HEADER)
: m.getPayload().hashCode());
I'm not sure from where to take that schema
for Glue though.
The AWS official doc shows only this for KPL sample:
com.amazonaws.services.schemaregistry.common.Schema gsrSchema =
new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");
They also have this sample:
Schema gsrSchema =
new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");
The docs is still the one I have shared before: https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds
I have pushed my latest code to the sample repo here. Currently to validate the changes I have hardcoded the partitionKey
and stream
. Sorry mentioned schema in the above question. I need to get both the values dynamically similar to buildPutRecordRequest()
.
Not sure what you want to hear from me since there is nothing what the framework can help, but there is indeed cannot be.
You call the REST controller you you end up here:
public void addCustomer(Customer customer) throws IOException {
So, we got only the Customer
info at this point.
Perhaps you can inject a BindingServiceProperties
and take the specific binding properties from there for this your producer logic:
ConsumerProperties consumerProperties = this.bindingServiceProperties.getConsumerProperties(inputName);
And that one may give you an access to the stream
and partitionKey
(if configured).
You probably can also have an extra configuration property for schema
as well if that can be configured some way.
As the fix for this issue I definitely going to expose a schema
evaluation property on the KplMessageHandler
and extract it for configuration from the Kinesis binder perspective.
Thank you @artembilan