DeduplicationTransformer example only supports 1 partition
jomach opened this issue · 2 comments
As a developer I would like to have an example of a Event Deduplication pipeline that is robust.
The current example EventDeduplicationLambdaIntegrationTest relies on a simple Transfomer which stores the state locally.
This will not work if we have multiple partitions. How should we proceed here? There are multiple persons with this issues but not proper way of solving it. Can someone change this example?
You can only do event-deduplication if you ensure that duplicate events are written into the same partition. In our example, we assume that the eventId
is used to partition the data to meet this property. Thus, the example also works with multiple partitions.
If your original input data is not partitioned by an event-id and you cannot change the upstream application, you can repartition the data based on the event-id using Kafka Streams: stream.selectKey((k,v) -> v.eventId()).repartition().transform()
.
Closing this question. Feel free to follow up with more questions if necessary.
Just for completion. This is what I did:
var compositionWithKey = compositions
.map((k,v) -> new KeyValue<>(v.getEDMATERIALID(), v))
.repartition(Repartitioned.with(new Serdes.LongSerde(), materialCompositionSerde))
.toTable();
var materialWithKey = material
.map((k,v) -> new KeyValue<>(v.getEDMATERIALID(), v))
.repartition(Repartitioned.with(new Serdes.LongSerde(), materialSerde));
return materialWithKey
.join(compositionWithKey, valueJoiner, Joined.with(keySerde, materialSerde, materialCompositionSerde))
.peek((k,v)-> System.out.println("After Join"+k))
.transformValues(transformerSupplier, MATERIAL_STORE_NAME)
`