rayon-rs/rayon

cooperative yield in ThreadPool::install() causes unexpected behavior in nested pools

benkay86 opened this issue · 4 comments

It appears calls to ThreadPool::install(op) will start running op inside the child thread pool and then cooperatively yield, potentially switching to a different task. This causes unexpected behavior with nested iterators using thread pools. It would be nice if rayon didn't yield on ThreadPool::install(), or if the authors believe that it should yield, to place some kind of warning in the documentation for ThreadPool::install().

Consider the following example. You have an outer loop that performs some operation that requires a lot of memory. You have an inner loop that uses a lot of cpu but is memory-efficient. To avoid unbounded memory growth, you will run the outer and inner loops in separate thread pools where ncpus_outer is a small number and ncpus_inner is a big number.

let pool_outer = ThreadPoolBuilder::default().num_threads(ncpus_outer).build().unwrap();
let pool_inner = ThreadPoolBuilder::default().num_threads(ncpus_inner).build().unwrap();
pool_outer.install(|| {
    some_par_iter.for_each({ |foo|
        println!("Entered outer loop.");
        let bar = memory_intensive_op(foo);
        pool_inner.install(|| {
            println!("Entered inner pool.");
            thing.into_par_iter().for_each({ |baz|
                cpu_intensive_op(baz);
            });
        });
        println!("Left inner pool.");
    });
});

Suppose ncpus_outer is equal to one. You might reasonably expect that only one bar will be allocated at any given time, giving you control over memory use. However, the output of the program may actually look like:

Entered outer loop.
Entered inner pool.
Entered outer loop.
Entered inner pool.
Entered outer loop.
Entered outer loop.
Left inner pool.
...

This is possible because, rather than blocking until pool_inner.install() returns, the outer thread pool will yield and cooperatively schedule another iteration of the outer loop on the same thread. This gives you essentially zero control over how many instances of bar will be allocated at a given time.

This is an even bigger problem if you have to do some kind of synchronization (yes there are real uses cases for this):

let pool_outer = ThreadPoolBuilder::default().num_threads(ncpus_outer).build().unwrap();
let pool_inner = ThreadPoolBuilder::default().num_threads(ncpus_inner).build().unwrap();
let mutex = std::sync::Mutex::new(());
pool_outer.install(|| {
    some_par_iter.for_each({ |foo|
        println!("Entered outer loop.");
        let bar = memory_intensive_op(foo);
        let lock = mutex.lock();
        pool_inner.install(|| {
            println!("Entered inner pool.");
            thing.into_par_iter().for_each({ |baz|
                cpu_intensive_op(baz);
            });
        });
        println!("Left inner pool.");
        // Mutex lock released here.
    });
});
Entered outer loop.
Entered inner pool.
Entered outer loop.
[ deadlock ]

The outer thread pool yields to a different task before releasing the mutex, likely leading to a deadlock unless it switches back to the task holding the mutex before it runs out of threads in the outer pool.

I do think yielding is the appropriate default behavior, and documentation PRs are fine to clarify this. I think we could also add a variant or two of install with different behavior. Here's the current install behavior, and some thoughts:

  • If you're not in any pool, queue the work in the target pool and then block.
    • This should be uncontroversial.
  • If you're in a different pool, queue the work in the target pool and then yield.
    • We could add a mode that blocks here instead. The potential downside is that this pool could get completely blocked, and even cause deadlocks if any of that inner pool work tries to queue and wait for any work back in the outer pool. But as long as you know there's no "loopback", it may be fine.
  • If you're already in the target pool, execute it directly.
    • Note that this may also lead to work-stealing of outer work if it ends up waiting for other threads to complete its inner work.
    • We could have an even stronger mode that blocks here as well, but that would have an even higher risk of deadlocking if all its threads are tied up this way.

BTW, your Mutex scenario is also discussed in #592, which may or may not involve multiple pools.

@cuviper, I appreciate your explanation of the install() behavior. I do want to make an important distinction that yielding is not the opposite of blocking. For example, consider the behavior of tokio::spawn():

statement1;
let handle = spawn(async { some_work });
statement2;
handle.await.unwrap();

statement1 is executed on what we'll call thread1. The call to spawn() returns immediately without blocking. statement2 is executed on thread1. Then the call to await blocks, potentially yielding execution to another task on thread1. Here, the non-blocking spawn and the yield occur in different statements, ensuring that statement2 will always execute after statement1 on thread1. Furthermore, the explicit await syntax makes it obvious to the programmer where the yield point is.

If you're in a different pool, queue the work in the target pool and then yield.

* We could add a mode that blocks here instead. The potential downside is that this pool could get completely blocked, and even cause deadlocks if any of that inner pool work tries to queue and wait for any work back in the outer pool. But as long as you know there's no "loopback", it may be fine.

In the simplest case:

