grpc/grpc-dotnet

exception when writing with bidirectional

TheCamel opened this issue · 4 comments

What version of gRPC and what language are you using?

on server side = Grpc.aspnet.core 2.66.0
on client side Grpc.protobuf 3.28.3 + and the dll from the server that is shared

What operating system (Linux, Windows,...) and version?

Windows Server Version 10.0.20348 Build 20348
CORE 9

What did you do?

Having a bi directional stream defined in the proto file, the client start it and reads the server requests

rpc RequestOneWay (google.protobuf.Empty) returns (stream RequestInfosGrpc);

the server write like below

public override async Task RequestTwoWay(IAsyncStreamReader<ClientInfosGrpc> requestStream, IServerStreamWriter<RequestInfosGrpc> responseStream, ServerCallContext context)
{
	try
	{
		bool finished = false;

		// Read incoming messages in a background task
		ClientInfosGrpc? lastMessageReceived = null;
		var readTask = Task.Run(async () =>
		{
			await foreach (var message in requestStream.ReadAllAsync())
			{
				lastMessageReceived = message;
			}
			finished = true;
		});

		while (!finished)
		{
			if (_state.HasClient)
			{
				GrpcState? grpc = _state.GetGrpcState(context.RequestHeaders.GetValue("client-id")!);
				if (grpc != null)
				{
					_state.TakeAll(grpc).ForEach(async x =>
					{
                                                await responseStream.WriteAsync(x.Transpose());
						_tracer.Debug("Request {@x} is sent in grpc_bi", x);
					});
				}
			}
			Thread.Sleep(2000);
		}
		await readTask;
	}
	catch (Exception err)
	{
		_tracer.Error(err);
	}
}

on client side

_asyncDuplex = _client.RequestTwoWay(GrpcClientStateHolder.Instance.Headers);

var responseReaderTask = Task.Run(async Task () =>
{
	if (_asyncDuplex is AsyncDuplexStreamingCall<ClientInfosGrpc, RequestInfosGrpc> duplex)
	{
		while (await duplex.ResponseStream.MoveNext(CancellationToken.None))
		{
			var requestFromSrv = duplex.ResponseStream.Current;

			_tracer.Information("Request from master {@requestFromSrv} is queued in", requestFromSrv);
			_requestQueue?.Enqueue(requestFromSrv.Transpose());
		}
	}
});

when the client is stopped, we close the stream

if (_asyncDuplex is AsyncDuplexStreamingCall<ClientInfosGrpc, RequestInfosGrpc> duplex)
{
	await duplex.RequestStream.CompleteAsync();
	//await responseReaderTask;
}
_client?.UnSubscribe(new ClientInfosGrpc(), GrpcClientStateHolder.Instance.Headers);

Event logs

Exception Info: System.InvalidOperationException: Can't write the message because the previous write is in progress.
at Grpc.AspNetCore.Server.Internal.HttpContextStreamWriter`1.WriteCoreAsync(TResponse message, CancellationToken cancellationToken)
at Process.Grpc.MasterGrpcService.<>c__DisplayClass7_0.<b__1>d.MoveNext() in C:\CONDUENT\SSUP\SSUPv5_master\Source\Process\Process.Grpc\MasterGrpcService.cs:line 81
--- End of stack trace from previous location ---
at System.Threading.Tasks.Task.<>c.b__128_1(Object state)
at System.Threading.QueueUserWorkItemCallback.Execute()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

The exception message explains the problem. You're either not awaiting a previous write, or you're trying to write to the stream from multiple threads. You can't write multiple messages at the same time.

For sure, but analysing my code (mainly copied from internet and from your samples repo) does not explain me how possible it is...? No thread, one server, 2 clients, one stream for each...

_state.TakeAll(grpc).ForEach(async x =>

nothing awaits this

ho, you mean that the fact the delegate X is async, then the foreach becomes paralell...
ok, thanks a lot !
I would have not found that ;-)