INatsKVStore.WatchAsync hunging. Never completes consuming all data from KVStore
fernandozago opened this issue ยท 4 comments
fernandozago commented
Observed behavior
Consuming a KV store using INatsKVStore.WatchAsync(), never completes.
Expected behavior
Should consume all data from KVStore, then continue consuming realtime data.
Server and client version
Server: 2.10.11
Client: 2.1.2
Host environment
Not relevant (I think)
Steps to reproduce
#region Setup
var loggerFactory = LoggerFactory.Create(builder =>
{
//builder.SetMinimumLevel(LogLevel.Trace);
builder.AddConsole();
});
var logger = loggerFactory.CreateLogger("NATS-by-Example");
var opts = new NatsOpts
{
Url = "192.168.2.10:4222",
LoggerFactory = loggerFactory,
Name = "NATS-by-Example"
};
var nats = new NatsConnection(opts);
var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
#endregion
int count = 0;
#region Prepare DataSet
try
{
await kv.DeleteStoreAsync("bug2");
}
catch
{
//
}
var store = await kv.CreateStoreAsync(new NatsKVConfig("bug2"));
await Parallel.ForAsync(1, 30_001, async (x, c) =>
{
await store.PutAsync(x.ToString(), x.ToString());
Interlocked.Increment(ref count);
if (x % 5_000 == 0)
{
logger.LogInformation("Wrote: {total}", x);
}
});
logger.LogInformation("Finished writting: {c} items", count);
#endregion
using var cts = new CancellationTokenSource();
try
{
await foreach (var v in store.WatchAsync<int>(cancellationToken: cts.Token))
{
count--;
if (v.Value % 5_000 == 0)
{
logger.LogInformation("Read: {total}", v.Value);
}
if (count == 0)
{
logger.LogInformation("Read All OK!");
break;
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "Not finished! -- Pending values {p}", count);
}
mtmk commented
thanks for the report @fernandozago ๐ it's a bug fo sure!
KV push consumer isn't responding to flow control requests.
fernandozago commented
@mtmk
I appreciate your attention.
I'm creating some examples to use NATS where I work.
fernandozago commented
Just pulled your branch. Works like a charm !! Thanks @mtmk !
darkwatchuk commented
Spent a while yesterday trying to figure out why my KV data had stopped coming through and then saw this issue this morning!
New branch fixed it - thank you!