/functional-streaming

4del: This is going to be removed... or refactored if not so useless as I'm currently thinking... spring-cloud-stream, spring-cloud-reactive-streams, kafka, docker, gradle composed builds, streaming using reactor Mono/Flux

Primary LanguageJava

functional-streaming Build Status

functional streaming using spring cloud (reactive) streams, schema registry, avro

in our messaging pipeline we can use Mono / Flux from reactor project, which makes developing streaming cloud native application more exciting!

  @Output(Processor.OUTPU)
  @StreamListener(Processor.INPUT)
  public Flux<Message<String>> correctMessagesReceiver(final Flux<Map<String, String>> payload) {

    return payload.map(Map::entrySet)
                  .map(Set::iterator)
                  .filter(Iterator::hasNext)
                  .map(Iterator::next)
                  .map(this::doSomeTransformationLogic)
                  .window(3) // pagination or could be also time interval collector
                  .flatMap(window -> window.reduce("|", (s1, s2) -> s1 + s2 + "|"))
                  .map(body -> MessageBuilder.withPayload(body).build());
  }

right now is only one limitation: spring-cloud-stream doesn’t suppotr spring-webflux yet, but I hope It will be changed soon (plus see streaming-reactive-*)

build, run, clean

  1. bootstrap your message broker (docker-compose required)

  2. start apps

  3. cleanup

gradle composeUp

gradle bootRun-producer
gradle bootRun-processor
gradle bootRun-aggregator
gradle bootRun-consumer
gradle bootRun-reactive-producer
gradle bootRun-reactive-aggregator
gradle bootRun-reactive-processor
gradle bootRun-reactive-consumer

gradle composeDown
gradle --stop
run integration tests
gradle integrationTests

other repositories: