PacktPublishing/Cpp17-STL-Cookbook

Sequential execution in multi_producer_consumer.cpp

JohnStd opened this issue · 6 comments

There is a problem in multi_producer_consumer.cpp code.
Mutexes are locked even when thread is paused by sleep_for(..ms). As result, all threads wait while any other thread is paused on sleep_for(..ms). sleep_for should be moved out of unique_lock scope.

Any comments?

@JohnStd I discussed this with Author he said the repo is on the latest state, contains fixes and modernizations. Please check again if you face same issue again please reopen.

https://github.com/PacktPublishing/Cpp17-STL-Cookbook

Yes, the issue still exists.

Output of the code from repo is:

   Producer 0 --> item   0
   Producer 0 --> item   1
   Producer 0 --> item   2
   Producer 0 --> item   3
   Producer 0 --> item   4
                  item   0 --> Consumer 3
                  item   1 --> Consumer 3
                  item   2 --> Consumer 3
                  item   3 --> Consumer 3
                  item   4 --> Consumer 3
   Producer 0 --> item   5
   Producer 0 --> item   6
   Producer 0 --> item   7
   Producer 0 --> item   8
   Producer 0 --> item   9
                  item   5 --> Consumer 1
                  item   6 --> Consumer 1
                  item   7 --> Consumer 1
                  item   8 --> Consumer 1
                  item   9 --> Consumer 1
   Producer 0 --> item  10
   Producer 0 --> item  11
   Producer 0 --> item  12
...

We can see that producers and consumers works sequentially. It happens because when the consumer calls go_produce.notify_all(); it continues to acquire the mutex (q_mutex) and producers and consumers cannot continue to wok while mutex acquired. They all wait for mutex be released.

Possible correction could be:

...
static void producer(size_t id, size_t items, size_t stock)
{
    for (size_t i = 0; i < items; ++i) {
        {
            unique_lock<mutex> lock(q_mutex);
            go_produce.wait(lock, [&] { return q.size() < stock; });
            
            q.push(id * 100 + i);
            pcout{} << "   Producer " << id << " --> item "
                    << setw(3) << q.back() << '\n';
        }
        go_consume.notify_all();
        this_thread::sleep_for(90ms);
    }
    pcout{} << "EXIT: Producer " << id << '\n';
}

static void consumer(size_t id)
{
    while (!production_stopped || !q.empty()) {
        {
            unique_lock<mutex> lock(q_mutex);
            if (go_consume.wait_for(lock, 1s,
                            []() { return !q.empty(); })) {
                pcout{} << "        item "
                        << setw(3) << q.front() << " --> Consumer "
                        << id << '\n';
                q.pop();
                go_produce.notify_all();
            }
        }
        this_thread::sleep_for(130ms);
    }
    pcout{} << "EXIT: Consumer " << id << "\n";
}
...

I added additional scope for unique_lock, wait and wait_for and I also moved sleep_for out of that scope. So, when producers or consumers runs sleep_for, mutex is not acquired and other threads could continue to work.

Output of my program:

   Producer 1 --> item 100
   Producer 2 --> item 200
        item   0 --> Consumer 0
        item 100 --> Consumer 4
        item 200 --> Consumer 2
   Producer 2 --> item 201
   Producer 0 --> item   1
   Producer 1 --> item 101
        item 201 --> Consumer 3
        item   1 --> Consumer 1
        item 101 --> Consumer 0
   Producer 2 --> item 202
        item 202 --> Consumer 4
   Producer 1 --> item 102
   Producer 0 --> item   2
        item 102 --> Consumer 2
        item   2 --> Consumer 3
   Producer 2 --> item 203
   Producer 1 --> item 103
        item 203 --> Consumer 1
        item 103 --> Consumer 0
   Producer 0 --> item   3
        item   3 --> Consumer 4
   Producer 2 --> item 204
   Producer 1 --> item 104
   Producer 0 --> item   4
        item 204 --> Consumer 2
        item 104 --> Consumer 3
        item   4 --> Consumer 0
   Producer 2 --> item 205
   Producer 0 --> item   5
        item 205 --> Consumer 4
   Producer 1 --> item 105
        item   5 --> Consumer 1
        item 105 --> Consumer 2
   Producer 2 --> item 206

@sonam-packt if you face same issue again please reopen.

I don't know how to reopen. I may not have enough access rights.

@JohnStd I have reopen the issue

tfc commented

@JohnStd thank you very much for looking into this!

I tested your solution and applied it to the repository. Thank you for the fix!