
Sequential execution in multi_producer_consumer.cpp

JohnStd opened this issue · 6 comments

There is a problem in multi_producer_consumer.cpp code.
Mutexes are locked even when thread is paused by sleep_for( As result, all threads wait while any other thread is paused on sleep_for( sleep_for should be moved out of unique_lock scope.

Any comments?

@JohnStd I discussed this with Author he said the repo is on the latest state, contains fixes and modernizations. Please check again if you face same issue again please reopen.

Yes, the issue still exists.

Output of the code from repo is:

   Producer 0 --> item   0
   Producer 0 --> item   1
   Producer 0 --> item   2
   Producer 0 --> item   3
   Producer 0 --> item   4
                  item   0 --> Consumer 3
                  item   1 --> Consumer 3
                  item   2 --> Consumer 3
                  item   3 --> Consumer 3
                  item   4 --> Consumer 3
   Producer 0 --> item   5
   Producer 0 --> item   6
   Producer 0 --> item   7
   Producer 0 --> item   8
   Producer 0 --> item   9
                  item   5 --> Consumer 1
                  item   6 --> Consumer 1
                  item   7 --> Consumer 1
                  item   8 --> Consumer 1
                  item   9 --> Consumer 1
   Producer 0 --> item  10
   Producer 0 --> item  11
   Producer 0 --> item  12

We can see that producers and consumers works sequentially. It happens because when the consumer calls go_produce.notify_all(); it continues to acquire the mutex (q_mutex) and producers and consumers cannot continue to wok while mutex acquired. They all wait for mutex be released.

Possible correction could be:

static void producer(size_t id, size_t items, size_t stock)
    for (size_t i = 0; i < items; ++i) {
            unique_lock<mutex> lock(q_mutex);
            go_produce.wait(lock, [&] { return q.size() < stock; });
            q.push(id * 100 + i);
            pcout{} << "   Producer " << id << " --> item "
                    << setw(3) << q.back() << '\n';
    pcout{} << "EXIT: Producer " << id << '\n';

static void consumer(size_t id)
    while (!production_stopped || !q.empty()) {
            unique_lock<mutex> lock(q_mutex);
            if (go_consume.wait_for(lock, 1s,
                            []() { return !q.empty(); })) {
                pcout{} << "        item "
                        << setw(3) << q.front() << " --> Consumer "
                        << id << '\n';
    pcout{} << "EXIT: Consumer " << id << "\n";

I added additional scope for unique_lock, wait and wait_for and I also moved sleep_for out of that scope. So, when producers or consumers runs sleep_for, mutex is not acquired and other threads could continue to work.

Output of my program:

   Producer 1 --> item 100
   Producer 2 --> item 200
        item   0 --> Consumer 0
        item 100 --> Consumer 4
        item 200 --> Consumer 2
   Producer 2 --> item 201
   Producer 0 --> item   1
   Producer 1 --> item 101
        item 201 --> Consumer 3
        item   1 --> Consumer 1
        item 101 --> Consumer 0
   Producer 2 --> item 202
        item 202 --> Consumer 4
   Producer 1 --> item 102
   Producer 0 --> item   2
        item 102 --> Consumer 2
        item   2 --> Consumer 3
   Producer 2 --> item 203
   Producer 1 --> item 103
        item 203 --> Consumer 1
        item 103 --> Consumer 0
   Producer 0 --> item   3
        item   3 --> Consumer 4
   Producer 2 --> item 204
   Producer 1 --> item 104
   Producer 0 --> item   4
        item 204 --> Consumer 2
        item 104 --> Consumer 3
        item   4 --> Consumer 0
   Producer 2 --> item 205
   Producer 0 --> item   5
        item 205 --> Consumer 4
   Producer 1 --> item 105
        item   5 --> Consumer 1
        item 105 --> Consumer 2
   Producer 2 --> item 206

@sonam-packt if you face same issue again please reopen.

I don't know how to reopen. I may not have enough access rights.

@JohnStd I have reopen the issue

tfc commented

@JohnStd thank you very much for looking into this!

I tested your solution and applied it to the repository. Thank you for the fix!