River.Streaming is a library for writing hybrid dataflow applications. Concurrency is achieved using actors which consume and produce messages.
An actor is the unit of concurrency.
interface IActor
{
Task Completion { get; }
void Start();
Task CancelAsync(CancellationToken cancellationToken = default);
}
Much like a real actor on stage, an Actor can be started (Action!), cancelled (Cut!) and completed (That's a wrap!). Actors prepare, execute their role (behaviour) and communicate with other actors.
Producers produce messages. The actual implementation is based on channels.
public interface IProducer<T> : IDisposable
{
bool TryWrite(T item);
ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);
bool TryComplete(Exception? error = null);
ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
}
Producer<T>
decorates a ChannelWriter<T>
with reference counting and lazy assignment. Use Dispose
to properly close the channel. TryComplete
will be called on the underlying ChannelWriter<T>
when the reference count hits zero.
LinkTo is the basic operation that connects a producer with a consumer.
class Hello
{
public string Name { get; set; }
}
class Source : AbstractActor
{
public IProducer<Hello> Outbox { get; } = new Producer<Hello>();
protected async Task ExecuteAsync(CancellationToken cancellationToken)
{
using (Outbox)
{
await Outbox.WriteAsync(new Hello { Name = "World" }, cancellationToken);
}
}
}
class Sink : AbstractActor
{
public IConsumer<Hello> Inbox { get; } = new Consumer<Hello>();
protected async Task ExecuteAsync(CancellationToken cancellationToken)
{
using (Inbox)
{
await foreach (var hello in Inbox.ReadAllAsync(cancellationToken))
Console.WriteLine($"Hello, {hello.Name}!");
}
}
}
public static void Main()
{
using var source = new Source();
using var sink = new Sink();
source.Outbox.LinkTo(sink.Inbox);
await Task.WhenAll(source, sink);
}
There are three ways for a producer to communicate with a consumer.
Unicast is the simplest form of communication. A single producer links to a single consumer.
producer.LinkTo(consumer);
Anycast connects a single producer to multiple consumers. The communication will act as a worker queue. The first consumer that reads a message gets it.
foreach (Var producer in producers)
producer.LinkTo(consumer);
Multicast copies the same message to a set of consumers.
TBD
TBD
Most operations on producers accept an optional ChannelOptions that defines the behaviour of the communication.
For instance, to skip messages when the consumer is slower than the producer, you can use DropOldest
var options = new BoundedChannelOptions(1) { FullMode = BoundedChannelFullMode.DropOldest };
producer.LinkTo(consumer, options);