NVIDIA/stdexec

Data race when `split` sender is cancelled

Opened this issue · 0 comments

The following program will with high probability segfault/hit an assertion on latest main (f11f711):

#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <stdexec/execution.hpp>

int main() {
    exec::static_thread_pool pool{4};
    auto sched = pool.get_scheduler();
    
    for (std::size_t i = 0; i < 100; ++i) {

        // both start_detaching work...
        for (std::size_t j = 0; j < 10; ++j) {
            auto s = stdexec::schedule(sched) | stdexec::split();
            for (std::size_t k = 0; k < 10; ++k) {
                stdexec::start_detached(s);
            }
            // s requests stop
        }

        // ... as well as async_scope-ing work triggers the issue
        // exec::async_scope scope;
        // for (std::size_t j = 0; j < 10; ++j) {
        //     auto s = stdexec::schedule(sched) | stdexec::split();
        //     for (std::size_t k = 0; k < 10; ++k) {
        //         scope.spawn(s);
        //     }
        //     // s requests stop
        // }
        // stdexec::sync_wait(scope.on_empty());
    }
}

(If it doesn't fail the first time, rerun or increase the number of iterations.)

It will typically fail with either a segfault or with assertions enabled it'll report:

output.s: /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/../stop_token.hpp:244: stdexec::inplace_stop_source::~inplace_stop_source(): Assertion `(__state_.load(std::memory_order_relaxed) & __locked_flag_) == 0' failed.

With thread sanitizer it may report the following (or a segfault):

WARNING: ThreadSanitizer: data race (pid=1)
  Write of size 8 at 0x7228000186a0 by thread T4:
    #0 operator delete(void*, unsigned long) /root/llvm-project/compiler-rt/lib/tsan/rtl/tsan_new_delete.cpp:150:3 (output.s+0xe6cb8)
    #1 stdexec::__ptr::__control_block<stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>, 2ul>::__dec_ref_() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__intrusive_ptr.hpp:113:11 (output.s+0x1023fc)
    #2 stdexec::__ptr::__enable_intrusive_from_this<stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>, 2ul>::__dec_ref() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__intrusive_ptr.hpp:282:22 (output.s+0x101e52)
    #3 stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__notify_waiters() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:346:17 (output.s+0x105756)
    #4 void stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__complete<stdexec::__rcvrs::set_value_t>(stdexec::__rcvrs::set_value_t) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:317:9 (output.s+0x105d8a)
    #5 exec::_pool_::static_thread_pool_::operation<stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>>::__t::__t(exec::_pool_::static_thread_pool_&, exec::_pool_::remote_queue*, stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__t, unsigned long, exec::nodemask const&)::'lambda'(exec::_pool_::task_base*, unsigned int)::operator()(exec::_pool_::task_base*, unsigned int) const /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:1034:15 (output.s+0x1053a0)
    #6 exec::_pool_::static_thread_pool_::operation<stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>>::__t::__t(exec::_pool_::static_thread_pool_&, exec::_pool_::remote_queue*, stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__t, unsigned long, exec::nodemask const&)::'lambda'(exec::_pool_::task_base*, unsigned int)::__invoke(exec::_pool_::task_base*, unsigned int) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:1025:27 (output.s+0x1051f5)
    #7 exec::_pool_::static_thread_pool_::run(unsigned int) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:711:9 (output.s+0xf7794)
    #8 exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()::operator()() const /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:682:45 (output.s+0xf765d)
    #9 void std::__invoke_impl<void, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(std::__invoke_other, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/invoke.h:61:14 (output.s+0xf75f5)
    #10 std::__invoke_result<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>::type std::__invoke<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/invoke.h:96:14 (output.s+0xf7565)
    #11 void std::thread::_Invoker<std::tuple<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>>::_M_invoke<0ul>(std::_Index_tuple<0ul>) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/std_thread.h:301:13 (output.s+0xf751d)
    #12 std::thread::_Invoker<std::tuple<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>>::operator()() /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/std_thread.h:308:11 (output.s+0xf74c5)
    #13 std::thread::_State_impl<std::thread::_Invoker<std::tuple<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>>>::_M_run() /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/std_thread.h:253:13 (output.s+0xf7389)
    #14 <null> <null> (libstdc++.so.6+0xed0e3) (BuildId: 998334304023149e8c44e633d4a2c69800a2eb79)

  Previous atomic write of size 1 at 0x7228000186a0 by main thread:
    #0 std::__atomic_base<unsigned char>::compare_exchange_weak(unsigned char&, unsigned char, std::memory_order, std::memory_order) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/atomic_base.h:536:9 (output.s+0x10312e)
    #1 stdexec::inplace_stop_source::__try_lock_unless_stop_requested_(bool) const /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/../stop_token.hpp:318:24 (output.s+0x10312e)
    #2 stdexec::inplace_stop_source::request_stop() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/../stop_token.hpp:249:10 (output.s+0x101ec1)
    #3 stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__detach(stdexec::__ptr::__intrusive_ptr<stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>, 2ul>&) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:252:66 (output.s+0x101d0d)
    #4 stdexec::__shared::__box<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>, true>::~__box() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:391:9 (output.s+0x101c69)
    #5 auto stdexec::__detail::__captures<stdexec::__split::__split_t, stdexec::__shared::__box<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>, true>>(stdexec::__split::__split_t, stdexec::__shared::__box<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>, true>&&)::'lambda'<typename $T, typename $T0>(stdexec::__split::__split_t, $T0&&)::~() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__basic_sender.hpp:457:14 (output.s+0x101c25)
    #6 stdexec::__sexpr<stdexec::(anonymous namespace)::'lambda13'(){}, stdexec::(anonymous namespace)::__anon>::~__sexpr() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__basic_sender.hpp:512:10 (output.s+0xe76d5)
    #7 main /app/example.cpp:18:9 (output.s+0xe758d)

  Thread T4 (tid=6, running) created by main thread at:
    #0 pthread_create /root/llvm-project/compiler-rt/lib/tsan/rtl/tsan_interceptors_posix.cpp:1023:3 (output.s+0x630d1)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State>>, void (*)()) <null> (libstdc++.so.6+0xed1b8) (BuildId: 998334304023149e8c44e633d4a2c69800a2eb79)
    #2 decltype(::new((void*)(0)) std::thread(std::declval<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>())) std::construct_at<std::thread, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(std::thread*, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/stl_construct.h:97:39 (output.s+0xf6f59)
    #3 void std::allocator_traits<std::allocator<std::thread>>::construct<std::thread, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(std::allocator<std::thread>&, std::thread*, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/alloc_traits.h:536:4 (output.s+0xea8ce)
    #4 std::thread& std::vector<std::thread, std::allocator<std::thread>>::emplace_back<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/vector.tcc:117:6 (output.s+0xea8ce)
    #5 exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:682:20 (output.s+0xe9b29)
    #6 exec::static_thread_pool::static_thread_pool(unsigned int, exec::bwos_params, exec::numa_policy) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:1558:9 (output.s+0xe9027)
    #7 main /app/example.cpp:6:30 (output.s+0xe7383)

Compiler explorer reproducer: https://compiler-explorer.com/z/4TGo8o67c.