Introduces IAsyncEnumerable
, IAsyncEnumerator
, ForEachAsync()
, and ParallelForEachAsync()
Helps to (a) create an element provider, where producing an element can take a lot of time due to dependency on other asynchronous events (e.g. wait handles, network streams), and (b) a consumer that processes those element as soon as they are ready without blocking the thread (the processing is scheduled on a worker thread instead).
using System.Collections.Async;
static IAsyncEnumerable<int> ProduceAsyncNumbers(int start, int end)
{
return new AsyncEnumerable<int>(async yield => {
// Just to show that ReturnAsync can be used multiple times
await yield.ReturnAsync(start);
for (int number = start + 1; number <= end; number++)
await yield.ReturnAsync(number);
// You can break the enumeration loop with the following call:
yield.Break();
// This won't be executed due to the loop break above
await yield.ReturnAsync(12345);
});
}
// Just to compare with synchronous version of enumerator
static IEnumerable<int> ProduceNumbers(int start, int end)
{
yield return start;
for (int number = start + 1; number <= end; number++)
yield return number;
yield break;
yield return 12345;
}
static async Task ConsumeNumbersAsync()
{
var asyncEnumerableCollection = ProduceAsyncNumbers(start: 1, end: 10);
await asyncEnumerableCollection.ForEachAsync(async number => {
await Console.Out.WriteLineAsync($"{number}");
});
}
// Just to compare with synchronous version of enumeration
static void ConsumeNumbers()
{
var enumerableCollection = ProduceNumbers(start: 1, end: 10);
foreach (var number in enumerableCollection) {
Console.Out.WriteLine($"{number}");
}
}
using System.Collections.Async;
IAsyncEnumerable<Bar> ConvertGoodFoosToBars(IAsyncEnumerable<Foo> items)
{
return items
.Where(foo => foo.IsGood)
.Select(foo => Bar.FromFoo(foo));
}
using System.Collections.Async;
async Task<IReadOnlyCollection<string>> GetStringsAsync(IEnumerable<T> uris, HttpClient httpClient, CancellationToken cancellationToken)
{
var result = new ConcurrentBag<string>();
await uris.ParallelForEachAsync(
async uri =>
{
var str = await httpClient.GetStringAsync(uri, cancellationToken);
result.Add(str);
},
maxDegreeOfParallelism: 5,
cancellationToken);
return result;
}
No and Yes. Just making everything async
makes your app tiny little bit slower because it
adds overhead in form of state machines and tasks. However, this will help you to better
utilize worker threads in the app because you don't need to block them anymore by waiting
on the next element to be produced - i.e. this will make your app better in general when it
has such multiple enumerations running in parallel. The best fit for IAsyncEnumerable
is a
case when you read elements from a network stream, like HTTP + XML (as shown above; SOAP),
or a database client implementation where result of a query is a set or rows.
GitHub: https://github.com/tyrotoxin/AsyncEnumerable
NuGet.org: https://www.nuget.org/packages/AsyncEnumerator/
License: https://opensource.org/licenses/MIT
1: How to use this library?
See examples above. The core code is in System.Collections.Async
namespace. You can also find useful extension methods in
System.Collections
and System.Collections.Generic
namespaces for IEnumerable
and IEnumerator
interfaces.
2: Using CancellationToken
- Do not pass a CancellationToken to a method that returns IAsyncEnumerable, because it is not async, but just a factory
- Use
yield.CancellationToken
in your enumeration lambda function, which is the same token which gets passed toIAsyncEnumerator.MoveNextAsync()
IAsyncEnumerable<int> ProduceNumbers()
{
return new AsyncEnumerable<int>(async yield => {
// This cancellation token is the same token which
// is passed to very first call of MoveNextAsync().
var cancellationToken1 = yield.CancellationToken;
await yield.ReturnAsync(start);
// This cancellation token can be different, because
// we are inside second MoveNextAsync() call.
var cancellationToken2 = yield.CancellationToken;
await yield.ReturnAsync(start);
// As a rule of thumb, always use yield.CancellationToken
// when calling underlying async methods to be able to
// cancel the MoveNextAsync() method.
await FooAsync(yield.CancellationToken);
});
}
3: Always remember about ConfigureAwait(false)
To avoid performance degradation and possible dead-locks in ASP.NET or WPF applications (or any SynchronizationContext
-dependent environment),
you should always put ConfigureAwait(false)
in your await
statements:
IAsyncEnumerable<int> GetValues()
{
return new AsyncEnumerable<int>(async yield =>
{
await FooAsync().ConfigureAwait(false);
// Yes, it's even needed for 'yield.ReturnAsync'
await yield.ReturnAsync(123).ConfigureAwait(false);
});
}
4: Clean-up on incomplete enumeration
Imagine such situation:
IAsyncEnumerable<int> ReadValuesFromQueue()
{
return new AsyncEnumerable<int>(async yield =>
{
using (var queueClient = CreateQueueClient())
{
while (true)
{
var message = queueClient.DequeueMessageAsync();
if (message == null)
break;
await yield.ReturnAsync(message.Value);
}
}
});
}
Task<int> ReadFirstValueOrDefaultAsync()
{
return ReadValuesFromQueue().FirstOrDefaultAsync();
}
The FirstOrDefaultAsync
method will try to read first value from the IAsyncEnumerator
,
and then will just dispose it. However, disposing AsyncEnumerator
does not mean that the
queueClient
in the lambda function will be disposed automatically as well, because async
methods are just state machines which need somehow to go to a particular state to do the clean-up.
To provide such behavior, when you dispose an AsyncEnumerator
before you reach the end of
enumeration, it will tell to resume your async lambda function (at await yield.ReturnAsync()
)
with the AsyncEnumerationCanceledException
(derives from OperationCanceledException
).
Having such exception in your lambda method will break normal flow of enumeration and will go
to terminal state of the underlying state machine, what will do the clean-up, i.e. dispose
the queueClient
in this case. You don't need (and shouldn't) catch that exception type,
because it's handled internally by AsyncEnumerator
. The same exception is thrown when
you call yield.Break()
.
There is another option to do the cleanup on Dispose
:
IAsyncEnumerator<int> GetQueueEnumerator()
{
var queueClient = CreateQueueClient();
return new AsyncEnumerable<int>(async yield =>
{
while (true)
{
var message = queueClient.DequeueMessageAsync();
if (message == null)
break;
await yield.ReturnAsync(message.Value);
}
},
onDispose: () => queueClient.Dispose());
}
5: Why is GetAsyncEnumeratorAsync async?
The IAsyncEnumerable.GetAsyncEnumeratorAsync()
method is async and returns a Task<IAsyncEnumerator>
,
where the current implementation of AsyncEnumerable
always runs that method synchronously and just
returns an instance of AsyncEnumerator
. Having interfaces allows you to do your own implementation,
where classes mentioned above are just helpers. The initial idea was to be able to support database-like
scenarios, where GetAsyncEnumeratorAsync()
executes a query first (what internally returns a pointer),
and the MoveNextAsync()
enumerates through rows (by using that pointer).
6: Returning IAsyncEnumerable vs IAsyncEnumerator
When you implement a method that returns an async-enumerable collection you have a choice to
return either IAsyncEnumerable
or IAsyncEnumerator
- the constructors of the helper classes
AsyncEnumerable
and AsyncEnumerator
are absolutely identical. Both interfaces have same set
of useful extension methods, like ForEachAsync
.
When you create an 'enumerable', you create a factory that produces 'enumerators', i.e. you can enumerate through a collection many times. On the other hand, creating an 'enumerator' is needed when you can through a collection only once.
Consider these 2 scenarios:
// You want to execute the same query against a database many times - you need an 'enumerable'
IAsyncEnumerable<DbRow> GetItemsFromDatabase()
{
return new AsyncEnumerable<int>(async yield =>
{
using (var dbReader = DbContext.ExecuteQuery(...))
{
while (true)
{
DbRow row = dbReader.ReadAsync();
if (row == null)
break;
await yield.ReturnAsync(row);
}
}
});
}
// Assume that you cannot seek in the stream - you need an 'enumerator'
IAsyncEnumerator<byte> EnumerateBytesInStream(Stream stream)
{
return new AsyncEnumerator<int>(async yield =>
{
while (true)
{
int byte = await stream.ReadByteAsync();
if (byte < 0)
break;
await yield.ReturnAsync((byte)byte);
}
});
}
7: Where is Reset or ResetAsync?
The Reset
method must not be on the IEnumerator
interface, and should be considered as deprecated. Create a new enumerator instead.
This is the reason why the 'oneTimeUse' flag was removed in version 2 of this library.
8: How can I do synchronous for-each enumeration through IAsyncEnumerable?
You can use extension methods like IAsyncEnumerable.ToEnumerable()
to use built-in foreach
enumeration, BUT you should never do that!
The general idea of this library is to avoid thread-blocking calls on worker threads, where converting an IAsyncEnumerable
to IEnumerable
will just defeat the whole purpose of this library. This is the reason why such synchronous extension methods are marked with [Obsolete]
attribute.
2.1.0: New extension methods: Batch, UnionAll, Single, SingleOrDefault, DefaultIfEmpty, Cast. Bug-fix: AsyncEnumerator.MoveNextAsync() must not succeed after Dispose().
2.0.1: Bug-fix: call onDispose when AsyncEnumerator is GC'ed but enumeration hasn't been started. Bug-fix: re-throw base exception instead of AggregateException in blocking synchronous methods.
2.0.0: Revise design of the library: same features, but slight paradigm shift and interface breaking changes.
1.5.0: Add support for .NET Standard, minor improvements.
1.4.2: Add finalizer to AsyncEnumerator and call Dispose in ForEachAsync and ParallelForEachAsync extension methods.
1.4.0: Add new generic type AsyncEnumeratorWithState for performance optimization. Now IAsyncEnumerator<T> is covariant. Add ForEachAsync, ParallelForeachAsync, and LINQ-style extension methods for IAsyncEnumerator.
1.3.0: Significantly improve performance of AsyncEnumerator by reducing thread switching and re-using instances of TaskCompletionSource. Add support for a state object that can be passed into AsyncEnumerable and AsyncEnumerator for performance optimization. Remove CancellationToken from Select/Take/Skip/Where extension methods - fix improper implementation. Move AsyncEnumerationCanceledException out of the generic AsyncEnumerator type. Change default behavior of the ToAsyncEnumerable extension method - now MoveNextAsync will run synchronously by default.
1.2.3: AsyncEnumerationCanceledException is thrown to the async enumeration function when the AsyncEnumerator is disposed before reaching the end of enumeration, what allows to do the clean-up. Fixed MoveNextAsync() that threw an exception sometimes only when you passed the end of enumeration.
1.2.2: Fix exception propagation in AsyncEnumerator.
1.2.1: New Linq-style extension methods in System.Collections.Async namespace.
1.2.0: Contract breaking changes in ParallelForEachAsync: introduce ParallelForEachException to unify error outcome of the loop.
1.1.0: Add ParallelForEachAsync extension methods for IEnumerable<T> and IAsyncEnumerable<T> in System.Collections.Async namespace.