zeromq/jeromq

DEALER recv sometimes hangs

sazzer opened this issue · 5 comments

sazzer commented

I'm just getting started playing with JeroMQ, and I'm finding that my DEALER is especially unreliable.

I've got a test that is almost exactly one of the examples, but when I run it then, more often than not, it will just hang. And it always hangs on worker.recvStr() within my DEALER side.

The code I'm using is this:

public class DealerRouterTest {
    private static final int NBR_WORKERS = 2;

    private static class Worker extends Thread
    {
        @Override
        public void run()
        {
            try (ZContext context = new ZContext()) {
                ZMQ.Socket worker = context.createSocket(SocketType.DEALER);

                worker.connect("tcp://localhost:5671");

                System.out.println(Thread.currentThread().getName() + " - Connected");

                int total = 0;
                while (true) {
                    //  Tell the broker we're ready for work
                    worker.sendMore("");
                    worker.send("Hi Boss from " + Thread.currentThread().getName());
                    System.out.println(Thread.currentThread().getName() + " - Sent Hi Boss");

                    //  Get workload from broker, until finished
                    worker.recvStr(); //  Envelope delimiter
                    System.out.println(Thread.currentThread().getName() + " - Received Envelope");
                    String workload = worker.recvStr();
                    System.out.println(Thread.currentThread().getName() + " - Received " + workload);

                    boolean finished = workload.equals("Fired!");
                    if (finished) {
                        System.out.printf("Completed: %d tasks\n", total);
                        break;
                    }
                    total++;

                    //  Do some random work
                    try {
                        Thread.sleep(500);
                    }
                    catch (InterruptedException e) {
                    }
                }
            }
        }
    }


    @Test
    public void test() {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
            broker.bind("tcp://*:5671");

            for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++) {
                Thread worker = new Worker();
                worker.setName("worker-" + workerNbr);
                worker.start();
            }

            //  Run for five seconds and then tell workers to end
            long endTime = System.currentTimeMillis() + 5000;
            int workersFired = 0;
            while (true) {
                //  Next message gives us least recently used worker
                String identity = broker.recvStr();
                broker.recv(0); //  Envelope delimiter
                String message = broker.recvStr(0); //  Response from worker
                System.out.println(Thread.currentThread().getName() + " - " + message);

                //  Encourage workers until it's time to fire them
                if (System.currentTimeMillis() < endTime) {
                    broker.sendMore(identity);
                    broker.sendMore("");
                    broker.send("Work harder");
                    System.out.println(Thread.currentThread().getName() + " - " + "Work harder");

                } else {
                    broker.sendMore(identity);
                    broker.sendMore("");
                    broker.send("Fired!");
                    System.out.println(Thread.currentThread().getName() + " - " + "Fired!");

                    if (++workersFired == NBR_WORKERS)
                        break;
                }
            }
        }
    }
}

So the only real changes here from the example is that I've added some logging to see why it's breaking. Sometimes it works fine, other times it just does this:
image

And then just hangs there forever.

Am I missing something? Any idea why it's doing this?

Cheers

sazzer commented

I've just tried, and it hangs exactly the same if I use ipc://. inproc:// or tcp://. However, it works fine with all three of those transports if I'm using REQ/REP instead of DEALER/ROUTER.

sazzer commented

I've also just pared it back to have no threading at all, and it still hangs:

        try (ZContext context = new ZContext()) {
            ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
            broker.bind("inproc://test");

            ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
            worker.connect("inproc://test");
            System.out.println(Thread.currentThread().getName() + " - Connected");

            worker.sendMore("");
            worker.send("Hello " + Thread.currentThread().getName());
            System.out.println(Thread.currentThread().getName() + " - Sent Hello");

            String identity = broker.recvStr();
            broker.recv(0); //  Envelope delimiter
            String message = broker.recvStr(0); //  Response from worker
            System.out.println(Thread.currentThread().getName() + " - " + message);

            broker.sendMore(identity);
            broker.sendMore("");
            broker.send("Hello back");

            worker.recvStr(); //  Envelope delimiter
            System.out.println(Thread.currentThread().getName() + " - Received Envelope");
            String workload = worker.recvStr();
            System.out.println(Thread.currentThread().getName() + " - Received " + workload);
        }

Hi, I tried your test and it indeed hangs. I had much better success when setting the identity of the DEALER socket after it was created.
For example, with a quick and dirty call to
ZHelper.setId(worker); // Set a printable identity after the createSocket call.

sazzer commented

Interesting. That does indeed seem to fix it.
I'd ignored that because of the comment - "Set a printable identity after the createSocket call". I'd assumed that meant it was for debugging purposes and not anything about the networking itself.

I assume that this identity is the value that's actually used between client and server so that messages go to the right place?

Cheers