Data race when `split` sender is cancelled
Opened this issue · 0 comments
msimberg commented
The following program will with high probability segfault/hit an assertion on latest main (f11f711):
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <stdexec/execution.hpp>
int main() {
exec::static_thread_pool pool{4};
auto sched = pool.get_scheduler();
for (std::size_t i = 0; i < 100; ++i) {
// both start_detaching work...
for (std::size_t j = 0; j < 10; ++j) {
auto s = stdexec::schedule(sched) | stdexec::split();
for (std::size_t k = 0; k < 10; ++k) {
stdexec::start_detached(s);
}
// s requests stop
}
// ... as well as async_scope-ing work triggers the issue
// exec::async_scope scope;
// for (std::size_t j = 0; j < 10; ++j) {
// auto s = stdexec::schedule(sched) | stdexec::split();
// for (std::size_t k = 0; k < 10; ++k) {
// scope.spawn(s);
// }
// // s requests stop
// }
// stdexec::sync_wait(scope.on_empty());
}
}
(If it doesn't fail the first time, rerun or increase the number of iterations.)
It will typically fail with either a segfault or with assertions enabled it'll report:
output.s: /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/../stop_token.hpp:244: stdexec::inplace_stop_source::~inplace_stop_source(): Assertion `(__state_.load(std::memory_order_relaxed) & __locked_flag_) == 0' failed.
With thread sanitizer it may report the following (or a segfault):
WARNING: ThreadSanitizer: data race (pid=1)
Write of size 8 at 0x7228000186a0 by thread T4:
#0 operator delete(void*, unsigned long) /root/llvm-project/compiler-rt/lib/tsan/rtl/tsan_new_delete.cpp:150:3 (output.s+0xe6cb8)
#1 stdexec::__ptr::__control_block<stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>, 2ul>::__dec_ref_() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__intrusive_ptr.hpp:113:11 (output.s+0x1023fc)
#2 stdexec::__ptr::__enable_intrusive_from_this<stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>, 2ul>::__dec_ref() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__intrusive_ptr.hpp:282:22 (output.s+0x101e52)
#3 stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__notify_waiters() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:346:17 (output.s+0x105756)
#4 void stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__complete<stdexec::__rcvrs::set_value_t>(stdexec::__rcvrs::set_value_t) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:317:9 (output.s+0x105d8a)
#5 exec::_pool_::static_thread_pool_::operation<stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>>::__t::__t(exec::_pool_::static_thread_pool_&, exec::_pool_::remote_queue*, stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__t, unsigned long, exec::nodemask const&)::'lambda'(exec::_pool_::task_base*, unsigned int)::operator()(exec::_pool_::task_base*, unsigned int) const /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:1034:15 (output.s+0x1053a0)
#6 exec::_pool_::static_thread_pool_::operation<stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>>::__t::__t(exec::_pool_::static_thread_pool_&, exec::_pool_::remote_queue*, stdexec::__shared::__receiver<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__t, unsigned long, exec::nodemask const&)::'lambda'(exec::_pool_::task_base*, unsigned int)::__invoke(exec::_pool_::task_base*, unsigned int) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:1025:27 (output.s+0x1051f5)
#7 exec::_pool_::static_thread_pool_::run(unsigned int) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:711:9 (output.s+0xf7794)
#8 exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()::operator()() const /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:682:45 (output.s+0xf765d)
#9 void std::__invoke_impl<void, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(std::__invoke_other, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/invoke.h:61:14 (output.s+0xf75f5)
#10 std::__invoke_result<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>::type std::__invoke<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/invoke.h:96:14 (output.s+0xf7565)
#11 void std::thread::_Invoker<std::tuple<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>>::_M_invoke<0ul>(std::_Index_tuple<0ul>) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/std_thread.h:301:13 (output.s+0xf751d)
#12 std::thread::_Invoker<std::tuple<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>>::operator()() /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/std_thread.h:308:11 (output.s+0xf74c5)
#13 std::thread::_State_impl<std::thread::_Invoker<std::tuple<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>>>::_M_run() /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/std_thread.h:253:13 (output.s+0xf7389)
#14 <null> <null> (libstdc++.so.6+0xed0e3) (BuildId: 998334304023149e8c44e633d4a2c69800a2eb79)
Previous atomic write of size 1 at 0x7228000186a0 by main thread:
#0 std::__atomic_base<unsigned char>::compare_exchange_weak(unsigned char&, unsigned char, std::memory_order, std::memory_order) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/atomic_base.h:536:9 (output.s+0x10312e)
#1 stdexec::inplace_stop_source::__try_lock_unless_stop_requested_(bool) const /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/../stop_token.hpp:318:24 (output.s+0x10312e)
#2 stdexec::inplace_stop_source::request_stop() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/../stop_token.hpp:249:10 (output.s+0x101ec1)
#3 stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>::__detach(stdexec::__ptr::__intrusive_ptr<stdexec::__shared::__shared_state<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>>, 2ul>&) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:252:66 (output.s+0x101d0d)
#4 stdexec::__shared::__box<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>, true>::~__box() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__shared.hpp:391:9 (output.s+0x101c69)
#5 auto stdexec::__detail::__captures<stdexec::__split::__split_t, stdexec::__shared::__box<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>, true>>(stdexec::__split::__split_t, stdexec::__shared::__box<exec::_pool_::static_thread_pool_::scheduler::_sender, stdexec::__env::env<>, true>&&)::'lambda'<typename $T, typename $T0>(stdexec::__split::__split_t, $T0&&)::~() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__basic_sender.hpp:457:14 (output.s+0x101c25)
#6 stdexec::__sexpr<stdexec::(anonymous namespace)::'lambda13'(){}, stdexec::(anonymous namespace)::__anon>::~__sexpr() /opt/compiler-explorer/libs/stdexec/trunk/include/exec/../stdexec/__detail/__basic_sender.hpp:512:10 (output.s+0xe76d5)
#7 main /app/example.cpp:18:9 (output.s+0xe758d)
Thread T4 (tid=6, running) created by main thread at:
#0 pthread_create /root/llvm-project/compiler-rt/lib/tsan/rtl/tsan_interceptors_posix.cpp:1023:3 (output.s+0x630d1)
#1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State>>, void (*)()) <null> (libstdc++.so.6+0xed1b8) (BuildId: 998334304023149e8c44e633d4a2c69800a2eb79)
#2 decltype(::new((void*)(0)) std::thread(std::declval<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>())) std::construct_at<std::thread, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(std::thread*, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/stl_construct.h:97:39 (output.s+0xf6f59)
#3 void std::allocator_traits<std::allocator<std::thread>>::construct<std::thread, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(std::allocator<std::thread>&, std::thread*, exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/alloc_traits.h:536:4 (output.s+0xea8ce)
#4 std::thread& std::vector<std::thread, std::allocator<std::thread>>::emplace_back<exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()>(exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy)::'lambda'()&&) /opt/compiler-explorer/gcc-14.2.0/lib/gcc/x86_64-linux-gnu/14.2.0/../../../../include/c++/14.2.0/bits/vector.tcc:117:6 (output.s+0xea8ce)
#5 exec::_pool_::static_thread_pool_::static_thread_pool_(unsigned int, exec::bwos_params, exec::numa_policy) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:682:20 (output.s+0xe9b29)
#6 exec::static_thread_pool::static_thread_pool(unsigned int, exec::bwos_params, exec::numa_policy) /opt/compiler-explorer/libs/stdexec/trunk/include/exec/static_thread_pool.hpp:1558:9 (output.s+0xe9027)
#7 main /app/example.cpp:6:30 (output.s+0xe7383)
Compiler explorer reproducer: https://compiler-explorer.com/z/4TGo8o67c.