dotnet/orleans

Race condition in PersistentStreamPullingAgent can cause events to get dropped when using SimpleQueueCache

Jamie-ML opened this issue ยท 5 comments

Overview

There appears to be a race when registering an implicit stream producer when it has multiple consumers that negotiate stream setup at different speeds. A Consumer can process an entire batch of events and become Inactive before the other Consumers finish handling the initial stream handshake. Subsequent batches that occur in the interim are effectively dropped for the faster Consumer. This issue is masked when using cache implementations that support rewind and time-based eviction like MemoryPooledCache, but is expressed dangerously when using the SimpleQueueCache which is designed to purge cache items as soon as they have been handled by all Consumers.

The Bug

When PersistentStreamPullingAgent starts registering itself as the producer of an Implicit stream for the first time it statically resolves the Consumers from the mappings in the ImplicitStreamPubSub.

It starts stream cursors for each of these consumers and waits for all of them to be finished (perform initial stream handshake) before setting a state flag on the StreamConsumerCollection.StreamRegistered.

The bug occurs when both these things happen:

  1. There are multiple Consumers and the time it takes them to initialize and perform the stream handshake differs.
  2. Additional batches of messages are pulled off the queue while this happens (i.e. while the producer is still registering).

This is the sequence:

  1. First batch pulled off the cache gets processed

  2. No stream data exists in the pubSubCache so the agent registers itself as the producer of the stream and starts the initial cursors

    else
    {
        RegisterStream(streamId, startToken, now).Ignore(); // if this is a new stream register as producer of stream in pub sub system
    }
    
  3. First Consumer (ConsumerFast) performs the initial handshake and processes the first batch to completion, making the cursor Inactive again. (e.g. StreamSequenceToken=1, Events=1,2,3)

  4. Second Consumer (ConsumerSlow) is still activating and doing the handshake

  5. Second batch (e.g. StreamSequenceToken=2, Events=4,5,6) is pulled off the queue but because stream registration has started but not completed and the ConsumerFast's cursor is Inactive, ReadFromQueue effectively does nothing.

    else
    {
        if(this.logger.IsEnabled(LogLevel.Debug))
            this.logger.LogDebug(
                $"Pulled new messages in stream {streamId} from the queue, but pulling agent haven't succeeded in" +
                                                $"RegisterStream yet, will start deliver on this stream after RegisterStream succeeded");
    }
    
  6. ConsumerSlow finishes handshake, the stream is registered, StreamConsumerCollection.StreamRegistered is set to True.

  7. Third batch (StreamSequenceToken=3, Events=7,8) is pulled off the queue, inactive cursors are woken up since the stream is now registered.

    if (streamData.StreamRegistered)
    {
        StartInactiveCursors(streamData,
            startToken); // if this is an existing stream, start any inactive cursors
    }
    
  8. When activating ConsumerFast cursor it refreshes the startToken first with the latest batch token (e.g. StreamSequenceToken=3 from the third batch).

    consumerData.Cursor?.Refresh(startToken);
    if (consumerData.State == StreamConsumerDataState.Inactive)
    {
        // wake up inactive consumers
        RunConsumerCursor(consumerData).Ignore();
    }
    
  9. The implementation for a SimpleQueueCacheCursor will set the token internally because it had been unset after completely processing the initial batch (StreamSequenceToken=1).

    // SimpleQueueCacheCursor.cs
    if (!IsSet)
    {
        cache.RefreshCursor(this, sequenceToken);
    }
    
  10. The cursor is then run from event 7. Events 4-6 have been skipped.

Repro

I've written a code sample https://github.com/mindlink/OrleansPersistentStream-DroppedEvents that shows events getting dropped.

It rips off the implementation of the MemoryStream stack, plugging in a SimpleQueueCache in the TestStreamAdapterFactory rather than the MemoryPooledCache. It then uses three grains to re-create the race condition described above:

  1. ProducerGrain
  2. ConsumerGrain
  3. SlowConsumerGrain

The ConsumerGrain and SlowConsumerGrain have an Implicit subscription on ProducerGrain.

ProducerGrain is told to emit events 1-10 in order, ConsumerGrain writes out to the console as it sees each event and warns of any breaks in the event sequence.

SlowConsumerGrain introduces a delay when activating to prolong the stream registration in the PersistentStreamPullingAgent.cs.

