zeromq/zmqpp

inheriting an actor

jgornowich opened this issue · 4 comments

I have tried to create a derived class from a zmqpp::actor to test embedding the routines into a more OO application. Here is s simple test example of what I was thinking was possible.

class Worker : public zmqpp::actor {
 public:
  Worker();

 private:
  std::string _name;
  int _count;

  bool doWork(zmqpp::socket *pipe);
};

Worker::Worker(const std::string &name)
    : actor(std::bind(&Worker::doWork, this, std::placeholders::_1)),
      _name(name) {
};

bool Worker::doWork(zmqpp::socket *pipe) {
  std::cout << "I am starting work" << std::endl;
  zmqpp::message msg;
  zmqpp::signal sig;
  pipe->send(zmqpp::signal::ok);
  bool terminated = false;
  while (!terminated) {
    while (pipe->receive(msg)) {
      if (msg.is_signal()) {
        msg.get(sig, 0);
        if (sig == zmqpp::signal::stop) {
          terminated = true;
          break;
        }
      }
      std::cout << _name << " got message : " << msg.get(0) << std::endl;
      _count++;
    }
  }

  zmqpp::message final_res;
  final_res << std::to_string(_count);
  // send the total count to the parent thread
  pipe->send(final_res);
  return true;
}

int main() {

  std::vector<std::unique_ptr<zmqpp::actor>> actors;

  // create 4 actors
  for (int ii = 0; ii < 4; ii++) {
    std::unique_ptr<zmqpp::actor> ptr(new Worker("worker-" + std::to_string(ii)));
    actors.push_back(std::move(ptr));
  }

  // send 12 times to each actor
  for (int i = 0; i < 12; i++) {
    std::cout << "sending " << i << std::endl;
    std::for_each(actors.begin(), actors.end(), [&](std::unique_ptr<zmqpp::actor> &a) {
      a->pipe()->send("hey" + std::to_string(i));
    });
  }

  // Shutdown each actor and get the results
  std::for_each(actors.begin(), actors.end(), [&](std::unique_ptr<zmqpp::actor> &a) {
      a->pipe()->send(zmqpp::signal::stop);
      zmqpp::message msg;
      a->pipe()->receive(msg);
      std::cout << " parent received" << msg.get(0) << std::endl;
  });

  return 0;
}

I was wondering if this is ok to do this with these actors or if there was some kind of underlying issue with this approach. If there is a better / simpler way to accomplish the same results, I would be curious to hear any suggestions. Also, am I overlooking where a zmqpp::context is required here somewhere or is that only necessary if the ActorRoutine needs to create a zmqpp::socket object?

I understand that there are thread safety concerns with accessing the member variables of this simplistic Worker inside the ActorRoutine here, my question is more related to the zmqpp usage.

Is this repo no longer active?

I'm not a contributor to this repository, but a constructor that is an infinite loop seems like an anti pattern. In this particular case you'll only ever have one worker running at a time because construction of the second worker only completes when the first worker terminates.

Edit: On closer inspection it looks like this code is half baked, you're using a non-existent Tester class and an undefined doWork method. Maybe rewrite the example so that it builds.

Sorry for for the confusion, there were two typos in the original example above, which I have edited. Where the infinite while loop was should have been the doWork method implementation not the constructor and the Tester class should have just said Worker.

In either event, a solution to my problem based upon this zmqpp library has been abandoned as this repo seems to be completely dead. I have instead gone back to using CZMQ instead, I was just hoping that this implementation would provide a little more OO like flavor to the zmq actor pattern.

I'd still be open to suggestions/criticism on this potential solution for future knowledge.

No worries!

I pulled down your code, built it, and executed and it works the way I'd expect -- after declaring the Worker(const std::string &s) ctor, anyway 😁.

As for whether deriving from zmqpp::actor is a good practice or not, my vote is for "bad practice" at least in the manner you did it. The only thing the ActorRoutine needs to get the job done is the zmqpp::socket pointer it's provided as a parameter. By deriving from zmqpp::actor you give the ActorRoutine access to the parent's pipe via the pipe() method and the stop() method. Utilizing either one of these methods in the ActorRoutine opens up a race-condition -- the parent end socket can be used from two threads and zmqpp::sockets aren't thread safe. It's safe to say you need to avoid using those methods in the ActorRoutine. That leaves exactly 0% of the public interface available to you to use making your Worker no-longer a zmq::actor.

You asked about whether a zmqpp::context is used and it definitely is. The zmqpp::actors are using zmqpp::sockets to communicate which necessitate a zmqpp::context. The sneaky bit is that the context is a static member of the class, so each process automatically gets one context to share between actors. Since contexts are thread-safe, there's no concern.

As for whether there is a simpler way to do what you're trying to do, it's hard to say without more meat to the example. If you're just trying to run some simple calculations in parallel and capture a result after stopping then using actors to do that is reasonable. You could consider a thread pool if zmq::actors come and go often.

You mentioned thread-safety concerns due to access of members in the Worker class doWork() method, but I don't think one exists. Since you're not using pipe() or stop() in doWork() things seem entirely sane to me because all members and methods used are private. Being private prevents the Worker state from being touched outside of the context of the zmqpp::actor's thread context -- exactly the way you'd want it.