Support out of order message processing and committing
fpacifici opened this issue · 2 comments
Most of how arroyo is designed quietly requires messages to be processed and committed in the same order they are received. The only exception is the parallel transform that transforms messages potentially out of order, but then re-orders them before sending them to the next step, thus the problem does not apply.
There was a potential use case in replays recently, I suspect we will encounter more as we expand to more use cases that need throughput but do not have strong consistency guarantees.
In that scenario most messages would be quick to process, while some took orders of magnitude longer. In a scenario like this, we would write a consumer such that it would have two independent queues inside to process the messages at different pace. One way to build this is to have a router and two separate consumer for the separate classes of traffic but at times the giht queue is not known upfront.
In scenarios like the one above, messages would reach the last step in the pipeline out of order. Which means that we cannot just trust the committable
property as the high watermark of what to commit. We would need to keep track of all the offsets we saw and only commit an offset when all the previous offsets have been processed.
The first question to support this is: should we? Out of order processing can be very useful for throughput though the question is whether the consumer owner should take care of understanding that and reordering the offsets before committing. It seems to me this would go slightly against the idea of making commit easy and as transparent as possible.
The second question, if we say yes to the first, is how to reorder the offsets behind the scenes before committing. One way would be to record all the offsets we see and mark them as "committable" when the application code asks to commit. Though we would only commit the highest watermark.
I think this idea of separate pipelines for different types of messages is a valid use case, but my feeling is that it is quite an exotic one.
Firstly I think we should rule out the idea of a processing step needing to see every offset. Especially one that might appear at the end of a pipeline. I think this would go against arroyo's design where strategies only process individual messages and know nothing about what happened beforehand and other context around it. This would be fragile and present problems in many scenarios such as:
- What happens if a filter step or some batching happened beforehand. All strategies are supposed to work with all others. If a specific sequence of strategies breaks it's a problem
- What happens on startup. A strategy doesn't know the offset where the consumer picked up from. As far as a strategy later in the pipeline is concerned there is no difference between an offset that is stuck in some earlier processing step and an offset that will never ever arrive because the consumer had already consumed and committed it long time ago
Now, I do think there is another way to maintain a watermark across multiple pipelines that you may have been hinting at in your description. If we have two alternative processing pipelines I can imagine something along the lines of:
synchronize_step = SynchronizeOffsets(inputs=["a", "b"], CommitOffsets(commit_func))
# A message goes through either ProcessA or ProcessB and always end up at synchronize
strategy = Router(router_function, [ProcessA(synchronize_step), ProcessB(synchronize_step)])
# ProcessA and ProcessB would internally call the synchronize step with their identifier, e.g.
next_step.submit(("a", payload))
The key to this is that although messages received by SynchronizeOffsets are not ordered, the messages which came from each input source are still ordered and SynchronizeOffsets would know the minimum offset that had been reached from each of the inputs and was thus safe to commit.
But there are still some gotchas:
- This only works if there is a reasonable amount of data on all partitions flowing through both of the parallel pipelines. The moment that one path isn't used as much everything gets stuck there and our watermark can't advance
- Batch payloads are not guaranteed to have a valid total order. It is conceivable that a batch in one pipeline contains offsets that are ahead on one partition and behind on another compared to a batch in the other pipeline. Which one comes first? There is no right answer in that case. Same with empty messages or those with no offsets to commit. For some use cases this might be able to be resolved by having the synchronizer rewrite the committable offsets in line with the watermarks it saw, but the right decision here depends a lot on the use case
My personal opinion is that while this might be a valid use case something like this is for experts only and is not advisable for most people since there are many things to be wary of and it makes your consumer significantly more complex. We didn't really have any real use cases for it yet - though it was floated for replays there are probably a lot more low hanging fruits than can be done first there before resorting to something like this.
This kind of strategy is already possible today with Arroyo, though it will have to be hand rolled. I think we don't need to do more to make this "easy" as I think it can be quite dangerous.
Closing since I don't think we should do this. @fpacifici Welcome to reopen if you want to kick off the discussion again.