wyfo/pyo3-async

How to use allow_threads with existing stream to avoid dead lock?

Opened this issue · 2 comments

Recently, I encountered a same issue mentioned here related to dead lock PyO3/pyo3#3540 (comment)

Here I use the par_stream crate to provide a parallel async stream implementation and wrap that with pyo3-async. The par_map returns a flume RecvStream which internally has a lock for the queue.

#[pymodule]
fn snake(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(parallel_stream, m)?)?;
    Ok(())
}

fn tokio() -> &'static tokio::runtime::Runtime {
    use std::sync::OnceLock;
    static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
    RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap())
}

fn fib(n: u64) -> u64 {
    match n {
        0 => 0,
        1 => 1,
        _ => fib(n - 1) + fib(n - 2),
    }
}

fn map() -> impl Stream<Item = PyResult<u64>> + Send {
    let _guard = tokio().enter();
    futures::stream::iter(0..100).par_map(None, |i| move || Ok(fib(i)))
}

#[pyfunction]
fn parallel_stream() -> pyo3_async::asyncio::AsyncGenerator {
    pyo3_async::asyncio::AsyncGenerator::from_stream(map())
}

Now it has a dead lock and hangs in Python.

import snake
import asyncio

async def fn():
    async for i in snake.parallel_stream():
        print(i)

asyncio.run(fn())

微信图片_20231209225251
微信图片_20231209225245

I have no idea where to add allow_thread here, just adding to parallel_stream seems not work.

BTW, if I change to use a tokio mpsc channel like this:

fn map() -> impl Stream<Item = PyResult<u64>> + Send {
    let (tx, mut rx) = tokio::sync::mpsc::channel(100);
    tokio().spawn(async move {
        futures::stream::iter(0..1000)
            .par_map(
                ParParams {
                    num_workers: 32,
                    buf_size: Some(10),
                },
                |_| move || Ok(fib(45)),
            )
            .for_each(|item| {
                let tx = tx.clone();
                async move {
                    let _ = tx.send(item).await;
                }
            })
            .await;
    });

    stream! {
        while let Some(item) = rx.recv().await {
            yield item;
        }
    }

It works well and no dead lock occurs. I'm not quite sure the essential difference.

Have you tried pyo3_async::asyncio::AsyncGenerator::from_stream(AllowThreads(map()))?

I've just released the version 0.3.2, because AllowThreads was bugged. Anyway, as you know, the crate is deprecated, so I invite you to use the PyO3 branches I mentioned in #2. I know there is no Stream support planned for now in PyO3, but I should add it soon in a new PR.

It works well and no dead lock occurs. I'm not quite sure the essential difference.

As written in the stack traces you shown, flume uses a mutex, while tokio::sync::mpsc don't.
Your deadlock seems to be the following:

  • Thread T1 acquires mutex M1 (asyncio thread run with the GIL)
  • Thread T2 acquires mutex M2 (background fib task is done and send its result)
  • Thread T2 tries to acquire mutex M1 and wait (background task try to wakeup the asyncio thread)
  • Thread T1 tries to acquire mutex M2 and wait (maybe asyncio thread try to pop a previous task result from the queue).
  • deadlock

I've looked at flume implementation, and indeed, the channel lock is held for the wakeup (I don't understand why honestly).
By using a intermediate tokio task with a mpsc, the flume channel doesn't wakeup the async generator __next__ coroutine but the tokio task, so it doesn't need to acquire the GIL.

Your use case is interesting though, because you're the second one I will to make a feedback on the async support in PyO3, and both of us encountered a deadlock. So it may indicate that the issue may be more common as I expected and should require a dedicated documentation. I will talk with PyO3 maintainers about that.

Thanks for your detailed reply. I will have a try with PyO3 master. One question is how is the performance? I don't which is better:

  1. use AllowThreads with stream which may have dead lock with GIL.
  2. Add one level of indirectness like my use of mpsc channel.

Since I would like to let Python user randomly combine the exposed stream algorithm, I need to make sure the final one will always avoid dead lock.