NatsConnection.SubscribeAsync<T>() not respecting CancellationToken. Takes 30 seconds to terminate program.
knallle opened this issue · 2 comments
Observed behavior
When cancelling my Console Application (minimal example provided below), it takes exactly 30 seconds for the program to terminate after calling for cancellation with CTRL-C.
Debugging with Parallel Stacks in Visual Studio led me to NATS.Client.Core.NatsConnection.SubScribeAsync()
:
/// <inheritdoc />
public async IAsyncEnumerable<NatsMsg<T>> SubscribeAsync<T>(string subject, string? queueGroup = default, INatsDeserialize<T>? serializer = default, NatsSubOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
serializer ??= Opts.SerializerRegistry.GetDeserializer<T>();
await using var sub = new NatsSub<T>(this, SubscriptionManager.GetManagerFor(subject), subject, queueGroup, opts, serializer, cancellationToken);
await SubAsync(sub, cancellationToken: cancellationToken).ConfigureAwait(false);
// We don't cancel the channel reader here because we want to keep reading until the subscription
// channel writer completes so that messages left in the channel can be consumed before exit the loop.
await foreach (var msg in sub.Msgs.ReadAllAsync(CancellationToken.None).ConfigureAwait(false))
{
yield return msg;
}
}
It is clearly stated that the channel reader is not cancelled and the incoming CancellationToken is not passed to sub.Msgs.ReadAllAsync()
.
Expected behavior
I expect the program to terminate sooner than 30 seconds after cancellation is called.
If immediate cancellation is impossible, enabling the developer to set the timeout to something less than 30 seconds would be great.
Server and client version
nats-server: v2.10.20
NATS.Net: 2.4.0
Host environment
- Operating system: Windows 10
- Target framework: .Net 8
- NATS.Net version: 2.4.0
- Visual Studio 2022 Version 17.11.3
Steps to reproduce
- Create a Console Application in Visual Studio, targetting .Net 8.
- Paste in the code for the minimal working example provided below.
- Run the program with nats-server running
- Request cancellation with CTRL-C (NOTE: You do not have to interact with nats-server)
- Wait for 30 seconds for the program to terminate.
Minimal working example
Program.cs
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Hosting;
namespace NatsBugMinimalExample
{
internal class Program
{
static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
services.AddSingleton<CancellationTokenSource>();
services.AddHostedService<MyService>();
})
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddSimpleConsole(options =>
{
options.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff";
options.SingleLine = true;
});
})
.Build();
await host.RunAsync();
}
}
}
MyService.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
namespace NatsBugMinimalExample
{
internal class MyService : BackgroundService
{
private readonly ILogger<MyService> _logger;
private readonly CancellationTokenSource _cancellationTokenSource;
public MyService(ILogger<MyService> logger, CancellationTokenSource cancellationTokenSource)
{
_logger = logger;
_cancellationTokenSource = cancellationTokenSource;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
const string subject = "myservice.number";
var opts = new NatsOpts { Url = "127.0.0.1:4222" };
await using var connection = new NatsConnection(opts: opts);
await connection.ConnectAsync();
await foreach (var msg in connection.SubscribeAsync<int>(subject).WithCancellation(_cancellationTokenSource.Token))
{
await msg.ReplyAsync(_cancellationTokenSource.Token);
}
}
}
}
thanks for the report @knallle I'm not familiar with CancellationTokenSource
being used as singleton but if you pass the stoppingToken
it works as expected:
await foreach (var msg in connection.SubscribeAsync<int>(subject).WithCancellation(stoppingToken))
edit: even if I don't pass any token to SubscribeAsync I see the same behaviour described above.