ClientStubStreamStream uses yield inside a task group
VincentVanlaer opened this issue · 5 comments
See
purerpc/src/purerpc/wrappers.py
Lines 110 to 115 in 9b70109
This causes internal corruption in trio.
I think there are two potential solutions:
- Don't run the sending stream side of the client when yielding. One way to accomplish this is to cancel the sending side whenever a message is received from the server. This does require some form of cancel safety of the sending stream (meaning that if
send_message
is canceled, the message must not be sent). An implementation would then look something like this:
NO_MESSAGE_WAITING = object()
class ClientStubStreamStream(ClientStub):
async def call_aiter(self, message_aiter, metadata):
# If send_multiple is cancelled while trying to send a message, that message will be saved in this variable
current_message = NO_MESSAGE_WAITING
async def send_multiple():
nonlocal current_message
while True:
try:
# Check if we got cancelled while trying to send a message
if current_message is NO_MESSAGE_WAITING:
current_message = await message_aiter.__anext__()
except StopAsyncIteration:
return
await stream.send_message(current_message) # This must be cancel safe
current_message = NO_MESSAGE_WAITING
stream = await self._stream_fn(metadata=metadata)
async with aclosing(message_aiter) as message_aiter:
try:
while True:
async with anyio.create_task_group() as task_group:
task_group.start_soon(send_multiple)
try:
value = await stream_to_async_iterator(stream).__anext__()
except StopAsyncIteration:
return
task_group.cancel_scope.cancel()
# Outside the task group
yield value
finally:
stream.close()
- Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.
I prefer the second option as it is significantly cleaner and has less sharp edges regarding cancel safety.
Thank you. I wonder why my project hasn't hit this. Do you have an example that demonstrates it?
Adding the following to tests/test_errors.py
triggers the bug for me:
@purerpc_channel("port")
async def test_errors_purerpc_in_client(greeter_pb2, greeter_grpc, channel):
async def generator():
for _ in range(7):
yield greeter_pb2.HelloRequest()
stub = greeter_grpc.GreeterStub(channel)
async for resp in stub.SayHelloToMany(generator()):
if resp.message == "2":
raise ValueError("oops")
Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.
purerpc has few users and, by way of no bug reports about calling the stub with an async generator in 5 years, it doesn't seem anyone is using the feature. Therefore, I'm not so concerned about an API change.
However, It's not trivial to use trio-util, because purerpc is built on anyio. (I don't have plans for an anyio variation of trio-util due to lack of sane test support-- namely, missing autojump_clock
.)
It seems like the async generator support is only a convenience of the API. Isn't it possible for the API user to achieve the equivalent, given only the standard stub call? If so, I propose removing the feature from the API, considering that this project is in "maintenance mode".
Wrap the generator in a context manager, like trio-utils provides. This would be a significant API change.
purerpc has few users and, by way of no bug reports about calling the stub with an async generator in 5 years, it doesn't seem anyone is using the feature. Therefore, I'm not so concerned about an API change.
However, It's not trivial to use trio-util, because purerpc is built on anyio. (I don't have plans for an anyio variation of trio-util due to lack of sane test support-- namely, missing
autojump_clock
.)
I don't think this needs a full wrapper like trio-util provides. I can't test it now, but the following should work:
async def stream_stream_sender(stream, message_aiter):
async with aclosing(message_aiter) as message_aiter:
async for message in message_aiter:
await stream.send_message(message)
async def stream_stream_receiver(stream):
stream = stream_to_async_iterator(stream)
async for message in stream:
yield message
@asynccontextmanager
async def stream_stream(self, message_aiter, metadata):
stream = await self._stream_fn(metadata=metadata)
async with anyio.create_task_group() as task_group:
task_group.start_soon(stream_stream_sender, stream, message_aiter)
yield stream_stream_receiver(stream)
You would then need to use it as
async with some_stub.StreamStreamCall(some_generator()) as resp_gen:
async for resp in resp_gen:
...
It seems like the async generator support is only a convenience of the API. Isn't it possible for the API user to achieve the equivalent, given only the standard stub call? If so, I propose removing the feature from the API, considering that this project is in "maintenance mode".
If I understand it correctly, the generators are the only way to use stream-stream calls in the current purerpc api. You'd have to replicate the logic in
Lines 44 to 66 in 9e96b7d
About the maintenance mode, I was planning to submit some PRs with some improvements (adding typing information to the stubs, unix socket support, proper handling of connection failures). I could let those changes live in a fork, but that seems needless fragmentation to me. Unless I missed something in my search, I don't think there is an alternative to using grpc with trio? I'm happy to help maintain this project a bit, but would still need someone to review the PRs (given the don't merge your own PR rules).
the generators are the only way to use stream-stream calls in the current purerpc api
I don't think this needs a full wrapper like trio-util provides. I can't test it now, but the following should work:
I see. If you have time to work on a PR, I'm happy to review. It would be important to add unit test coverage for the new stub method.
About other improvements, I'm happy to review, but I'd like to consider carefully whether the changes will add to the maintenance burden. In other projects, I've found even things like adding type annotations to be a burden, because of added dependencies on projects with alpha stability, and having to battle the type system with every future PR.