
Auto-commit in the processing thread?

Steps to reproduce

I have created a simple function which blocks for 10s for each message. I run this using the vscode debugger.

import faust
import time

app = faust.App("test", broker="localhost:9093")

topic = app.topic("new-topic", value_type=str)

async def processor(stream):
    async for message in stream:

if __name__ == "__main__":

Expected behavior

Commits to occur in the background thread, not the processing thread.

Documentation states auto-commit commits should occur in a background thread:
Also seen in faust/transport/consumer.py:34:

      + The Consumer has a background thread that periodically commits the

Actual behavior

I am seeing commits occur in the main processing thread.

Blocking processor calls cause the auto-commit timer to lose its place leading to infrequent commits. In this example, a commit only occurs after 3 messages have been processed. One way to work around this issue is by wrapping the blocking function in a thread executor:

async def processor(stream):
    async for message in stream:
        loop = asyncio.get_event_loop()
        print("going to sleep")
        await loop.run_in_executor(None, time.sleep, 10)

If the commit happened in a different thread, like the consumer heartbeat does, this would not be an issue.

Full traceback

[2023-03-23 12:44:39,232] [65292] [INFO] Timer Monitor.sampler woke up too late, with a drift of +19.581969500000014 runtime=0.00015591600003972417 sleeptime=20.581969500000014 
[2023-03-23 12:44:39,239] [65292] [INFO] Timer Recovery.stats woke up too late, with a drift of +16.54871479099984 runtime=4.708400047093164e-05 sleeptime=21.54871479099984 
[2023-03-23 12:44:49,253] [65292] [INFO] Timer commit woke up too late, with a drift of +18.804260166999665 runtime=10.017661875000158 sleeptime=21.604260166999666 
[2023-03-23 12:44:49,255] [65292] [WARNING] Timer commit is overlapping (interval=2.8 runtime=10.017661875000158) 


  • Python version: 3.10
  • Faust version: 0.9.5
  • Operating system: macOS 13.1 (22C65)
  • Kafka version: 2.8.1
In situations where processsing one message takes only 3-5 seconds, but the agent receives a sufficient burst of messages to exceed the leave_group timer, the kafka-side offset is incremented but the faust auto-commit is not triggered and faust volunatirly leaves the group. The processor/service must then be restarted to rejoin and re-commence work on the burst of messages.

You may be interested in these settings:


I adapted the word_count example to include your use case:

#!/usr/bin/env python
import asyncio
import time

import faust

WORDS = ['the', 'quick', 'brown', 'fox']

app = faust.App(
app.conf.broker_commit_interval = 15.0
app.conf.broker_heartbeat_interval = 20.0

posts_topic = app.topic('posts', value_type=str)
word_counts = app.Table('word_counts', default=int,
                        help='Keep count of words (str to int).')

async def shuffle_words(posts):
    async for post in posts:
        for word in post.split():
            await count_words.send(key=word, value=word)

last_count = {w:0 for w in WORDS}
async def count_words(words):
    """Count words from blog post article body."""
    async for word in words:
        word_counts[word] += 1
        last_count[word] = word_counts[word]

@app.table_route(table=word_counts, match_info='word')
async def get_count(web, request, word):
    return web.json({
        word: word_counts[word],

@app.topic_route(topic=posts_topic, match_info='word')
async def get_last(web, request, word):
    return web.json({
        word: last_count,

async def sender():
    await posts_topic.maybe_declare()

    for word in WORDS:
        for _ in range(1000):
            # time.sleep(10)
            await shuffle_words.send(value=word)

    await asyncio.sleep(5.0)
        title='$$ TALLY $$',

async def on_rebalance_complete(sender, **kwargs):
        title='$$ TALLY - after rebalance $$',

async def processor(stream):
    async for message in stream:

if __name__ == '__main__':

It definitely reduces the frequency of commit is overlapping warnings.

I should also note that I don't think a Faust stream is intended for a message to take 10 seconds to be processed though, given the default values of many intervals I've seen in faust-streaming and mode-streaming.

There is still the issue that the documentation appears to be wrong.