This project is a simple example of how to implement external system calls like long SQL queries or HTTP API using async processing and pause/resume pattern on consumers.
Resilience4j is used for easy to use retry strategy implementation.
First you need to create two handlers
:
- a record handler implementing
Function<ConsumerRecord<?, ?>, Void>
; - an exception handler implementing
BiConsumer<ConsumerRecord<?, ?>, Exception>
.
Put your business logic in the record handler.
You need to map your retriable exceptions to RetriableException.
In this simple example, the retry strategy is a simple infinite retry strategy in case of RetriableException
.
The exception handler is only used in case of non retriable exception.
Then, you just need to build and start a ConsumerRunner
instance:
ConsumerRunner runner = ConsumerRunner.builder()
.recordHandler(recordHandler)
.exceptionHandler(exceptionHandler)
.threadCount(1)
.consumerProperties(properties)
.retryInterval(1_000)
.build()
.start();
Deserializers are decorated using an error proof implementation.
Testcontainer is used for testing. On macos, you may need to use these two properties to make it work with Colima:
DOCKER_HOST=unix:///Users/<your_user_account>/.colima/default/docker.sock
TESTCONTAINERS_RYUK_DISABLED=true