smol-rs/async-task

Task not get rescheduled randomly

sgdxbc opened this issue · 1 comments

Code for reproduce:

use std::{
    cell::RefCell,
    thread::{self, sleep},
    time::{Duration, Instant},
};

use async_task::{Runnable, Task};
use futures::{channel::mpsc, Future, StreamExt};

thread_local! {
    static RUNNABLE_LIST: RefCell<Vec<Runnable>> = RefCell::new(Vec::new());
}

fn poll_once() -> usize {
    let runnable_list: Vec<_> =
        RUNNABLE_LIST.with(|runnable_list| runnable_list.borrow_mut().drain(..).collect());
    let count = runnable_list.len();
    for runnable in runnable_list {
        let waker = runnable.waker();
        runnable.run();
        waker.wake();
    }
    count
}

fn spawn(task: impl Future<Output = ()> + Send + 'static) -> Task<()> {
    let (runnable, handle) = async_task::spawn(task, |runnable| {
        RUNNABLE_LIST.with(|runnable_list| runnable_list.borrow_mut().push(runnable));
    });
    runnable.schedule();
    handle
}

fn main() {
    let (tx, mut rx) = mpsc::unbounded();
    let handle = spawn(async move {
        loop {
            let _: () = rx.next().await.unwrap();
        }
    });

    thread::spawn(move || loop {
        sleep(Duration::from_millis(20));
        tx.unbounded_send(()).unwrap();
    });

    let start = Instant::now();
    while Instant::now() - start < Duration::from_millis(1 * 1000) {
        // assert_eq!(poll_once(), 1);
        poll_once();
    }
    assert_eq!(poll_once(), 1);
    drop(handle);
}

I understand wake up runnable immediately unconditionally will waste a lot of CPU, but I have a good reason to do it in my case.

The problem here is poll_once returns 0 after random duration, and probably cannot last for 1 second. That means the task disappeared in the system at some instant. According to my observation it never appear again after that, even when channel tx send new message.

One interesting point is that when simplify async task into futures::pending() it never disappear. Is there anything I did incorrect?

I could indeed reproduce this, but I have a theory: I suspect this might happen if the call to waker.wake() happens while the task is being concurrently woken by the sender. When this happens, the first caller to wake (in this case the sender thread) is responsible for scheduling the task.

In such scenario, the task may not be visible in the queue yet the next time poll_once is called, possibly because the sender was pre-empted before it could re-schedule the task, or because of inter-thread communication latency, or simply because the scheduling process takes more instructions. So my hunch is that this is a concurrency bug in the user code rather than in async-task.

Edit: I just realized that the queue is thread-local, so what happens is probably simpler. If at any moment the receiver finds an empty channel, it registers its waker and the next time the sender sends an item, the task is pushed to wrong queue (i.e. the queue owned by the sender thread).