Router not waking up to receive
Cjen1 opened this issue · 5 comments
I'm using two sockets connected via the 127.0.0.1
.
I have the server bound onto port 56789
and the client connecting to it.
If the server starts up substantially before the client (on the order of 5s) and the receive method is hit on the server, when the client sends a message the server will not wake up.
Potentially important information is that this is using the latest version of ocaml-zmq
and the server socket is a router
while the client socket is a dealer
.
Additionally by doing a tcpdump
it appears that the zmq
side of the situation is correctly acknowledging the communication.
I've instrumented ocaml-zmq
as follows in zmq-deferred/src/socket.ml
:
(** The event loop repeats acting on events as long as there are
sends or receives to be processed.
According to the zmq specification, send and receive may update the event,
and the fd can only be trusted after reading the status of the socket.
*)
let rec event_loop t =
match t.closing with
| true -> Deferred.return ()
| false -> begin
let open Zmq.Socket in
let process queue =
let f = Queue.peek queue in
try
f ();
(* Success, pop the sender *)
Queue.pop queue |> ignore
with
| Retry -> (* If f raised EAGAIN, dont pop the message *) ()
in
match events t.socket, Queue.is_empty t.senders, Queue.is_empty t.receivers with
| _, true, true ->
Logs.debug ~src:t.logging (fun m -> m "Empty qs");
Condition.wait t.condition >>= fun () ->
event_loop t
| Poll_error, _, _ -> failwith "Cannot poll socket"
(* Prioritize send's to keep network busy *)
| Poll_in_out, false, _
| Poll_out, false, _ ->
Logs.debug ~src:t.logging (fun m -> m "send queue not empty");
process t.senders;
event_loop t
| Poll_in_out, _, false
| Poll_in, _, false ->
Logs.debug ~src:t.logging (fun m -> m "recieve queue not empty");
process t.receivers;
event_loop t
| Poll_in, _, true
| Poll_out, true, _ ->
Logs.debug ~src:t.logging (fun m -> m "Nothing waiting to consume, awaiting request, queue state: %b %b" (Queue.is_empty t.senders) (Queue.is_empty t.receivers));
Condition.signal t.fd_condition ();
Condition.wait t.condition >>= fun () ->
event_loop t
| No_event, _, _ ->
Logs.debug ~src:t.logging (fun m -> m "No event, awaiting event");
Condition.signal t.fd_condition ();
Condition.wait t.condition >>= fun () ->
event_loop t
| exception Unix.Unix_error(Unix.ENOTSOCK, "zmq_getsockopt", "") ->
Deferred.return ()
end
let condition_waker t =
let (>>=.) = Lwt.bind in
let rec loop () =
Lwt_unix.sleep 5. >>=. fun () -> (
Logs.debug ~src:t.logging (fun m -> m "forcing wakeup");
T.(Condition.signal t.condition ();
loop () )
)
in loop ()
let of_socket: 'a Zmq.Socket.t -> Logs.src -> 'a t = fun socket logging ->
let fd = Fd.create (Zmq.Socket.get_fd socket) in
let t =
{ socket; fd;
senders = Queue.create ();
receivers = Queue.create ();
condition = Condition.create ();
fd_condition = Condition.create ();
closing = false;
logging;
}
in
Deferred.don't_wait_for (fun () -> event_loop t);
Deferred.don't_wait_for (fun () -> fd_monitor t);
Lwt.async (fun () -> condition_waker t);
t
Here t.logging
just differentiates between sockets.
This is the pertinent area of the log when the client has connected and sent a message:
[2019-12-05 17:00:50.558764Z] Zmq:client [DEBUG] Empty qs
[2019-12-05 17:00:50.558806Z] Zmq:client [DEBUG] working with receivers
[2019-12-05 17:00:50.558819Z] Zmq:client [DEBUG] Pushed onto queue
[2019-12-05 17:00:50.560789Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:00:55.563338Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:00:55.563405Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:00.567798Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:00.567864Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:05.572104Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:05.572171Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:10.576354Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:10.576434Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:15.580654Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:15.580720Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:20.584959Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:20.585040Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:25.589324Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:25.589404Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:30.593645Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:30.593713Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:32.998337Z] Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:32.998692Z] Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
[2019-12-05 17:01:32.998818Z] Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
[2019-12-05 17:01:35.595551Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:35.595618Z] Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
[2019-12-05 17:01:40.599895Z] Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:40.599961Z] Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
As far as I can tell the server is waiting to receive, though the socket state doesn't seem to be correctly updated.
The zmq api seems to indicate that router sockets may only generates events if the mandatory flag i set.Reading the zmq spec, it seems that events are only generated on router socket if the socket is marked as mandatory.
From the zmq manual:
When a ZMQ_ROUTER socket has ZMQ_ROUTER_MANDATORY flag set to 1, the socket shall generate ZMQ_POLLIN events upon reception of messages from one or more peers. Likewise, the socket shall generate ZMQ_POLLOUT events when at least one message can be sent to one or more peers.
Could you test to see if you get proper events if the mandatory flag is set on the router socket set_router_mandatory
?
Also - Have you tested this in blocking mode (i.e. plain zmq - not async / lwt)?
I haven't tested blocking mode, though basic testing seems to be showing it infinitely waiting... Am investigating it more today though.
Ok testing in blocking mode was also failing. Looking into it further it appears that two clients were registered with the same routing identity and that was causing the messages to not be receivable. Although also the lack of set_router_mandatory
was also causing issues.
Thanks for letting me know about that flag!
Updated to clear up set_router_mandatory