salesforce/reactive-grpc

Indeterminate cancel or error when streaming bidirectionally

stevenll opened this issue · 0 comments

When integration testing a GRPC service that contains a bidirectional streaming method, we have observed that, in a test where we expect the server to return an error, the server will sporadically emit a server-side CANCEL of the request instead.

We have deduced that this sporadic behavior is related to a thread that AbstractServerStreamObserverAndPublisher starts when it receives a cancel signal from a subscriber, causing it to cancel the client stream after 100ms. Apparently, if the subscriber is running on another thread and raises an error, the error signal is in a race with the cancel thread to be the first to emit a response to the client.

We can recreate the faulty behavior consistently if we deliberately introduce a delay into the subscriber when it emits an error. Let's say we have a simple GRPC service:

message T {
  string data = 1;
}
service TestService {
  rpc bidirectionalMethod(stream T) returns (stream T);
}

We define the TestService implementation to introduce a 500ms delay between emitting a cancel signal and an error signal, as shown below.

@Slf4j
@GrpcService
public class TestService extends ReactorTestServiceGrpc.TestServiceImplBase {

 @Override
  public Flux<T> bidirectionalMethod(Flux<T> request) {
    return request
        .log()
        .subscribeOn(Schedulers.boundedElastic())
        .map(T::getData)
        .map(
            data -> {
              if (data.equals("error")) {
                throwError();
              }
              return data + "!";
            })
        .map(data -> T.newBuilder().setData(data).build())
        .doOnError(throwable -> Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS))
        ;
  }

  private void throwError() {
    throw new RuntimeException("error error error");
  }
}

This consistently causes the server to respond with "io.grpc.StatusRuntimeException: CANCELLED: Server canceled request" rather than the desired error.

Would you please advise us as to how we can change this code to ensure that the service always responds to the client with the intended error in spite of any delay in excess of 100ms that might occur on the subscriber thread? We don't seem to have any direct way to control the cancellation timer.