spring-projects/spring-integration-aws

Integration support for glueSchemaRegistry

jaychapani opened this issue ยท 19 comments

Hi there,

I'm trying to integrate glueSchemaRegistry with Kinesis using KPL/KCL.

Upon debugging the code I came across these lines of code and found that we don't have a way to pass the schema along the UserRecord which is calling KinesisProducer.addUserRecord(). The following piece of code is never execute because of null schema object.

if (schema != null && data != null) {
  if (schema.getSchemaDefinition() == null || schema.getDataFormat() == null) {
      throw new IllegalArgumentException(
          String.format(
              "Schema specification is not valid. SchemaDefinition or DataFormat cannot be null. SchemaDefinition: %s, DataFormat: %s",
              schema.getSchemaDefinition(),
              schema.getDataFormat()
          )
      );
  }
  GlueSchemaRegistrySerializer serializer = glueSchemaRegistrySerializerInstance.get(config);
  byte[] encodedBytes = serializer.encode(stream, schema, data.array());
  data = ByteBuffer.wrap(encodedBytes);
}

There is the logic like this in the component:

else if (message.getPayload() instanceof UserRecord) {
	return handleUserRecord(message, buildPutRecordRequest(message), (UserRecord) message.getPayload());
}

So, you can build that UserRecord together with the provided Schema upstream and pass it as a payload in the message to produce.

Does it make sense?

The message that we receive is of type GenericMessage

That's not related.
I suggest you to have a simple POJO (or lambda) transformer upfront which would create you an UserRecord with desired Schema.
Why you talk about Message is not clear.

Please, elaborate.

Sorry - Message payload is of type byte[]. I'm not following your suggestion to transform payload to UserRecord. It would be nice if you can help me with pointers or example.

Since you point to the code in the KplMessageHandler like this:

				UserRecord userRecord = new ();
				userRecord.setExplicitHashKey(putRecordRequest.getExplicitHashKey());
				userRecord.setData(putRecordRequest.getData());
				userRecord.setPartitionKey(putRecordRequest.getPartitionKey());
				userRecord.setStreamName(putRecordRequest.getStreamName());
				return handleUserRecord(message, putRecordRequest, userRecord);

It is just a fact that you need to do something similar to create UserRecord and populate a Schema in some your service (or transformer) code upfront into this UserRecord, before sending to this KplMessageHandler.

See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#transformer-annotation.

Your story sounds more like StackOverflow question.
Please, consider in the future to start from there: we use GitHub for bugs and feature requests.

Anyway: what is your code? How do you send that byte[] to this KplMessageHandler and why can't you do the same but with already UserRecord instead and with requested Schema ?

I am using spring cloud steam with spring-cloud-stream-binder-aws-kinesis and trying to integrate AWS glue schema registry. This piece of code is converting object to byte[] which is being passed to KplMessageHandler in the code flow which is resulting in above issue. Going forward I will post such things on StackOverflow.

OK. That's much better and thank you for more info.

Does that convert your data correctly using its Avro approach?
I wonder if you can modify your function to return a UserRecord converted manually instead...

Maybe there is a chance that you can share with us a simple project revealing the problem?

@artembilan - Thanks for your help through this.
Yes data conversion is working fine. I'm having trouble integrating with Glue Schema registry so that payload have injected value from schema which will help in serialization and deserialization. Here is the test repo that I'm using.

I see in your code this.customerQueue.offer(customer);.
How about calling the MessageConverter before that, create a UserRecord and populate desired Schema into it before posting to the queue?

Side question: is it really correct to convert via Apache Avro first and then re-convert with that Glue Schema?
Although it is not clear from where then we take a byte[] for the Glue Schema in the end by that UserRecord contract...

How about to use an AWSKafkaAvro(De)Serializer instead : https://stackoverflow.com/q/72420787/2756547 ?

Forget my previous question: it is not related since it talks about Apache Kafka.

So, here is a doc: https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds.

