ConsumerObservable doesn't propagate the cancellation
cakper opened this issue · 0 comments
cakper commented
During the upgrade to 1.0.0-RC5
we've noticed that the consumer keeps pooling messaged after it was terminated. A minimal reproduction case was added to this PR:
https://github.com/monix/monix-kafka/compare/master...cakper:cancelable-repro?expand=1
I've noticed that if the doAfterSubscribe
is set before bufferTimedWithPressure
then the consumer is not terminated properly, but if I change it to be after then it is.
Upon investigation @Avasil has found in the Observer implementation:
https://github.com/monix/monix-kafka/blob/v1.0.0-RC5/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala#L77
Observer.feed always returns Continue if iterator is empty so downstream doesn't get the chance to propagate cancelation and it keeps polling until there is anything available in the topic
I'm not sure about the fix though, on the user side you could probably use takeUntil or takeUntilEval + Deferred which will cancel the subscription (along with the Task) or just cancel the observableTask itself