spring-cloud/spring-cloud-function

Make imperative `Consumer` composable with a reactive `Supplier<Flux<?>>`

artembilan opened this issue · 0 comments

With Spring Cloud Function we can simply make a composition like this: supplier|consumer. It is even composed when Supplier<Flux<?>> and when we do this:

Runnable composedFunction = functionCatalog.lookup(null);
composedFunction.run();

Everything is OK, but consumer does not really consume the Flux returned from the supplier.

To make it working we need a special wrapper around imperative consumer:

@Bean
Consumer<Flux<Message<?>>> fluxWrapperConsumer(@Qualifier("myConsumer") Consumer<Message<?>> myConsumer) {
	return messageFlux -> messageFlux.doOnNext(myConsumer).subscribe();
}

And then compose a reactive supplier with this one already.

The idea of this request is to have an automatic wrapping in the Spring Cloud Function when it composes reactive suppliers with imperative consumers.