Akka.Streams.Channels ChannelSource is failing in the middle of processing with TaskCanceledException
Closed this issue · 4 comments
Version Information
Version of Akka.NET? 1.4.38
Which Akka.NET Modules? Akka.Streams 1.4.38, Akka.Streams.Channels 1.0.0-beta9
Describe the bug
I noticed strange behavior during working with Akka.Streams.Channels ChannelSource. I decided to use Channels as my backing store for some objects. I created ChannelSource to read these data in batches and send them to Azure Service Bus. Everything was working pretty well if I have around 1000 items stored in Channel. In case of the number increased I noticed that ChannelSource stopped processing data after some time. After some investigation, I noticed that ChannelSource is failing with TaskCanceledException which is raised with FailStage here - ChannelReaderSource.cs
To Reproduce
Steps to reproduce the behavior:
- Create new channel.
- Create simple ChannelSource with Sink.Ignore.
- Start writing items into Channel (it has to be at least 50 000 items).
- ChannelSource is failing after some time.
Here is the link for simple repo which is on my machine failing almost every time - https://github.com/pavoldecky/AkkaStreamChannel/blob/main/AkkaStreamChannel/Program.cs
Expected behavior
First of all, I am not sure if ChannelSource is intended to use a huge amount of items. I try to play with Channels Reader and Writer and I was able to process much more data as it is failing with ChannelSource. So I thought that ChannelSource should be able o handle the same amount of data.
Actual behavior
So after some more investigation, I noticed that problem could be with the continuation ValueTask in OnPull method
public override void OnPull()
{
if (_reader.TryRead(out var element))
{
Push(_outlet, element);
}
else
{
var continuation = _reader.WaitToReadAsync();
if (continuation.IsCompletedSuccessfully)
{
var dataAvailable = continuation.GetAwaiter().GetResult();
if (dataAvailable && _reader.TryRead(out element))
Push(_outlet, element);
else
CompleteStage();
}
else if (!continuation.IsCompleted)
continuation.AsTask().ContinueWith(t =>
{
if (t.IsFaulted) _onValueReadFailure(t.Exception);
else if (t.IsCanceled) _onValueReadFailure(new TaskCanceledException(t));
else _onValueRead(t.Result);
});
else if (continuation.IsFaulted)
FailStage(continuation.AsTask().Exception);
else
FailStage(new TaskCanceledException(continuation.AsTask()));
}
}
so the issue occurs when _reader.TryRead returns false and we are calling _reader.WaitToReadAsync(). In some cases, code execution ends up in last else. But when I check the status of continuation ValueTask in this case, this value task is CompletedSuccesfully. So could be the problem that continuation is completed with some delay and because of that it is not handled by first if (continuation.IsCompletedSuccessfully) ?
I tried to do some changes and it is working for my case, but as a user with limited knowledge of ValueTasks and Akka.Streams I would expect that this is not exactly what is needed :)
Here are my changes.
public override void OnPull()
{
if (_reader.TryRead(out var element))
{
Push(_outlet, element);
}
else
{
var task = _reader.WaitToReadAsync().AsTask();
if (task.IsCompletedSuccessfully)
{
var dataAvailable = task.GetAwaiter().GetResult();
if (dataAvailable && _reader.TryRead(out element))
{
Push(_outlet, element);
}
else
{
CompleteStage();
}
}
else if (task.IsFaulted)
{
FailStage(task.Exception);
}
else
{
task.ContinueWith(result =>
{
if (result.IsCanceled)
{
_onValueReadFailure(new TaskCanceledException(result));
}
else if (result.IsFaulted)
{
_onValueReadFailure(result.Exception);
}
else
{
_onValueRead(result.Result);
}
});
}
}
Environment
Windows 11, .NET 6.0.300
We'll look into this
@to11mtm was this related to your fix or something else?
@Aaronontheweb based on the info, I don't think this is related to my changes in #1068; those were on the Sink
side and this appears to be Source
side.
I would suggest considering the following, if one is worried about races in the if logic of the Source
:
public override void OnPull()
{
if (_reader.TryRead(out var element))
{
Push(_outlet, element);
}
else
{
var continuation = _reader.WaitToReadAsync();
if (continuation.IsCompletedSuccessfully)
{
var dataAvailable = continuation.GetAwaiter().GetResult();
if (dataAvailable && _reader.TryRead(out element))
Push(_outlet, element);
else
CompleteStage();
}
else
{
continuation.AsTask().ContinueWith(t =>
{
if (t.IsFaulted) _onValueReadFailure(t.Exception);
else if (t.IsCanceled) _onValueReadFailure(new TaskCanceledException(t));
else _onValueRead(t.Result);
});
}
}
}
Fault cases should be fairly infrequent, so just using the continuation every time should be OK.
hi, I can confirm that the suggested code change helped with my issue, and everything is working as expected, thanks for the help