nats-io/nats.net.v2

No reply received

Closed this issue · 8 comments

Observed behavior

after A "No reply received" error occurs, and followed by a "No responders" error.
it's success if request count is 100 ~ 200.
but it's failed if over about 500 ~ 1000
the both code is running on same machine.

my log of error is.
2024-05-19 16:29:44.736 +09:00 [ERR] No reply received
2024-05-19 16:29:44.737 +09:00 [ERR] NATS.Client.Core.NatsNoReplyException: No reply received
at NATS.Client.Core.NatsConnection.RequestAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize1 requestSerializer, INatsDeserialize1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken)
at NATS.Client.Core.NatsConnection.RequestAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize1 requestSerializer, INatsDeserialize1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken)
2024-05-19 16:29:44.738 +09:00 [ERR] No responders
2024-05-19 16:29:44.738 +09:00 [ERR] NATS.Client.Core.NatsNoRespondersException: No responders
at NATS.Client.Core.NatsSubBase.DisposeAsync()
at NATS.Client.Core.NatsConnection.RequestAsync[TRequest,TReply](String subject, TRequest data, NatsHeaders headers, INatsSerialize1 requestSerializer, INatsDeserialize1 replySerializer, NatsPubOpts requestOpts, NatsSubOpts replyOpts, CancellationToken cancellationToken)

my subscribe code below:

        NatsSubOpts subOpts = new NatsSubOpts() { ChannelOpts  = new NatsSubChannelOpts() { Capacity = 5000 } };
        await using var sub = await natsConn.SubscribeCoreAsync<byte[]>($"{subJect}", queueGroup:groupName, opts:subOpts);
        var reader = sub.Msgs;
          while (await reader.WaitToReadAsync())
          {
              while (reader.TryRead(out NatsMsg<byte[]> msg))
              {
                  var t = Task.Run(() =>
                  {
                      Interlocked.Increment(ref _listenCount);

                      if (msg.ReplyTo != null)
                      {
                          string strMsg = $"Tester{_name}:{subJect} received : {_listenCount}";
                          msg.ReplyAsync(strMsg);
                      }
                  });
              }
          }

my request code below:

           var replyOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(5) };
            for (int i = 0; i < requestCount; i++)
            {
                try
                {
                    var reply = await natsConn.RequestAsync<byte[], string>(subject, sendBytes, replyOpts: replyOpts);
                }
                catch (Exception e)
                {
                    _logger.err(e.Message);
                    _logger.err($"{e}");
                }
            }

Expected behavior

not error.

Server and client version

server Version: 2.10.14 (windows)
client NATS.Net ( 2.2.1 )

Host environment

No response

Steps to reproduce

No response

mtmk commented

Thanks for the report. Does changing the message dropping behaviour help?

await using var nats = new NatsConnection(new NatsOpts
{
    SubPendingChannelFullMode = BoundedChannelFullMode.Wait,
});

thanks for your answer.
Even so done, the same error occurs.
By the way, I found another case.
Keep the subscriber the same code that upper,
I put publisher as java code, and even if I do over 10000 cases, that receive them correctly.

mtmk commented

thank you for following up. I just noticed you're not awaiting message reply here msg.ReplyAsync(strMsg); also putting that in a fire-and-forget task run looked a bit odd to me.

Here is what I tried and worked for me (without any errors logged) out of the box in console app:

// Program.cs
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

await using var nats = new NatsConnection(new NatsOpts
{
    LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
});

await using var subscription = await nats.SubscribeCoreAsync<byte[]>("foo");
var cts = new CancellationTokenSource();
var reader = Task.Run(async () =>
{
    var i = 0;
    try
    {
        await foreach (var msg in subscription.Msgs.ReadAllAsync(cts.Token))
        {
            await msg.ReplyAsync($"{i++}");
        }
    }
    catch (OperationCanceledException)
    {
    }
});

await nats.PingAsync();

for (int i = 0; i < 10_000; i++)
{
    var reply = await nats.RequestAsync<byte[], string>("foo", new byte[128]);
    if (int.Parse(reply.Data!) != i)
    {
        throw new Exception($"Expected {i}, got {reply.Data}");
    }
}

cts.Cancel();

await reader;

Edit: note that I just kept the type as string because that was the example. I could've used int as well. issue might be somewhere else. could you provide a full example perhaps?

I can repro the issue with the code below. This is a slightly altered version of the original posting. I've added multiple sender tasks, and have also included the await and async as @mtmk suggested. However the async/await seems to make little difference.

Interestingly, running this code inside Visual Studio doesn't seem to exhibit the issue. Running outside of VS directly from powershell I get the following.....

creating sender task 12
creating sender task 13
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
warn: NATS.Client.Core.Internal.SubscriptionManager[1002]
      Subscription GCd but was never disposed subject/1
Send 75 : No responders
Send 76 : No responders
Send 76 : No responders
Send 76 : No responders
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
using NATS.Client.JetStream;

var config = new NatsOpts
{
    Url = "nats://127.0.0.1",
    LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
};

var natsConn= new NatsConnection(config);

string subject = "subject";
string _name = "bob";

_ = Task.Run(Receive);

await Task.Delay(2000);

List tasks = new List();
for (int i = 0; i < 14; i++)
{

    Console.WriteLine($"creating sender task {i}");
    tasks.Add(Send(16000));
}

Task.WaitAll(Task.WhenAll(tasks));

Console.WriteLine("done");

async Task Send(int requestCount)
{
    byte[] sendBytes = new byte[123024];

    var replyOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(5) };
    for (int i = 0; i < requestCount; i++)
    {
        try
        {
            var reply = await natsConn.RequestAsync(subject, sendBytes, replyOpts: replyOpts);
            //Console.WriteLine(reply);
        }
        catch (Exception e)
        {
            Console.WriteLine($"Send {i} : {e.Message}");            
        }
    }

}

