DGuco/shmqueue

多个生产者一个消费者的时候,如果生产者动态加入会导致消费者读到的数据异常,请看下。

Closed this issue · 3 comments

你好,大神
我用您这个方法测试多进程通讯的时候发现了一个问题:
在多生产者,一个消费者的时候:比如原来一个生产者和一个消费者正在通讯(生产者每次生产+1的整数数据,生产者生产的快,消费者消费的慢),如果中途有一个新的生产者进程加入但是不生产,将会覆盖掉共享内存已有的数据,导致消费者读到的数据发生跳变。
如下,写了一个例程,消费者读的数据在新生产者创建时刻发生了跳变。
如图从34跳变到1294
image

通过代码发现可能是在每次创建新的生产者进程的时候都会调用
CMessageQueue::CreateInstance(SHAR_KEY_1, 10240, eQueueModel::ONE_READ_MUL_WRITE);
thread write_thread(remote_write_func, messQueue, 1, "remoteWrite");
进而调用
CMessageQueue::CMessageQueue(BYTE pCurrAddr, eQueueModel module, key_t shmKey, int shmId, size_t size)
{
m_pShm = (void
) pCurrAddr;
m_pQueueAddr = pCurrAddr;
m_stMemTrunk = new (m_pQueueAddr) stMemTrunk();
m_pQueueAddr += sizeof(stMemTrunk);
m_stMemTrunk->m_iBegin = 0;
m_stMemTrunk->m_iEnd = 0;
m_stMemTrunk->m_iShmKey = shmKey;
m_stMemTrunk->m_iShmId = shmId;
m_stMemTrunk->m_iSize = (unsigned int)size;
m_stMemTrunk->m_eQueueModule = module;
InitLock();
}
此处将重置m_stMemTrunk->m_iBegin = 0; 和 m_stMemTrunk->m_iEnd = 0;
从而新创建进程的时候共享内存读写指针发生了重置,导致原本消费者未读的数据就没读到。
是这样的吗,我顺便向您的邮箱发了下我的测试代码,
您有空可以看下能帮忙解决下吗。
多谢

第一个终端编译通过之后创建一个消费者
image

第二个终端创建第一个生产者
image

第三个终端创建第2个生产者
image

然后回到第一个终断输出窗口发现数据跳变了
image

DGuco commented

目前是不支持在运行中动态加入生产者或者消费者的,你可以通过在运行前把所有的生产者和消费者创建好来解决,或者修改源码,添加新的构造函数如下:
CMessageQueue::CMessageQueue(BYTE pCurrAddr)
{
m_pShm = (void) pCurrAddr;
m_pQueueAddr = pCurrAddr;
m_stMemTrunk = new (m_pQueueAddr) stMemTrunk();
m_pQueueAddr += sizeof(stMemTrunk);
InitLock();
}
在创建messagequeue是如果之前共享内存中已经创建过了,队列是attach到之前的队列中则调用上面的构造函数函数来初始化messagequeue,不过还是建议你在队列进入工作前把所有的生产者和消费者都提前创建好。

感谢