Filtering happens after group_by repartitioning
dolshevsk opened this issue · 0 comments
dolshevsk commented
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
async for v in stream.filter(lambda: v > 1000).group_by(...):
... # do something
By using .filter with .group_by it appears that every event is sent to repartition topic, even if the event doesn't pass the filtering and filtering happens only after repartitioning is done. Thus the extra work is done.
Expected behavior
As the docs says the .filter()
method should be useful for filtering events before repartitioning a stream. So the filtered events won't be sent to the repartition topic.
Actual behavior
Filtering happens after repartitioning because of the chain method inside the group_by
which creates new stream and moves the.filter()
process to the self._next which happens after the repartition process.
Versions
- Python version: Python 3.10.6
- Faust version: 0.10.14
- Operating system: Ubuntu 22
- Kafka version: 3.4
- RocksDB version (if applicable)