ciena/afkak

Consumer: FailedPayloadError when stopping consumer

Closed this issue · 2 comments

When the consumer is stopped, a Traceback with the FailedPayloadError is printed to stdout. This problem happens when running the consumer_example script.

To replicate this issue I followed this procedure

  • setup and activated a Python 2.7 virtual environment,
  • downloaded consumer_example , changed its mode to 755, updated the topic to be wutang (a topic that only has 5 messages) and changed the Consumer to start consuming from OFFSET_EARLIEST
  • installed afkak from the tip at this repo
  • executed the consumer_example script.

The following is the output of the last two steps:

(py27) » pip install https://github.com/ciena/afkak/archive/master.zip
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Please upgrade your Python as Python 2.7 won't be maintained after that date. A future version of pip will drop support for Python 2.7. More details about Python 2 support in pip, can be found at https://pip.pypa.io/en/latest/development/release-process/#python-2-support
Collecting https://github.com/ciena/afkak/archive/master.zip
  Downloading https://github.com/ciena/afkak/archive/master.zip
     - 1.3MB 3.7MB/s
Collecting six (from afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/73/fb/00a976f728d0d1fecfe898238ce23f502a721c0ac0ecfedb80e0d88c64e9/six-1.12.0-py2.py3-none-any.whl
Collecting Twisted>=18.7.0 (from afkak==19.6.0a1)
Collecting PyHamcrest>=1.9.0 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/9a/d5/d37fd731b7d0e91afcc84577edeccf4638b4f9b82f5ffe2f8b62e2ddc609/PyHamcrest-1.9.0-py2.py3-none-any.whl
Collecting zope.interface>=4.4.2 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/a2/a2/e68c37eb2ef9bf942e0ace19f4cf6fe3e7c650932fb587bfde3c608f7d77/zope.interface-4.6.0-cp27-cp27mu-manylinux1_x86_64.whl
Collecting Automat>=0.3.0 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/a3/86/14c16bb98a5a3542ed8fed5d74fb064a902de3bdd98d6584b34553353c45/Automat-0.7.0-py2.py3-none-any.whl
Collecting hyperlink>=17.1.1 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/7f/91/e916ca10a2de1cb7101a9b24da546fb90ee14629e23160086cf3361c4fb8/hyperlink-19.0.0-py2.py3-none-any.whl
Collecting attrs>=17.4.0 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/23/96/d828354fa2dbdf216eaa7b7de0db692f12c234f7ef888cc14980ef40d1d2/attrs-19.1.0-py2.py3-none-any.whl
Collecting incremental>=16.10.1 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/f5/1d/c98a587dc06e107115cf4a58b49de20b19222c83d75335a192052af4c4b7/incremental-17.5.0-py2.py3-none-any.whl
Collecting constantly>=15.1 (from Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/b9/65/48c1909d0c0aeae6c10213340ce682db01b48ea900a7d9fce7a7910ff318/constantly-15.1.0-py2.py3-none-any.whl
Requirement already satisfied: setuptools in ./py27/lib/python2.7/site-packages (from PyHamcrest>=1.9.0->Twisted>=18.7.0->afkak==19.6.0a1) (41.0.1)
Collecting idna>=2.5 (from hyperlink>=17.1.1->Twisted>=18.7.0->afkak==19.6.0a1)
  Using cached https://files.pythonhosted.org/packages/14/2c/cd551d81dbe15200be1cf41cd03869a46fe7226e7450af7a6545bfc474c9/idna-2.8-py2.py3-none-any.whl
Building wheels for collected packages: afkak
  Building wheel for afkak (setup.py) ... done
  Created wheel for afkak: filename=afkak-19.6.0a1-py2.py3-none-any.whl size=171403 sha256=88515ebc3f110662546223648e5a775f327a1750e41f5fbed76e1bb0651f2efa
  Stored in directory: /tmp/pip-ephem-wheel-cache-Se8Sm2/wheels/82/f3/6a/8e09678abee87d881ef9a51df33b29e6d73a1852cf38c3d60f
Successfully built afkak
Installing collected packages: six, PyHamcrest, zope.interface, attrs, Automat, idna, hyperlink, incremental, constantly, Twisted, afkak
Successfully installed Automat-0.7.0 PyHamcrest-1.9.0 Twisted-19.2.1 afkak-19.6.0a1 attrs-19.1.0 constantly-15.1.0 hyperlink-19.0.0 idna-2.8 incremental-17.5.0 six-1.12.0 zope.interface-4.6.0

(py27) » ./consumer_example 172.16.1.13:9092
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=0, message=<Message v0 key='HkQCjJda' value='ueYRT'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=1, message=<Message v0 key='HkQCjJda' value='nAjxY'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=2, message=<Message v0 key='HkQCjJda' value='VHOoH'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=3, message=<Message v0 key='HkQCjJda' value='afkyo'>)
consumer_example INFO Got message SourcedMessage(topic=u'wutang', partition=0, offset=4, message=<Message v0 key='HkQCjJda' value='sWcZb'>)
consumer_example INFO

consumer_example INFO Time is up, stopping consumers...
consumer_example ERROR Consumer failed: [Failure instance: Traceback: <class 'afkak.common.FailedPayloadsError'>: ([], [(FetchRequest(topic=u'wutang', partition=0, offset=5, max_bytes=131072), <twisted.python.failure.Failure twisted.internet.defer.CancelledError: >)])
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:460:callback
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:568:_startRunCallbacks
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:654:_runCallbacks
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:1475:gotResult
--- <exception caught here> ---
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/afkak/client.py:694:send_fetch_request
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/twisted/internet/defer.py:1418:_inlineCallbacks
/home/ubuntu/sandbox/afkak/py27/local/lib/python2.7/site-packages/afkak/client.py:1216:_send_broker_aware_request
]
afkak.brokerclient INFO _KafkaBrokerClient<node_id=1001 172.16.1.12:9092 connected>: Connection closed: <twisted.python.failure.Failure twisted.internet.error.ConnectionDone: Connection was closed cleanly.>

This problem also happens in a Python3 environment (which requires fixing consumer_example to replicate)
This problems does not happen with afkak 3.0.0

The consumer should probably ignore the CancelledError when stopping. I think it used to do that?

@rthille it did not in Afkak <= 3.0.0.