Farfetch/kafkaflow

[Bug Report]:Unable to produce tombstone records

adimoraret opened this issue · 7 comments

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

I am unable to produce tombstone records. It looks like the message value is sent as byte[0] instead of null.

Steps to reproduce

services.AddKafka(kafka => kafka
            .AddCluster(cluster => cluster
                .WithBrokers(new[] { kafkaBrokers })
                .AddProducer("test", producer => producer
                    .AddMiddlewares(middlewares => middlewares
                        .AddSerializer<JsonCoreSerializer>(_ => new JsonCoreSerializer(options))
                    )
                    .WithAcks(Acks.All)
                )
            )
        );

await producer.ProduceAsync("test", Guid.NewGuid().ToString(), null);

Expected behavior

Message value should be sent as null

Actual behavior

Message value is sent as byte[0]

KafkaFlow version

3.0.3

Do you have any error while producing the message?

No, there is no error. But it won't produce tombstone records when using a kafka upsert source: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/ .

Like we discussed on kafkaflow on slack, by using a custom serializer, you can bypass this limitation by checking if the message is null before serializing the message value.

This should produce a message where the message value is an empty byte[] which is the tombstone.

Meanwhile, we will address this issue, by creating a pull request to fix the JsonCoreSerializer.

Regarding the Kakfa upsert source, flink we won't expect a different behavior from Kafka client as they both produce a null record.

I have tried that approach, end to end, using kafka-upsert source and iceberg table destination in AWS. It sends indeed an empty byte array, but that did not translate into a tombstone record.
If I read correctly this https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/, it would only work if we send null for message.
Maybe I missed something, but I'm happy to test your PR once ready.

Fixing the issue with JsonCoreSerializer when producing a message with a null value it results in a tombstone record where the message value is an empty byte[].

"A null payload is a payload with 0 bytes"
reference here

public Task SerializeAsync(object? message, Stream output, ISerializerContext context)
    {
        if (message == null)
        {
            return Task.CompletedTask;
        }

        return SerializeNonNullMessageAsync(message, output);
    }

As an alternative, you can always use the native Kafka client to confirm that it also produces a tombstone record with an empty byte[] like KafkaFlow.

When fixing it with the serializer, then it has to be changed for all serializers. Sending null for tombstoning should not be part of the serialization imho.

great library by the way. :)

Fixing the issue with JsonCoreSerializer when producing a message with a null value it results in a tombstone record where the message value is an empty byte[].

"A null payload is a payload with 0 bytes"
reference here

public Task SerializeAsync(object? message, Stream output, ISerializerContext context)
    {
        if (message == null)
        {
            return Task.CompletedTask;
        }

        return SerializeNonNullMessageAsync(message, output);
    }

As an alternative, you can always use the native Kafka client to confirm that it also produces a tombstone record with an empty byte[] like KafkaFlow.

I resolved it the same way for the ProtobufSerializer. Is there an idea to fix it in general?