DEALER recv sometimes hangs
sazzer opened this issue · 5 comments
I'm just getting started playing with JeroMQ, and I'm finding that my DEALER is especially unreliable.
I've got a test that is almost exactly one of the examples, but when I run it then, more often than not, it will just hang. And it always hangs on worker.recvStr()
within my DEALER side.
The code I'm using is this:
public class DealerRouterTest {
private static final int NBR_WORKERS = 2;
private static class Worker extends Thread
{
@Override
public void run()
{
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.connect("tcp://localhost:5671");
System.out.println(Thread.currentThread().getName() + " - Connected");
int total = 0;
while (true) {
// Tell the broker we're ready for work
worker.sendMore("");
worker.send("Hi Boss from " + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " - Sent Hi Boss");
// Get workload from broker, until finished
worker.recvStr(); // Envelope delimiter
System.out.println(Thread.currentThread().getName() + " - Received Envelope");
String workload = worker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
boolean finished = workload.equals("Fired!");
if (finished) {
System.out.printf("Completed: %d tasks\n", total);
break;
}
total++;
// Do some random work
try {
Thread.sleep(500);
}
catch (InterruptedException e) {
}
}
}
}
}
@Test
public void test() {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5671");
for (int workerNbr = 0; workerNbr < NBR_WORKERS; workerNbr++) {
Thread worker = new Worker();
worker.setName("worker-" + workerNbr);
worker.start();
}
// Run for five seconds and then tell workers to end
long endTime = System.currentTimeMillis() + 5000;
int workersFired = 0;
while (true) {
// Next message gives us least recently used worker
String identity = broker.recvStr();
broker.recv(0); // Envelope delimiter
String message = broker.recvStr(0); // Response from worker
System.out.println(Thread.currentThread().getName() + " - " + message);
// Encourage workers until it's time to fire them
if (System.currentTimeMillis() < endTime) {
broker.sendMore(identity);
broker.sendMore("");
broker.send("Work harder");
System.out.println(Thread.currentThread().getName() + " - " + "Work harder");
} else {
broker.sendMore(identity);
broker.sendMore("");
broker.send("Fired!");
System.out.println(Thread.currentThread().getName() + " - " + "Fired!");
if (++workersFired == NBR_WORKERS)
break;
}
}
}
}
}
So the only real changes here from the example is that I've added some logging to see why it's breaking. Sometimes it works fine, other times it just does this:
And then just hangs there forever.
Am I missing something? Any idea why it's doing this?
Cheers
I've just tried, and it hangs exactly the same if I use ipc://
. inproc://
or tcp://
. However, it works fine with all three of those transports if I'm using REQ
/REP
instead of DEALER
/ROUTER
.
I've also just pared it back to have no threading at all, and it still hangs:
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("inproc://test");
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.connect("inproc://test");
System.out.println(Thread.currentThread().getName() + " - Connected");
worker.sendMore("");
worker.send("Hello " + Thread.currentThread().getName());
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
String identity = broker.recvStr();
broker.recv(0); // Envelope delimiter
String message = broker.recvStr(0); // Response from worker
System.out.println(Thread.currentThread().getName() + " - " + message);
broker.sendMore(identity);
broker.sendMore("");
broker.send("Hello back");
worker.recvStr(); // Envelope delimiter
System.out.println(Thread.currentThread().getName() + " - Received Envelope");
String workload = worker.recvStr();
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
Hi, I tried your test and it indeed hangs. I had much better success when setting the identity of the DEALER socket after it was created.
For example, with a quick and dirty call to
ZHelper.setId(worker); // Set a printable identity
after the createSocket
call.
Interesting. That does indeed seem to fix it.
I'd ignored that because of the comment - "Set a printable identity after the createSocket call". I'd assumed that meant it was for debugging purposes and not anything about the networking itself.
I assume that this identity is the value that's actually used between client and server so that messages go to the right place?
Cheers