Adding Merge and Split for System.Threading.Channels
kdcllc opened this issue · 0 comments
kdcllc commented
https://deniskyashif.com/2019/12/08/csharp-channels-part-1/
static ChannelReader<T> Merge<T>(params ChannelReader<T>[] inputs)
{
var output = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
async Task Redirect(ChannelReader<T> input)
{
await foreach (var item in input.ReadAllAsync())
await output.Writer.WriteAsync(item);
}
await Task.WhenAll(inputs.Select(i => Redirect(i)).ToArray());
output.Writer.Complete();
});
return output;
}
static IList<ChannelReader<T>> Split<T>(ChannelReader<T> ch, int n)
{
var outputs = new Channel<T>[n];
for (int i = 0; i < n; i++)
outputs[i] = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
var index = 0;
await foreach (var item in ch.ReadAllAsync())
{
await outputs[index].Writer.WriteAsync(item);
index = (index + 1) % n;
}
foreach (var ch in outputs)
ch.Writer.Complete();
});
return outputs.Select(ch => ch.Reader).ToArray();
}
https://btburnett.com/csharp/2019/12/01/iasyncenumerable-is-your-friend.html