nats-io/nats.net.v2

Enhancement to KV WatchAsync to notify if the watch has been started on an empty store

darkwatchuk opened this issue · 7 comments

Proposed change

For the KV WatchAsync command, provide some sort of mechanism to say when the original set of data has been returned, currently this can only be detected when there is data coming back from the original watch. However if KV is empty, you won't know. Possibly just add an optional parameter to WatchAsync like an Action onInitialComplete parameter that fires on the first go. Or Action endOfData which fires every time delta has hit 0 if there is no data currently left to come in.. (probably the latter is better?)

Use case

When using the latest .Net client library (ver 2) and doing a kvStore.WatchAsync, I currently release a semaphore the first time delta hits 0, this denotes that I've received all values to date, the watch then continues.... I tend to start this watch at the beginning of the program and want to make sure that I've received all current KV values before the rest of the program continues.. This works great, all current values come in, semaphore gets triggered, and further events come through and get processed. However, when the KV is initially empty, nothing comes back in the WatchAsync (obviously!).... and the program can't continue..... In V1 of the library there was an event that came through to say, first lot of data is back.

Contribution

No response

mtmk commented

Implementation suggestions:

if (watcher.InitialConsumer.Info.NumPending == 0)
    yield break;
  • Would need an option to activate this behaviour
  • Also consider ending the enumeration at the end i.e. if (entry.Delta == 0) yield break;

edit: I would argue against a callback mechanism since it would make the program flow more difficult to manage e.g. using a shared variable or another synchronization mechanism.

Thanks for that. That does in fact work, but obviously finished the watch early. The advantage of the callback at the moment though is that you wouldn't need to make an additional call to start the watch again. In time it takes to go from reading the history, to re-establishing a watch to only look at new values, you may have missed something.

I patched with the following, and does work for me...

public async IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default, Action endOfCurrentData = null)
 {
     await using var watcher = await WatchInternalAsync<T>(key, serializer, opts, cancellationToken);

     if (watcher.InitialConsumer.Info.NumPending == 0)
         endOfCurrentData?.Invoke();

     while (await watcher.Entries.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
     {
         while (watcher.Entries.TryRead(out var entry))
         {
             yield return entry;

            if (entry.Delta == 0)
                endOfCurrentData?.Invoke();
             
         }
     }
 }

Another option could be to add a ResumeWatchAsync, somehow passing the original watcher variable??? Might be more messy though....

mtmk commented

I patched with the following, and does work for me...

I see, nice! how would you use it though?

A basic example would be something like this... to prefill a local cache before the program starts up...



            kv= await kvc.CreateStoreAsync(new NatsKVConfig(.....)
            {
                History = 1,
                ....
            });

            // Fill cache
            await kv.LoadAllAndWatchAsync<string>(OnPut: async (key, value) =>
            {
                  _localDict[key] = value;
            });

           //Cached prefilled....





 public async static Task LoadAllAndWatchAsync<T>(this INatsKVStore kvStore, Func<string,T,Task> OnPut, Func<string, T, Task> OnPreloadPut = null)
 {
     SemaphoreSlim semaphoreInitialLoad = new SemaphoreSlim(1, 1);

     semaphoreInitialLoad.Wait();

     _ = Task.Run(async () =>
     {

         bool initialLoadDone = false;
         using var cts = new CancellationTokenSource();

         try
         {

             await foreach (var x in kvStore.WatchAsync<T>(cancellationToken: cts.Token, endOfCurrentData: () =>
             {

                 if (!initialLoadDone)
                 {
                     initialLoadDone = true;
                     semaphoreInitialLoad.Release();
                 }
             }))
             {

                 Console.WriteLine("update");
                 try
                 {
                     if (initialLoadDone)
                     {
                         await OnPut?.Invoke(x.Key, x.Value);
                     }
                     else
                     {
                         await (OnPreloadPut ?? OnPut)?.Invoke(x.Key, x.Value);
                     }

                 }
                 catch (Exception ex)
                 {

                 }

             }
         }
         catch (Exception ex2)
         {
         }

     });

     await semaphoreInitialLoad.WaitAsync();
 }
mtmk commented

Thanks @darkwatchuk, looks like you've already done the work 😄 You didn't say anything about contributing but would you like to create a PR? We can work on the details there.