async Task Receive()
{
    long _listenCount = 0;
    string groupName = "group";
    

    NatsSubOpts subOpts = new NatsSubOpts() { ChannelOpts = new NatsSubChannelOpts() { Capacity = 5000 } };
    await using var sub = await natsConn.SubscribeCoreAsync($"{subject}", queueGroup: groupName, opts: subOpts);
    var reader = sub.Msgs;
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out NatsMsg msg))
        {
            var t = Task.Run( async () =>                                                           // <<<<<<---- added async
            {
                Interlocked.Increment(ref _listenCount);
                try
                {
                    if (msg.ReplyTo != null)
                    {
                        string strMsg = $"Tester{_name}:{subject} received : {_listenCount}";
                        await msg.ReplyAsync(strMsg);                                                     //  <<<<<<<<---------- added await
                    }
                }
                catch (Exception e)
                {                    
                    Console.WriteLine($"Receive : {e.Message}");
                }
                
            });
        }
    }
}
mtmk commented

thanks @darkwatchuk I can reproduce that too. I think problem might be Subscription GCd but was never disposed subject/1 which means either inbox or responder subscription is quitting for some reason and is causing the no responders error (which means server could not find a subscription for a given publish message). will try to dig deeper.

mtmk commented

ok I think not holding on to the subscription might be the issue. We have few people tripping up with this now and again. We keep a weak reference to the subscription object in cases where application code might not properly dispose the subscription, we can then unsubscribe from the server to keep things nice and tidy. but in some cases it is unsubscribing unexpectedly because GC is kicking in because of the weak reference. You can either hold on to the subscription yourself or use the SubscribeAsync enumerable where we do that for you internally. I was able to run the code to completion with changes below. thanks again for the repro @darkwatchuk 💯

using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

var config = new NatsOpts
{
    Url = "nats://127.0.0.1",
    LoggerFactory = LoggerFactory.Create(builder => builder.AddConsole()),
};

var natsConn= new NatsConnection(config);

string subject = "subject";
string _name = "bob";

// string groupName = "group";
// var subOpts = new NatsSubOpts() { ChannelOpts = new NatsSubChannelOpts() { Capacity = 5000 } };
// await using var sub = await natsConn.SubscribeCoreAsync<byte[]>($"{subject}", queueGroup: groupName, opts: subOpts);
// ChannelReader<NatsMsg<byte[]>> reader1 = sub.Msgs;
//
// _ = Task.Run(async () => await Receive(reader1));

_ = Task.Run(Receive2);

await Task.Delay(2000);

List<Task> tasks = new List<Task>();
for (int i = 0; i < 14; i++)
{

    Console.WriteLine($"creating sender task {i}");
    tasks.Add(Send(16000));
}

Task.WaitAll(Task.WhenAll(tasks));

Console.WriteLine("done");

async Task Send(int requestCount)
{
    byte[] sendBytes = new byte[123024];

    var replyOpts = new NatsSubOpts { Timeout = TimeSpan.FromSeconds(5) };
    for (int i = 0; i < requestCount; i++)
    {
        try
        {
            var reply = await natsConn.RequestAsync<byte[], string>(subject, sendBytes, replyOpts: replyOpts);
            //Console.WriteLine(reply);
        }
        catch (Exception e)
        {
            //Console.WriteLine($"Send {i} : {e.Message}");
        }
    }

}

async Task Receive(ChannelReader<NatsMsg<byte[]>> reader)
{
    long _listenCount = 0;


    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out NatsMsg<byte[]> msg))
        {
            var t = Task.Run( async () =>                                                           // <<<<<<---- added async
            {
                Interlocked.Increment(ref _listenCount);
                try
                {
                    if (msg.ReplyTo != null)
                    {
                        string strMsg = $"Tester{_name}:{subject} received : {_listenCount}";
                        await msg.ReplyAsync(strMsg);                                                     //  <<<<<<<<---------- added await
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Receive : {e.Message}");
                }

            });
        }
    }
}


async Task Receive2()
{
    long _listenCount = 0;

    string groupName = "group";

    NatsSubOpts subOpts = new NatsSubOpts() { ChannelOpts = new NatsSubChannelOpts() { Capacity = 5000 } };


    await foreach (var msg in natsConn.SubscribeAsync<byte[]>(subject, groupName, opts: subOpts))
    {
        var t = Task.Run( async () =>                                                           // <<<<<<---- added async
            {
                Interlocked.Increment(ref _listenCount);
                try
                {
                    if (msg.ReplyTo != null)
                    {
                        string strMsg = $"Tester{_name}:{subject} received : {_listenCount}";
                        await msg.ReplyAsync(strMsg);                                                     //  <<<<<<<<---------- added await
                    }
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Receive : {e.Message}");
                }

            });
    }
}

TBH - wasn't aware you could access the ChannelReader in the first place. Is it not a bit dangerous to expose such a thing? @copycd used a common pattern to read the channel and of course the weak reference system is behind the scenes - who would know!?!? What's the point in exposing the ChannelReader?

mtmk commented

TBH - wasn't aware you could access the ChannelReader in the first place. Is it not a bit dangerous to expose such a thing? @copycd used a common pattern to read the channel and of course the weak reference system is behind the scenes - who would know!?!? What's the point in exposing the ChannelReader?

in general application code doesn't need to know about the channels like in the case of using SubscribeAsync that returns an async enumerable for await foreach (like most other JetStream APIs too) where we take care of the object life cycles internally. We suggest using SubscribeAsync for that reason. But if someone wants to use and fine tune a channel we try to provide some knobs for that too maybe for slightly more advanced scenarios.