issuu/ocaml-zmq

Router not waking up to receive

Cjen1 opened this issue · 5 comments

Cjen1 commented

I'm using two sockets connected via the 127.0.0.1.

I have the server bound onto port 56789 and the client connecting to it.

If the server starts up substantially before the client (on the order of 5s) and the receive method is hit on the server, when the client sends a message the server will not wake up.

Potentially important information is that this is using the latest version of ocaml-zmq and the server socket is a router while the client socket is a dealer.

Additionally by doing a tcpdump it appears that the zmq side of the situation is correctly acknowledging the communication.

Cjen1 commented

I've instrumented ocaml-zmq as follows in zmq-deferred/src/socket.ml :

  (** The event loop repeats acting on events as long as there are
      sends or receives to be processed.
      According to the zmq specification, send and receive may update the event,
      and the fd can only be trusted after reading the status of the socket.
  *)
  let rec event_loop t =
    match t.closing with
    | true -> Deferred.return ()
    | false -> begin
        let open Zmq.Socket in
        let process queue =
          let f = Queue.peek queue in
          try
            f ();
            (* Success, pop the sender *)
            Queue.pop queue |> ignore
          with
          | Retry -> (* If f raised EAGAIN, dont pop the message *) ()
        in
        match events t.socket, Queue.is_empty t.senders, Queue.is_empty t.receivers with
        | _, true, true ->
          Logs.debug ~src:t.logging (fun m -> m "Empty qs");
          Condition.wait t.condition >>= fun () ->
          event_loop t
        | Poll_error, _, _ -> failwith "Cannot poll socket"
        (* Prioritize send's to keep network busy *)
        | Poll_in_out, false, _
        | Poll_out, false, _ ->
          Logs.debug ~src:t.logging (fun m -> m "send queue not empty");
          process t.senders;
          event_loop t
        | Poll_in_out, _, false
        | Poll_in, _, false ->
          Logs.debug ~src:t.logging (fun m -> m "recieve queue not empty");
          process t.receivers;
          event_loop t
        | Poll_in, _, true
        | Poll_out, true, _ ->
          Logs.debug ~src:t.logging (fun m -> m "Nothing waiting to consume, awaiting request, queue state: %b %b" (Queue.is_empty t.senders) (Queue.is_empty t.receivers));
          Condition.signal t.fd_condition ();
          Condition.wait t.condition >>= fun () ->
          event_loop t
        | No_event, _, _ ->
          Logs.debug ~src:t.logging (fun m -> m "No event, awaiting event");
          Condition.signal t.fd_condition ();
          Condition.wait t.condition >>= fun () ->
          event_loop t
        | exception Unix.Unix_error(Unix.ENOTSOCK, "zmq_getsockopt", "") ->
          Deferred.return ()
      end

  let condition_waker t =
    let (>>=.) = Lwt.bind in
    let rec loop () =
      Lwt_unix.sleep 5. >>=. fun () -> (
        Logs.debug ~src:t.logging (fun m -> m "forcing wakeup");
        T.(Condition.signal t.condition ();
        loop () )
      )
    in loop ()

  let of_socket: 'a Zmq.Socket.t -> Logs.src -> 'a t = fun socket logging ->
    let fd = Fd.create (Zmq.Socket.get_fd socket) in
    let t =
      { socket; fd;
        senders = Queue.create ();
        receivers = Queue.create ();
        condition = Condition.create ();
        fd_condition = Condition.create ();
        closing = false;
        logging;
      }
    in
    Deferred.don't_wait_for (fun () -> event_loop t);
    Deferred.don't_wait_for (fun () -> fd_monitor t);
    Lwt.async (fun () -> condition_waker t);
    t

Here t.logging just differentiates between sockets.

This is the pertinent area of the log when the client has connected and sent a message:

[2019-12-05 17:00:50.558764Z]     Zmq:client [DEBUG] Empty qs
[2019-12-05 17:00:50.558806Z]     Zmq:client [DEBUG] working with receivers
[2019-12-05 17:00:50.558819Z]     Zmq:client [DEBUG] Pushed onto queue
[2019-12-05 17:00:50.560789Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:00:55.563338Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:00:55.563405Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:00.567798Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:00.567864Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:05.572104Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:05.572171Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:10.576354Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:10.576434Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:15.580654Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:15.580720Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:20.584959Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:20.585040Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:25.589324Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:25.589404Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:30.593645Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:30.593713Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:32.998337Z]     Zmq:client [DEBUG] No event, awaiting event
[2019-12-05 17:01:32.998692Z]     Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
[2019-12-05 17:01:32.998818Z]     Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
[2019-12-05 17:01:35.595551Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:35.595618Z]     Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false
[2019-12-05 17:01:40.599895Z]     Zmq:client [DEBUG] forcing wakeup
[2019-12-05 17:01:40.599961Z]     Zmq:client [DEBUG] Nothing waiting to consume, awaiting request, queue state: true false

As far as I can tell the server is waiting to receive, though the socket state doesn't seem to be correctly updated.

The zmq api seems to indicate that router sockets may only generates events if the mandatory flag i set.Reading the zmq spec, it seems that events are only generated on router socket if the socket is marked as mandatory.

From the zmq manual:
When a ZMQ_ROUTER socket has ZMQ_ROUTER_MANDATORY flag set to 1, the socket shall generate ZMQ_POLLIN events upon reception of messages from one or more peers. Likewise, the socket shall generate ZMQ_POLLOUT events when at least one message can be sent to one or more peers.

Could you test to see if you get proper events if the mandatory flag is set on the router socket set_router_mandatory?

Also - Have you tested this in blocking mode (i.e. plain zmq - not async / lwt)?

Cjen1 commented

I haven't tested blocking mode, though basic testing seems to be showing it infinitely waiting... Am investigating it more today though.

Cjen1 commented

Ok testing in blocking mode was also failing. Looking into it further it appears that two clients were registered with the same routing identity and that was causing the messages to not be receivable. Although also the lack of set_router_mandatory was also causing issues.

Thanks for letting me know about that flag!

Updated to clear up set_router_mandatory