And they indeed shows that we have to use org.apache.avro.Schema.Parser to serialize data into byte[]. And I guess that our AvroSchemaMessageConverter does that trick.
Then their sample goes to the producer.addUserRecord(config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema); indicating that data is already serialized by Avro, but schema has to be propagated anyway for the proper Glue logic.
Therefore my suggestion to have your addCustomer() to produce a UserRecord manually serialized and supplied with a schema is the way to go for now as a workaround.

I guess as the fix we can add an option to inject a schema via SpEL expression as we do for many other UserRecord option.

Let me double check the code how proposed workaround can work!

So, if you produce a UserRecord (and of course with that schema option) from your Supplier, then you need to set spring.cloud.stream.default.producer.useNativeEncoding.
This way Spring Cloud Stream won't perform the conversion itself: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties.
The OutboundContentTypeConvertingInterceptor cannot be used in this case:

	/**
	 * Unlike INBOUND where the target type is known and conversion is typically done by
	 * argument resolvers of {@link InvocableHandlerMethod} for the OUTBOUND case it is
	 * not known so we simply rely on provided MessageConverters that will use the
	 * provided 'contentType' and convert messages to a type dictated by the Binders
	 * (i.e., byte[]).
	 */
	private final class OutboundContentTypeConvertingInterceptor

It always expects conversion to be done to the byte[].

I'm able to supply UserRecord to KplMessageHandler.handleMessageToAws. But now the problem is with the following code where it tries to generate PutRecordRequest using buildPutRecordRequest. How can I serialize UserRecord?

else if (message.getPayload() instanceof UserRecord) {
      return handleUserRecord(message, buildPutRecordRequest(message), (UserRecord) message.getPayload());
}

I see your point.
That buildPutRecordRequest() must not do any serialization since we already have the data in the UserRecord.
All your feedback is great and I guess we have to fix this respectively.

Meanwhile as another workaround, in addition to what we have so far with you about UserRecord propagation, here is what you can do for that serialization which is there anyway:

		@Bean
		ProducerMessageHandlerCustomizer<KplMessageHandler> kplMessageHandlerCustomizer() {
			return (handler, destinationName) -> handler.setMessageConverter(new MessageConverter() {

				@Override 
				public Object fromMessage(Message<?> message, Class<?> targetClass) {
					return ((UserRecord) message.getPayload()).getData().array();
				}

				@Override 
				public Message<?> toMessage(Object payload, MessageHeaders headers) {
					return null;
				}
				
			});
		}

Thanks, @artembilan - The suggestion that you provided above worked. Is there a way to get the schema and partition key like the way we are getting in buildPutRecordRequest ()?

Sorry, not sure in your question. I assumed since you have managed to create a UserRecord in your own code, then you know how to provide a partitionKey and schema for that instance from your code.

Please, share what you have so far and how you provide the partitionKey for a Spring Cloud Stream producer binding.
By default it is like this:

		FunctionExpression<Message<?>> partitionKeyExpression =
				new FunctionExpression<>((m) ->
						m.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER)
								? m.getHeaders().get(BinderHeaders.PARTITION_HEADER)
								: m.getPayload().hashCode());

I'm not sure from where to take that schema for Glue though.

The AWS official doc shows only this for KPL sample:

com.amazonaws.services.schemaregistry.common.Schema gsrSchema = 
    new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");

They also have this sample:

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

The docs is still the one I have shared before: https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds

I have pushed my latest code to the sample repo here. Currently to validate the changes I have hardcoded the partitionKey and stream. Sorry mentioned schema in the above question. I need to get both the values dynamically similar to buildPutRecordRequest().

Not sure what you want to hear from me since there is nothing what the framework can help, but there is indeed cannot be.
You call the REST controller you you end up here:

 public void addCustomer(Customer customer) throws IOException {

So, we got only the Customer info at this point.

Perhaps you can inject a BindingServiceProperties and take the specific binding properties from there for this your producer logic:

ConsumerProperties consumerProperties = this.bindingServiceProperties.getConsumerProperties(inputName);

And that one may give you an access to the stream and partitionKey (if configured).
You probably can also have an extra configuration property for schema as well if that can be configured some way.

As the fix for this issue I definitely going to expose a schema evaluation property on the KplMessageHandler and extract it for configuration from the Kinesis binder perspective.