cascadium/OpenMAMA-zmq

thread safety for subscriptions

bill-torpey opened this issue · 2 comments

The zmq transport references the collection of subscription endpoints in two threads: the IO thread reads from the zmq socket (in zmqBridgeMamaTransportImpl_dispatchThread, checks the endpoint pool to see if there are any subscribers for the message, and if so enqueues it once for each subscriber. The messages are dequeued in zmqBridgeMamaTransportImpl_queueCallback, running on a different thread, which also queries the endpoint pool.

However, application code running in the zmqBridgeMamaTransportImpl_queueCallback thread can also modify the endpoint pool, typically by deleting subscriptions. Mama subscriptions must be deleted from the thread that created them (by calling mamaSubscription_destroy), or if that is not possible by calling mamaSubscription_destroyEx, which queues the destroy on the callback thread, where it is destroyed by mamaSubscription_DestroyThroughQueueCB.

Additionally, subscriptions can be created on any thread, either directly or by calling functions that create subscriptions "under the covers" (like creating an inbox).

It would seem that the endpoint pool would need to be synchronized with a mutex, but it is not.

Also, for some reason, this synchronization does not appear to be necessary with the qpid transport (why?), but it is definitely required with zmq.

I have a pull request ready to go -- in my case I've made the change in the OpenMAMA code, so it would affect both qpid and zmq. However, if the synchronization is really not required for qpid (why?), then it might make more sense to make the change in zmq. In that case, it would be necessary to come up with a way to ensure that the correct (thread-safe) version of the endpoint pool code gets loaded at runtime for zmq (and any other transports that require it).

Please let me know your thoughts.

I've submitted the pull request against OpenMAMA here: finos/OpenMAMA#312

Again, not sure if that's the best place for it. Although, even if qpid doesn't need it, it does no harm.

Yeah I think it's feasible enough - endpointpool is only really an example implementation anyway - people are free to use more thread unsafe approaches if they understand their threading model well enough. There are many issues which don't present themselves in qpid because qpid spends so much time blocking and waiting for messages. The issue was probably there in qpid too you were just much less likely to hit it.