Dasync/AsyncEnumerable

yield.ReturnAsync inside ParallelForEachAsync

galmok opened this issue · 5 comments

We have trouble using ParallelForEachAsync to start a number of parallel running async webrequests (actually they are Azure Blob Storage retrievals) and have the result being returned using yield.ReturnAsync.

The problem is that before the first result is returned, multiple parallel webrequests have been started and also completed, yet only the last of the results are returned to the consumer (that iterates using ForEachAsync).

The producer:

    public static IAsyncEnumerable<(MemoryStream, string)> Stream(this IEnumerable<string> blobNames,
        IBlobManager blobManager,
        CloudBlobContainer container, int maxDegreeOfParallelism = 5)
    {
        return new AsyncEnumerable<(MemoryStream, string)>(async yield =>
        {
            var cancellationToken1 = yield.CancellationToken;

            await blobNames.ParallelForEachAsync(async blobName =>
            {
                Console.WriteLine($"Trying to download blob: {blobName}");

               //TODO: Try-catch, what happens if one fail?
                var memoryStream = await blobManager
                    .DownloadBlobAsStreamAsync(container, blobName, cancellationToken1)
                    .ConfigureAwait(false);

                // Return immediately instead of waiting for all the blobs to complete
                await yield.ReturnAsync((memoryStream, blobName)).ConfigureAwait(false);
            }, maxDegreeOfParallelism, cancellationToken1).ConfigureAwait(false);
        });
    }

The consumer:

        var blobNames = MyFactory.BuildBlobNames(from, to);

        var asyncEnumerable = blobNames.Stream(BlobManager, Container, 4);

        // Assert
        var concurrentList = new ConcurrentBag<string>();
        await asyncEnumerable.ForEachAsync(async tuple =>
        {
            using (var ms = tuple.Item1)
            {
                var decoded = Encoding.UTF8.GetString(ms.ToArray());
                //TODO: Convert to text to assert the content
                concurrentList.Add(decoded);
                Console.WriteLine($"Blob: {tuple.Item2}. Content: {decoded}");
            }
        }, cancellationToken).ConfigureAwait(false);

What did we do wrong?

I've exactly the same issue...

Good try, but this is simply an unsupported scenario with concurrent producers.
Imagine a synchronous version of the producer side:

IEnumerable<...> Stream(...)
{
    Parallel.ForEach(... =>
    {
        yield return ...; // won't compile
    }

    yield return ...; // will compile
}

The yield keyword cannot be used inside the lambda function passed to the Parallel.ForEach method, because it is out of the scope of the synchronous enumerator method.

Since this library mimics a C# language feature, it cannot impose similar restrictions on using the variable yield (it's not a keyword in the async version).

IAsyncEnumerable<...> Stream(...)
{
    return new AsyncEnumerable<...>(async yield => // 'yield' is a variable
    {
        await ....ParallelForEachAsync(async ...=>
        {
             // 'yield' gets captured in the closure, that's why you can use it, but must not
            await yield.ReturnAsync(...);
         }, ...);
    });
}

If you try to use C# 3.0 and async streams, it won't work either. So this is something you have to be aware of.

Concurrent producer/consumer pattern with limiters is a slightly more complex problem, and is not available as an extension method in this library. I posted a possible solution on StackOverflow.

One thing I can recommend is to swap methods in your case - the producer uses ForEachAsync and the consumer uses ParallelForEachAsync if it works for you.

P.S. this is a duplicate of #44

The use of SemaphoreSlim will definitely solve the problem, however, that will impact the performance of enumerations in regular cases as every iteration has to consult a synchronization primitive. How would solve that problem?