/rx-cancel-issue

Reproducible vertx Rx issue

Primary LanguageJava

Rx-cancel-issue

vert.x 4.3.3 purple

This is a reproducible case for issue eclipse-vertx/vert.x#4479

Build & test

Start the http server:

mvn clean compile exec:java

It will start an http server on port 80.

Make a request:

curl -D- http://localhost

The response should look like the following:

curl -D- http://localhost
HTTP/1.1 200 OK
transfer-encoding: chunked

Chunk 0
Chunk 1
Chunk 2
Chunk 3
Chunk 4
...

And you should see the following log on the server log console:

2022-09-11 11:44:05.879 [INFO ] io.gravitee.vertx.rx_cancel_issue.MainVerticle: HTTP listener ready to accept requests on port 80
2022-09-11 11:44:11.618 [INFO ] io.gravitee.vertx.rx_cancel_issue.MainVerticle: Start generating chunks

Type CTRL-C to interrupt the curl command.

See the stack trace on the server logs:

2022-09-11 11:44:17.665 [ERROR] io.vertx.core.impl.ContextBase: Unhandled exception
java.lang.IllegalStateException: Response has already been written
at io.vertx.core.http.impl.Http1xServerResponse.checkValid(Http1xServerResponse.java:682)
at io.vertx.core.http.impl.Http1xServerResponse.writeQueueFull(Http1xServerResponse.java:274)
at io.vertx.core.streams.impl.PipeImpl.lambda$to$1(PipeImpl.java:82)
at io.vertx.reactivex.impl.ReadStreamSubscriber.checkStatus(ReadStreamSubscriber.java:166)
at io.vertx.reactivex.impl.ReadStreamSubscriber.onNext(ReadStreamSubscriber.java:220)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ImmediateThinScheduler$ImmediateThinWorker.schedule(ImmediateThinScheduler.java:89)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.trySchedule(FlowableObserveOn.java:166)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:117)
at io.reactivex.subscribers.SerializedSubscriber.onNext(SerializedSubscriber.java:100)
at io.reactivex.internal.operators.flowable.FlowableDelay$DelaySubscriber$OnNext.run(FlowableDelay.java:114)
at io.vertx.reactivex.ContextScheduler$ContextWorker$TimedAction.run(ContextScheduler.java:184)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)

It appears that cancellation is not propagated to the flowable that producing buffer:

private Flowable<Buffer> generateMessageFlow() {
    return Flowable
      .<Buffer, Long>generate(
        () -> 0L,
        (state, emitter) -> {
          emitter.onNext(Buffer.buffer("Chunk " + state + "\n"));
          return state + 1;
        }
      )
      .delay(1000, TimeUnit.MILLISECONDS)
      .rebatchRequests(1)
      .doOnCancel(() -> log.info("Cancelled. Stop generating chunks")) // <-- Never executed.
      .doOnSubscribe(subscription -> log.info("Start generating chunks"));
  }