obecto/perper

There is no way to detect that a given stream has finished.

IoanStoianov opened this issue · 1 comments

[FunctionName("Generator")]
public static async IAsyncEnumerable<dynamic> Generator([PerperTrigger] dynamic parameters, ILogger logger)
{
    for (var i = 0; i <= parameters.Count; i++)
    {
        yield return new { Num = i };
    }
}
var generator = await context.StreamFunctionAsync<dynamic>("Generator", new { Count = count });

await foreach(var element in generator){
    ...
}

The foreach will iterate through the numbers from 0 to 10 and it will freeze

This should be possible since 884a80a as Streams now have an underlying Execution which is marked as finished whenever they finish, however care must be taken when implementing as we still want to read the last few items before stopping the enumeration.

A few possible implementations
  1. On the stream side, we store the final key in the Result field of the Execution. On the listener side, we wait for the execution to finish, and once it does, we continue enumerating until we reach that last key, or exit immediately if we already read it.
    This is the simplest convention to implement. However, we do not have a last key to store when the stream has no items, and finding the last key to store after a "crash -> stream restart -> stream exits before writing any more items" scenario will require either storing state or a query over all keys.

    1. On the listener side, we signal Fabric when the execution is over; after that signal, Fabric cancels all ContiniousQueries or stops enumerating instead of making ContiniousQuery.
      This should work; unfortunately, signalling a GRPC stream to "start finishing" is only possible via either restarting it (not cool, would require a re-query for non-packed streams) or with a duplex streaming call which isn't cool either.

    2. We couple Fabric to the Stream/Execution convention; instead of signalling the GRPC stream from the client, we monitor the execution from within the GRPC stream items handler.
      This goes against the current idea of keeping the Fabric core relatively lean and devoid of criss-crossing connections.

  2. On the stream side, we store an extra placeholder item at long.MaxValue. On the listener side, we stop iterating when we see that key without reading the item. In Fabric, we implement an additional check before launch a ContiniousQuery for packed streams.
    Ideally, conventions such as this one would stay out of Fabric, however considering that we already use long.MinValue as a magic value for Ephemeral streams, it should be fine.
    Note that the placeholder item could be reused to store an exception in case we want to completely decouple stream listening from executions.