faust-streaming/faust

Storing commits on a database

jamesellis1999 opened this issue · 1 comments

Hi,

This is more of a general question than an issue to reproduce, so please pardon me not following the template.

Is it possible to store the offset of a consumer in an external database? Ideally, I would like to commit my offsets with the other data I have in a single database transaction. How would I go about doing this in Faust?

Thanks!

Not sure if this helps, but you can get the offset as each event comes into your stream processor, then use that while doing something with the other data too:

@app.agent(some_topic)
async def process_event_with_offset(stream):
  async for event in stream.events():
    # use event.value for your data
    # event.message.offset for the offset of the event
    ....