python-trio/purerpc

ClientStubStreamStream uses yield inside a task group

VincentVanlaer opened this issue · 5 comments

See

async def call_aiter(self, message_aiter, metadata):
stream = await self._stream_fn(metadata=metadata)
async with anyio.create_task_group() as task_group:
task_group.start_soon(send_multiple_messages_client, stream, message_aiter)
async for value in stream_to_async_iterator(stream):
yield value

This causes internal corruption in trio.

I think there are two potential solutions:

  • Don't run the sending stream side of the client when yielding. One way to accomplish this is to cancel the sending side whenever a message is received from the server. This does require some form of cancel safety of the sending stream (meaning that if send_message is canceled, the message must not be sent). An implementation would then look something like this:
NO_MESSAGE_WAITING = object()


class ClientStubStreamStream(ClientStub):
    async def call_aiter(self, message_aiter, metadata):
        # If send_multiple is cancelled while trying to send a message, that message will be saved in this variable
        current_message = NO_MESSAGE_WAITING

        async def send_multiple():
            nonlocal current_message

            while True:
                try:
                    # Check if we got cancelled while trying to send a message
                    if current_message is NO_MESSAGE_WAITING:
                        current_message = await message_aiter.__anext__()
                except StopAsyncIteration:
                    return
                await stream.send_message(current_message)  # This must be cancel safe
                current_message = NO_MESSAGE_WAITING

        stream = await self._stream_fn(metadata=metadata)
        async with aclosing(message_aiter) as message_aiter:
            try:
                while True:
                    async with anyio.create_task_group() as task_group:
                        task_group.start_soon(send_multiple)
                        try:
                            value = await stream_to_async_iterator(stream).__anext__()
                        except StopAsyncIteration:
                            return
                        task_group.cancel_scope.cancel()
                    # Outside the task group
                    yield value
            finally:
                stream.close()
  • Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

I prefer the second option as it is significantly cleaner and has less sharp edges regarding cancel safety.

belm0 commented

Thank you. I wonder why my project hasn't hit this. Do you have an example that demonstrates it?

Adding the following to tests/test_errors.py triggers the bug for me:

@purerpc_channel("port")
async def test_errors_purerpc_in_client(greeter_pb2, greeter_grpc, channel):
    async def generator():
        for _ in range(7):
            yield greeter_pb2.HelloRequest()

    stub = greeter_grpc.GreeterStub(channel)

    async for resp in stub.SayHelloToMany(generator()):
        if resp.message == "2":
            raise ValueError("oops")
belm0 commented

Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

purerpc has few users and, by way of no bug reports about calling the stub with an async generator in 5 years, it doesn't seem anyone is using the feature. Therefore, I'm not so concerned about an API change.

However, It's not trivial to use trio-util, because purerpc is built on anyio. (I don't have plans for an anyio variation of trio-util due to lack of sane test support-- namely, missing autojump_clock.)

It seems like the async generator support is only a convenience of the API. Isn't it possible for the API user to achieve the equivalent, given only the standard stub call? If so, I propose removing the feature from the API, considering that this project is in "maintenance mode".

Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.

purerpc has few users and, by way of no bug reports about calling the stub with an async generator in 5 years, it doesn't seem anyone is using the feature. Therefore, I'm not so concerned about an API change.

However, It's not trivial to use trio-util, because purerpc is built on anyio. (I don't have plans for an anyio variation of trio-util due to lack of sane test support-- namely, missing autojump_clock.)

I don't think this needs a full wrapper like trio-util provides. I can't test it now, but the following should work:

async def stream_stream_sender(stream, message_aiter):
    async with aclosing(message_aiter) as message_aiter:
        async for message in message_aiter:
            await stream.send_message(message)


async def stream_stream_receiver(stream):
    stream = stream_to_async_iterator(stream)

    async for message in stream:
        yield message


@asynccontextmanager
async def stream_stream(self, message_aiter, metadata):
    stream = await self._stream_fn(metadata=metadata)

    async with anyio.create_task_group() as task_group:
        task_group.start_soon(stream_stream_sender, stream, message_aiter)
        yield stream_stream_receiver(stream)

You would then need to use it as

async with some_stub.StreamStreamCall(some_generator()) as resp_gen:
    async for resp in resp_gen:
        ...

It seems like the async generator support is only a convenience of the API. Isn't it possible for the API user to achieve the equivalent, given only the standard stub call? If so, I propose removing the feature from the API, considering that this project is in "maintenance mode".

If I understand it correctly, the generators are the only way to use stream-stream calls in the current purerpc api. You'd have to replicate the logic in

async def rpc(self, method_name: str, request_type, response_type, metadata=None):
message_type = request_type.DESCRIPTOR.full_name
if metadata is None:
metadata = ()
stream = await self.channel._grpc_socket.start_request("http", self.service_name,
method_name, message_type,
"{}:{}".format(self.channel._host,
self.channel._port),
custom_metadata=metadata)
stream.expect_message_type(response_type)
return stream
def get_method_stub(self, method_name: str, signature: RPCSignature):
stream_fn = functools.partial(self.rpc, method_name, signature.request_type,
signature.response_type)
if signature.cardinality == Cardinality.STREAM_STREAM:
return ClientStubStreamStream(stream_fn)
elif signature.cardinality == Cardinality.UNARY_STREAM:
return ClientStubUnaryStream(stream_fn)
elif signature.cardinality == Cardinality.STREAM_UNARY:
return ClientStubStreamUnary(stream_fn)
else:
return ClientStubUnaryUnary(stream_fn)
with a custom ClientStubStreamStream implementation.

About the maintenance mode, I was planning to submit some PRs with some improvements (adding typing information to the stubs, unix socket support, proper handling of connection failures). I could let those changes live in a fork, but that seems needless fragmentation to me. Unless I missed something in my search, I don't think there is an alternative to using grpc with trio? I'm happy to help maintain this project a bit, but would still need someone to review the PRs (given the don't merge your own PR rules).

belm0 commented

the generators are the only way to use stream-stream calls in the current purerpc api

I don't think this needs a full wrapper like trio-util provides. I can't test it now, but the following should work:

I see. If you have time to work on a PR, I'm happy to review. It would be important to add unit test coverage for the new stub method.

About other improvements, I'm happy to review, but I'd like to consider carefully whether the changes will add to the maintenance burden. In other projects, I've found even things like adding type annotations to be a burden, because of added dependencies on projects with alpha stability, and having to battle the type system with every future PR.