MagicStack/uvloop

pipes from subprocess_exec do not have extra info 'pipe' (incompatible with vanilla asyncio)

jensbjorgensen opened this issue · 1 comments

  • uvloop version:
    0.19.0

  • Python version:
    3.10.12

  • Platform:
    linux

  • Can you reproduce the bug with PYTHONASYNCIODEBUG in env?:
    yes

  • Does uvloop behave differently from vanilla asyncio? How?:
    Yes, this is the point of this issue report. So after you get your process transport from subprocess_exec(...) you then can use get_pipe_transport() to access the transport associated with stdin. With vanilla asyncio you can then call get_extra_info('pipe') on that transport to access the pipe directly. With uvloop get_extra_info('pipe') on the same transport returns None.

I can see in the source that when a pipe transport is created via loop.create_write_pipe(...) we have:

        transp = WriteUnixTransport.new(self, proto, None, waiter)
        transp._add_extra_info('pipe', pipe)

However no transp._add_extra_info(...) is done on the WriteUnixTransport that is created inside the UVProcessTransport code. There are ways to work around this (don't depend on getting access to that pipe) however this works fine in the asyncio loop implementation. I imagine it can be done without too much pain by wrapping the raw file descriptor created in the process transport code however I'm not super fluent in cython so I didn't attempt a patch.

import asyncio
import typing
import uvloop

class Test(asyncio.SubprocessProtocol):
    def __init__(self, asyncio_impl):
        self._exited = asyncio.Event()
        self._loop = asyncio_impl.new_event_loop()
        self._task = self._loop.create_task(self.test_subprocess_pipe_extra_info())
        self._task.add_done_callback(self._task_done)
        self._ok = True

    def _task_done(self, task: asyncio.Task) -> None:
        if not task.cancelled() and task.exception():
            self._ok = False
            task.print_stack()
        self._loop.stop()
        
    def run(self) -> bool:
        self._loop.run_forever()
        return self._ok
        
    async def test_subprocess_pipe_extra_info(self) -> None:
        proc_trans, _ = await self._loop.subprocess_exec(lambda : self, 'cat', stdin=asyncio.subprocess.PIPE)
        proc_stdin = proc_trans.get_pipe_transport(0)
        pipe = proc_stdin.get_extra_info('pipe')
        pipe.write('hello, test'.encode())
        pipe.close()
        await self._exited.wait()
        self._ok = True

    def pipe_data_received(self, fd: int, data: bytes) -> None:
        print(f"pipe_data_received: {fd} {repr(data)}")

    def pipe_connection_lost(self, fd: int, exc: typing.Optional[Exception]) -> None:
        pass

    def process_exited(self):
        self._exited.set()

for impl_s in ('asyncio', 'uvloop'):
    if Test(globals()[impl_s]).run():
        print(f'{impl_s} works as expected')
    else:
        print(f'{impl_s} is broken it seems (see stack trace)')