vmagamedov/grpclib

Response Streaming - Client Blocked

ozangungor12 opened this issue · 7 comments

Hi,
I am trying to implement a response streaming RPC application but couldn't figure out one thing.
I want to continuously stream camera frames from my servicer so there's a infinite loop there, something like:


request = await stream.recv_message()
while True:
    await stream.send_message(response)

However, this implementation blocks my client, I guess since the stream never ends I can't receive anything on the client side.
What is the correct way of continuously streaming and reading the stream at the same time?#
Thank you

You mean async for msg in stream? That require a method __aiter__ __anext__. The Stream class just implement that.

When I do this in the client, the loop is never executed, that's what I mean.

response = await stub.UnaryStream(request)
async for reply in response:

I don't fully understand your problem.

You want to wait for new messages on the client-side and wait for some other events at the same time?
You want to terminate an infinite call at some point in time?
You're implementing stream-stream method type or unary-stream?

Please explain in more details what you want to do on the client-side.

I would like to stream live frames captured from the camera. I chose Unary-Stream, since I only need to send one single request (empty message) from the client at the beginning to start the camera, and then keep sending images from the server to the client.
So client sends a requests and server starts sending images.
In order to get new frames from the camera I use an infinite loop. So the moment I receive a request, I capture frames from the camera and send it.

request = await stream.recv_message()
while True:
    # capture frame from the camera
    ...
    # send frame
    await stream.send_message(response)

In the client side I want to receive the images and process them as they are coming by.
I send the request from the client like this, but I nothing is being executed in the client side after this line, for example response is not printed.

response = await stub.UnaryStream(request)
print(response)

I hope that was a bit more clear, thank you for your time.

You probably have to use .open() context manager:

This is the only way to work with large or infinite streams on the client-side. Regular stub methods like await stub.UnaryStream(request) are designed for simple cases, they wait until request finishes and return simple list instead of async iterator.

stub.method.open() context manager returns a Stream which is similar to the Stream on the server-side.

Thank you!