akkadotnet/Akka.Streams.Kafka

Consumer actor does not perform proper incoming requests cleanup

IgorFedchenko opened this issue · 0 comments

While working on #56 , it turned out that consumer actor does not perform incoming requests cleanup when requests are served. This means that actor continues pooling records from all partitions that were requested since actor was started. And this may lead to message loss (messages are consumed, but no one stage is waiting for it).

Need to add test for that (that should initially fail) and then fix this.

Note: The tricky part here is that alpakka consumer is able to fetch multiple messages from multiple partitions at the same time (or at least multiple messages from one partition). But confluent driver allows consuming one message at a time. This difference makes impact on how we should request consuming actor for new messages and keep track of given responses. We can not just ask for "give me all messages from this list of partitions", and wait for all of them at once - like it is implemented right now.