zarusz/SlimMessageBus

[Host.Kafka] Kafka Headers

correa-juan opened this issue · 5 comments

Hi !

We are sending an event with some headers but the headers are being sent with extra quotes.

image

Do you know how to change that behavior ?

thanks a lot !

Awesome project !

Hello, what version of SMB Kafka are you using? What version of serializer plugin? Do you setup custom kafka header serializer in SMB?

Hello @zarusz ! We are using:

image

this is our config:

image

The underlying client (Confluent.Kafka library) required the header values to come as byte[].

So since you're using the JsonMessageSerializer here, values are serialized to JSON. In the example, the primitive value is a string, so it would be serialized to "string here" which is what is expected. It does look weird in the tool that you've used to view the headers in Kafka (what tool are you using?).

Now, if you need to send a string representation in such a case, then the following custom serializer should do the trick:

public class StringMessageSerializer : IMessageSerializer
{
    private readonly Encoding _encoding;

    public StringMessageSerializer(Encoding encoding = null)
    {
        _encoding = encoding ?? Encoding.UTF8;
    }

    #region Implementation of IMessageSerializer

    public byte[] Serialize(Type t, object message)
    {
        if (message == null) return null;
        var payload = _encoding.GetBytes(message.ToString());
        return payload;
    }

    public object Deserialize(Type t, byte[] payload)
    {
        if (payload == null) return null;
        var str = _encoding.GetString(payload);
        // ToDo: maybe int.TryParse if you need to detect the primitive type
        // ToDo: maybe Guid.TryParse
        // ToDo: maybe bool.TryParse 
        return str;
    }

    #endregion
}

mbb.    
   .WithProviderKafka(new KafkaMessageBusSettings(kafkaBrokers)
   {
      HeaderSerializer = new StringMessageSerializer()
   });

Let me know if this works for you.

Also, why do you create the MessageBusBuilder.Create() instead of using the MS Dependency Injection integration using .AddSlimMessageBus()? Using the DI integration will setup various aspects of SMB.

it works !

thanks a lot !

I might add this as the default header serializer in Kafka in the upcoming releases.

Also, could you see my questions from previous reply? I am curious about your setup.