boostorg/fiber

Threads in thread pool for shared_work scheduler got stuck in suspend_until wait

Opened this issue · 0 comments

I am writing a test to test shared_work scheduler with buffered_channel based on work_sharing.cpp example.

In the test, I have a thread pool with 8 threads, all of them are running shared_worker scheduler with suspend = True. There are two buffered channels to do synchronization between fibers

Two fibers are pushing input into channel 1 like this.

boost::fibers::fiber{[&chan1] {
    
    int i = 0;
    int counter{0};
    while (true) {
      char item = (i++%26)+'a';
      chan1.push(item);
      std::cout << "pushed " << item << " into pipeline" << std::endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      if (++counter == 1024) {
        counter = 0;
        boost::this_fiber::sleep_for(std::chrono::milliseconds(10+rand()%10));
      }
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "feeder 1 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }
    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }}.detach();

Four Fibers to pop from buffer channel 1 , do some CPU intensive task and push item into buffer channel 2 like this

boost::fibers::fiber([&chan1, &chan2]() {
    
    while (true) {
      char item;
      chan1.pop(item);
      cout << "stage 1:2 consumed " << item << endl;
      do_primes();
      chan2.push(item);
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 1:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
   }).detach();

Another four fibers to pop from buffer channel 2 and do some CPU intensive task like this

boost::fibers::fiber([&chan2]() {
    
    while (true) {
      char item;
      chan2.pop(item);
      do_primes1();
      cout << "stage 2:1 consumed " << item << endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 2:1 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }).detach(); 

When running the test on 8 core machine, I observed that some worker threads in thread pool are never running.
image

And when I gdb to look into stuck thread, it looks like they are blocked by wait in suspend_until
image

Each time I run the test, the number of threads that are stuck is random. I also try the test without buffered channel, the random disappears and everything seems perfect.
I have run the test on both libboost-fiber-1.71 and newest relase 1.75. The issue is still there.

Is there a race condition for conditional_variable in scheduler and buffered channel that may cause this issue? Is there any workaround to avoid it?
Thank you!

Here is the complete test code to reproduce the issue

//          Copyright Nat Goodspeed + Oliver Kowalke 2015.
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)

#include <chrono>
#include <string>
#include <cstdlib>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>
#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/all.hpp>
#include <boost/fiber/detail/thread_barrier.hpp>
#include <boost/assert.hpp>

static std::size_t fiber_count{ 0 };
static std::mutex mtx_count{};
static boost::fibers::condition_variable_any cnd_count{};
typedef std::unique_lock<std::mutex > lock_type;
using boost::fibers::detail::thread_barrier;

const int MAX_PRIME = 10000;

void do_primes()
{
    unsigned long i, num, primes = 0;
    for (num = 1; num <= MAX_PRIME; ++num) {
        for (i = 2; (i <= num) && (num % i != 0); ++i);
        if (i == num)
            ++primes;
    }
    printf("Calculated %d primes 0.\n", primes);
}


void do_primes1()
{
    unsigned long i, num, primes = 0;
    for (num = 1; num <= MAX_PRIME; ++num) {
        for (i = 2; (i <= num) && (num % i != 0); ++i);
        if (i == num)
            ++primes;
    }
    printf("Calculated %d primes 1.\n", primes);
}


/*****************************************************************************
*   example fiber function
*****************************************************************************/

/*****************************************************************************
*   example thread function
*****************************************************************************/

void start_thread( thread_barrier * b, int i) {
    std::ostringstream buffer;
    std::string thread_name = std::string{"worker_thread_"} + std::to_string(i);
    pthread_setname_np(pthread_self(), thread_name.c_str()); // set the name (pthread_self() returns the pthread_t of the current thread)
    buffer << "thread started " << std::this_thread::get_id() << std::endl;
    boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work>(true); /*<
        Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
        join the work sharing.
    >*/
    b->wait(); /*< sync with other threads: allow them to start processing >*/
    boost::fibers::fiber test_fiber([] {
      while(true) {
        boost::this_fiber::sleep_for(std::chrono::seconds(1));
      }
    });
    lock_type lk( mtx_count);
    std::cout << buffer.str() << std::flush;
    cnd_count.wait( lk, [](){return 0 == fiber_count;} ); /*<
        Suspend main fiber and resume worker fibers in the meanwhile.
        Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
        if all worker fibers are complete.
    >*/
    test_fiber.join();
    BOOST_ASSERT( 0 == fiber_count);
}
//]

/*****************************************************************************
*   main()
*****************************************************************************/
using std::endl;
using std::cout;
int main( int argc, char *argv[]) {
  //[main_ws
   /*<
Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main
thread too, so each new fiber gets launched into the shared pool.
>*/
  boost::fibers::use_scheduling_algorithm<boost::fibers::algo::shared_work>(true);
  boost::fibers::buffered_channel<char> chan1(64);
  boost::fibers::buffered_channel<char> chan2(64);
  boost::fibers::fiber([&chan1, &chan2]() { 
    
    while (true) {
      char item;
      chan1.pop(item);
      cout << "stage 1:1 consumed " << item << endl;
      do_primes();
      chan2.push(item);
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 1:1 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
   }).detach();
   fiber_count ++;
  boost::fibers::fiber([&chan1, &chan2]() {
    
    while (true) {
      char item;
      chan1.pop(item);
      cout << "stage 1:2 consumed " << item << endl;
      do_primes();
      chan2.push(item);
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 1:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
   }).detach();
   fiber_count ++; 
   boost::fibers::fiber([&chan1, &chan2]() { 
    
    while (true) {
      char item;
      chan1.pop(item);
      cout << "stage 1:3 consumed " << item << endl;
      do_primes();
      chan2.push(item);
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 1:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
   }).detach();
   fiber_count ++;
   boost::fibers::fiber([&chan1, &chan2]() { 
    
    while (true) {
      char item;
      chan1.pop(item);
      cout << "stage 1:4 consumed " << item << endl;
      do_primes();
      chan2.push(item);
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 1:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
   }).detach();
   fiber_count ++;
  boost::fibers::fiber([&chan2]() {
    
    while (true) {
      char item;
      chan2.pop(item);
      do_primes1();
      cout << "stage 2:1 consumed " << item << endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 2:1 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }).detach();
  fiber_count ++;
  boost::fibers::fiber([&chan2]() {
    
    while (true) {
      char item;
      chan2.pop(item);
      do_primes1();
      cout << "stage 2:2 consumed " << item << endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 2:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }).detach();
  fiber_count ++;
  boost::fibers::fiber([&chan2]() {
    
    while (true) {
      char item;
      chan2.pop(item);
      do_primes1();
      cout << "stage 2:3 consumed " << item << endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 2:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }).detach();
  fiber_count ++;
  boost::fibers::fiber([&chan2]() {
    
    while (true) {
      char item;
      chan2.pop(item);
      do_primes1();
      cout << "stage 2:4 consumed " << item << endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "stage 2:2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }

    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }).detach();
  fiber_count ++;

  boost::fibers::fiber{[&chan1] {
    
    int i = 0;
    int counter{0};
    while (true) {
      char item = (i++%26)+'a';
      chan1.push(item);
      std::cout << "pushed " << item << " into pipeline" << std::endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      if (++counter == 1024) {
        counter = 0;
        boost::this_fiber::sleep_for(std::chrono::milliseconds(10+rand()%10));
      }
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "feeder 1 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }
    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }}.detach();
  fiber_count ++;
  boost::fibers::fiber{[&chan1] {
    
    int i = 0;
    int counter{0};
    while (true) {
      char item = (i++%26)+'a';
      chan1.push(item);
      std::cout << "pushed " << item << " into pipeline" << std::endl;
      auto prev_thread = std::this_thread::get_id();
      boost::this_fiber::yield();
      if (++counter == 1024) {
        counter = 0;
        boost::this_fiber::sleep_for(std::chrono::milliseconds(10+rand()%10));
      }
      auto after_thread = std::this_thread::get_id();
      if (prev_thread != after_thread) {
        std::cout << "feeder 2 migrated from " << prev_thread << " to "
                  << after_thread << std::endl;
      }
    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
    }
  }}.detach();
  fiber_count ++;
  thread_barrier b(8);
  std::thread threads[] = {
      std::thread(start_thread, &b, 1), std::thread(start_thread, &b, 5),
      std::thread(start_thread, &b, 2), std::thread(start_thread, &b, 6),
      std::thread(start_thread, &b, 3), std::thread(start_thread, &b, 7),
      std::thread(start_thread, &b, 4)};
  std::cout << "main thread started " << std::this_thread::get_id()
            << std::endl;
  b.wait(); /*< sync with other threads: allow them to start processing >*/
  {
    lock_type /*< `lock_type` is typedef'ed as __unique_lock__<
                 [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] >
                 >*/
                  lk(mtx_count);
    cnd_count.wait(lk, []() {return 0 == fiber_count;}); /*<
       Suspend main fiber and resume worker fibers in the meanwhile.
       Main fiber gets resumed (e.g returns from
   `condition_variable_any::wait()`) if all worker fibers are complete.
   >*/
  }                                                      
  BOOST_ASSERT(0 == fiber_count);
  for (std::thread &t : threads) { /*< wait for threads to terminate >*/
    t.join();
  }
  //]
  std::cout << "done." << std::endl;
  return EXIT_SUCCESS;
}