jbaldwin/libcoro

`coro::sync_wait(coro::when_all())` paradigm (very rarely) hangs

Closed this issue · 7 comments

When running a set of tasks in parallel to accomplish a task, to assure completion, one typically uses a coro::sync_wait(coro::when_all(std::move(tasks)); line. Sometimes, but very rarely, this hangs at the

m_cv.wait(lk, [this] { return m_set.load(std::memory_order::acquire); });
line.

Curiously, m_set is true when checked in the debugger - I'm not sure how that can be the case when that line is stuck waiting. The code looks proper and I suspect some subtle race condition. As an aside, since this is C++20, we don't need to use the condition variable because atomics have atomic::wait() and atomic::notify_all() and atomic::notify_one() - I'm not sure if refactoring to use those will fix the issue or not (I don't currently have time to try).

I've taken to doing something like:

auto when_all = [](coro::thread_pool &my_thread_pool, std::vector<coro::task<void>>&& my_tasks) -> coro::task<void>
{
   co_await tp.schedule();
   co_await coro::when_all(std::move(my_tasks));
};
coro::sync_wait(when_all(thread_pool, std::move(tasks)));

which seems to mitigate the issue but also, in my case, adds a noticeable performance hit (about 1-5%). I just tried this as a wild guess that coro::when_all needed to slow down a bit by going on the thread pool.

The hang does seem more prevalent when tasks.size() is higher. That is, when processing only 1-5 tasks, I'm not sure I've seen it happen. When it is 50-100 tasks, it seem to happen more regularly (like one out of every 80,000 or so calls). In my case, the tasks are very short lived (like 10 lookups in a std::map()).

I've diagnosed this on a 16-core box with source compiled with Microsoft Visual C++ in debug mode. I believe I've seen the same behavior on Linux compiled with g++13 but I didn't run in the debugger there so I cannot be certain. I had built in both debug and release on Linux and I don't recall which executable I was running when I saw the issue.

Interesting, I think there are some subtle issues probably around the memory order acquire and release with std::condition_variable.notify_all() because there was another bug reported in coro::thread_pool that has a similar mechanism. That one was solved by locking the mutex before calling notify_all() (its on shutdown of the thread pool so not a huge perf hit) and/or using std::jthread does it correctly as well, but the libcoro was backported to use std::thread so wasm could be supported.

An easy solution to try would be modifying the coro::sync_wait to lock the m_mutex prior to calling notify_all() and verify it isn't happening anymore 🤞 but I'm also worried about the possible performance hit just like your work around, it should be a lot less of a perf hit though than spinning up a full coro::thread_pool.

This I'm pretty sure would work everytime but std::condition_variable docs say you shouldn't need to call the notify_all() within the lock.

auto sync_wait_event::set() noexcept -> void
{
    std::unique_lock<std::mutex> lk{m_mutex};
    m_set.exchange(true, std::memory_order::release);
    m_cv.notify_all();
}

So I wonder if this solution below is enough to flush the caches/memory across threads, since it seems the std::memory_order::release|acquire doesn't seem to work all the time.

auto sync_wait_event::set() noexcept -> void
{
    {
        std::unique_lock<std::mutex> lk{m_mutex};
        m_set.exchange(true, std::memory_order::release);
    }
    m_cv.notify_all();
}

And lastly we could just try using std::memory_order::seq_cst its supposed to basically negate these problems and would probably be the quickest of all, but I'm not super confident it would fix the problem based on the prior coro::thread_pool bug I worked through. Probably worth a try though.

auto sync_wait_event::set() noexcept -> void
{
    m_set.exchange(true, std::memory_order::seq_cst);
    m_cv.notify_all();
}

auto sync_wait_event::wait() noexcept -> void
{
    std::unique_lock<std::mutex> lk{m_mutex};
    m_cv.wait(lk, [this] { return m_set.load(std::memory_order::seq_cst); });
}

If you have time to make a PR to test these that would be awesome. I realize its probably pretty hard to trigger the failure so your use case boiled down into a test case would be awesome to add -- it'll get run quite a bit at least on each pull. If you don't have time I can probably get to it in a few weeks, going to be busy with 4th of july stuff for a bit.


As for atomic::wait|notify* last time I looked into it I think it just basically degrades to std::condition_variable depending on the implementation and every compiler I looked at a few years ago pretty much did just that, but maybe its worth checking out again since its been a while? I could be remembering wrong though.

@mheyman I'm trying to reproduce this but so far I'm not getting anything with gcc/clang based on your setup description. (I don't have a ready windows box to test on...). Do you have some code that reproduces that you could share here so I can make a test case from it and try and reproduce?

Ok, I got it to hang with the following code, didnt' take that long either. I'm going to go through my 3 proposed solutions and run them overnight and see if they reproduce. It would still be good to get an example from you to make sure my test code here aligns with your setup.

TEST_CASE("sync_wait very rarely hangs issue-270", "[sync_wait]")
{
    coro::thread_pool tp{};

    // const int ITERATIONS = 1'000'000;
    const int ITERATIONS = 100;

    std::unordered_set<int> data{};
    data.reserve(ITERATIONS);

    std::random_device dev;
    std::mt19937 rng(dev());
    std::uniform_int_distribution<std::mt19937::result_type> dist(0, ITERATIONS);

    for (int i = 0; i < ITERATIONS; ++i)
    {
        data.insert(dist(rng));
    }

    std::atomic<int> count{0};

    auto make_task = [&](int i) -> coro::task<void>
    {
        co_await tp.schedule();

        if (data.find(i) != data.end())
        {
            count.fetch_add(1);
        }

        co_return;
    };

    std::vector<coro::task<void>> tasks{};
    tasks.reserve(ITERATIONS);
    for(int i = 0; i < ITERATIONS; ++i)
    {
        tasks.emplace_back(make_task(i));
    }

    coro::sync_wait(coro::when_all(std::move(tasks)));

    REQUIRE(count > 0);
}
#6  coro::detail::sync_wait_event::wait (this=this@entry=0x7fffbb929f80) at /home/jbaldwin/repos/libcoro/src/sync_wait.cpp:26
26	    m_cv.wait(lk, [this] { return m_set.load(std::memory_order::acquire); });
(gdb) p m_set
$1 = std::atomic<bool> = { true }

Its definitely some kind of race condition where the woken up thread is reading the variable before the release on the m_set variable propagates to all cores. I'm going to try seq_cst but if that triggers it probably just needs to be behind a full lock 😭

edit: well I got it to hang using seq_cst as well so I think locking is the only proper solution now

Thanks for your work towards this - I through this at you then went on vacation :-)

No prob, vacation is important, been busy since I looked at this as well. Have you have a chance to look at the PR? I think I just need a little cleanup and I can merge it if you think its good and I'll roll a point version release.

I did look at it and the bad news is that it only seems to have helped. A similar lock occurred in at a different point and none of my ad-hoc mitigation strategies helped. I did verify that my when_all wrapper only helped as well, reducing the occurrence of a hang greatly but not eliminating it.

I only checked out the PR on Windows and was under pressure to get past the issue so I ported our code base to stdexec. I like libcoro better for my use case and I think it may be slightly more efficient on Linux for what we have (going by how many iterations need to be in individual parallelized loops to optimize CPU usage but, to be honest, I don't have identical code with numbers to verify on).

Sadly, I no longer have the ability to easily test the issue.