bytedance/monoio

`JoinHandle.await` in another thread has chance to block forever.

drmingdrmer opened this issue · 3 comments

Version
monoio v0.2.2

Platform

uname -a
Darwin drdrxps-MBP 22.3.0 Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000 arm64

ulimit -l
unlimited

Description
Calling JoinHandle.await in another thread(and also in another monoio runtime), it has odd to return the expected value and odd to block forever.

The code to reproduce this issue is in the following repo:
https://github.com/drmingdrmer/t-monoio/blob/ec306d33e9f581eacd06c51034de7d0d4698f9dd/src/bin/cross-rt-join.rs#L1

It is quite simple. Create a monoio runtime to run a future, send the JoinHandle to another thread and await it in another monoio runtime:

use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    std::thread::spawn(move || {
        let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
            .enable_all()
            .build()
            .expect("Failed building the Runtime");

        rt.block_on(async move {
            let fu = async move {
                // println!("inner-fu: ready");
                1u64
            };

            let handle = monoio::spawn(fu);
            tx.send(handle).unwrap();

            monoio::time::sleep(Duration::from_millis(1_000)).await;
            println!("outer-fu: after sending handle and sleep");
        });
    });

    let handle = rx.recv().unwrap();

    let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
        .enable_all()
        .build()
        .expect("Failed building the Runtime");

    rt.block_on(async move {
        println!("joiner: before handle.await");
        let got = handle.await;
        println!("joiner: after handle.await: {:?}", got);
    });
}

Clone it and run it with:

cargo run --bin cross-rt-join

Sometimes it blocks forever:

joiner: before handle.await
outer-fu: after sending handle and sleep

Sometime it returns the expected value:

joiner: before handle.await
joiner: after handle.await: 1

The main issue here is that you run it on mac os, so you do not have any guarantee on where the task is going to be ran (on which core it's going to be executed). That why you have an erratic behavior.
You could run it on linux and pin specific core to those thread and you would be able to have it working each time or failing each time.

For this part, you can activate the sync feature flag on monoio and you'll be able to do cross-thread async communication documented here (but it would be better in term of performance if you avoided cross-thread async communication as it's way more efficient when you avoid it)

@Miaxos
Thank you so much! it looks like the sync feature flag is the cause. With sync enabled, the program outputs consistent result.

The doc does not provide any clue about how to use JoinHandle correctly in another thread.
Is it possible to remove the Send and Sync bound from JoinHandle if sync feature flag is not enabled?

Another issue is that if the task panics, the JoinHandle.await in another thread blocks forever.

Hey @drmingdrmer I took some time to provide you an example of a working solution without sync. If you change the pinned core, the solution won't work anymore without sync.

(To be able to run those tests & io-uring using macos, I suggest you run those inside a VM backed by tart, it's what I'm using and it's working very well).

A working solution without `sync` (Linux only)

(If you change the pinned thread, it won't work without the sync)

use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let t1 = std::thread::spawn(move || {
        // If you want to force a thread to be pinned on a core
        // (Only available on linux)
        monoio::utils::bind_to_cpu_set(Some(0)).unwrap();
        let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
            .enable_all()
            .build()
            .unwrap();

        rt.block_on(async move {
            let fu = async move {
                println!("inner-fu: ready");
                1u64
            };

            let handle = monoio::spawn(fu);
            tx.send(handle).unwrap();

            monoio::time::sleep(Duration::from_millis(1_000)).await;
            println!("outer-fu: after sending handle and sleep");
        });
    });

    let t2 = std::thread::spawn(move || {
        // If you want to force a thread to be pinned on a core
        // (Only available on linux)
        monoio::utils::bind_to_cpu_set(Some(0)).unwrap();

        let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
            .enable_all()
            .build()
            .unwrap();

        rt.block_on(async move {
            println!("joiner: before handle.await");
            loop {
                if let Ok(handle) = rx.try_recv() {
                    let got = handle.await;
                    println!("joiner: after handle.await: {:?}", got);
                    break;
                }
            }
        });
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Is it possible to remove the Send and Sync bound from JoinHandle if sync feature flag is not enabled?

Based on the implementation, you'll need to have T: !Send for JoinHandle<T> to not be Send.

Another issue is that if the task panics, the JoinHandle.await in another thread blocks forever.

Yeah, right now you would need to wrap future spawned into a catch_unwind with a manual Result