Farfetch/kafkaflow

AddSingleTypeDeserializer tries to run for all types of messages[Bug Report]:

mhkolk opened this issue · 5 comments

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

With the following configuration Kafkaflow will try the first registered deserializer for all types of Kafka messages received and it will also run all serializers even when the first one should be hit only.

					.AddSingleTypeDeserializer<Acknowledgement_MarketDocument, KafkaXmlDeserializer>()
						.AddTypedHandlers(handlers => handlers
							.AddHandler<Acknowledgement_MarketDocumentMessageHandler>()
								.WithHandlerLifetime(InstanceLifetime.Scoped)
							.WhenNoHandlerFound(context =>
								Log.Warning("Message not handled > Partition: {0} | Offset: {1} | Context: {2}",
									context.ConsumerContext.Partition,
									context.ConsumerContext.Offset,
									context.ConsumerContext.ToString()))
							)
					.AddSingleTypeDeserializer<ReserveAllocation_MarketDocument, KafkaXmlDeserializer>()
						.AddTypedHandlers(handlers => handlers
							.AddHandler<ReserveAllocation_MarketDocumentMessageHandler>()
								.WithHandlerLifetime(InstanceLifetime.Scoped)
							.WhenNoHandlerFound(context =>
								Log.Warning("Message not handled > Partition: {0} | Offset: {1} | Context: {2}",
									context.ConsumerContext.Partition,
									context.ConsumerContext.Offset,
									context.ConsumerContext.ToString()))
							)

If Acknowledgement_MarketDocument was to be received it will run the first deserializer successfully and it will also try to run the second one but fail with the following exception

KafkaFlow: Error processing message | Data: ... } | Exception: {"Type":"System.InvalidOperationException","Message":"Message must be a byte array to be deserialized and it is \u0027KafkaFlow.Message\u0027","StackTrace":"   at KafkaFlow.Middlewares.Serializer.DeserializerConsumerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\n   at KafkaFlow.Middlewares.TypedHandler.TypedHandlerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\n   at KafkaFlow.Middlewares.Serializer.DeserializerConsumerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\n   at KafkaFlow.Consumers.ConsumerWorker.ProcessMessageAsync(IMessageContext context, CancellationToken cancellationToken)"}

If on the other hand it was the ReserveAllocation_MarketDocument that was received it too will run the first deserializer and fail immediately not running the second one at all with the following exception

KafkaFlow: Error processing message | Data: {"Message": ... | Exception: {"Type":"System.InvalidOperationException","Message":"There is an error in XML document (2, 2).","StackTrace":"   at System.Xml.Serialization.XmlSerializer.Deserialize(XmlReader xmlReader, String encodingStyle, XmlDeserializationEvents events)\n   at System.Xml.Serialization.XmlSerializer.Deserialize(Stream stream)\n   at Setup.Fms.Common.Kafka.KafkaXmlDeserializer.DeserializeAsync(Stream input, Type type, ISerializerContext context) in C:\\Git\\Setup.Fms\\Setup.Fms.Common\\Kafka\\KafkaXmlSerializer.cs:line 47\n   at KafkaFlow.Middlewares.Serializer.DeserializerConsumerMiddleware.Invoke(IMessageContext context, MiddlewareDelegate next)\n   at KafkaFlow.Consumers.ConsumerWorker.ProcessMessageAsync(IMessageContext context, CancellationToken cancellationToken)"}

There is no way atm to configure this the way I want it (one deserializer for one type with one specific type handler for type) and I tried the other overloads (with factories) too - they all issue the same exceptions or am I missing something?

Steps to reproduce

Try to use multiple deserializers at the same time for different types and with different custom type handlers.

Also tried using the same (generic) deserializer but since there is not generic ISerializer available was out of luck.

Even if I introduce the following class, which is the most I can do in terms of generics, it wouldn't work:

	public class KafkaXmlDeserializer<T> : IDeserializer
	{
		public Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
		{
			var serializer = new XmlSerializer(typeof(T));
			var obj = serializer.Deserialize(input);
			return Task.FromResult(obj!);
		}
	}

Expected behavior

  1. No exceptions, types should be handled with the deserializer and type handler that was registered for them
  2. Easier (more intuitive) registration of (de)serializer and typed handlers.

Actual behavior

See above for exceptions and thorough explanation

KafkaFlow version

3.0.3

What I find unintuitive is not having generic (de)serializers and having to provide type resolvers for things that deserializer could do.

For example, if we have a generic deserializer and provide type for it at registration through generic parameter then the ability of the serializer to deserialize the message to that type is already acting as a type resolver, this is the case both with Json and XML.

And doing that we already get the instance of the message content in the correct strong type.

My use case is such that I need custom type resolvers - because incoming messages lack the Message-Type header and the default type resolver won't resolve. So I have to run custom resolvers on all messages and the way I resolve type is by deserializing the raw content. If it fails, it is not the correct type, move on.

Hopefully someone will understand my use case.

Hi @mhkolk ,
We think this is a pertinent improvement to be made, but we'll need some time to analyze how to implement such changes.

Hi,
Analyzing your issue, there is no need to register the deserializer for each type, so you can just use the AddSerializer<>(), with the two needed handlers. If the messages aren't being produced by KafkaFlow, and the message type is not defined in headers, you can register an IMessageTypeResolver , to get the message type, and use AddSerializer<JsonMessageSerializer, YourTypeResolver>(). Please let us know if this works for you.

Hi @mhkolk ,

If you're still experiencing this issue or have any additional information to share, please feel free to let us know.

However, if we don't receive any updates or feedback from you within the next 5 business days, we may need to consider closing this issue. Please understand that this is not a final decision, and you can always reopen the issue or create a new one in the future.

We appreciate your contribution to our project and look forward to hearing from you soon. If you have any questions or need further assistance, don't hesitate to reach out.

Hi @mhkolk,

We will be closing this issue since the question has been answered. Please feel free to respond to this message or reopen the issue if you'd like to continue the discussion or if you've encountered any new developments related to it.

Thank you for your contribution.