
Consumer: FailedPayloadError when stopping consumer

When the consumer is stopped, a Traceback with the FailedPayloadError is printed to stdout. This problem happens when running the consumer_example script.

To replicate this issue I followed this procedure

  • setup and activated a Python 2.7 virtual environment,
  • downloaded consumer_example , changed its mode to 755, updated the topic to be wutang (a topic that only has 5 messages) and changed the Consumer to start consuming from OFFSET_EARLIEST
  • installed afkak from the tip at this repo
  • executed the consumer_example script.

The following is the output of the last two steps:

(py27) » ./consumer_example
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=0, message=<Message v0 key='HkQCjJda' value='ueYRT'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=1, message=<Message v0 key='HkQCjJda' value='nAjxY'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=2, message=<Message v0 key='HkQCjJda' value='VHOoH'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=3, message=<Message v0 key='HkQCjJda' value='afkyo'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=4, message=<Message v0 key='HkQCjJda' value='sWcZb'>)
consumer_example INFO

consumer_example INFO Time is up, stopping consumers...
consumer_example ERROR Consumer failed: [Failure instance: Traceback: <class 'afkak.common.FailedPayloadsError'>: ([], [(FetchRequest(topic=u'wutang', partition=0, offset=5, max_bytes=131072), <twisted.python.failure.Failure twisted.internet.defer.CancelledError: >)])
--- <exception caught here> ---
afkak.brokerclient INFO _KafkaBrokerClient<node_id=1001 connected>: Connection closed: <twisted.python.failure.Failure twisted.internet.error.ConnectionDone: Connection was closed cleanly.>

This problem also happens in a Python3 environment (which requires fixing consumer_example to replicate)
This problems does not happen with afkak 3.0.0

The consumer should probably ignore the CancelledError when stopping. I think it used to do that?

@rthille it did not in Afkak <= 3.0.0.