nats-io/nats.rs

Implement the "drain" feature for subscriptions and connections

Closed this issue · 4 comments

Proposed change

Implement the "drain" functionality described in this NATS doc, which is already present in other languages' NATS libraries.

Use case

A service I work on makes pretty heavy use of this crate, and we've hit a couple (admittedly rare) timing bugs which would (hopefully) be resolved by implementing the "drain" functionality described in this NATS doc. I poked around issues and don't see any mention or request of this feature for the new async crate. I also spent a while looking through the code and you could almost convince me that drain() isn't needed with the way messages are handled, but given that the internal subscription object is dropped immediately after sending the unsubscribe message to the NATS server, I still think there's a small chance for in-flight messages to be dropped.

Contribution

I would love to!

I've included a sort of design proposal below and I have a fork PR with changes to match it (didn't want to open a PR here without some discussion first). Obviously I'm not familiar with this code and there are a lot of ways to approach this, so I'd be happy to take a different approach

Draining a single subscription

Generally the flow for draining a subscription (at least according to the doc above) is basically:

  1. Send UNSUB to the NATS server
  2. Deliver any remaining messages on the subscription
  3. Delete the subscription

Looking at the python lib, it looks like this is more-or-less achieved by:

  1. Send UNSUB to the NATS server
  2. Flush the connection to ensure UNSUB is delivered and any messages from the server are received
  3. Ensure any messages received from the server are written to the client's queue
  4. Delete the internal subscription object

The challenge with implementing this within rust's ownership model is that "drain" is effectively a "delayed delete" where the final delete is triggered by the crate, but we can't force the client to drop its Subscriber. All we can really do is drop the receiver associated with the subscriber. Hence "Delete the subscription" really translates to "the creator will delete the internal subscription object" and the owner should handle any resulting "disconnected" errors. As such, here's how I aimed to implement this:

Subscriber::drain()

On the Subscriber side there's really only 1 step:

  1. Subscriber sends new Command::Drain { sid: u64 } to the Handler, awaits to ensure it is written to the queue

After this, the process is complete on the crate-user's side. The crate-internal Handler side will write any remaining messages to the associated sender for Subscriber::receiver then drop it, so the owner of the Subscriber can consume the remaining messages and know the drain is complete when it receives Option::None/Error::Disconnected.

An alternate approach here would be to have Subscriber::drain() block until the Handler explicitly notifies it (with an observer oneshot similar to flush()) and return any remaining messages in a collection/iterator of some kind (instead of having the Subscriber owner process remaining messages from the existing stream). This would keep the stream and the Subscriber lifetimes more tightly-coupled and allow the owner to drop the Subscriber as soon as drain() has completed, but felt a little less idiomatic.

Handler logic for Command::Drain

The main challenge here was that we need the Handler::process() poll loop to receive the Command::Drain message and leave the internal Subscription alive for long enough to write any remaining messages to Subscription::sender after UNSUB is sent to the server and the connection is flushed. Currently the unsubscribe flow just deletes the Subscription right away, which means any messages published to the subject between the client unsubscribing and server receiving the UNSUB message will be dropped. To address this, the Handler will:

  1. Upon receiving Command::Drain, queue an UNSUB message to the server, and set Subscription::is_draining (a new field) to true. At the end of the current call to Handler::process()'s Future::poll the UNSUB message will be flushed
  2. On the next call to Handler::process()'s Future::poll, after processing any incoming messages from the server and delivering them to any subscriptions (via Handler::handle_server_op()), drop any subscriptions with is_draining: true. This should ensure that the subscription isn't dropped until after any remaining messages on the subscription are delivered.

There is a small risk with this approach that the crate-user could try to perform further operations on Subscriber after calling drain(), which would go ignored, but this was already possible.

Draining a connection

As mentioned in the doc, this is more complex. The flow is effectively:

  1. Send UNSUB to the NATS server for each subscription
  2. Do not publish any further messages
  3. Flush any pending messages to the server and deliver any remaining messages to any subscriptions
  4. Close the connection

So following the patter for an individual subscribe, it might translate to

  1. For each subscription, send UNSUB to the server
  2. Ignore any further Commands (we could drop the Command receiver, but that would complicate other operations by making the receiver optional)
  3. Ensure a flush occurs
  4. Deliver any remaining messages received from the NATS server to draining subscriptions
  5. Drop all subscriptions
  6. Terminate the connection

Client::drain()

Update the Command::Drain { sid: u64 } above to make sid Optional. If sid.is_none() then the Handler can assume this is a connection-level drain. As above, the Client-side logic is really only one step:

  1. Client::drain() sends Command::Drain{ sid: None } and awaits to ensure it is written to the queue

After this, any Subscribers held by the crate-user will deliver any remaining messages then return Option::None and calls to various Subscriber impl fns will eventually error once the Command receiver is dropped. Similarly to the Subscriber flow, we could use a oneshot to await until the operation is complete.

Handler logic for Command::Drain

  1. When processing the Command::Drain message, we will check if Command::Drain::sid.is_some(). If not, we will queue an UNSUB message and set is_draining for each entry in Handler::subscriptions. We will also set Handler::is_draining (a new field) to true. At the end of the current call to Handler::process()'s Future::poll any outgoing messages will be flushed.
  2. On the next call to Handler::process()'s Future::poll, after processing any incoming messages from the server and delivering them to any subscriptions (via Handler::handle_server_op()), and dropping any subscriptions with is_draining == true (which should be all subscriptions in this case), and check if Handler::is_draining == true. If so, exit Handler::process() with ExitReason::Closed. This order will ensure all required UNSUB messages are sent to the server, any remaining messages (both incoming and outgoing) are processed and delivered, and that the process itself is closed.

PR merged, calling this one done

Will be part of next release.

Thank you again @jsudano for valuabe contribution!

Thank you for all the help! Hopefully not my last contribution :)