AsyncCollection IsEmpty
romerod opened this issue · 3 comments
We are using the AsyncCollection to queue messages that have to be sent to a server. The send operation usually takes some time and sending multiple messages at once is faster than sending each message separetely.
Currently we have something like this:
var cancelledToken = new CancellationToken(true);
while(true)
{
var buffer = new Message[10000]();
buffer.Add(await asyncQueue.TakeAsync());
var count = 1;
while(count < buffer.Length)
{
try
{
// take currently available items
buffer[count] = await asyncQueue.TakeAsync(cancelledToken);
count++;
} catch (TaskCanceledException) {
break;
}
}
await SendMessagesAsync(buffer, count);
}
This works great except that the debug output is flooded with TaskCanceledExceptions, especially when the functionality is used in a request/response scenario when messages do not pile up.
Would it be possible to expose the Empty and/or Count property or have a method which gets a specified number of items which are currently available?
Any suggestions on this?
@romerod @StephenCleary It seems to me that the inner wait loop in DoTakeAsync
should probably be guarded on the cancellation token also:
while (Empty && !_completed && !cancellationToken.IsCancellationRequested)
{
if (sync)
{
_completedOrNotEmpty.Wait(cancellationToken);
}
else
{
await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
}
I appreciate that as this is async, it might still have the occasional TaskCanceledException
if the token was cancelled between the test and the wait, but it would eliminate the problem in this scenario where the token is always cancelled.
I think DoOutputAvailableAsync
also needs the change, and then the main loop could be modified to be
var cancelledToken = new CancellationToken(true);
while (true)
{
IList<Message> buffer = new List<Message>(10000);
buffer.Add(await asyncQueue.TakeAsync());
var count = 1;
while (count < buffer.Count && await asyncQueue.OutputAvailableAsync(cancelledToken))
{
try
{
// take currently available items
var message = await asyncQueue.TakeAsync(cancelledToken);
if (message == null) break;
buffer[count] = message;
count++;
}
catch (InvalidOperationException) // Thown if the producer completes
{
break;
}
}
await SendMessagesAsync(buffer, count);
}
I think that perhaps the AsyncCollection
should have a TryTakeAsync(CancellationToken cancellationToken)
method which would be almost the same as the TakeAsync implementation, except not throwing the InvalidOperationException
if the underlying take fails.
@romerod As this is open source, you could just include a private copy of the AsyncCollection class, modified for your needs if necessary...
Would it be possible to expose the Empty and/or Count property or have a method which gets a specified number of items which are currently available?
No; there's too much potential for misuse; with concurrent/asynchronous collections, any Empty
or Count
property can be outdated by the time your code even gets the value.
However, there is a better way to meet your need: a Try*
method. In the past, I've avoided Try* methods because the meaning of "try" is ambiguous (the most common meaning is "don't throw", but it can also mean "don't behave asynchronously", which would be the meaning in this case). Since System.Threading.Channels has adopted the Try* approach with their high-performance asynchronous collections, I'm comfortable adding Try* methods to mine as well.