Read Me First

The following was discovered as part of building this project:

  • The JVM level was changed from '11' to '17', review the JDK Version Range on the wiki for more details.

Getting Started

Mono Stream Behaviour

Plain Mono Stream

Only one element in the stream, onNext is executed once, then the whole stream is completed.

    public void MonoTest() {
        Mono<?> stringMono = Mono
//                .then(Mono.error(new RuntimeException("Intent to throw an exception")))

        stringMono.subscribe(System.out::println, (error) -> System.out.println(error.getMessage()));

Output is as below

21:37:48.753 [Test worker] DEBUG reactor.util.Loggers -- Using Slf4j logging framework
21:37:48.759 [Test worker] INFO reactor.Mono.Just.1 -- | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
21:37:48.760 [Test worker] INFO reactor.Mono.Just.1 -- | request(unbounded)
21:37:48.760 [Test worker] INFO reactor.Mono.Just.1 -- | onNext(Phil)
21:37:48.760 [Test worker] INFO reactor.Mono.Just.1 -- | onComplete()

Mono Stream with error

Error interrupt the whole Mono stream, subscriber can't accept the only one element at all.

    public void MonoTest() {
        Mono<?> stringMono = Mono
                .then(Mono.error(new RuntimeException("Intent to throw an exception")))

        stringMono.subscribe(System.out::println, (error) -> System.out.println(error.getMessage()));
21:40:41.217 [Test worker] DEBUG reactor.util.Loggers -- Using Slf4j logging framework
21:40:41.223 [Test worker] INFO reactor.Mono.IgnoreThen.1 -- onSubscribe(MonoIgnoreThen.ThenIgnoreMain)
21:40:41.223 [Test worker] INFO reactor.Mono.IgnoreThen.1 -- request(unbounded)
21:40:41.224 [Test worker] ERROR reactor.Mono.IgnoreThen.1 -- onError(java.lang.RuntimeException: Intent to throw an exception)
21:40:41.224 [Test worker] ERROR reactor.Mono.IgnoreThen.1 -- 
java.lang.RuntimeException: Intent to throw an exception
	at com.geekchow.webflux.MonoFluxTest.MonoTest(
Intent to throw an exception

Flux Stream Behaviour

Plain Flux Stream

All elements within the Flux stream are accepted by subscriber one by one, and at last an onCompleted event occurs.

    public void FluxTest() {
        Flux<?> stringFlux = Flux
                .just("Phil", "Zhou", "Full", "Stack", "Programmer")
                .concatWithValues("AWS", "DevOps")

        stringFlux.subscribe(System.out::println, (error) -> System.out.println(error.getMessage()));

Output of plain flux stream.

21:43:59.734 [Test worker] DEBUG reactor.util.Loggers -- Using Slf4j logging framework
21:43:59.741 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onSubscribe(FluxConcatArray.ConcatArraySubscriber)
21:43:59.742 [Test worker] INFO reactor.Flux.ConcatArray.1 -- request(unbounded)
21:43:59.742 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Phil)
21:43:59.742 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Zhou)
21:43:59.742 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Full)
21:43:59.742 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Stack)
21:43:59.742 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Programmer)
21:43:59.743 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(AWS)
21:43:59.743 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(DevOps)
21:43:59.743 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onComplete()

Flux stream with error element

Error element interrupt the Flux stream, but doesn't affect the elements before error.

    public void FluxTest() {
        Flux<?> stringFlux = Flux
                .just("Phil", "Zhou", "Full", "Stack", "Programmer")
                .concatWithValues("AWS", "DevOps")
                .concatWith(Flux.error(new RuntimeException("Intent to throw an exception within Flux Stream")))
                .concatWithValues("TypeScript", "Python")

        stringFlux.subscribe(System.out::println, (error) -> System.out.println(error.getMessage()));
21:48:14.628 [Test worker] DEBUG reactor.util.Loggers -- Using Slf4j logging framework
21:48:14.635 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onSubscribe(FluxConcatArray.ConcatArraySubscriber)
21:48:14.636 [Test worker] INFO reactor.Flux.ConcatArray.1 -- request(unbounded)
21:48:14.636 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Phil)
21:48:14.637 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Zhou)
21:48:14.637 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Full)
21:48:14.637 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Stack)
21:48:14.637 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(Programmer)
21:48:14.637 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(AWS)
21:48:14.637 [Test worker] INFO reactor.Flux.ConcatArray.1 -- onNext(DevOps)
21:48:14.638 [Test worker] ERROR reactor.Flux.ConcatArray.1 -- onError(java.lang.RuntimeException: Intent to throw an exception within Flux Stream)
21:48:14.638 [Test worker] ERROR reactor.Flux.ConcatArray.1 -- 
java.lang.RuntimeException: Intent to throw an exception within Flux Stream
	at com.geekchow.webflux.MonoFluxTest.FluxTest(
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Intent to throw an exception within Flux Stream

Reference Documentation

For further reference, please consider the following sections:


The following guides illustrate how to use some features concretely:

Additional Links

These additional references should also help you: