Running with isolated_partitions and more than 1 worker crashes
ostetsenko opened this issue · 1 comments
ostetsenko commented
First mention of the same problem: robinhood/faust#181
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
- run worker instance with "isolated_partitions=True" when declaring the agent
... agent works good ..... - run other worker instance with "isolated_partitions=True" when declaring the agent
Expected behavior
Run normally with no crash, partitions are divided between workers after rebalancing
Actual behavior
Second worker gets all partitions and start processing.
After repartitioning, first worker crashed with an assertion error.
Logs of Worker1
[2023-06-29 15:11:21,275] [37364] [INFO] [^Worker]: Starting...
[2023-06-29 15:11:21,277] [37364] [INFO] [^-App]: Starting...
[2023-06-29 15:11:21,277] [37364] [INFO] [^--Monitor]: Starting...
[2023-06-29 15:11:21,277] [37364] [INFO] [^--Producer]: Starting...
[2023-06-29 15:11:21,278] [37364] [INFO] [^---ProducerBuffer]: Starting...
[2023-06-29 15:11:21,300] [37364] [INFO] [^--Consumer]: Starting...
[2023-06-29 15:11:21,300] [37364] [INFO] [^---AIOKafkaConsumerThread]: Starting...
[2023-06-29 15:11:21,313] [37364] [INFO] [^--LeaderAssignor]: Starting...
[2023-06-29 15:11:21,314] [37364] [INFO] [^--ReplyConsumer]: Starting...
[2023-06-29 15:11:21,316] [37364] [INFO] [^--AgentManager]: Starting...
[2023-06-29 15:11:21,317] [37364] [INFO] [^---Agent: __main__.read_agent]: Starting...
[2023-06-29 15:11:21,318] [37364] [INFO] [^----OneForOneSupervisor: (1@0x11373ea70)]: Starting...
[2023-06-29 15:11:21,318] [37364] [INFO] [^---Conductor]: Starting...
[2023-06-29 15:11:21,319] [37364] [INFO] [^--TableManager]: Starting...
[2023-06-29 15:11:21,319] [37364] [INFO] [^---Conductor]: Waiting for agents to start...
[2023-06-29 15:11:21,320] [37364] [INFO] [^---Conductor]: Waiting for tables to be registered...
[2023-06-29 15:11:22,320] [37364] [INFO] [^---Recovery]: Starting...
[2023-06-29 15:11:22,337] [37364] [INFO] Discovered coordinator 1 for group consumer
[2023-06-29 15:11:22,338] [37364] [INFO] Revoking previously assigned partitions set() for group consumer
[2023-06-29 15:11:22,339] [37364] [INFO] (Re-)joining group consumer
[2023-06-29 15:11:25,342] [37364] [INFO] Joined group 'consumer' (generation 12) with member_id faust-0.10.14-a26c1fa8-221b-45da-9879-6fee8d2576da
[2023-06-29 15:11:25,343] [37364] [INFO] Elected group leader -- performing partition assignments using faust
[2023-06-29 15:11:25,350] [37364] [INFO] Successfully synced group consumer with generation 12
[2023-06-29 15:11:25,350] [37364] [INFO] Setting newly assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0} │
│ test.faust.bugfix.partitions │ {0-2} │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:25,351] [37364] [INFO] Executing _on_partitions_assigned
[2023-06-29 15:11:25,352] [37364] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {1} │
└──────────────────────────────┴────────────┘
[2023-06-29 15:11:25,353] [37364] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {2} │
└──────────────────────────────┴────────────┘
[2023-06-29 15:11:25,355] [37364] [INFO] generation id 12 app consumers id 12
[2023-06-29 15:11:25,356] [37364] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2023-06-29 15:11:25,368] [37364] [INFO] [^---Recovery]: Resuming flow...
[2023-06-29 15:11:25,369] [37364] [INFO] [^---Fetcher]: Starting...
[2023-06-29 15:11:25,369] [37364] [INFO] [^---Recovery]: Worker ready
[2023-06-29 15:11:25,370] [37364] [INFO] [^Worker]: Ready
[2023-06-29 15:11:32,809] [37364] [INFO] stream:4621335280:partition:0:offset:0={'user': '46daec69-c5b4-452f-8c74-99b13e4b19a2', 'partition': 0, 'event_id': 0, 'time': '2023-06-29T15:11:32.691308'}
[2023-06-29 15:11:32,812] [37364] [INFO] stream:4621660032:partition:1:offset:0={'user': '2dc65358-e5c4-4ca5-a838-be82af753279', 'partition': 1, 'event_id': 0, 'time': '2023-06-29T15:11:32.805750'}
[2023-06-29 15:11:32,819] [37364] [INFO] stream:4621663824:partition:2:offset:0={'user': 'bc19eb14-39ce-44c7-84f6-1fc99cd0f9aa', 'partition': 2, 'event_id': 0, 'time': '2023-06-29T15:11:32.810987'}
[2023-06-29 15:11:46,367] [37364] [WARNING] Heartbeat failed for group consumer because it is rebalancing
[2023-06-29 15:11:46,368] [37364] [INFO] Revoking previously assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0} │
│ test.faust.bugfix.partitions │ {0-2} │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:46,369] [37364] [INFO] (Re-)joining group consumer
[2023-06-29 15:11:46,376] [37364] [INFO] Joined group 'consumer' (generation 13) with member_id faust-0.10.14-a26c1fa8-221b-45da-9879-6fee8d2576da
[2023-06-29 15:11:46,376] [37364] [INFO] Elected group leader -- performing partition assignments using faust
[2023-06-29 15:11:46,385] [37364] [INFO] Successfully synced group consumer with generation 13
[2023-06-29 15:11:46,386] [37364] [INFO] Setting newly assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0} │
│ test.faust.bugfix.partitions │ {1-2} │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:46,387] [37364] [INFO] Executing _on_partitions_assigned
[2023-06-29 15:11:46,388] [37364] [INFO] [^----Agent*: __main__.read_agent isolated={0}]: Stopping actor for revoked partition TP(topic='test.faust.bugfix.partitions', partition=0)...
[2023-06-29 15:11:46,389] [37364] [INFO] [^---Agent: __main__.read_agent]: Restarting on rebalance
[2023-06-29 15:11:46,389] [37364] [ERROR] [^----Agent*: __main__.read_agent isolated={0}]: Crashed reason=CancelledError()
Traceback (most recent call last):
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/agents/agent.py", line 674, in _execute_actor
await coro
File "/Users/o.stetsenko/forks/examples/consumer.py", line 26, in read_agent
async for events in stream.noack().noack_take(1, 1):
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/streams.py", line 667, in noack_take
await self.wait_for_stopped(buffer_full, timeout=timeout)
File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 704, in wait_for_stopped
return (await self.wait(*coros, timeout=timeout)).stopped
File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 711, in wait
return await self._wait_one(coros[0], timeout=timeout)
File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 779, in _wait_one
results = await self.wait_first(coro, timeout=timeout)
File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 752, in wait_first
done, pending = await asyncio.wait(
File "/Users/o.stetsenko/.pyenv/versions/3.10.6/lib/python3.10/asyncio/tasks.py", line 384, in wait
return await _wait(fs, timeout, return_when, loop)
File "/Users/o.stetsenko/.pyenv/versions/3.10.6/lib/python3.10/asyncio/tasks.py", line 491, in _wait
await waiter
asyncio.exceptions.CancelledError
[2023-06-29 15:11:46,427] [37364] [INFO] [^----OneForOneSupervisor: (3@0x11373ea70)]: Restarting dead <Agent*: __main__.read_agent isolated={0}>! Last crash reason: CancelledError()
NoneType: None
[2023-06-29 15:11:46,429] [37364] [ERROR] [^-App]: Crashed reason=AssertionError()
Traceback (most recent call last):
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/app/base.py", line 1767, in _on_partitions_assigned
await T(self.topics.on_partitions_assigned)(assigned)
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/utils/tracing.py", line 133, in corowrapped
await_ret = await ret
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/transport/conductor.py", line 347, in on_partitions_assigned
T(self._update_tp_index)(assigned)
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/utils/tracing.py", line 92, in _inner
return call_with_trace(child, fun, on_exit, *args, **kwargs)
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/utils/tracing.py", line 122, in call_with_trace
ret = fun(*args, **kwargs)
File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/transport/conductor.py", line 366, in _update_tp_index
assert topic.active_partitions.issubset(assigned)
AssertionError
[2023-06-29 15:11:46,445] [37364] [INFO] [^Worker]: Stopping...
[2023-06-29 15:11:46,445] [37364] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[2023-06-29 15:11:46,445] [37364] [INFO] [^-App]: Stopping...
[2023-06-29 15:11:46,446] [37364] [INFO] [^---Fetcher]: Stopping...
[2023-06-29 15:11:46,446] [37364] [INFO] [^-App]: Wait for streams...
[2023-06-29 15:11:46,446] [37364] [INFO] [^--TableManager]: Stopping...
[2023-06-29 15:11:46,446] [37364] [INFO] [^---Fetcher]: Stopping...
[2023-06-29 15:11:46,447] [37364] [INFO] [^---Recovery]: Stopping...
[2023-06-29 15:11:46,448] [37364] [INFO] [^-App]: Flush producer buffer...
[2023-06-29 15:11:46,449] [37364] [INFO] [^---Conductor]: Stopping...
[2023-06-29 15:11:46,450] [37364] [INFO] [^--AgentManager]: Stopping...
[2023-06-29 15:11:46,451] [37364] [INFO] [^---Agent: __main__.read_agent]: Stopping...
[2023-06-29 15:11:46,451] [37364] [INFO] [^----OneForOneSupervisor: (3@0x11373ea70)]: Stopping...
[2023-06-29 15:11:46,453] [37364] [INFO] LeaveGroup request succeeded
[2023-06-29 15:11:46,453] [37364] [INFO] [^--ReplyConsumer]: Stopping...
[2023-06-29 15:11:46,454] [37364] [INFO] [^--LeaderAssignor]: Stopping...
[2023-06-29 15:11:46,455] [37364] [INFO] [^--Consumer]: Stopping...
[2023-06-29 15:11:46,455] [37364] [INFO] [^---AIOKafkaConsumerThread]: Stopping...
[2023-06-29 15:11:46,457] [37364] [INFO] [^--Producer]: Stopping...
[2023-06-29 15:11:46,457] [37364] [INFO] [^---ProducerBuffer]: Stopping...
[2023-06-29 15:11:46,458] [37364] [INFO] [^--Monitor]: Stopping...
[2023-06-29 15:11:46,459] [37364] [INFO] [^Worker]: Gathering service tasks...
[2023-06-29 15:11:46,459] [37364] [INFO] [^Worker]: Gathering all futures...
[2023-06-29 15:11:47,460] [37364] [INFO] [^Worker]: Closing event loop
[2023-06-29 15:11:47,460] [37364] [CRITICAL] [^Worker]: We experienced a crash! Reraising original exception...
Logs of Worker2
[2023-06-29 15:11:44,203] [37402] [INFO] [^Worker]: Starting...
[2023-06-29 15:11:44,205] [37402] [INFO] [^-App]: Starting...
[2023-06-29 15:11:44,205] [37402] [INFO] [^--Monitor]: Starting...
[2023-06-29 15:11:44,206] [37402] [INFO] [^--Producer]: Starting...
[2023-06-29 15:11:44,206] [37402] [INFO] [^---ProducerBuffer]: Starting...
[2023-06-29 15:11:44,227] [37402] [INFO] [^--Consumer]: Starting...
[2023-06-29 15:11:44,228] [37402] [INFO] [^---AIOKafkaConsumerThread]: Starting...
[2023-06-29 15:11:44,243] [37402] [INFO] [^--LeaderAssignor]: Starting...
[2023-06-29 15:11:44,244] [37402] [INFO] [^--ReplyConsumer]: Starting...
[2023-06-29 15:11:44,244] [37402] [INFO] [^--AgentManager]: Starting...
[2023-06-29 15:11:44,245] [37402] [INFO] [^---Agent: __main__.read_agent]: Starting...
[2023-06-29 15:11:44,246] [37402] [INFO] [^----OneForOneSupervisor: (1@0x112caea10)]: Starting...
[2023-06-29 15:11:44,246] [37402] [INFO] [^---Conductor]: Starting...
[2023-06-29 15:11:44,246] [37402] [INFO] [^--TableManager]: Starting...
[2023-06-29 15:11:44,247] [37402] [INFO] [^---Conductor]: Waiting for agents to start...
[2023-06-29 15:11:44,247] [37402] [INFO] [^---Conductor]: Waiting for tables to be registered...
[2023-06-29 15:11:45,247] [37402] [INFO] [^---Recovery]: Starting...
[2023-06-29 15:11:45,252] [37402] [INFO] Updating subscribed topics to:
┌Requested Subscription────────┐
│ topic name │
├──────────────────────────────┤
│ consumer-__assignor-__leader │
│ test.faust.bugfix.partitions │
└──────────────────────────────┘
[2023-06-29 15:11:45,253] [37402] [INFO] Subscribed to topic(s):
┌Final Subscription────────────┐
│ topic name │
├──────────────────────────────┤
│ consumer-__assignor-__leader │
│ test.faust.bugfix.partitions │
└──────────────────────────────┘
[2023-06-29 15:11:45,266] [37402] [INFO] Discovered coordinator 1 for group consumer
[2023-06-29 15:11:45,267] [37402] [INFO] Revoking previously assigned partitions set() for group consumer
[2023-06-29 15:11:45,268] [37402] [INFO] (Re-)joining group consumer
[2023-06-29 15:11:46,375] [37402] [INFO] Joined group 'consumer' (generation 13) with member_id faust-0.10.14-2c255bcd-6190-4d4a-850a-2076f4067061
[2023-06-29 15:11:46,383] [37402] [INFO] Successfully synced group consumer with generation 13
[2023-06-29 15:11:46,384] [37402] [INFO] Setting newly assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {0} │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:46,385] [37402] [INFO] Executing _on_partitions_assigned
[2023-06-29 15:11:46,387] [37402] [INFO] generation id 13 app consumers id 13
[2023-06-29 15:11:46,388] [37402] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2023-06-29 15:11:46,392] [37402] [INFO] [^---Recovery]: Resuming flow...
[2023-06-29 15:11:46,392] [37402] [INFO] [^---Fetcher]: Starting...
[2023-06-29 15:11:46,393] [37402] [INFO] [^---Recovery]: Worker ready
[2023-06-29 15:11:46,393] [37402] [INFO] [^Worker]: Ready
[2023-06-29 15:11:49,390] [37402] [WARNING] Heartbeat failed for group consumer because it is rebalancing
[2023-06-29 15:11:49,391] [37402] [INFO] Revoking previously assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {0} │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:49,392] [37402] [INFO] (Re-)joining group consumer
[2023-06-29 15:11:49,400] [37402] [INFO] Joined group 'consumer' (generation 14) with member_id faust-0.10.14-2c255bcd-6190-4d4a-850a-2076f4067061
[2023-06-29 15:11:49,401] [37402] [INFO] Elected group leader -- performing partition assignments using faust
[2023-06-29 15:11:49,412] [37402] [INFO] Successfully synced group consumer with generation 14
[2023-06-29 15:11:49,413] [37402] [INFO] Setting newly assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0} │
│ test.faust.bugfix.partitions │ {0-2} │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:49,413] [37402] [INFO] Executing _on_partitions_assigned
[2023-06-29 15:11:49,414] [37402] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {1} │
└──────────────────────────────┴────────────┘
[2023-06-29 15:11:49,420] [37402] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions
┌Topic Partition Set───────────┬────────────┐
│ topic │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {2} │
└──────────────────────────────┴────────────┘
[2023-06-29 15:11:49,423] [37402] [INFO] generation id 14 app consumers id 14
[2023-06-29 15:11:49,424] [37402] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2023-06-29 15:11:49,441] [37402] [INFO] [^---Recovery]: Resuming flow...
[2023-06-29 15:11:49,441] [37402] [INFO] [^---Recovery]: Worker ready
[2023-06-29 15:11:55,172] [37402] [INFO] stream:4610259600:partition:0:offset:1={'user': '01b790cb-b645-44ac-a313-59c39a91c2e5', 'partition': 0, 'event_id': 0, 'time': '2023-06-29T15:11:55.052464'}
[2023-06-29 15:11:55,178] [37402] [INFO] stream:4610522176:partition:1:offset:1={'user': 'fa5a91a7-33ae-4c28-9d5c-c83575add8d2', 'partition': 1, 'event_id': 0, 'time': '2023-06-29T15:11:55.167866'}
[2023-06-29 15:11:55,185] [37402] [INFO] stream:4611049456:partition:2:offset:1={'user': '439180a6-f624-4423-92c4-09256b9af09e', 'partition': 2, 'event_id': 0, 'time': '2023-06-29T15:11:55.175868'}
Versions
- Python version: 3.10.6
- Faust version: 0.10.14
- Operating system: macOS 13.4.1
Reproduce
- Run services:
$ docker-compose up
- Create topic: go to http://127.0.0.1:8090/; create topics
test.faust.bugfix.partitions
with 3 partitions - Run consumer:
$ make consume
(worker 1) (in separate terminal 1) - Run producer:
$ make produce
(in separate terminal 2) - Run consumer:
$ make consume
(worker 2) (in separate terminal 3)
Worker 1 crashed.
ostetsenko commented
@wbarnha could you check PR?