faust-streaming/faust

Running with isolated_partitions and more than 1 worker crashes

ostetsenko opened this issue · 1 comments

First mention of the same problem: robinhood/faust#181

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

  • run worker instance with "isolated_partitions=True" when declaring the agent
    ... agent works good .....
  • run other worker instance with "isolated_partitions=True" when declaring the agent

Expected behavior

Run normally with no crash, partitions are divided between workers after rebalancing

Actual behavior

Second worker gets all partitions and start processing.
After repartitioning, first worker crashed with an assertion error.

Logs of Worker1

[2023-06-29 15:11:21,275] [37364] [INFO] [^Worker]: Starting...
[2023-06-29 15:11:21,277] [37364] [INFO] [^-App]: Starting...
[2023-06-29 15:11:21,277] [37364] [INFO] [^--Monitor]: Starting...
[2023-06-29 15:11:21,277] [37364] [INFO] [^--Producer]: Starting...
[2023-06-29 15:11:21,278] [37364] [INFO] [^---ProducerBuffer]: Starting...
[2023-06-29 15:11:21,300] [37364] [INFO] [^--Consumer]: Starting...
[2023-06-29 15:11:21,300] [37364] [INFO] [^---AIOKafkaConsumerThread]: Starting...
[2023-06-29 15:11:21,313] [37364] [INFO] [^--LeaderAssignor]: Starting...
[2023-06-29 15:11:21,314] [37364] [INFO] [^--ReplyConsumer]: Starting...
[2023-06-29 15:11:21,316] [37364] [INFO] [^--AgentManager]: Starting...
[2023-06-29 15:11:21,317] [37364] [INFO] [^---Agent: __main__.read_agent]: Starting...
[2023-06-29 15:11:21,318] [37364] [INFO] [^----OneForOneSupervisor: (1@0x11373ea70)]: Starting...
[2023-06-29 15:11:21,318] [37364] [INFO] [^---Conductor]: Starting...
[2023-06-29 15:11:21,319] [37364] [INFO] [^--TableManager]: Starting...
[2023-06-29 15:11:21,319] [37364] [INFO] [^---Conductor]: Waiting for agents to start...
[2023-06-29 15:11:21,320] [37364] [INFO] [^---Conductor]: Waiting for tables to be registered...
[2023-06-29 15:11:22,320] [37364] [INFO] [^---Recovery]: Starting...
[2023-06-29 15:11:22,337] [37364] [INFO] Discovered coordinator 1 for group consumer
[2023-06-29 15:11:22,338] [37364] [INFO] Revoking previously assigned partitions set() for group consumer
[2023-06-29 15:11:22,339] [37364] [INFO] (Re-)joining group consumer
[2023-06-29 15:11:25,342] [37364] [INFO] Joined group 'consumer' (generation 12) with member_id faust-0.10.14-a26c1fa8-221b-45da-9879-6fee8d2576da
[2023-06-29 15:11:25,343] [37364] [INFO] Elected group leader -- performing partition assignments using faust
[2023-06-29 15:11:25,350] [37364] [INFO] Successfully synced group consumer with generation 12
[2023-06-29 15:11:25,350] [37364] [INFO] Setting newly assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0}        │
│ test.faust.bugfix.partitions │ {0-2}      │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:25,351] [37364] [INFO] Executing _on_partitions_assigned
[2023-06-29 15:11:25,352] [37364] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {1}        │
└──────────────────────────────┴────────────┘
[2023-06-29 15:11:25,353] [37364] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {2}        │
└──────────────────────────────┴────────────┘
[2023-06-29 15:11:25,355] [37364] [INFO] generation id 12 app consumers id 12
[2023-06-29 15:11:25,356] [37364] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2023-06-29 15:11:25,368] [37364] [INFO] [^---Recovery]: Resuming flow...
[2023-06-29 15:11:25,369] [37364] [INFO] [^---Fetcher]: Starting...
[2023-06-29 15:11:25,369] [37364] [INFO] [^---Recovery]: Worker ready
[2023-06-29 15:11:25,370] [37364] [INFO] [^Worker]: Ready
[2023-06-29 15:11:32,809] [37364] [INFO] stream:4621335280:partition:0:offset:0={'user': '46daec69-c5b4-452f-8c74-99b13e4b19a2', 'partition': 0, 'event_id': 0, 'time': '2023-06-29T15:11:32.691308'}
[2023-06-29 15:11:32,812] [37364] [INFO] stream:4621660032:partition:1:offset:0={'user': '2dc65358-e5c4-4ca5-a838-be82af753279', 'partition': 1, 'event_id': 0, 'time': '2023-06-29T15:11:32.805750'}
[2023-06-29 15:11:32,819] [37364] [INFO] stream:4621663824:partition:2:offset:0={'user': 'bc19eb14-39ce-44c7-84f6-1fc99cd0f9aa', 'partition': 2, 'event_id': 0, 'time': '2023-06-29T15:11:32.810987'}
[2023-06-29 15:11:46,367] [37364] [WARNING] Heartbeat failed for group consumer because it is rebalancing
[2023-06-29 15:11:46,368] [37364] [INFO] Revoking previously assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0}        │
│ test.faust.bugfix.partitions │ {0-2}      │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:46,369] [37364] [INFO] (Re-)joining group consumer
[2023-06-29 15:11:46,376] [37364] [INFO] Joined group 'consumer' (generation 13) with member_id faust-0.10.14-a26c1fa8-221b-45da-9879-6fee8d2576da
[2023-06-29 15:11:46,376] [37364] [INFO] Elected group leader -- performing partition assignments using faust
[2023-06-29 15:11:46,385] [37364] [INFO] Successfully synced group consumer with generation 13
[2023-06-29 15:11:46,386] [37364] [INFO] Setting newly assigned partitions
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0}        │
│ test.faust.bugfix.partitions │ {1-2}      │
└──────────────────────────────┴────────────┘ for group consumer
[2023-06-29 15:11:46,387] [37364] [INFO] Executing _on_partitions_assigned
[2023-06-29 15:11:46,388] [37364] [INFO] [^----Agent*: __main__.read_agent isolated={0}]: Stopping actor for revoked partition TP(topic='test.faust.bugfix.partitions', partition=0)...
[2023-06-29 15:11:46,389] [37364] [INFO] [^---Agent: __main__.read_agent]: Restarting on rebalance
[2023-06-29 15:11:46,389] [37364] [ERROR] [^----Agent*: __main__.read_agent isolated={0}]: Crashed reason=CancelledError()
Traceback (most recent call last):
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/agents/agent.py", line 674, in _execute_actor
    await coro
  File "/Users/o.stetsenko/forks/examples/consumer.py", line 26, in read_agent
    async for events in stream.noack().noack_take(1, 1):
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/streams.py", line 667, in noack_take
    await self.wait_for_stopped(buffer_full, timeout=timeout)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 704, in wait_for_stopped
    return (await self.wait(*coros, timeout=timeout)).stopped
  File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 711, in wait
    return await self._wait_one(coros[0], timeout=timeout)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 779, in _wait_one
    results = await self.wait_first(coro, timeout=timeout)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/mode/services.py", line 752, in wait_first
    done, pending = await asyncio.wait(
  File "/Users/o.stetsenko/.pyenv/versions/3.10.6/lib/python3.10/asyncio/tasks.py", line 384, in wait
    return await _wait(fs, timeout, return_when, loop)
  File "/Users/o.stetsenko/.pyenv/versions/3.10.6/lib/python3.10/asyncio/tasks.py", line 491, in _wait
    await waiter
asyncio.exceptions.CancelledError
[2023-06-29 15:11:46,427] [37364] [INFO] [^----OneForOneSupervisor: (3@0x11373ea70)]: Restarting dead <Agent*: __main__.read_agent isolated={0}>! Last crash reason: CancelledError()
NoneType: None
[2023-06-29 15:11:46,429] [37364] [ERROR] [^-App]: Crashed reason=AssertionError()
Traceback (most recent call last):
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/app/base.py", line 1767, in _on_partitions_assigned
    await T(self.topics.on_partitions_assigned)(assigned)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/utils/tracing.py", line 133, in corowrapped
    await_ret = await ret
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/transport/conductor.py", line 347, in on_partitions_assigned
    T(self._update_tp_index)(assigned)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/utils/tracing.py", line 92, in _inner
    return call_with_trace(child, fun, on_exit, *args, **kwargs)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/utils/tracing.py", line 122, in call_with_trace
    ret = fun(*args, **kwargs)
  File "/Users/user/venv/examples/lib/python3.10/site-packages/faust/transport/conductor.py", line 366, in _update_tp_index
    assert topic.active_partitions.issubset(assigned)
AssertionError
[2023-06-29 15:11:46,445] [37364] [INFO] [^Worker]: Stopping...
[2023-06-29 15:11:46,445] [37364] [INFO] [^--Consumer]: Consumer shutting down for user cancel.
[2023-06-29 15:11:46,445] [37364] [INFO] [^-App]: Stopping...
[2023-06-29 15:11:46,446] [37364] [INFO] [^---Fetcher]: Stopping...
[2023-06-29 15:11:46,446] [37364] [INFO] [^-App]: Wait for streams...
[2023-06-29 15:11:46,446] [37364] [INFO] [^--TableManager]: Stopping...
[2023-06-29 15:11:46,446] [37364] [INFO] [^---Fetcher]: Stopping...
[2023-06-29 15:11:46,447] [37364] [INFO] [^---Recovery]: Stopping...
[2023-06-29 15:11:46,448] [37364] [INFO] [^-App]: Flush producer buffer...
[2023-06-29 15:11:46,449] [37364] [INFO] [^---Conductor]: Stopping...
[2023-06-29 15:11:46,450] [37364] [INFO] [^--AgentManager]: Stopping...
[2023-06-29 15:11:46,451] [37364] [INFO] [^---Agent: __main__.read_agent]: Stopping...
[2023-06-29 15:11:46,451] [37364] [INFO] [^----OneForOneSupervisor: (3@0x11373ea70)]: Stopping...
[2023-06-29 15:11:46,453] [37364] [INFO] LeaveGroup request succeeded
[2023-06-29 15:11:46,453] [37364] [INFO] [^--ReplyConsumer]: Stopping...
[2023-06-29 15:11:46,454] [37364] [INFO] [^--LeaderAssignor]: Stopping...
[2023-06-29 15:11:46,455] [37364] [INFO] [^--Consumer]: Stopping...
[2023-06-29 15:11:46,455] [37364] [INFO] [^---AIOKafkaConsumerThread]: Stopping...
[2023-06-29 15:11:46,457] [37364] [INFO] [^--Producer]: Stopping...
[2023-06-29 15:11:46,457] [37364] [INFO] [^---ProducerBuffer]: Stopping...
[2023-06-29 15:11:46,458] [37364] [INFO] [^--Monitor]: Stopping...
[2023-06-29 15:11:46,459] [37364] [INFO] [^Worker]: Gathering service tasks...
[2023-06-29 15:11:46,459] [37364] [INFO] [^Worker]: Gathering all futures...
[2023-06-29 15:11:47,460] [37364] [INFO] [^Worker]: Closing event loop
[2023-06-29 15:11:47,460] [37364] [CRITICAL] [^Worker]: We experienced a crash! Reraising original exception...

Logs of Worker2

[2023-06-29 15:11:44,203] [37402] [INFO] [^Worker]: Starting... 
[2023-06-29 15:11:44,205] [37402] [INFO] [^-App]: Starting... 
[2023-06-29 15:11:44,205] [37402] [INFO] [^--Monitor]: Starting... 
[2023-06-29 15:11:44,206] [37402] [INFO] [^--Producer]: Starting... 
[2023-06-29 15:11:44,206] [37402] [INFO] [^---ProducerBuffer]: Starting... 
[2023-06-29 15:11:44,227] [37402] [INFO] [^--Consumer]: Starting... 
[2023-06-29 15:11:44,228] [37402] [INFO] [^---AIOKafkaConsumerThread]: Starting... 
[2023-06-29 15:11:44,243] [37402] [INFO] [^--LeaderAssignor]: Starting... 
[2023-06-29 15:11:44,244] [37402] [INFO] [^--ReplyConsumer]: Starting... 
[2023-06-29 15:11:44,244] [37402] [INFO] [^--AgentManager]: Starting... 
[2023-06-29 15:11:44,245] [37402] [INFO] [^---Agent: __main__.read_agent]: Starting... 
[2023-06-29 15:11:44,246] [37402] [INFO] [^----OneForOneSupervisor: (1@0x112caea10)]: Starting... 
[2023-06-29 15:11:44,246] [37402] [INFO] [^---Conductor]: Starting... 
[2023-06-29 15:11:44,246] [37402] [INFO] [^--TableManager]: Starting... 
[2023-06-29 15:11:44,247] [37402] [INFO] [^---Conductor]: Waiting for agents to start... 
[2023-06-29 15:11:44,247] [37402] [INFO] [^---Conductor]: Waiting for tables to be registered... 
[2023-06-29 15:11:45,247] [37402] [INFO] [^---Recovery]: Starting... 
[2023-06-29 15:11:45,252] [37402] [INFO] Updating subscribed topics to: 
┌Requested Subscription────────┐
│ topic name                   │
├──────────────────────────────┤
│ consumer-__assignor-__leader │
│ test.faust.bugfix.partitions │
└──────────────────────────────┘ 
[2023-06-29 15:11:45,253] [37402] [INFO] Subscribed to topic(s): 
┌Final Subscription────────────┐
│ topic name                   │
├──────────────────────────────┤
│ consumer-__assignor-__leader │
│ test.faust.bugfix.partitions │
└──────────────────────────────┘ 
[2023-06-29 15:11:45,266] [37402] [INFO] Discovered coordinator 1 for group consumer 
[2023-06-29 15:11:45,267] [37402] [INFO] Revoking previously assigned partitions set() for group consumer 
[2023-06-29 15:11:45,268] [37402] [INFO] (Re-)joining group consumer 
[2023-06-29 15:11:46,375] [37402] [INFO] Joined group 'consumer' (generation 13) with member_id faust-0.10.14-2c255bcd-6190-4d4a-850a-2076f4067061 
[2023-06-29 15:11:46,383] [37402] [INFO] Successfully synced group consumer with generation 13 
[2023-06-29 15:11:46,384] [37402] [INFO] Setting newly assigned partitions 
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {0}        │
└──────────────────────────────┴────────────┘ for group consumer 
[2023-06-29 15:11:46,385] [37402] [INFO] Executing _on_partitions_assigned 
[2023-06-29 15:11:46,387] [37402] [INFO] generation id 13 app consumers id 13 
[2023-06-29 15:11:46,388] [37402] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2023-06-29 15:11:46,392] [37402] [INFO] [^---Recovery]: Resuming flow... 
[2023-06-29 15:11:46,392] [37402] [INFO] [^---Fetcher]: Starting... 
[2023-06-29 15:11:46,393] [37402] [INFO] [^---Recovery]: Worker ready 
[2023-06-29 15:11:46,393] [37402] [INFO] [^Worker]: Ready 
[2023-06-29 15:11:49,390] [37402] [WARNING] Heartbeat failed for group consumer because it is rebalancing 
[2023-06-29 15:11:49,391] [37402] [INFO] Revoking previously assigned partitions 
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {0}        │
└──────────────────────────────┴────────────┘ for group consumer 
[2023-06-29 15:11:49,392] [37402] [INFO] (Re-)joining group consumer 
[2023-06-29 15:11:49,400] [37402] [INFO] Joined group 'consumer' (generation 14) with member_id faust-0.10.14-2c255bcd-6190-4d4a-850a-2076f4067061 
[2023-06-29 15:11:49,401] [37402] [INFO] Elected group leader -- performing partition assignments using faust 
[2023-06-29 15:11:49,412] [37402] [INFO] Successfully synced group consumer with generation 14 
[2023-06-29 15:11:49,413] [37402] [INFO] Setting newly assigned partitions 
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ consumer-__assignor-__leader │ {0}        │
│ test.faust.bugfix.partitions │ {0-2}      │
└──────────────────────────────┴────────────┘ for group consumer 
[2023-06-29 15:11:49,413] [37402] [INFO] Executing _on_partitions_assigned 
[2023-06-29 15:11:49,414] [37402] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions 
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {1}        │
└──────────────────────────────┴────────────┘ 
[2023-06-29 15:11:49,420] [37402] [INFO] [^---Agent: __main__.read_agent]: Starting actor for partitions 
┌Topic Partition Set───────────┬────────────┐
│ topic                        │ partitions │
├──────────────────────────────┼────────────┤
│ test.faust.bugfix.partitions │ {2}        │
└──────────────────────────────┴────────────┘ 
[2023-06-29 15:11:49,423] [37402] [INFO] generation id 14 app consumers id 14 
[2023-06-29 15:11:49,424] [37402] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. 
[2023-06-29 15:11:49,441] [37402] [INFO] [^---Recovery]: Resuming flow... 
[2023-06-29 15:11:49,441] [37402] [INFO] [^---Recovery]: Worker ready 
[2023-06-29 15:11:55,172] [37402] [INFO] stream:4610259600:partition:0:offset:1={'user': '01b790cb-b645-44ac-a313-59c39a91c2e5', 'partition': 0, 'event_id': 0, 'time': '2023-06-29T15:11:55.052464'} 
[2023-06-29 15:11:55,178] [37402] [INFO] stream:4610522176:partition:1:offset:1={'user': 'fa5a91a7-33ae-4c28-9d5c-c83575add8d2', 'partition': 1, 'event_id': 0, 'time': '2023-06-29T15:11:55.167866'} 
[2023-06-29 15:11:55,185] [37402] [INFO] stream:4611049456:partition:2:offset:1={'user': '439180a6-f624-4423-92c4-09256b9af09e', 'partition': 2, 'event_id': 0, 'time': '2023-06-29T15:11:55.175868'} 

Versions

  • Python version: 3.10.6
  • Faust version: 0.10.14
  • Operating system: macOS 13.4.1

Reproduce

  1. Run services: $ docker-compose up
  2. Create topic: go to http://127.0.0.1:8090/; create topics test.faust.bugfix.partitions with 3 partitions
  3. Run consumer: $ make consume (worker 1) (in separate terminal 1)
  4. Run producer: $ make produce (in separate terminal 2)
  5. Run consumer: $ make consume (worker 2) (in separate terminal 3)

Worker 1 crashed.

reproduce.zip

@wbarnha could you check PR?