faust-streaming/faust

Faust agents die after some days

alihoseiny opened this issue · 8 comments

Actual behavior

After running the application for some days, agents die slowly and after a few days, no event consumes from topics. If I do not restart the docker container of the Faust app, all agents stop consumption after a few days.

Full traceback

* m_consumer.m_agent ----->
============================================================
['Stack for <coroutine object movie_updated_agent at 0x7f303428bce0> (most recent call last):\n  File "/project/m_consumer/movie_updated_consumer.py", line 22, in movie_updated_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']



* z_consumer.retry_send_product_agent ----->
============================================================
['Stack for <coroutine object retry_send_product_agent at 0x7f30341506b0> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f303428bf00> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f3034150af0> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f30341508d0> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object retry_send_product_agent at 0x7f3034150490> (most recent call last):\n  File "/project/z_consumer/retry_consumer.py", line 29, in retry_send_product_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']



* o_consumer.o_agent ----->
============================================================
['Stack for <coroutine object order_agent at 0x7f303419a750> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b3450> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034175d50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b2150> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034199450> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341b0e50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f30341c4850> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034198150> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f303419ba50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object order_agent at 0x7f3034174b50> (most recent call last):\n  File "/project/o_consumer/consumer.py", line 23, in order_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']



* z.z_consumer.send_user_agent ----->
============================================================
['Stack for <coroutine object send_user_agent at 0x7f30341f8710> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8dd0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9010> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8830> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8ef0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f95b0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8950> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9130> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f96d0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f84d0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8a70> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9250> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f85f0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f97f0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8b90> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9370> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f303427e7b0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8170> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f8cb0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n', 'Stack for <coroutine object send_user_agent at 0x7f30341f9490> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 50, in send_user_agent\n    async for records in stream.take(faust_config.max_stream_take, 1):\n  File "async_generator_asend", line -1, in [rest of traceback truncated]\n']




* zb_core.transport_layer.zb_product_sent_consumer.consumer.send_product_agent ----->
============================================================
['Stack for <coroutine object send_product_agent at 0x7f3034108dd0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108050> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108710> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109250> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108ef0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108170> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108830> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109370> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109010> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108950> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108290> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341fb770> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108b90> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341083b0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108a70> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341fbe30> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034109130> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341084d0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f3034108cb0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n', 'Stack for <coroutine object send_product_agent at 0x7f30341085f0> (most recent call last):\n  File "/project/z_consumer/consumer.py", line 54, in send_product_agent\n    await _wrap_send_product_to_zb(record=record, zb_interface=interface)\n  File "/project/z_consumer/consumer.py", line 34, in _wrap_send_product_to_zb\n    await asyncio.sleep(0)      # Skipping current event loop run for giving execution chance to other tasks.\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 630, in sleep\n    await __sleep0()\n  File "/usr/local/lib/python3.11/asyncio/tasks.py", line 624, in __sleep0\n    yield\n']

Sample code

Sample code of one the consumers:

@app.agent(event_sent_topic, concurrency=20)
async def send_event_agent(stream):
    """"""
    task_name = asyncio.current_task().get_name()
    interface = Interface()

    async for records in stream.take(10, 1):
        for record in records:
            send_event_tasks_queue[record.user_id].append(task_name)
            await _wrap_send_event(record=record, interface=interface)

        await asyncio.sleep(0)  # Skipping current event loop run for giving execution chance to other tasks.

Versions

  • Python version 3.11.3
  • Faust version 0.10.13
  • Operating system docker on CentOS (python:3.11.3-slim-buster )
  • Kafka version

For extra context, we also have following warning logs:

[2023-06-15 08:38:54,210] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.297 seconds
[2023-06-15 08:38:54,462] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.336312010884285 runtime=4.301220178604126e-05 sleeptime=1.336312010884285
[2023-06-15 08:38:54,838] [1] [WARNING] Executing <Task pending name='Task-190' coro=<Agent._execute_actor() running at /usr/local/lib/python3.11/site-packages/faust/agents/agent.py:674> cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/site-packages/faust/agents/agent.py:664> took 0.102 seconds
[2023-06-15 08:38:55,500] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.281 seconds
[2023-06-15 08:38:55,768] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.30657482892274857 runtime=4.564225673675537e-05 sleeptime=1.3065748289227486
[2023-06-15 08:38:56,789] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.280 seconds
[2023-06-15 08:38:58,079] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.283 seconds
[2023-06-15 08:38:59,384] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.296 seconds
[2023-06-15 08:38:59,629] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.3187522292137146 runtime=4.420429468154907e-05 sleeptime=1.3187522292137146
[2023-06-15 08:39:00,682] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.290 seconds
[2023-06-15 08:39:00,933] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.30384987592697144 runtime=5.805492401123047e-05 sleeptime=1.3038498759269714
[2023-06-15 08:39:01,969] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.277 seconds
[2023-06-15 08:39:03,264] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.289 seconds
[2023-06-15 08:39:04,560] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.287 seconds
[2023-06-15 08:39:05,851] [1] [WARNING] Executing <Task pending name='<coroutine object MethodQueueWorker._method_queue_do_work at 0x7f30346f8160>' coro=<Service._execute_task() running at /usr/local/lib/python3.11/site-packages/mode/services.py:843> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[Service._on_future_done()] created at /usr/local/lib/python3.11/asyncio/tasks.py:670> took 0.283 seconds
[2023-06-15 08:39:06,858] [1] [WARNING] [^--Consumer]: wait_empty: Waiting for tasks


[2023-06-15 08:55:59,973] [1] [INFO] Timer Monitor.sampler woke up too late, with a drift of +0.32391348481178284 runtime=4.1179358959198e-05 sleeptime=1.3239134848117828
[2023-06-15 08:56:01,013] [1] [WARNING] Executing <Task pending name='Task-763' coro=<AIOKafkaConnection._read() running at /usr/local/lib/python3.11/site-packages/aiokafka/conn.py:525> wait_for=<Future pending cb=[Task.task_wakeup()] created at /usr/local/lib/python3.11/asyncio/base_events.py:427> cb=[AIOKafkaConnection._on_read_task_error(<weakref at 0...x7f30230ac330>)()] created at /usr/local/lib/python3.11/site-packages/aiokafka/util.py:26> took 0.291 seconds

Likely a duplicate of #175 😦. If I figure out a solution, I'll immediately have a fix deployed.