issuu/ocaml-zmq

Implement zmq-lwt.

andersfugmann opened this issue · 20 comments

Just a head up:

Now that we have a zmq-async, I have started working on zmq-lwt by creating a functor to abstract the concurrency monad.

This would essentially implement the same interface as https://github.com/hcarty/lwt-zmq/blob/master/src/lwt_zmq.ml, but keep the implementations identical, albait factorized by the concurrency monads (Async_kernel.Deferred.t / Lwt.t).

I will create a branch today, as I do have some questions about both implementations.

Basic import PR in as #57

I have updated PR #56 with a version that I believe is working. However its completely currently untested so I will need to write some tests.

@rgrinberg Can you give examples of idiomatic Lwt / Async in the context of the deferred bindings? I don't see how they are not idiomatic to both.

Sure, so basically idiomatic Async accepts slightly different types than Lwt. For example, it should have a function to send an Iobuf.t, which is the preferred type for 0 copy IO. Another common type that core writers accept is a bigstring/bigsubstring. Which are just core's wrappers for bigarrays. Basically this requires additions to the Async API, so it shouldn't really affect your functor work as far as I can tell.

I completed what I believe is a safe version version of zmq-async / zmq-lwt, which shares the same implementation apart from the monad which is functorized. The implementation is based on what I can read from the ZMQ documentation, and tested through the unittests (which actually exposed some deadlock and fd bugs in the code). See #56

I elected to have two threads running in the background.
event
This thread monitors the state of the socket and makes appropriate actions (sending if there are senders on the queue and the socket accepts send without blocking, and recv if there are messages pending and waiters). The challenge is that the state of the socket can change though call to send/recv. and may not be caught by the fd, so when we sleep on the fd, we must be able to be woken up if the socket is sent to or read from.

I could have elected to let every send / recv monitor the socket, but the every send/recv would wakeup all other threads waiting with is more work for the scheduler than needed.

fd_monitor
When asked (by notification on the fd_condition), the fd_monitor will send a signal to the event thread once the fd becomes readable. This makes it simple to "wait for fd or other event" without spawining a new fd thread (which also does not work well with async).

There are still caveats. If the ZMQ.Socket is used both though async/lwt and outside, then the user may inadvertently make the system loose important events (reading state/sending/receiving), causing a deadlock.

Alternative approach
ZMQ supports poll which resembles unix select. We could fire up a global seperate (unix) thread that does the poll, and signals when registered socket is ready to send or to receive. When a deferred thread wants to send it would (after consulting the state) register the socket with the 'global' thread.
(By global thread, I mean one thread per zmq context).

I haven't had a chance to give #56 a try in real code yet but it seems very promising!

Does the alternative approach help in the case where a user may (hopefully accidentally...) mix async/lwt and vanilla calls to a socket?

No. Mixing lwt/async socket operations and vanilla (blocking) calls will fail horribly, as we might miss events. The documentation says that the state of the socket might change when calling send/receive and this might not be reflected on the fd.

Btw. I compared the test time on this branch with that of the current (master zmq-lwt implementation), when increasing the number of message by x10 (one of the test now sends 8000 message).
Master:

$ time /tmp/test.exe 
.......
Ran: 7 tests in: 1.86 seconds.
OK

real	0m1.894s
user	0m1.270s
sys	0m0.599s

New functorized implementation (#56):

$ time _build/default/zmq-lwt/test/test.exe
.......
Ran: 7 tests in: 0.29 seconds.
OK

real	0m0.323s
user	0m0.193s
sys	0m0.104s

The difference is really noticeable. When increasing by yet another order of magnitude, the current implementation takes forever (100% cpu), where as this implementation completes in 1.2 seconds.

@andersfugmann That performance result looks amazing.

Regarding the two options, I'm not sure we can use a separate Unix thread for these operations as most zeromq sockets are not safe to share between threads. I don't know if that holds for polling as well as send/recv operations but it seems somewhat risky.

So I vote for the first, currently implemented option. Do you think your PR is ready for testing in an existing codebase? I can do some tests early next week if everything goes smoothly between now and then.

You are right in that ZMQ might not be thread safe, so I also vote for the first (current implemented) solution.

Yes - I do think the code is ready for serious testing.

The only thing missing is the async sexp stuff but I'll see if I can add these just for the async interface to not introduce a dependency on Janestreet libs for the lwt implementation.

Some simple tests in an application using which currently uses lwt-zmq (changing only Lwt_zmq to Zmq_lwt in the codebase) results in effectively the same performance, maybe a slight improvement, when there is little to no concurrency. I also haven't seen any change in behavior or stability in these tests. This is all running under a VM on my laptop.

I'm pretty happy with these results.

I'm happy to hear that. I think the speed increase in the tests to be a result of only waking up one thread at a time to do read write, rather than all threads. This changes complexity from O(n^2) to O(n). Under low concurrency (few waiters and few messages), I would actually expect the difference to be neglectable.

What is your take on a merge? Have you had a chance to take a look at the code in detail.

Btw. I give up on adding sexp of Socket.t for just the zmq-async. If we want it in maybe we should depend on base and add it for both implementations. Adding it for one is a hazzle and just gives extra maintenance.

I'm ok with a merge. Then we can add Msg.t support and any other missing pieces necessary from the core library.

I'd personally prefer to avoid adding a dependency on Base in the core library if we can help it. There are relatively frequent backwards-incompatible updates to Base and related libraries so it's easy for libraries depending on them to fall out of sync with the latest and greatest.

@rgrinberg Do you have some comments regarding the sexp stuff being removed before we merge?

#56 is merged. Closing.