some_par_iter.par_for_each(|| {
    statement1;
    other_pool.install(|| { some_work; });
    statement2;
}

There is no need to make install() block here. It could queue the work on the other threadpool and then return immediately just like spawn(). What's confusing is that install() doesn't just not block -- it also yields, which means there may be several calls to statement1 before statement2 ever gets a chance to execute. Rayon's syntax doesn't make yield points explicit, and the implicit yield is not documented (except in this GItHub issue). This leads to surprising results not only with mutexes as in #592, but also with unbounded memory growth and stack overflows as mentioned in my first comment. Even more surprisingly, install() may or may not implicitly yield() depending on the context!

IMHO, if we were writing Rayon from scratch, I would suggest making all yields explicit. The default behavior would then be expressed as follows. Making the yield explicit makes the behavior obvious to the programmer, and it also gives the programmer the choice of moving statement2 up between install() and yield().

some_par_iter.par_for_each(|| {
    statement1;
    other_pool.install(|| { some_work; });
    rayon::yield_now();
    statement2;
}

I do think yielding is the appropriate default behavior, and documentation PRs are fine to clarify this. I think we could also add a variant or two of install with different behavior.

I have to agree, but only because changing the behavior of install() could cause deadlocks in existing code that depend on the implicit yield. I would love to see clearer documentation and a yield-free versions of install() and similar methods (not sure if spawn() can yield in rayon). It looks like this would require some intimate work on unsafe parts of rayon. Do you envision this as something a maintainer might work on, or something you would like an outside PR for?

In the simplest case:

some_par_iter.par_for_each(|| {
    statement1;
    other_pool.install(|| { some_work; });
    statement2;
}

There is no need to make install() block here.

Since install returns a value, that is inherently blocking with respect to following statements. It also allows borrowing since its closure is not constrained to 'static lifetimes -- from the compiler borrowck perspective, those borrows only last for the duration of this call, so we cannot safely return until we're done with that work. If you don't want that imperative control flow, that's what ThreadPool::spawn and Scope::spawn are for.

So when I talk about yielding vs. blocking here, I'm only talking about what the thread should do while it waits, but we do have to wait for install completion regardless.

We've also had discussions about actual awaitable futures, like my old WIP #679, but that didn't address the executor part, which you would need if you wanted to await that within a parallel for_each -- or at least a block_on kind of thing.

I would love to see clearer documentation and a yield-free versions of install() and similar methods (not sure if spawn() can yield in rayon). It looks like this would require some intimate work on unsafe parts of rayon. Do you envision this as something a maintainer might work on, or something you would like an outside PR for?

The design is the more interesting part that needs discussion. Regarding spawn, that only queues the work and returns right away. The other main primitives are join and scope, but I think it will get a lot hairier if we try to add variations of those, and I haven't fully thought about those implications. Higher-level stuff like the parallel for_each also runs on join, and I definitely don't want to fork the entire ParallelIterator API.

Another idea that I've raised before is that we could have a kind of "critical section" call, inhibiting all work-stealing on that thread until that call returns. (Possibly with some nuance like allowing local work-stealing of work queued while in that call.) This might be a better way to have broad impact without forking a bunch of APIs.

If we decide on just adding install variations, I can mentor the implementation if you're keen, and as a first step take a look at Registry::in_worker. The variations I described before are pretty much contained in the three different branches in that function, where the "cold" part is the fully blocking case.

pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
where
OP: FnOnce(&WorkerThread, bool) -> R + Send,
R: Send,
{
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
self.in_worker_cold(op)
} else if (*worker_thread).registry().id() != self.id() {
self.in_worker_cross(&*worker_thread, op)
} else {
// Perfectly valid to give them a `&T`: this is the
// current thread, so we know the data structure won't be
// invalidated until we return.
op(&*worker_thread, false)
}
}
}

The cold path currently debug_assert!s that it's not in any pool though, so we'd have to re-evaluate that at least. In a way, that represents the philosophy of the current design, that pool threads should always be trying to find work -- but I do think there are reasonable scenarios, like yours, where it makes sense to opt out of that work-stealing behavior.

Since install returns a value, that is inherently blocking with respect to following statements. It also allows borrowing since its closure is not constrained to 'static lifetimes -- from the compiler borrowck perspective, those borrows only last for the duration of this call, so we cannot safely return until we're done with that work. If you don't want that imperative control flow, that's what ThreadPool::spawn and Scope::spawn are for.

Ah, I stand corrected. Since install() borrows and returns a value, it must either block or yield before returning.

If you don't want that imperative control flow, that's what ThreadPool::spawn and Scope::spawn are for.

I'll refactor my code with spawn() and scope().

EDIT It looks like calling ThreadPool::scope() will internally call ThreadPool::install(), triggering the yield behavior.

Another idea that I've raised before is that we could have a kind of "critical section" call, inhibiting all work-stealing on that thread until that call returns. (Possibly with some nuance like allowing local work-stealing of work queued while in that call.) This might be a better way to have broad impact without forking a bunch of APIs.

This sounds difficult to implement, but it sounds like it would be the most ergonomic solution in the long term.

EDIT Might still need a non-yielding install() variant to prevent install() from work-stealing on the parent thread pool but still allow work-stealing on the inner thread pool. Otherwise, it's not clear how you would specify the critical section to cover one but not the other.