nats-io/nats.net.v2

INatsKVStore.WatchAsync hunging. Never completes consuming all data from KVStore

fernandozago opened this issue ยท 4 comments

Observed behavior

Consuming a KV store using INatsKVStore.WatchAsync(), never completes.

Expected behavior

Should consume all data from KVStore, then continue consuming realtime data.

Server and client version

Server: 2.10.11
Client: 2.1.2

Host environment

Not relevant (I think)

Steps to reproduce

    #region Setup
    var loggerFactory = LoggerFactory.Create(builder =>
{
    //builder.SetMinimumLevel(LogLevel.Trace);
    builder.AddConsole();
});
    var logger = loggerFactory.CreateLogger("NATS-by-Example");

    var opts = new NatsOpts
    {
        Url = "192.168.2.10:4222",
        LoggerFactory = loggerFactory,
        Name = "NATS-by-Example"
    };
    var nats = new NatsConnection(opts);
    var js = new NatsJSContext(nats);
    var kv = new NatsKVContext(js); 
    #endregion
    int count = 0;

    #region Prepare DataSet
    try
    {
        await kv.DeleteStoreAsync("bug2");
    }
    catch
    {
        //
    }
    var store = await kv.CreateStoreAsync(new NatsKVConfig("bug2"));

    await Parallel.ForAsync(1, 30_001, async (x, c) =>
    {
        await store.PutAsync(x.ToString(), x.ToString());
        Interlocked.Increment(ref count);
        if (x % 5_000 == 0)
        {
            logger.LogInformation("Wrote: {total}", x);
        }
    });
    logger.LogInformation("Finished writting: {c} items", count); 
    #endregion


    using var cts = new CancellationTokenSource();
    try
    {
        await foreach (var v in store.WatchAsync<int>(cancellationToken: cts.Token))
        {
            count--;
            if (v.Value % 5_000 == 0)
            {
                logger.LogInformation("Read: {total}", v.Value);
            }
            
            if (count == 0)
            {
                logger.LogInformation("Read All OK!");
                break;
            }
        }
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Not finished! -- Pending values {p}", count);
    }
mtmk commented

thanks for the report @fernandozago ๐Ÿ™ it's a bug fo sure!

KV push consumer isn't responding to flow control requests.

@mtmk
I appreciate your attention.

I'm creating some examples to use NATS where I work.

Just pulled your branch. Works like a charm !! Thanks @mtmk !

Spent a while yesterday trying to figure out why my KV data had stopped coming through and then saw this issue this morning!
New branch fixed it - thank you!