reqrep device msg drop
daveamit opened this issue · 3 comments
Hi,
I'm kinda new to this lib. We are migrating a set of existing services which were talking to each other via ZMQ / CZMQ.
In ZMQ, we were using ROUTER/DEALER
device as load balancer both listening
to well-known ports, dynamically scaled services will connect
to the DEALER
end of the device and clients will connect
to ROUTER
end.
We are trying to accomplish same using mangos.Device
but getting a message drops.
I know I'm definitely doing something wrong, can someone please guide me?
Following is the device code
func device() {
var front mangos.Socket
var back mangos.Socket
var err error
if front, err = rep.NewSocket(); err != nil {
die("can't get new req socket: %s", err.Error())
}
if back, err = req.NewSocket(); err != nil {
die("can't get new req socket: %s", err.Error())
}
back.AddTransport(tcp.NewTransport())
front.AddTransport(tcp.NewTransport())
if err = back.Listen("tcp://127.0.0.1:40900"); err != nil {
die("Problem in back: %s", err.Error())
}
if err = front.Listen("tcp://127.0.0.1:40899"); err != nil {
die("Problem in front: %s", err.Error())
}
err = mangos.Device(front, back)
if err != nil {
die("Can't run device: %s", err.Error())
}
forever := make(chan bool)
<-forever
}
The server (in RAW mode)
func node0(url string) {
var sock mangos.Socket
var err error
if sock, err = rep.NewSocket(); err != nil {
die("can't get new rep socket: %s", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
sock.SetOption(mangos.OptionRaw, true)
url = "tcp://127.0.0.1:40900"
if err = sock.Dial(url); err != nil {
die("can't listen on rep socket: %s", err.Error())
}
for {
// Could also use sock.RecvMsg to get header
msg, err := sock.RecvMsg()
go func(msg *mangos.Message) {
if string(msg.Body) == "DATE" { // no need to terminate
fmt.Println("NODE0: RECEIVED DATE REQUEST")
d := date()
msg.Body = []byte(d)
fmt.Printf("NODE0: SENDING DATE %s\n", d)
err = sock.SendMsg(msg)
if err != nil {
die("can't send reply: %s", err.Error())
}
}
}(msg)
}
}
Set of clients
func c(url string) {
var sock mangos.Socket
var err error
var msg []byte
if sock, err = req.NewSocket(); err != nil {
die("can't get new req socket: %s", err.Error())
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Dial(url); err != nil {
die("can't dial on req socket: %s", err.Error())
}
count := 100
for i := 0; i < count; i++ {
// time.Sleep(time.Millisecond * 100)
// fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
if err = sock.Send([]byte("DATE")); err != nil {
die("can't send message on push socket: %s", err.Error())
}
if msg, err = sock.Recv(); err != nil {
die("can't receive date: %s %s", err.Error(), msg)
}
// fmt.Printf("NODE1: RECEIVED DATE %s\n", string(msg))
}
fmt.Println("Done")
sock.Close()
}
func node1(url string) {
start := time.Now()
count := 100
var wg sync.WaitGroup
for i := 0; i < count; i++ {
go func(i int) {
wg.Add(1)
c(url)
wg.Done()
}(i)
}
wg.Wait()
end := time.Since(start)
fmt.Printf("%d in %s (%f calls / sec)\n", count*100, end, float64(count*100)/end.Seconds())
}
I don't see anything immediately wrong, but note that the device framework uses raw mode sockets, and will quite happily drop messages if there is no peer on the far side, or if the peer cannot receive messages due to backpressure. Unfortunately, one of the drawbacks of using the device framework is that you lose any support for backpressure being reported back to the client, which means that clients can easily overwhelm the server resulting in dropped messages.
The solution to this to ensure that you have adequate servers to handle this, or that you tune the messaging timeouts and retries so that you get automatic retries quickly enough to satisfy your needs. (Recall that REQ/REP automatically retries on the client side if it doesn't receive a reply from the server in a given period of time.)
Well, when I connect client (req) with async server (req in raw mode), there is zero message drop. But when I use reqres device, I’m not even able to send out 50 consecutive requests won’t dropping messages.
What I’m looking for is a load balancer type configuration which clients will be connecting to, the load balancer has well known location. The servers are dynamic and hence come and go depending on load, these servers also connect to the load balancer, and load balancer round Robbins the requests among all available (connected servers).
Can you please advice on an approach?
PS: We have achieved this in zeromq using router/dealer configuration.
Closing issue as the first comment partially explains the solution.
The solution to this to ensure that you have adequate servers to handle this, or that you tune the messaging timeouts and retries so that you get automatic retries quickly enough to satisfy your needs. (Recall that REQ/REP automatically retries on the client side if it doesn't receive a reply from the server in a given period of time.)