Inter-Process Messaging
PAR 2021-01-26: To build the HEAD of develop
, you'll need to make the following modifications to dbt-settings
:
- Add
/cvmfs/dune.opensciencegrid.org/dunedaq/DUNE/products
- Change the
zmq
line to"zmq v4_3_1c e19:prof"
- Add the
cppzmq
product:"cppzmq v4_3_0 e19:prof"
See https://github.com/DUNE-DAQ/appfwk/wiki/Compiling-and-running-under-v2.2.0#adding-extra-ups-products-and-product-pools for more about this.
The IPM library provides the low-level library for for sending messages between DUNE DAQ processes. IPM deals with messages consisting of arrays of bytes: higher-level concepts such as object serialization/deserialization will be handled by other libraries and processes building on IPM.
IPM provides two communication patterns:
Sender
/Receiver
, a pattern in which one sender talks to one receiverPublisher
/Subscriber
, a pattern in which one sender talks to zero or more receivers. Each message goes to all subscribers
Users should interact with IPM via the interfaces dunedaq::ipm::Sender
, dunedaq::ipm::Receiver
and dunedaq::ipm::Subscriber
, which are created using the factory functions dunedaq::ipm::makeIPM(Sender|Receiver|Subscriber)
, which each take a string argument giving the implementation type. The currently-available implementation types all use ZeroMQ, and are:
ZmqSender
implementingdunedaq::ipm::Sender
in the sender/receiver patternZmqReceiver
implementingdunedaq::ipm::Receiver
ZmqPublisher
implementingdunedaq::ipm::Sender
in the publisher/subscriber patternZmqSubscriber
implementingdunedaq::ipm::Subscriber
Basic example of the sender/receiver pattern:
// Sender side
std::shared_ptr<dunedaq::ipm::Sender> sender=dunedaq::ipm::makeIPMSender("ZmqSender");
sender->connect_for_sends({ {"connection_string", "tcp://127.0.0.1:12345"} });
void* message= ... ;
// Last arg is send timeout
sender->send(message, message_size, std::chrono::milliseconds(10));
// Receiver side
std::shared_ptr<dunedaq::ipm::Receiver> receiver=dunedaq::ipm::makeIPMReceiver("ZmqReceiver");
receiver->connect_for_receives({ {"connection_string", "tcp://127.0.0.1:12345"} });
// Arg is receive timeout
Receiver::Response response=receiver->receive(std::chrono::milliseconds(10));
// ... do something with response.data or response.metadata
Basic example of the publisher/subscriber pattern:
// Publisher side
std::shared_ptr<dunedaq::ipm::Sender> publisher=dunedaq::ipm::makeIPMSender("ZmqPublisher");
publisher->connect_for_sends({ {"connection_string", "tcp://127.0.0.1:12345"} });
void* message= ... ;
// Third arg is send timeout; last arg is topic for subscribers to subscribe to
publisher->send(message, message_size, std::chrono::milliseconds(10), "topic");
// Subscriber side
std::shared_ptr<dunedaq::ipm::Subscriber> subscriber=dunedaq::ipm::makeIPMReceiver("ZmqSubscriber");
subscriber->connect_for_receives({ {"connection_string", "tcp://127.0.0.1:12345"} });
subscriber->subscribe("topic");
// Arg is receive timeout
Receiver::Response response=subscriber->receive(std::chrono::milliseconds(10));
// ... do something with response.data or response.metadata
More complete examples can be found in the test/plugins
directory.
The simplest set of tests to run are, of course, the unit tests; assuming you've got the ipm repo in your development area, performing the unit tests is done in the standard manner as described in the "Compiling and Running" instructions linked to at the top of the document.
For integration tests, the simplest thing is to make sure that you can send data between processes on the same host. This can be checked with the sourcecode/ipm/schema/ipm-zmqsender-job.json
and sourcecode/ipm/schema/ipm-zmqreceiver-job.json
scripts which can be passed to daq_application
. Note that these script make use of appfwk's FakeDataProducerDAQModule
and FakeDataConsumerDAQModule
DAQ modules; as these modules are in appfwk's test area and consequently not installed, you'll need to put the appfwk repo in your source area rather than relying on it in its installed form.
In two separate terminals, log in to the same node twice, and set yourself up for compiling and running as described in the instructions. The variable which will define the socket over which the two processes will communicate is called connection_string
so, e.g.,
grep connection_string sourcecode/ipm/schema/ipm-zmqsender-job.json
yields
"connection_string": "tcp://127.0.0.1:29870"
and similarly for ipm-zmqreceiver-job.json
. This means that the socket will be on the localhost at port 29870. Of course, make sure that port isn't being used, i.e., that
netstat -tlpn | grep 29870
doesn't list that port as already being listened on. If it is in use, then you'll want to use a different port, and of course mentally substitute that port for the 29870 used here.
Then, in terminal one, we'll launch a process which will send out vectors via TCP:
daq_application -c stdin://sourcecode/ipm/schema/ipm-zmqsender-job.json
and take the job up through the "conf" transition. Since the receiving process isn't receiving (and in fact doesn't exist) if you try to go through the "start" transition an error will occur. In the second terminal, if you type netstat -tlpn | grep 29870
you'll see that the port is now in use, something along the lines of:
tcp 0 0 127.0.0.1:29870 0.0.0.0:* LISTEN 38624/daq_applicati
Now let's launch the receiving process in the second terminal:
daq_application -c stdin://sourcecode/ipm/schema/ipm-zmqreceiver-job.json
and take it through the "start" transition. You should see something like:
2020-Dec-30 15:00:12,241 INFO [dunedaq::appfwk::FakeDataConsumerDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/appfwk/test/plugins/FakeDataConsumerDAQModule.cpp:96] FDC: do_work
Then go back to the first terminal, and issue the "start" transition to the sending process. Data should begin flowing. In particular, the sending process should look like:
2020-Dec-30 15:01:24,907 DEBUG_0 [dunedaq::appfwk::FakeDataProducerDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/appfwk/test/plugins/FakeDataProducerDAQModule.cpp:118] Produced vector 2 with contents {-3, -2, -1, 0, 1, 2, 3, 4, 5, 6} and size 10 DAQModule: fdp
2020-Dec-30 15:01:24,907 INFO [dunedaq::appfwk::FakeDataProducerDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/appfwk/test/plugins/FakeDataProducerDAQModule.cpp:122] FDP "fdp" push 2
12-30 15:01:25.907032 ZmqSender INFO send_(...): Starting send of 40 bytes
12-30 15:01:25.907060 ZmqSender INFO send_(...): Completed send of 40 bytes
2020-Dec-30 15:01:25,907 INFO [dunedaq::ipm::VectorIntIPMSenderDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/ipm/test/plugins/VectorIntIPMSenderDAQModule.cpp:107] : Sent 3 vectors DAQModule: viis
and the receiving process should look like:
2020-Dec-30 15:01:31,909 INFO [dunedaq::ipm::VectorIntIPMReceiverDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/ipm/test/plugins/VectorIntIPMReceiverDAQModule.cpp:106] : Received vector 8 with size 10 DAQModule: viir
2020-Dec-30 15:01:32,252 INFO [dunedaq::appfwk::FakeDataConsumerDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/appfwk/test/plugins/FakeDataConsumerDAQModule.cpp:108] FDC "fdc" pop 8
2020-Dec-30 15:01:32,252 DEBUG_0 [dunedaq::appfwk::FakeDataConsumerDAQModule::do_work(...) at /home/jcfree/daqbuild_v2.0.0_instructions/sourcecode/appfwk/test/plugins/FakeDataConsumerDAQModule.cpp:122] Received vector 8: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} DAQModule: fdc
Once you're satisfied with the result, you can terminate daq_application
as you normally would.