see other repo:
functional streaming using spring cloud (reactive) streams, schema registry, avro
see streaming-reactive-processor and streaming-reactive-consumer projects
in our messaging pipeline we can use Mono / Flux from reactor project, which makes developing streaming cloud native application more exciting!
@Output(Processor.OUTPU)
@StreamListener(Processor.INPUT)
public Flux<Message<String>> correctMessagesReceiver(final Flux<Map<String, String>> payload) {
return payload.map(Map::entrySet)
.map(Set::iterator)
.filter(Iterator::hasNext)
.map(Iterator::next)
.map(this::doSomeTransformationLogic)
.window(3) // pagination or could be also time interval collector
.flatMap(window -> window.reduce("|", (s1, s2) -> s1 + s2 + "|"))
.map(body -> MessageBuilder.withPayload(body).build());
}
right now is only one limitation: spring-cloud-stream doesn’t suppotr spring-webflux yet, but I hope It will be changed soon (plus see streaming-reactive-*)
build, run, clean
-
bootstrap your message broker (docker-compose required)
-
start apps
-
cleanup
gradle composeUp
gradle bootRun-producer
gradle bootRun-processor
gradle bootRun-aggregator
gradle bootRun-consumer
gradle bootRun-reactive-producer
gradle bootRun-reactive-aggregator
gradle bootRun-reactive-processor
gradle bootRun-reactive-consumer
gradle composeDown
gradle --stop
run integration tests
gradle integrationTests
other repositories: