faust-streaming/faust

Filtering happens after group_by repartitioning

dolshevsk opened this issue · 0 comments

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

async for v in stream.filter(lambda: v > 1000).group_by(...):
... # do something

By using .filter with .group_by it appears that every event is sent to repartition topic, even if the event doesn't pass the filtering and filtering happens only after repartitioning is done. Thus the extra work is done.

Expected behavior

As the docs says the .filter() method should be useful for filtering events before repartitioning a stream. So the filtered events won't be sent to the repartition topic.

Actual behavior

Filtering happens after repartitioning because of the chain method inside the group_by which creates new stream and moves the.filter() process to the self._next which happens after the repartition process.

Versions

  • Python version: Python 3.10.6
  • Faust version: 0.10.14
  • Operating system: Ubuntu 22
  • Kafka version: 3.4
  • RocksDB version (if applicable)