Implement the "drain" feature for subscriptions and connections
Closed this issue · 4 comments
Proposed change
Implement the "drain" functionality described in this NATS doc, which is already present in other languages' NATS libraries.
Use case
A service I work on makes pretty heavy use of this crate, and we've hit a couple (admittedly rare) timing bugs which would (hopefully) be resolved by implementing the "drain" functionality described in this NATS doc. I poked around issues and don't see any mention or request of this feature for the new async crate. I also spent a while looking through the code and you could almost convince me that drain()
isn't needed with the way messages are handled, but given that the internal subscription object is dropped immediately after sending the unsubscribe message to the NATS server, I still think there's a small chance for in-flight messages to be dropped.
Contribution
I would love to!
I've included a sort of design proposal below and I have a fork PR with changes to match it (didn't want to open a PR here without some discussion first). Obviously I'm not familiar with this code and there are a lot of ways to approach this, so I'd be happy to take a different approach
Draining a single subscription
Generally the flow for draining a subscription (at least according to the doc above) is basically:
- Send UNSUB to the NATS server
- Deliver any remaining messages on the subscription
- Delete the subscription
Looking at the python lib, it looks like this is more-or-less achieved by:
- Send UNSUB to the NATS server
- Flush the connection to ensure
UNSUB
is delivered and any messages from the server are received - Ensure any messages received from the server are written to the client's queue
- Delete the internal subscription object
The challenge with implementing this within rust's ownership model is that "drain" is effectively a "delayed delete" where the final delete is triggered by the crate, but we can't force the client to drop its Subscriber
. All we can really do is drop the receiver
associated with the subscriber. Hence "Delete the subscription" really translates to "the creator will delete the internal subscription object" and the owner should handle any resulting "disconnected" errors. As such, here's how I aimed to implement this:
Subscriber::drain()
On the Subscriber
side there's really only 1 step:
Subscriber
sends newCommand::Drain { sid: u64 }
to theHandler
,await
s to ensure it is written to the queue
After this, the process is complete on the crate-user's side. The crate-internal Handler
side will write any remaining messages to the associated sender for Subscriber::receiver
then drop it, so the owner of the Subscriber
can consume the remaining messages and know the drain is complete when it receives Option::None
/Error::Disconnected
.
An alternate approach here would be to have Subscriber::drain()
block until the Handler
explicitly notifies it (with an observer oneshot similar to flush()
) and return any remaining messages in a collection/iterator of some kind (instead of having the Subscriber
owner process remaining messages from the existing stream). This would keep the stream and the Subscriber
lifetimes more tightly-coupled and allow the owner to drop the Subscriber
as soon as drain()
has completed, but felt a little less idiomatic.
Handler
logic for Command::Drain
The main challenge here was that we need the Handler::process()
poll loop to receive the Command::Drain
message and leave the internal Subscription
alive for long enough to write any remaining messages to Subscription::sender
after UNSUB
is sent to the server and the connection is flushed. Currently the unsubscribe flow just deletes the Subscription
right away, which means any messages published to the subject between the client unsubscribing and server receiving the UNSUB
message will be dropped. To address this, the Handler
will:
- Upon receiving
Command::Drain
, queue anUNSUB
message to the server, and setSubscription::is_draining
(a new field) totrue
. At the end of the current call toHandler::process()
'sFuture::poll
theUNSUB
message will be flushed - On the next call to
Handler::process()
'sFuture::poll
, after processing any incoming messages from the server and delivering them to any subscriptions (viaHandler::handle_server_op()
), drop any subscriptions withis_draining: true
. This should ensure that the subscription isn't dropped until after any remaining messages on the subscription are delivered.
There is a small risk with this approach that the crate-user could try to perform further operations on Subscriber
after calling drain()
, which would go ignored, but this was already possible.
Draining a connection
As mentioned in the doc, this is more complex. The flow is effectively:
- Send
UNSUB
to the NATS server for each subscription - Do not publish any further messages
- Flush any pending messages to the server and deliver any remaining messages to any subscriptions
- Close the connection
So following the patter for an individual subscribe, it might translate to
- For each subscription, send
UNSUB
to the server - Ignore any further
Command
s (we could drop theCommand
receiver, but that would complicate other operations by making thereceiver
optional) - Ensure a flush occurs
- Deliver any remaining messages received from the NATS server to draining subscriptions
- Drop all subscriptions
- Terminate the connection
Client::drain()
Update the Command::Drain { sid: u64 }
above to make sid
Optional
. If sid.is_none()
then the Handler
can assume this is a connection-level drain. As above, the Client
-side logic is really only one step:
Client::drain()
sendsCommand::Drain{ sid: None }
andawait
s to ensure it is written to the queue
After this, any Subscriber
s held by the crate-user will deliver any remaining messages then return Option::None
and calls to various Subscriber
impl fns will eventually error once the Command
receiver
is dropped. Similarly to the Subscriber
flow, we could use a oneshot to await
until the operation is complete.
Handler
logic for Command::Drain
- When processing the
Command::Drain
message, we will check ifCommand::Drain::sid.is_some()
. If not, we will queue anUNSUB
message and setis_draining
for each entry inHandler::subscriptions
. We will also setHandler::is_draining
(a new field) totrue
. At the end of the current call toHandler::process()
'sFuture::poll
any outgoing messages will be flushed. - On the next call to
Handler::process()
'sFuture::poll
, after processing any incoming messages from the server and delivering them to any subscriptions (viaHandler::handle_server_op()
), and dropping any subscriptions withis_draining == true
(which should be all subscriptions in this case), and check ifHandler::is_draining == true
. If so, exitHandler::process()
withExitReason::Closed
. This order will ensure all requiredUNSUB
messages are sent to the server, any remaining messages (both incoming and outgoing) are processed and delivered, and that the process itself is closed.
PR merged, calling this one done
Thank you for all the help! Hopefully not my last contribution :)