Router/Dealer Pattern facing unnecessary delay during multiple connections
PanagiotisDrakatos opened this issue · 6 comments
I am using the Jer0mq server socket model and specifically the router dealer pattern because I want to validate the identity of the clients. My problem is that I notice random upsurge spikes of 500 ms when I use a loop case where the server binds the socket and the client tries to connect. From negligible delays up to 500 ms delay, why is this happening? how can I avoid such latency? is this possible? What am I doing wrong? Here is my simple code to test it.
package sockets;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.nio.charset.StandardCharsets;
import static sockets.rtdealer.NOFLAGS;
public class ZmqStack {
public static void main(String[] args) throws InterruptedException {
Thread brokerThread = new Thread(() -> {
while (true) {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
String identity = new String(broker.recv());
String data1 = new String(broker.recv());
String identity2 = new String(broker.recv());
String data2 = new String(broker.recv());
System.out.println("Identity: " + identity + " Data: " + data1);
System.out.println("Identity: " + identity2 + " Data: " + data2);
broker.sendMore(identity.getBytes(ZMQ.CHARSET));
broker.send("xxx1".getBytes(StandardCharsets.UTF_8));
broker.sendMore(identity2.getBytes(ZMQ.CHARSET));
broker.send("xxx12");
broker.close();
context.destroy();
}
}
});
brokerThread.setName("broker");
Thread workerThread = new Thread(() -> {
while (true) {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
String identity = "identity1";
worker.setIdentity(identity.getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
worker.send("Hello1".getBytes(StandardCharsets.UTF_8));
String workload = new String(worker.recv(NOFLAGS));
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
}
});
workerThread.setName("worker");
Thread workerThread1 = new Thread(() -> {
while (true) {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.setIdentity("Identity2".getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
long start = System.currentTimeMillis();
worker.send("Hello2 " + Thread.currentThread().getName());
String workload = new String(worker.recv(NOFLAGS));
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
System.out.println("Elapsed Time: " + timeElapsed);
}
}
});
workerThread1.setName("worker1");
workerThread1.start();
workerThread.start();
brokerThread.start();
}
}
Did you try with Epsilon: A No-Op Garbage Collector ?
@fbacchella I have been using this
-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC
but the problem still persists any other possible solutions?
Thanks in advance
Context and sockets are created in the while loop, so every thing is new. I rather look at low level problems like socket exhaustion.
@fbacchella i forgot to mention that those upsurges started after the fourth/third time not exactly after many reconnections to conclude and say that socket exhaustion is the problem. Are y sure about that? Any other clue?
I ran you code, there is a few hiccups of about 100-200ms, but not a lot.
A long time ago, a similar question was asked and I do some investigation, that might give you some leads: #723 (comment)
@fbacchella Things become even worse when i try localhost on a different machine just with one client.
Here is the server Router part:
package sockets;
import org.zeromq.ZMQ;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
public class Zmq6 {
static final String URI = "tcp://192.168.1.106:5555";
static final int NOFLAGS = 0;
public static class Server implements Runnable {
public final String name;
Server(String name) { this.name = name; }
public void run() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.ROUTER);
socket.setImmediate(true);
socket.setProbeRouter(true);
socket.bind(URI);
String address1 = "A";
long start = System.currentTimeMillis();
String identity = new String(socket.recv());
String data1 = new String(socket.recv());
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
System.out.println(timeElapsed);
// Wait a second for the workers to connect their sockets.
System.out.println("Workers started, sleeping 1 second for warmup.");
socket.send(address1, ZMQ.SNDMORE);
socket.send("This is the workload.".getBytes(), NOFLAGS);
socket.close();
context.term();
}
}
public static void main(String[] args) throws InterruptedException {
while (true) {
String address = "A";
Thread warmup = new Thread(new Server(address));
warmup.start();
warmup.join();
}
}
}
Here is the client Dealer's part
package io.Adrestus;
import org.junit.jupiter.api.Test;
import org.zeromq.ZMQ;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
public class justTest {
static final String URI = "tcp://192.168.1.106:5555";
static final int NOFLAGS = 0;
public static class Worker implements Runnable {
public final String name;
private final byte[] END = "END".getBytes();
Worker(String name) { this.name = name; }
public void run() {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.DEALER);
socket.setIdentity(name.getBytes());
socket.setIdentity(name.getBytes());
socket.setImmediate(true);
socket.setProbeRouter(true);
socket.connect(URI);
socket.send("Hello1".getBytes(StandardCharsets.UTF_8));
long start = System.currentTimeMillis();
byte[] data = socket.recv(NOFLAGS);
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
System.out.println(new String(data, StandardCharsets.UTF_8)+" "+timeElapsed);
socket.close();
context.term();
}
}
@Test
public void test() throws InterruptedException {
while (true) {
String address1 = "A";
Thread workerA = new Thread(new Worker(address1));
workerA.start();
workerA.join();
}
}
}