Will the result callback called in a threadsafe/coruntine safe way? #322
Closed this issue · 7 comments
Hi, I'm a little worried about the following callback, which might be called in C++ from another thread(s), and writing to asyncio.Queue
ScaleLLM/scalellm/llm_engine.py
Line 180 in ea0f6bf
Thanks to GIL, it is safe to call python callback from c++ in a async way.
you can find more details from pybind11 tests:
https://github.com/pybind/pybind11/blob/master/tests/test_callbacks.cpp#L222
https://github.com/pybind/pybind11/blob/master/tests/test_callbacks.cpp#L222
The test invlove a res.push_back
where res is type of List. In our case, res is type of asyncio.Queue, so asyncio.run_coroutine_threadsafe
might be needed
Could you elaborate the reason behind your suggestion? I am not sure if I get the point. Thanks
Could you elaborate the reason behind your suggestion? I am not sure if I get the point. Thanks
self._queue is a asyncio.Queue, call self._queue.put_nowait in another thread might not be safe.
some fixing like:
async def async_put(self, item: RequestOutput) -> bool:
# if the stream is cancelled, return False
if self._cancelled:
return False
if item.status is not None and not item.status.ok:
await self._queue.put(
ValidationError(item.status.code, item.status.message)
)
return False
# put the item into the queue
await self._queue.put(item)
if item.finished:
await self._queue.put(StopAsyncIteration())
return True
def put(self, item: RequestOutput) -> bool: # << callback
future = asyncio.run_coroutine_threadsafe(self.async_put(item), self.loop)
return future.result()
But im not sure if im right
Thank you! Let me dig more about this potential issue. Good catch!
After further investigation, I believe you are correct. asyncio.Queue
is not thread-safe, so we'll need to use run_coroutine_threadsafe or call_soon_threadsafe to schedule tasks into the event loop from a separate thread. I'm currently working on implementing the fix.
Good catch! Thank you!
looking forward to it