Duplicate messages despite exactly_once configuration
bruno-villa opened this issue · 1 comments
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
This happens reproducibly with different Kafka clusters and with different Python and Faust versions. A simple 1:1 pass app suffices to see the error.
- Create a Faust app with
processing_guarantee: exactly_once
. - Run Faust and monitor the sink topic. Messages should be passed as expected.
- Kill the Faust process. Keep a consumer running and note the offset at which this is done (e.g. with Kafkacat).
- Restart Faust. Run a console consumer with
--isolation-level read_committed --max-messages 20 --offset X-5
where X is the offset where Faust was killed. Here just for example 5 messages earlier to consume those again.
Expected behavior
The consumer with isolation level should not see uncommitted transactions. Therefore there should not be any duplicate messages.
Actual behavior
Instead, there are duplicate messages regardless of the isolation-level setting. This suggests that the commits are not working well in Faust.
Additionally, the logs give the error
[ERROR] [^---AIOKafkaConsumerThread]: Has not committed TP(
topic='mytopic', partition=0) at all since worker start (started 55.79 min
utes ago).
There are multiple possible explanations for this:
1) The processing of a single event in the stream
is taking too long.
The timeout for this is defined by the broker_commit_livelock_soft_timeout setting,
currently set to 300.0. If you expect the time
required to process an event, to be greater than this then please
increase the timeout.
2) The commit handler background thread has stopped working (report as bug).
Comments
Potentially explained by #177 but that ticket is two years old. There is also this bit in the code that suggests the transaction manager is deprecated.
Not following exactly_once semantics means it is a hard pass on Faust for certain use cases.
Versions
- Python version 3.8 and 3.11
- Faust version 0.9.1 and 0.10.13
- Operating system Ubuntu
- Kafka version Unkown
- RocksDB version (if applicable) None
Good catch with the TransactionManager being deprecated, I'll see what we can do for a suitable replacement.