NOTE: The race is obviously highly dependent on timings so an element of randomness has been added to the emit-events loop in the ProducerGrain and the SlowConsumerGrain delayed activation to try and capture the race on other systems as well. The repro is run 20 times to account for environment/random-delay variability, but on my machine these conditions produce dropped events in about 80% of the runs.

Expected Output (x20):

Emitting events...
Updated last observed event to: 1.
Updated last observed event to: 2.
Updated last observed event to: 3.
Updated last observed event to: 4.
Updated last observed event to: 5.
Updated last observed event to: 6.
Updated last observed event to: 7.
Updated last observed event to: 8.
Updated last observed event to: 9.
Updated last observed event to: 10.

Actual Output:

Emitting events...
Updated last observed event to: 1.
Updated last observed event to: 2.
Updated last observed event to: 3.
Skipped events detected. Last observed event is 3 but received event is 7.
Updated last observed event to: 7.
Updated last observed event to: 8.
Updated last observed event to: 9.
Updated last observed event to: 10.

Solution

Initially I thought we could change the SimpleQueueCache to fix the problem by changing the way it unsets the stream token after becoming inactive, but after poking around the cache implementation more I don't think that's actually the problem here.

Seems to me it's the PersistentStreamPullingAgent at fault because it's letting batches slip through the cracks and is also breaking the fundamental principal around independent consumer cursor progression. Just because the stream isn't fully registered doesn't mean there aren't some cursors ready to process more batches.

As a potential solution I was thinking you could add a third state to StreamConsumerDataState called Registering or something and initialize the StreamConsumerData in that state rather than Inactive. Then, when a batch is being pulled off the queue, instead of doing nothing if the stream is still registering, try to start the inactive cursors anyway. Those that are Registering won't be disturbed while they're in the handshake flow, but those that have already completed the handshake and become Inactive will be allowed to process more batches while stream registration is still in progress meaning we won't skip batches on fast Consumers:

if (pubSubCache.TryGetValue(streamId, out streamData))
{
    streamData.RefreshActivity(now);

    StartInactiveCursors(streamData, startToken); // if this is an existing stream, start any inactive cursors

}
else
{
    RegisterStream(streamId, startToken, now).Ignore(); // if this is a new stream register as producer of stream in pub sub system
}

Just a thought though, haven't followed all the details through or tried it out - interested to get opinions/alternatives!

Great find and great investigation!

I think you found two issues here:

  • the SimpleQueueCache current implementation isn't good enough to avoid skipping events like this. I am pretty sure there could be other race conditions that would provoke the same issue
  • the race condition in the PersistentStreamPullingAgent: it's not limited to the providers using SimpleQueueCache. With another cache implementation, if you didn't had the third batch event coming, the subscriber would never wake up.

I like your proposed solution, but it will be a breaking change, and it really seems to be a local issue to the pulling agent.

I think adding a local non serialized boolean flag to StreamConsumerData set to true when the handshake was done could be a solution. Then check this flag to start the inactive cursors when the stream isn't fully registered... And be careful about race conditions when updating this flag.

We maybe also should decouple this handshake process outside of AddSubscriber_Impl.

Hey @benjaminpetit thanks for the quick response and kind words!

Interesting you feel there could be other instances where the SimpleQueueCache can race and drop events. Is there any more information you could provide around this? Would be a bit of a blocker for us as well...

Aha yes, on that second issue that's certainly true. If any queue implementation encounters this there's always the risk that if we're publishing Event-1 and Event-2 that Event-2 is never heard by the consumer if Event-3 doesn't come in a timely fashion and push it through.

Breaking change, yup gotcha - separate non-serialized IsRegistered state flag makes sense!

I might have been too dramatic about the SimpleQueueCache, after re-reading it, I don't think there is any race condition.

But you have to be aware that you might lose data when using a non-rewindable stream in case of silo shutdown or pulling agent rebalancing.

Understood, thanks. We've built a "reliable event publishing" layer on top of Orleans Streams that guarantees delivery in failure/shutdown/re-distribution scenarios like you describe.

What we rely on for that system to work is that:

  1. Orleans serves events in sequence (and doesn't skip any!)
  2. IQueueAdapterReceiver.MessagesDeliveredAsync is only called when all Consumers have successfully handled the event.