obecto/perper

API: Reduce boilerplate when building stream graph

branimirangelov opened this issue · 1 comments

Azure Functions Extensions collects all of the metadata of the functions (function.json / attributes) that can be passed to stream context to reduce boilerplate when building stream graph.

We no longer use Azure Functions in 0.7, hence the approach in the OP won't work. However, in 0.8 there is still boilerplate remaining, as the caller is currently responsible for configuring the stream to match the callee's output.

Example (from samples/dotnet/BasicSample):

var generator = await PerperContext.Stream("Generator").Packed().Persistent().StartAsync(messageCount).ConfigureAwait(false);
var processor = await PerperContext.Stream("Processor").StartAsync(generator.Replay(), batchCount).ConfigureAwait(false);

public async IAsyncEnumerable<(long, string)> RunAsync(int count)

Ideally we would want the caller (Init.cs) to not have to configure Packed since the callee (Generator.cs) cannot be used as an non-Packed stream -- it writes items out-of-order, and forgetting Packed would cause listeners to skip items.
Likewise, we don't want to have to configure Persistent and Replay either.

My proposal would be to make streams (e.g. Generator) be accessed as PerperStream-returning Call directly, while managing issues such as executions and cancellation behind the scenes. Unfortunately, this would require either attributes or an additional method for configuring the stream itself, and it would not work with cyclic streams.

API Proposals
// Usage site:

var generator = PerperContext.CallAsync<PerperStream>("Generator", 10);
var processor = PerperContext.CallAsync<PerperStream>("Processor", generator);

// 1. Definition site w/ Attributes:
class Generator {
    [PeperPersistentStream] [PeperReplayStream] [PerperIndexStream(typeof(X))]
    public IAsyncEnumerable<(long, X)> RunAsync(int x) { ... }
}
class GeneratorAgent {
    [PeperPersistentStream] [PeperReplayStream] [PerperIndexStream(typeof(X))]
    public IAsyncEnumerable<(long, X)> GeneratorAsync(int x) { ... }
}
// 2. Definition site w/ Additional method or property:
class Generator {
    public PerperStreamOptions StreamOptions => new StreamOptions().Persistent().Index(typeof(X)).Replay();
    public IAsyncEnumerable<(long, X)> RunAsync(int x) { ... }
}
class GeneratorAgent {
    public PerperStreamOptions GeneratorStreamOptions => new StreamOptions().Persistent().Index(typeof(X)).Replay();
    public IAsyncEnumerable<(long, X)> GeneratorAsync(int x) { ... }
}
// 3. Definition site w/ Additional utility method:
class Generator {
    public async Task<PerperStream> RunAsync(int x) =>
        (await new StreamBuilder().Persistent().Index(typeof(X)).MakeStream(StreamAsync(x))).Replay();
    private IAsyncEnumerable<(long, X)> StreamAsync(int x) { ... }
}
class GeneratorAgent {
    public async Task<PerperStream> GeneratorAsync(int x) =>
        (await new StreamBuilder().Persistent().Index(typeof(X)).MakeStream(StreamAsync(x))).Replay();
    private IAsyncEnumerable<(long, X)> StreamAsync(int x) { ... }
}

Additional considerations:

  • Replaying the stream is a lesser issue, but we would still enjoy being able to do so.
  • In (1) and (2)+properties, there is no way for the caller to specify whether the resulting stream should be persisted/indexed/etc., even if the definition site cooperates.
  • In (3), care must be taken if the utility method is not idempotent, as we need a way be able to restart execution of the stream after a crash.

Retagging to 0.8, though it is possible it might have to wait for 0.9 if there is no suitable API proposal.