monix/monix-kafka

ConsumerObservable doesn't propagate the cancellation

cakper opened this issue · 0 comments

During the upgrade to 1.0.0-RC5 we've noticed that the consumer keeps pooling messaged after it was terminated. A minimal reproduction case was added to this PR:
https://github.com/monix/monix-kafka/compare/master...cakper:cancelable-repro?expand=1

I've noticed that if the doAfterSubscribe is set before bufferTimedWithPressure then the consumer is not terminated properly, but if I change it to be after then it is.

Upon investigation @Avasil has found in the Observer implementation:

https://github.com/monix/monix-kafka/blob/v1.0.0-RC5/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableAutoCommit.scala#L77

Observer.feed always returns Continue if iterator is empty so downstream doesn't get the chance to propagate cancelation and it keeps polling until there is anything available in the topic

I'm not sure about the fix though, on the user side you could probably use takeUntil or takeUntilEval + Deferred which will cancel the subscription (along with the Task) or just cancel the observableTask itself