[Reactor-grpc] Got problem using many to one
rockiee281 opened this issue · 1 comments
rockiee281 commented
I'm going to create service do some action as bellow:
- receive audio stream and cache to local file
- decode the audio data to PCM file when got all the audio data
- transform audio data to text
- return the text result
Service:
rpc InvokeWithUnaryResult(stream ServingRequest) returns (ServingResponse);
Server side:
@Override
public Mono<ServingResponse> invokeWithUnaryResult(Flux<ServingRequest> request) {
return request
.doOnNext(k -> MDC.put(MdcKeys.TASK_ID, k.getTaskId()))
.map(k -> handler.collectAudio(k))
.last()
.flatMap(v -> handler.decode(v))
.flatMap(k -> handler.recognize(k))
.doOnNext(k -> MDC.clear());
}
Client side:
long startTime = System.currentTimeMillis();
ReactorServingStub stub = ReactorServingGrpc.newReactorStub(channel);
// build flux request from local file
ServingResponse response = stub.invokeWithUnaryResult(buildRequest(testFile))
.block();
log.info("Got response {}, take {} mills", response, System.currentTimeMillis() - startTime);
I create a many to one service using reactive-grpc and got errors bellow. Seems http2 stream was closed before all the request data accepted by server side when using Flux.last()
Http2Exception$StreamException: Received DATA frame for an unknown stream 3
at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
I find TckService
demo create a many to one service bellow, does reactor-grpc works too ?
@Override
public Single<Message> manyToOne(Flowable<Message> request) {
return request.hide().map(this::maybeExplode).last(Message.newBuilder().setNumber(0).build()).hide();
}
Thanks for any suggest !
rockiee281 commented
Sorry but I forget to log runtime exception