salesforce/reactive-grpc

Question about call cancellation mechanism

grigorryev opened this issue · 10 comments

In usual, non-reactive grpc, if server makes outbound call during processing of inbound call, and this inbound call gets cancelled, the outbound one is cancelled as well. As I understand, this is implemented implicitly using Context, which is thread-local for both inbound and outbound call.

I don't understand how it works in a reactive environment, where there are no thread local entities shared by inbound and outbound calls.

Could you please explain this mechanism?

I'm pretty sure transitive call cancellation works.

For RxJava, reactive-grpc can propagate the gRPC context easily between Rx threads. Context propagation with Reactor comes with a few additional caveats.

For streaming gRPC requests, reactive-grpc captures and propagates the stream cancellation signals up and down the reactive call chain. I have a test that verifies the gRPC context is successfully marshaled across the wire, but I don't have a test for transitive call cancellation.

If it turns out transitive call cancellation doesn't work, can you let me know?

It definitely works, but I started receiving unexpected cancellations for outbound calls that should not be cancelled. They happen in production and are quite unstable, and I haven't managed to provide an example reproducing it. So I'm trying to clarify how this mechanism is implemented internally.

I use Reactor context to propagate data (like auth tokens) from server's current() gRPC context over a Flux, and I fill current() gRPC context with this data again right before I make an outbound call, and it works just fine. But this current() calls are made in different threads and return different contexts, so I cannot figure out how cancellation works in this case :)

Hmm... I'm worried that you are getting unexpected cancellations. Debugging Rx is notoriously difficult at the edges.

I struggle to provide solid example of code failing transitive cancellation, as I don't understand how to validate it, but I'm pretty sure that something like this would do the trick (it's Kotlin):

@GRpcService
class Service(val upstreamStub: Upstream.Stub) : ReactorService.ImplBase() {

    override fun someApiMethod(): Flux<Dto> {
        return Mono.just(1)
            .publishOn(Schedulers.parallel()) // just to reschedule to another thread
            .flatMapMany { upstreamStub.getSubscription() }
    }
}

As ClientCall is created on thread from parallel() thread pool and takes Context.current() as its context, CancellationListeners chain between ServerCall and ClientCall is broken and upstream subscription will not be cancelled. It is also consistent with my observations of this behavior under debugger: ClientCall's CancellationListener is never called.

UPD: turns out, there is also mechanism of subscription cancellation via cancellation of reactive pipeline.
It is done in com.salesforce.reactorgrpc.stub.ReactorClientStreamObserverAndPublisher, and it works even for example above. So, transitive call cancellation works just fine :)

My problem was with calls that were being invoked outside of the server call context (e.g. by @scheduled, or during kafka event processing). In these cases, if Context stored in thread local storage gets cancelled once, all subsequent calls from this thread will be immediately aborted. I solved it by implementing my own version of com.salesforce.reactorgrpc.stub.ClientCalls and wrapping the call with Context.current.fork().call(...)

So, transitive call cancellation works just fine :)

I'm glad you found a solution, @grigorryev.

I've fought with Reactor context propagation before and must confess, it never made sense to me. Is Context.current.fork().call(...) something that should always be in the reactor ClientCalls stub?

Would you be willing to share your experience by updating the Reactor context documentation in the Readme?

Is Context.current.fork().call(...) something that should always be in the reactor ClientCalls stub?

I believe so right now. However, to be sure, I think I need to provide an example of code demonstrating the problem, right? I'll try to do it in few days.

Would you be willing to share your experience by updating the Reactor context documentation in the Readme?

Of course, but I think we should properly validate it first.

Btw, what would be really great to have in ClientCalls is an opportunity to inject some hook that will fill gRPC Context with data from Reactor's context. That was the main reason why we implemented our own version of ClientCalls long time ago.
Our current mechanism of context propagation looks like this:

  1. When we receive a server call, we fill grpc-context with things like authentication and tracing ids in ServerCallInterceptors
  2. We copy grpc-context to reactor-context to ensure that we will have it in every part of our reactive pipeline
  3. Before making an outgoing call we (in our custom ClientCalls) fill the grpc-context again with data from reactor-context
  4. We fill grpc headers in ClientCallInterceptors with data from grpc-context

It would be much easier if there was a mechanism to inject grpc<->reactor context copying logic.

@OlegDokuka (asked me to tag him here)

@grigorryev Please let me know if there is more to do with Reactor context propagation.