vectorch-ai/ScaleLLM

Will the result callback called in a threadsafe/coruntine safe way? #322

Closed this issue · 7 comments

Hi, I'm a little worried about the following callback, which might be called in C++ from another thread(s), and writing to asyncio.Queue

def callback(output: RequestOutput) -> bool:

Thanks to GIL, it is safe to call python callback from c++ in a async way.
you can find more details from pybind11 tests:
https://github.com/pybind/pybind11/blob/master/tests/test_callbacks.cpp#L222

https://github.com/pybind/pybind11/blob/master/tests/test_callbacks.cpp#L222

The test invlove a res.push_back where res is type of List. In our case, res is type of asyncio.Queue, so asyncio.run_coroutine_threadsafe might be needed

Could you elaborate the reason behind your suggestion? I am not sure if I get the point. Thanks

Could you elaborate the reason behind your suggestion? I am not sure if I get the point. Thanks

self._queue is a asyncio.Queue, call self._queue.put_nowait in another thread might not be safe.

some fixing like:

async def async_put(self, item: RequestOutput) -> bool:
        # if the stream is cancelled, return False
        if self._cancelled:
            return False

        if item.status is not None and not item.status.ok:
            await self._queue.put(
                ValidationError(item.status.code, item.status.message)
            )
            return False

        # put the item into the queue
        await self._queue.put(item)
        if item.finished:
            await self._queue.put(StopAsyncIteration())
        return True

def put(self, item: RequestOutput) -> bool:  # << callback
        future = asyncio.run_coroutine_threadsafe(self.async_put(item), self.loop)
        return future.result()

But im not sure if im right

Thank you! Let me dig more about this potential issue. Good catch!

After further investigation, I believe you are correct. asyncio.Queue is not thread-safe, so we'll need to use run_coroutine_threadsafe or call_soon_threadsafe to schedule tasks into the event loop from a separate thread. I'm currently working on implementing the fix.

Good catch! Thank you!

looking forward to it