eclipse-cyclonedds/cyclonedds-cxx

Managing DataReaderListener(s)

adam-lee opened this issue · 4 comments

I have a listener for each topic (extended on dds::sub::NoOpDataReaderListener<T>) with on_data_available().
This means I have as many threads as there are topics (T). For example, 100 topics would result in 100 Listener threads.

Keeping everything the same (no reduction of IDLs / topics), is it possible to reduce the # of threads? I think there is a very good chance I'm approaching this completely wrong.

ps. Let me know if this is not the right place to ask this sort of question. Ty!

Let me know if this is not the right place to ask this sort of question.

GitHub is fine, no worries

I have a listener for each topic (extended on dds::sub::NoOpDataReaderListener) with on_data_available().
This means I have as many threads as there are topics (T). For example, 100 topics would result in 100 Listener threads.

Cyclone doesn't create threads for listeners1, so why would there be 100 listener threads just because you have 100 readers?

Footnotes

  1. There are downsides to this and because of those downsides, it'll change at some point. Even then it'll be just one thread, or perhaps a small pool.

Right, I'm creating (and detaching) a listener thread for each reader (for each topic) during initialization of my program.

Are you saying it is indeed possible to have a single reader with multiple listeners for different topics? If so, where should I start from? Using dds::sub::NoOpDataReaderListener<T> / on_data_available(dds::sub::DataReader<T> &reader) perhaps is not the right start.

Ty!

With the caveat that I may be misimagining what you are doing:

I think you're making an incorrect assumption about threads/topics/readers/listeners. Your listener function, so here your on_data_available function, is invoked by the core of Cyclone on whatever thread happens to detect the event. For data available, that is by definition on updating the reader history cache, which itself happens:

  • in the internal short-circuit for local readers, synchronously during a write (and similar) operation, on the thread doing the write;
  • in the case of data coming in from the network, typically the thread receiving the packet containing the data;
  • in the non-typical case, it could be one of the other receive threads if it switches between unicast and multicast (on Linux it defaults to three if multicast is enabled, but it can be configured to use a single one);
  • in the case of Iceoryx, it would be a thread that is created by Iceoryx for this purpose. It is my understanding that there is one such thread in each DDS domain used by the process that has Iceoryx enabled, but I don't know Iceoryx that well ...

What is very much absent is the thread that creates the reader, or any thread that you create for that matter.

What I imagining you're doing is something like this, for each topic:

  • you spawn a detached thread
  • that thread creates the reader with a listener
  • and that sits doing nothing until the process is terminating.

That means you have all these threads and they're doing absolutely nothing, you'd have had exactly the same behaviour if you created them all in one thread!

If you want control over which thread processes data for which (set of) reader(s), then your best bet is to create one waitset for each thread you want to do processing, attach the readers to those waitsets (it involves conditions in the API), then spawn those threads do wait/process loop. You can even use the dispatch operation on the waitset to simplify that loop. For example, 10 threads each handling 1 waitset with with 10 readers.

What you can't do is create a single reader that handles multiple topics — that's simply impossible given DDS' model. What you can do is use the same listener functions for multiple readers.

I think you're making an incorrect assumption about threads/topics/readers/listeners. Your listener function, so here your on_data_available function, is invoked by the core of Cyclone on whatever thread happens to detect the event.

You are correct. I was wrong to assume that I'd need a listener (on_data_available) on a thread of its own.

What is very much absent is the thread that creates the reader, or any thread that you create for that matter.

Nice!

What I imagining you're doing is something like this, for each topic:

  • you spawn a detached thread
  • that thread creates the reader with a listener
  • and that sits doing nothing until the process is terminating.

Correct 💯 .

That means you have all these threads and they're doing absolutely nothing, you'd have had exactly the same behaviour if you created them all in one thread!

Thanks for confirming my suspicion. I indeed did this all wrong.

If you want control over which thread processes data for which (set of) reader(s), then your best bet is to create one waitset for each thread you want to do processing, attach the readers to those waitsets (it involves conditions in the API), then spawn those threads do wait/process loop. You can even use the dispatch operation on the waitset to simplify that loop. For example, 10 threads each handling 1 waitset with with 10 readers.

This also explains why all the example in the Internet does this 👍 .

What you can't do is create a single reader that handles multiple topics — that's simply impossible given DDS' model. What you can do is use the same listener functions for multiple readers.

Thanks for the explanation, and once again I appreciate quick and thorough response 💯 .