How to use allow_threads with existing stream to avoid dead lock?
Opened this issue · 2 comments
Recently, I encountered a same issue mentioned here related to dead lock PyO3/pyo3#3540 (comment)
Here I use the par_stream
crate to provide a parallel async stream implementation and wrap that with pyo3-async
. The par_map
returns a flume RecvStream
which internally has a lock for the queue
.
#[pymodule]
fn snake(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(parallel_stream, m)?)?;
Ok(())
}
fn tokio() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap())
}
fn fib(n: u64) -> u64 {
match n {
0 => 0,
1 => 1,
_ => fib(n - 1) + fib(n - 2),
}
}
fn map() -> impl Stream<Item = PyResult<u64>> + Send {
let _guard = tokio().enter();
futures::stream::iter(0..100).par_map(None, |i| move || Ok(fib(i)))
}
#[pyfunction]
fn parallel_stream() -> pyo3_async::asyncio::AsyncGenerator {
pyo3_async::asyncio::AsyncGenerator::from_stream(map())
}
Now it has a dead lock and hangs in Python.
import snake
import asyncio
async def fn():
async for i in snake.parallel_stream():
print(i)
asyncio.run(fn())
I have no idea where to add allow_thread
here, just adding to parallel_stream
seems not work.
BTW, if I change to use a tokio mpsc channel
like this:
fn map() -> impl Stream<Item = PyResult<u64>> + Send {
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
tokio().spawn(async move {
futures::stream::iter(0..1000)
.par_map(
ParParams {
num_workers: 32,
buf_size: Some(10),
},
|_| move || Ok(fib(45)),
)
.for_each(|item| {
let tx = tx.clone();
async move {
let _ = tx.send(item).await;
}
})
.await;
});
stream! {
while let Some(item) = rx.recv().await {
yield item;
}
}
It works well and no dead lock occurs. I'm not quite sure the essential difference.
Have you tried pyo3_async::asyncio::AsyncGenerator::from_stream(AllowThreads(map()))
?
I've just released the version 0.3.2, because AllowThreads
was bugged. Anyway, as you know, the crate is deprecated, so I invite you to use the PyO3 branches I mentioned in #2. I know there is no Stream
support planned for now in PyO3, but I should add it soon in a new PR.
It works well and no dead lock occurs. I'm not quite sure the essential difference.
As written in the stack traces you shown, flume
uses a mutex, while tokio::sync::mpsc
don't.
Your deadlock seems to be the following:
- Thread T1 acquires mutex M1 (asyncio thread run with the GIL)
- Thread T2 acquires mutex M2 (background fib task is done and send its result)
- Thread T2 tries to acquire mutex M1 and wait (background task try to wakeup the asyncio thread)
- Thread T1 tries to acquire mutex M2 and wait (maybe asyncio thread try to pop a previous task result from the queue).
- deadlock
I've looked at flume implementation, and indeed, the channel lock is held for the wakeup (I don't understand why honestly).
By using a intermediate tokio task with a mpsc, the flume
channel doesn't wakeup the async generator __next__
coroutine but the tokio task, so it doesn't need to acquire the GIL.
Your use case is interesting though, because you're the second one I will to make a feedback on the async support in PyO3, and both of us encountered a deadlock. So it may indicate that the issue may be more common as I expected and should require a dedicated documentation. I will talk with PyO3 maintainers about that.
Thanks for your detailed reply. I will have a try with PyO3 master. One question is how is the performance? I don't which is better:
- use
AllowThreads
with stream which may have dead lock with GIL. - Add one level of indirectness like my use of
mpsc channel
.
Since I would like to let Python user randomly combine the exposed stream algorithm, I need to make sure the final one will always avoid dead lock.