Auto-commit in the processing thread?
jamesellis1999 opened this issue · 3 comments
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
I have created a simple function which blocks for 10s for each message. I run this using the vscode debugger.
import faust
import time
app = faust.App("test", broker="localhost:9093")
topic = app.topic("new-topic", value_type=str)
@app.agent(topic)
async def processor(stream):
async for message in stream:
time.sleep(10)
if __name__ == "__main__":
app.main()
Expected behavior
Commits to occur in the background thread, not the processing thread.
Documentation states auto-commit commits should occur in a background thread:
https://faust-streaming.github.io/faust/userguide/streams.html#message-life-cycle
Also seen in faust/transport/consumer.py:34
:
+ The Consumer has a background thread that periodically commits the
offset.
Actual behavior
I am seeing commits occur in the main processing thread.
Blocking processor calls cause the auto-commit timer to lose its place leading to infrequent commits. In this example, a commit only occurs after 3 messages have been processed. One way to work around this issue is by wrapping the blocking function in a thread executor:
@app.agent(topic)
async def processor(stream):
async for message in stream:
loop = asyncio.get_event_loop()
print("going to sleep")
await loop.run_in_executor(None, time.sleep, 10)
If the commit happened in a different thread, like the consumer heartbeat does, this would not be an issue.
Full traceback
[2023-03-23 12:44:39,232] [65292] [INFO] Timer Monitor.sampler woke up too late, with a drift of +19.581969500000014 runtime=0.00015591600003972417 sleeptime=20.581969500000014
[2023-03-23 12:44:39,239] [65292] [INFO] Timer Recovery.stats woke up too late, with a drift of +16.54871479099984 runtime=4.708400047093164e-05 sleeptime=21.54871479099984
[2023-03-23 12:44:49,253] [65292] [INFO] Timer commit woke up too late, with a drift of +18.804260166999665 runtime=10.017661875000158 sleeptime=21.604260166999666
[2023-03-23 12:44:49,255] [65292] [WARNING] Timer commit is overlapping (interval=2.8 runtime=10.017661875000158)
Versions
- Python version: 3.10
- Faust version: 0.9.5
- Operating system: macOS 13.1 (22C65)
- Kafka version: 2.8.1
- RocksDB version (if applicable)
+1 for similar issue
In situations where processsing one message takes only 3-5 seconds, but the agent receives a sufficient burst of messages to exceed the leave_group timer, the kafka-side offset is incremented but the faust auto-commit is not triggered and faust volunatirly leaves the group. The processor/service must then be restarted to rejoin and re-commence work on the burst of messages.
You may be interested in these settings:
https://faust-streaming.github.io/faust/userguide/settings.html#std-setting-broker_commit_interval
https://faust-streaming.github.io/faust/userguide/settings.html#std-setting-broker_heartbeat_interval
I adapted the word_count example to include your use case:
#!/usr/bin/env python
import asyncio
import time
import faust
WORDS = ['the', 'quick', 'brown', 'fox']
app = faust.App(
'word-counts',
broker='kafka://localhost:9092',
store='rocksdb://',
version=1,
topic_partitions=8,
)
app.conf.broker_commit_interval = 15.0
app.conf.broker_heartbeat_interval = 20.0
posts_topic = app.topic('posts', value_type=str)
word_counts = app.Table('word_counts', default=int,
help='Keep count of words (str to int).')
@app.agent(posts_topic)
async def shuffle_words(posts):
async for post in posts:
for word in post.split():
await count_words.send(key=word, value=word)
last_count = {w:0 for w in WORDS}
@app.agent(value_type=str)
async def count_words(words):
"""Count words from blog post article body."""
async for word in words:
word_counts[word] += 1
last_count[word] = word_counts[word]
@app.page('/count/{word}/')
@app.table_route(table=word_counts, match_info='word')
async def get_count(web, request, word):
return web.json({
word: word_counts[word],
})
@app.page('/last/{word}/')
@app.topic_route(topic=posts_topic, match_info='word')
async def get_last(web, request, word):
return web.json({
word: last_count,
})
@app.task
async def sender():
await posts_topic.maybe_declare()
for word in WORDS:
for _ in range(1000):
# time.sleep(10)
await shuffle_words.send(value=word)
await asyncio.sleep(5.0)
print(word_counts.as_ansitable(
key='word',
value='count',
title='$$ TALLY $$',
sort=True,
))
@app.on_rebalance_complete.connect
async def on_rebalance_complete(sender, **kwargs):
print(word_counts.as_ansitable(
key='word',
value='count',
title='$$ TALLY - after rebalance $$',
sort=True,
))
@app.agent(posts_topic)
async def processor(stream):
async for message in stream:
time.sleep(10)
print(message)
if __name__ == '__main__':
app.main()
It definitely reduces the frequency of commit is overlapping
warnings.
I should also note that I don't think a Faust stream is intended for a message to take 10 seconds to be processed though, given the default values of many intervals I've seen in faust-streaming and mode-streaming.
There is still the issue that the documentation appears to be wrong.