salesforce/reactive-grpc

[Reactor-grpc] Got problem using many to one

rockiee281 opened this issue · 1 comments

I'm going to create service do some action as bellow:

  1. receive audio stream and cache to local file
  2. decode the audio data to PCM file when got all the audio data
  3. transform audio data to text
  4. return the text result

Service:

    rpc InvokeWithUnaryResult(stream ServingRequest) returns (ServingResponse);

Server side:

 @Override
    public Mono<ServingResponse> invokeWithUnaryResult(Flux<ServingRequest> request) {
      
         return request
            .doOnNext(k -> MDC.put(MdcKeys.TASK_ID, k.getTaskId()))
            .map(k -> handler.collectAudio(k))
            .last()
            .flatMap(v -> handler.decode(v))
            .flatMap(k -> handler.recognize(k))
            .doOnNext(k -> MDC.clear());
    }

Client side:

       long startTime = System.currentTimeMillis();

       ReactorServingStub stub = ReactorServingGrpc.newReactorStub(channel);

       // build flux request from local file 
       ServingResponse response = stub.invokeWithUnaryResult(buildRequest(testFile))
            .block();

        log.info("Got response {}, take {} mills", response, System.currentTimeMillis() - startTime);

I create a many to one service using reactive-grpc and got errors bellow. Seems http2 stream was closed before all the request data accepted by server side when using Flux.last()

Http2Exception$StreamException: Received DATA frame for an unknown stream 3
	at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
	at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

I find TckService demo create a many to one service bellow, does reactor-grpc works too ?

 @Override
    public Single<Message> manyToOne(Flowable<Message> request) {
        return request.hide().map(this::maybeExplode).last(Message.newBuilder().setNumber(0).build()).hide();
    }

Thanks for any suggest !

Sorry but I forget to log runtime exception