vmagamedov/grpclib

stream-stream rpc method send\receive in any order

antonio-antuan opened this issue · 14 comments

GRPC docs says:

Bidirectional streaming RPCs where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. 

Looks like on client-side I can send message to a stream, then read from the stream, then send message, then read and so on....
But current grpclib implementation does not allow it. Is it possible to change that? :)

Can you show an example of what is not possible in grpclib?

Sure. Here it is:

from grpclib.client import Channel
from provider_grpc import SubscriptionProviderStub
from provider_pb2 import Request

channel = Channel('0.0.0.0', port=7011)
stub = SubscriptionProviderStub(channel)


async def recv_without_end():
    async with stub.Subscribe.open() as stream:
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )

        await stream.recv_message()  # grpclib.exceptions.ProtocolError: Outgoing stream was not ended


async def send_after_end():
    async with stub.SubscribeToEvents.open() as stream:
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )
        await stream.end()
        await stream.recv_message()
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )  # grpclib.exceptions.ProtocolError: Stream is ended

proto:

syntax = 'proto3';

service SubscriptionProvider {
    rpc Subscribe (stream Request) returns (stream Data) {}
}

message Request {
    string user_id = 1;
    repeated string obj_ids = 2;
}


message Data {
    repeated string objects = 1;
}

Actually, the code is more complex. Here is what I want to do:

  1. open channel and stream in some kind of proxy-server (asyncio + websockets)
  2. send initial data to stream when websocket channel is opened and first data received from any clieny
  3. send the first data to grpc-server
  4. create two asyncio.Task objects. The first listen to websocket messages and send them to grpc-server. The second listen to grpc-steam and send received data to websocket-client. Something like this:
async def from_ws_to_grpc():
    while True:
        try:
            msg = json.loads(await websocket.recv())
            await stream.send_message(
                Request(user_id=msg['user_id'], obj_ids=msg['obj_ids'])
            )
        finally:
            return


async def from_grpc_to_ws():
    while True:
        try:
            subscription_data = await stream.recv_message()
            await websocket.send(json.dumps([json.loads(ev) for ev in subscription_data.objects]))
        finally:
            return


await from_ws_to_grpc()
_, pending = await asyncio.wait(
    [from_grpc_to_ws(), from_ws_to_grpc()],    
    return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
    task.cancel()

grpclib currently assumes that you end() stream explicitly before exiting from the stub.Subscribe.open() context-manager:

async with stub.Subscribe.open() as stream:
    ...
    # exchange messages
    ...
    await asyncio.wait(tasks)
    await stream.end()  # !!!

Maybe this will be improved in the future, but it is possible to make stream-stream requests with grpclib right now.

Outgoing stream was not ended error raised not when you call recv_message() but when you exit from the context-manager.

I understand it. But my approach is another. Here is working example of bidirectional streaming with purerpc:

async def main():
    async with purerpc.insecure_channel("channel", 81) as channel:
        stub = ServiceStub(channel)
        stream = await stub.Call()
        await stream.send_message(Request(
            params='{"object_ids": ["cc76b6ea-dc92-46c4-ae00-fa7956c0eda8"]}'))
        msg = await stream.receive_message()
        print(msg)
        msg = await stream.receive_message()
        print(msg)
        await stream.send_message(Request(
            params={"object_ids": ["19a6a096-3fa0-47e8-bd05-836a6d33ffb1"]}))
        msg = await stream.receive_message()
        print(msg)
        msg = await stream.receive_message()
        print(msg)

Can you show a traceback you have when you try the same approach with grpclib? And can you show the same working example but with grpclib and with my tip about adding await stream.end() in the end?

I do not want to mark sending as ended. I want to send and receiive infinitly. Send initial data, waiting for receiving. Then sending again and waiting again. And again and again. Then, maybe, receiving data multiple times, then send and receive again.
If I call stream.end() then I can't send more data to stream.

In my example I placed await stream.end() after await asyncio.wait(tasks), that wait() may block as long as you wish, waiting indefinitely for from_ws_to_grpc and from_grpc_to_ws to finish. end() is only needed to properly close the stream, just before an exit from the .open() context-manager.

await stream.recv_message() before any stream.send_message(req) is throwing an exception. Is it intentional?

@malarinv call await stream.send_request() before any await stream.send_message(...) or await stream.recv_message() and you'll be fine. Yes, this is intentional.

For some reason generated documentation at https://grpclib.readthedocs.io/en/latest/client.html is incomplete. This method is documented, but I don't see any Stream's method in the docs.

For some reason generated documentation at https://grpclib.readthedocs.io/en/latest/client.html is incomplete. This method is documented, but I don't see any Stream's method in the docs.

fixed: https://grpclib.readthedocs.io/en/latest/client.html#grpclib.client.Stream.send_request

thanks. sorry, missed your last message

@AcLr Can you show a working example of bidirectional stream client? If you succeed.

There was a mistake in the docs: 2e9d222