nanomsg/mangos-v1

reqrep device msg drop

daveamit opened this issue · 3 comments

Hi,
I'm kinda new to this lib. We are migrating a set of existing services which were talking to each other via ZMQ / CZMQ.
In ZMQ, we were using ROUTER/DEALER device as load balancer both listening to well-known ports, dynamically scaled services will connect to the DEALER end of the device and clients will connect to ROUTER end.
We are trying to accomplish same using mangos.Device but getting a message drops.

I know I'm definitely doing something wrong, can someone please guide me?

Following is the device code

func device() {
	var front mangos.Socket
	var back mangos.Socket
	var err error

	if front, err = rep.NewSocket(); err != nil {
		die("can't get new req socket: %s", err.Error())
	}
	if back, err = req.NewSocket(); err != nil {
		die("can't get new req socket: %s", err.Error())
	}

	back.AddTransport(tcp.NewTransport())
	front.AddTransport(tcp.NewTransport())

	if err = back.Listen("tcp://127.0.0.1:40900"); err != nil {
		die("Problem in back: %s", err.Error())
	}
	if err = front.Listen("tcp://127.0.0.1:40899"); err != nil {
		die("Problem in front: %s", err.Error())
	}

	err = mangos.Device(front, back)
	if err != nil {
		die("Can't run device: %s", err.Error())
	}

	forever := make(chan bool)
	<-forever
}

The server (in RAW mode)

func node0(url string) {
	var sock mangos.Socket
	var err error
	if sock, err = rep.NewSocket(); err != nil {
		die("can't get new rep socket: %s", err)
	}
	sock.AddTransport(ipc.NewTransport())
	sock.AddTransport(tcp.NewTransport())

	sock.SetOption(mangos.OptionRaw, true)

	url = "tcp://127.0.0.1:40900"
	if err = sock.Dial(url); err != nil {
		die("can't listen on rep socket: %s", err.Error())
	}
	for {
		// Could also use sock.RecvMsg to get header
		msg, err := sock.RecvMsg()
		go func(msg *mangos.Message) {
			if string(msg.Body) == "DATE" { // no need to terminate
				fmt.Println("NODE0: RECEIVED DATE REQUEST")
				d := date()
				msg.Body = []byte(d)
				fmt.Printf("NODE0: SENDING DATE %s\n", d)
				err = sock.SendMsg(msg)
				if err != nil {
					die("can't send reply: %s", err.Error())
				}
			}
		}(msg)
	}
}

Set of clients

func c(url string) {
	var sock mangos.Socket
	var err error
	var msg []byte

	if sock, err = req.NewSocket(); err != nil {
		die("can't get new req socket: %s", err.Error())
	}
	sock.AddTransport(ipc.NewTransport())
	sock.AddTransport(tcp.NewTransport())
	if err = sock.Dial(url); err != nil {
		die("can't dial on req socket: %s", err.Error())
	}

	count := 100
	for i := 0; i < count; i++ {
		// time.Sleep(time.Millisecond * 100)
		// fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
		if err = sock.Send([]byte("DATE")); err != nil {
			die("can't send message on push socket: %s", err.Error())
		}
		if msg, err = sock.Recv(); err != nil {
			die("can't receive date: %s %s", err.Error(), msg)
		}
		// fmt.Printf("NODE1: RECEIVED DATE %s\n", string(msg))
	}
	fmt.Println("Done")
	sock.Close()
}

func node1(url string) {
	start := time.Now()
	count := 100
	var wg sync.WaitGroup
	for i := 0; i < count; i++ {
		go func(i int) {
			wg.Add(1)
			c(url)
			wg.Done()
		}(i)
	}
	wg.Wait()
	end := time.Since(start)
	fmt.Printf("%d in %s (%f calls / sec)\n", count*100, end, float64(count*100)/end.Seconds())
}

I don't see anything immediately wrong, but note that the device framework uses raw mode sockets, and will quite happily drop messages if there is no peer on the far side, or if the peer cannot receive messages due to backpressure. Unfortunately, one of the drawbacks of using the device framework is that you lose any support for backpressure being reported back to the client, which means that clients can easily overwhelm the server resulting in dropped messages.

The solution to this to ensure that you have adequate servers to handle this, or that you tune the messaging timeouts and retries so that you get automatic retries quickly enough to satisfy your needs. (Recall that REQ/REP automatically retries on the client side if it doesn't receive a reply from the server in a given period of time.)

Well, when I connect client (req) with async server (req in raw mode), there is zero message drop. But when I use reqres device, I’m not even able to send out 50 consecutive requests won’t dropping messages.

What I’m looking for is a load balancer type configuration which clients will be connecting to, the load balancer has well known location. The servers are dynamic and hence come and go depending on load, these servers also connect to the load balancer, and load balancer round Robbins the requests among all available (connected servers).

Can you please advice on an approach?

PS: We have achieved this in zeromq using router/dealer configuration.

Closing issue as the first comment partially explains the solution.

The solution to this to ensure that you have adequate servers to handle this, or that you tune the messaging timeouts and retries so that you get automatic retries quickly enough to satisfy your needs. (Recall that REQ/REP automatically retries on the client side if it doesn't receive a reply from the server in a given period of time.)