zeromq/jeromq

Router/Dealer Pattern facing unnecessary delay during multiple connections

PanagiotisDrakatos opened this issue · 6 comments

I am using the Jer0mq server socket model and specifically the router dealer pattern because I want to validate the identity of the clients. My problem is that I notice random upsurge spikes of 500 ms when I use a loop case where the server binds the socket and the client tries to connect. From negligible delays up to 500 ms delay, why is this happening? how can I avoid such latency? is this possible? What am I doing wrong? Here is my simple code to test it.

package sockets;

import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;

import static sockets.rtdealer.NOFLAGS;

public class ZmqStack {
    public static void main(String[] args) throws InterruptedException {
        Thread brokerThread = new Thread(() -> {
            while (true) {
                try (ZContext context = new ZContext()) {
                    ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
                    broker.bind("tcp://*:5555");
                    String identity = new String(broker.recv());
                    String data1 = new String(broker.recv());
                    String identity2 = new String(broker.recv());
                    String data2 = new String(broker.recv());
                    System.out.println("Identity: " + identity + " Data: " + data1);
                    System.out.println("Identity: " + identity2 + " Data: " + data2);

                    broker.sendMore(identity.getBytes(ZMQ.CHARSET));
                    broker.send("xxx1".getBytes(StandardCharsets.UTF_8));

                    broker.sendMore(identity2.getBytes(ZMQ.CHARSET));
                    broker.send("xxx12");

                    broker.close();
                    context.destroy();
                }
            }
        });
        brokerThread.setName("broker");

        Thread workerThread = new Thread(() -> {
            while (true) {
                try (ZContext context = new ZContext()) {
                    ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
                    String identity = "identity1";
                    worker.setIdentity(identity.getBytes(ZMQ.CHARSET));
                    worker.connect("tcp://localhost:5555");


                    worker.send("Hello1".getBytes(StandardCharsets.UTF_8));

                    String workload = new String(worker.recv(NOFLAGS));
                    System.out.println(Thread.currentThread().getName() + " - Received " + workload);
                }
            }
        });
        workerThread.setName("worker");

        Thread workerThread1 = new Thread(() -> {
            while (true) {
                try (ZContext context = new ZContext()) {
                    ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
                    worker.setIdentity("Identity2".getBytes(ZMQ.CHARSET));

                    worker.connect("tcp://localhost:5555");

                    long start = System.currentTimeMillis();
                    worker.send("Hello2 " + Thread.currentThread().getName());
                    String workload = new String(worker.recv(NOFLAGS));
                    long finish = System.currentTimeMillis();
                    long timeElapsed = finish - start;
                    System.out.println(Thread.currentThread().getName() + " - Received " + workload);
                    System.out.println("Elapsed Time: " + timeElapsed);
                }
            }
        });
        workerThread1.setName("worker1");

        workerThread1.start();
        workerThread.start();
        brokerThread.start();
    }
}

enter image description here

@fbacchella I have been using this
-XX:+UnlockExperimentalVMOptions -XX:+UseEpsilonGC
but the problem still persists any other possible solutions?

Thanks in advance

Context and sockets are created in the while loop, so every thing is new. I rather look at low level problems like socket exhaustion.

@fbacchella i forgot to mention that those upsurges started after the fourth/third time not exactly after many reconnections to conclude and say that socket exhaustion is the problem. Are y sure about that? Any other clue?

I ran you code, there is a few hiccups of about 100-200ms, but not a lot.
A long time ago, a similar question was asked and I do some investigation, that might give you some leads: #723 (comment)

@fbacchella Things become even worse when i try localhost on a different machine just with one client.

Here is the server Router part:

package sockets;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
public class Zmq6 {
    static final String URI = "tcp://192.168.1.106:5555";
    static final int NOFLAGS = 0;

    public static class Server implements Runnable {
        public final String name;

        Server(String name) { this.name = name; }

        public void run() {
            ZMQ.Context context = ZMQ.context(1);
            ZMQ.Socket socket = context.socket(ZMQ.ROUTER);
            socket.setImmediate(true);
            socket.setProbeRouter(true);
            socket.bind(URI);

            String address1 = "A";

            long start = System.currentTimeMillis();
            String identity = new String(socket.recv());
            String data1 = new String(socket.recv());
            long finish = System.currentTimeMillis();
            long timeElapsed = finish - start;
            System.out.println(timeElapsed);
            // Wait a second for the workers to connect their sockets.
            System.out.println("Workers started, sleeping 1 second for warmup.");

            socket.send(address1, ZMQ.SNDMORE);
            socket.send("This is the workload.".getBytes(), NOFLAGS);
            socket.close();
            context.term();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        while (true) {
         String address = "A";
         Thread warmup = new Thread(new Server(address));
         warmup.start();
         warmup.join();
        }
    }
}

Here is the client Dealer's part

package io.Adrestus;

import org.junit.jupiter.api.Test;
import org.zeromq.ZMQ;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
public class justTest {
    static final String URI = "tcp://192.168.1.106:5555";
    static final int NOFLAGS = 0;
    public static class Worker implements Runnable {
        public final String name;
        private final byte[] END = "END".getBytes();

        Worker(String name) { this.name = name; }

        public void run() {
            ZMQ.Context context = ZMQ.context(1);
            ZMQ.Socket socket = context.socket(ZMQ.DEALER);
            socket.setIdentity(name.getBytes());
            socket.setIdentity(name.getBytes());
            socket.setImmediate(true);
            socket.setProbeRouter(true);
            socket.connect(URI);

            socket.send("Hello1".getBytes(StandardCharsets.UTF_8));
            long start = System.currentTimeMillis();
            byte[] data = socket.recv(NOFLAGS);
            long finish = System.currentTimeMillis();
            long timeElapsed = finish - start;
            System.out.println(new String(data, StandardCharsets.UTF_8)+" "+timeElapsed);
            socket.close();
            context.term();
        }
    }
    @Test
    public void test() throws InterruptedException {
        while (true) {
            String address1 = "A";
            Thread workerA = new Thread(new Worker(address1));
            workerA.start();
            workerA.join();
        }
    }
}

Here is random spike ms:
image