This is a reproducible case for issue eclipse-vertx/vert.x#4479
Start the http server:
mvn clean compile exec:java
It will start an http server on port 80.
Make a request:
curl -D- http://localhost
The response should look like the following:
curl -D- http://localhost
HTTP/1.1 200 OK
transfer-encoding: chunked
Chunk 0
Chunk 1
Chunk 2
Chunk 3
Chunk 4
...
And you should see the following log on the server log console:
2022-09-11 11:44:05.879 [INFO ] io.gravitee.vertx.rx_cancel_issue.MainVerticle: HTTP listener ready to accept requests on port 80
2022-09-11 11:44:11.618 [INFO ] io.gravitee.vertx.rx_cancel_issue.MainVerticle: Start generating chunks
Type CTRL-C to interrupt the curl command.
See the stack trace on the server logs:
2022-09-11 11:44:17.665 [ERROR] io.vertx.core.impl.ContextBase: Unhandled exception
java.lang.IllegalStateException: Response has already been written
at io.vertx.core.http.impl.Http1xServerResponse.checkValid(Http1xServerResponse.java:682)
at io.vertx.core.http.impl.Http1xServerResponse.writeQueueFull(Http1xServerResponse.java:274)
at io.vertx.core.streams.impl.PipeImpl.lambda$to$1(PipeImpl.java:82)
at io.vertx.reactivex.impl.ReadStreamSubscriber.checkStatus(ReadStreamSubscriber.java:166)
at io.vertx.reactivex.impl.ReadStreamSubscriber.onNext(ReadStreamSubscriber.java:220)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:407)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ImmediateThinScheduler$ImmediateThinWorker.schedule(ImmediateThinScheduler.java:89)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.trySchedule(FlowableObserveOn.java:166)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:117)
at io.reactivex.subscribers.SerializedSubscriber.onNext(SerializedSubscriber.java:100)
at io.reactivex.internal.operators.flowable.FlowableDelay$DelaySubscriber$OnNext.run(FlowableDelay.java:114)
at io.vertx.reactivex.ContextScheduler$ContextWorker$TimedAction.run(ContextScheduler.java:184)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
It appears that cancellation is not propagated to the flowable that producing buffer:
private Flowable<Buffer> generateMessageFlow() {
return Flowable
.<Buffer, Long>generate(
() -> 0L,
(state, emitter) -> {
emitter.onNext(Buffer.buffer("Chunk " + state + "\n"));
return state + 1;
}
)
.delay(1000, TimeUnit.MILLISECONDS)
.rebatchRequests(1)
.doOnCancel(() -> log.info("Cancelled. Stop generating chunks")) // <-- Never executed.
.doOnSubscribe(subscription -> log.info("Start generating chunks"));
}