KStream update operations with Spring Cloud Stream Kafka Streams binder

Here is the use case. Update events are coming in as Avro records with a key (For simplicity, we are using Integer as key) into a Kafka topic. Update events contain two key information - a reason code (I for increment, D for decrement, X for delete) and a delta amount to be applied. The processor needs to examine this data and do an aggregation based on the reason code and the delta.

Here is a concrete example.

A record comes in with ID 100 and update event. Update event contains the reasonCode I and delta amount of $100. Sine this is the first time, the processor sees this ID, it just a simple initial aggregation with this data, i.e at this point for the ID 100, there is a current total of $100. Another record now comes in with the same ID 100 and reason code of I. But this time the delta amount is $75. Now the aggregation in the processor looks at the existing total (from the underlying state store) and add $75 on top of that. After this, the total is $175. Let’s say another record now comes in with the same ID. But this time the reasonCode is D (for decrement) and dollar amount is $100. After the aggregation, the summary information for ID 100 only has $75. Now, another request comes in for the same ID. This time, the reasonCode is X for delete. The processor marks the value as a null when it writes it out so that downstream processors can take further actions (for e.g. delete the record for this ID from a database.)

Another requirement since we are using Avro is that the application needs to store the avro schema in Confluent Schema Registry.

Running the application

Start the Confluent Platform:

git clone https://github.com/confluentinc/cp-docker-images
cd cp-docker-images
git checkout 5.3.1-post
cd examples/cp-all-in-one/
docker-compose up -d --build

In order to run this sample, you need to set compatibility to NONE on Confluent schema registry server.

curl -X PUT http://127.0.0.1:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"

Start the application StreamTableUpdateApplication in an IDE (or CLI)

Verifying the application behavior

In order to help testing, the application comes up with a handy producer which is a REST controller. When the REST endpoint is invoked, the producer sends an update event downstream which is picked up by the Kafka Streams processor.

Let’s run it.

We are sending a POST request with ID - 20 and reasonCode I and an amount of $100.

If you are running scheam registry from the docker container above, you can see the output as below.

docker exec -it schema-registry kafka-avro-console-consumer --bootstrap-server broker:29092 --topic bohSummary --from-beginning --key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer --property print.key=true

You should see an output like this:

20	{"delta":100.0,"field1":0,"field2":0,"field3":0}

Now lets do a few more curl commands.

curl -X POST http://localhost:9001/updateEvent/20/I/50`
curl -X POST http://localhost:9001/updateEvent/20/I/20`
curl -X POST http://localhost:9001/updateEvent/20/I/10`

and you should see the corresponding output on the avro conumer.

20	{"delta":150.0,"field1":0,"field2":0,"field3":0}
20	{"delta":170.0,"field1":0,"field2":0,"field3":0}
20	{"delta":180.0,"field1":0,"field2":0,"field3":0}

Now do some decrements.

curl -X POST http://localhost:9001/updateEvent/20/D/50
curl -X POST http://localhost:9001/updateEvent/20/D/40
curl -X POST http://localhost:9001/updateEvent/20/D/30

This is the output from the avro console consumer:

20	{"delta":130.0,"field1":0,"field2":0,"field3":0}
20	{"delta":90.0,"field1":0,"field2":0,"field3":0}
20	{"delta":60.0,"field1":0,"field2":0,"field3":0}

Now lets delete this key:

curl -X POST http://localhost:9001/updateEvent/20/X/1

Output now shows:

20	null

There is also an interactive quey provided to query the state store. In the case of this application this is redundant information that we already saw on the avro console consumer. However it is provided for illustration purposes.

Here is the endpoint to do that. Lets try it for the ID 20, which we just delted.

curl -X POST http://localhost:9001/delta/20
{"timestamp":"2019-09-20T20:26:22.499+0000","status":500,"error":"Internal Server Error","message":"No data found!","path":"/delta/20"}

As expected, the state store removed that key.

Now lets add another key and try again.

curl -X POST http://localhost:9001/updateEvent/22/I/2000

curl -X POST http://localhost:9001/delta/22
2000.0