Kafka test hangs on "await subscriber.get()"
rafalp opened this issue · 2 comments
We now have tests suite courtesy of #11, but test for Kafka backend hands indefinitely on await subscriber.get()
, causing tests suite to timeout:
broadcaster/tests/test_broadcast.py
Line 43 in 435c35e
Sadly, I'm out of my depth here, and I'm unable to diagnose if this is problem with Kafka backend or test setup.
Help is welcome.
Maybe unrelated, but there's something strange about the subscribe/unsubscribe implementations in the Kafka backend:
https://github.com/encode/broadcaster/blob/master/broadcaster/_backends/kafka.py
Essentially when we subscribe to a new channel, we add it to a set of channels, then get the Kafka consumer to subscribe to all those channels (topics). And when we unsubscribe from a specific channel, we unsubscribe, period. I wonder if this can cause issues when subscribing from two or more channels independently. Probably not related to the test though since we only subscribe to a si single "chatroom" channel there.
Anyway, one way to narrow it down would be to look if it hangs on getting an item from the internal subscriber queue, or perhaps more specifically waiting for the "getone()" operation on the Kafka consumer to return an item. In the latter case it might be that the topic hasn't actually received the message, ie that the publish() operation didn't work correctly. If I remember correctly Kafka producers typically have an internal buffer that they don't necessarily flush right away (Kafka was designed for high volume streaming use cases). Perhaps we need to force-flush that, or something?
Some experimentation result mentioned in #44 (comment)
seems the issue is with test.