zmq-async fork
This is a fork of the zmq-async library, forked from https://github.com/lynaghk/zmq-async
Main differences:
- Many deadlocks have been resolved
- deadlocks relating to sloppy usage of
async-control-chan
- deadlocks relating to
close!
(do you know that closing a channel does not unblock pending puts already on it?)
- deadlocks relating to sloppy usage of
- All reflective calls in the main namespace have been eliminated
- explicit dependencies on jzmq, core.async and clojure have been removed (so you need to add it in your own application). The reason: core.async changes too fast, and you may want to build your own jzmq, or even use jeromq.
- Logging has been added, so that debugging is hopefully less painful. To see the logging, set timbre logging level to
:trace
or below
Available on clojars:
The original README is reproduced below
ZeroMQ Async
ZeroMQ is a message-oriented socket system that supports many communication styles (request/reply, publish/subscribe, fan-out, &c.) on top of many transport layers with bindings to many languages. However, ZeroMQ sockets are not thread safe---concurrent usage typically requires explicit locking or dedicated threads and queues. This library handles all of that for you, taking your ZeroMQ sockets and hiding them behind thread safe core.async channels.
Quick start | Caveats | Architecture | Thanks! | Other Clojure ZMQ libs
Quick start
Add to your project.clj
:
[com.keminglabs/zmq-async "0.1.0"]
Your system should have ZeroMQ 3.2 installed:
brew install zeromq
or
apt-get install libzmq3
This library provides one function, register-socket!
, which associates a ZeroMQ socket with core.async channel(s) in
(into which strings or byte arrays are written) and/or out
(whence byte arrays).
Writing a Clojure collection of strings and/or byte arrays sends a multipart message; received multipart messages are put on core.async channels as a vector of byte arrays.
The easiest way to get started is to have zmq-async create sockets and the backing message pumps automagically for you:
(require '[com.keminglabs.zmq-async.core :refer [register-socket!]]
'[clojure.core.async :refer [>! <! go chan sliding-buffer close!]])
(let [n 3, addr "inproc://ping-pong"
[s-in s-out c-in c-out] (repeatedly 4 #(chan (sliding-buffer 64)))]
(register-socket! {:in s-in :out s-out :socket-type :rep
:configurator (fn [socket] (.bind socket addr))})
(register-socket! {:in c-in :out c-out :socket-type :req
:configurator (fn [socket] (.connect socket addr))})
(go (dotimes [_ n]
(println (String. (<! s-out)))
(>! s-in "pong"))
(close! s-in))
(go (dotimes [_ n]
(>! c-in "ping")
(println (String. (<! c-out))))
(close! c-in)))
Note that you must provide a :configurator
function to setup the newly instantiated socket, including binding/connecting it to addresses.
Take a look at the jzmq javadocs for more info on configuring ZeroMQ sockets.
Closing the core.async in
channel associated with a socket closes the socket.
If you already have ZeroMQ sockets in hand, you can give them directly to this library:
(import '(org.zeromq ZMQ ZContext))
(let [my-sock (doto (.createSocket (ZContext.) ZMQ/PAIR)
;;twiddle ZeroMQ socket options here...
(.bind addr))]
(register-socket! {:socket my-sock :in in :out out}))
(Of course, after you've created a ZeroMQ socket and handed it off to the library, you shouldn't read/write against it since the sockets aren't thread safe and doing so may crash your JVM.)
The implicit context supporting register-socket!
can only handle one incoming/outgoing message at a time.
If you need sockets to work in parallel (i.e., you don't want to miss a small control message just because you're slurping in a 10GB message on another socket), then you'll need multiple zmq-async contexts.
Contexts accept an optional name to aid in debugging/profiling:
(require '[zmq-async.core :refer [context initialize! register-socket!]])
(def my-context
(doto (context "My Context")
(initialize!)))
(register-socket! {:context my-context :in in :out out :socket my-sock})
Each context has an associated shutdown function, which will close all ZeroMQ sockets and core.async channels associated with the context and stop both threads:
((:shutdown my-context))
Caveats
- The
out
ports provided to the library should never block writes, otherwise the async message pump thread will block and no messages will be able to go through that context in either direction. This may be enforced in the future with an exception (once core.async provides a mechanism for checking if a port blocks writes). - The ZeroMQ thread will drop messages on the floor rather than blocking trying to hand it off to a socket.
Architecture
All sockets are associated with a context of two "message pump" threads:
- One thread manages ZeroMQ sockets and conveys incoming values to the application via a core.async channel (the "ZeroMQ thread")
- One thread manages core.async channels and writes to a ZeroMQ control socket (the "core.async thread")
Each thread blocks with the appropriate selection construct (zmq_poll
and alts!!
, respectively) rather than an explicit polling loop.
Thus, each thread must initially communicate with the other via the other's transport.
The core.async thread notifies the ZeroMQ thread that it needs to do something by writing to an in-process control socket ("the ZeroMQ control socket").
However, since Java objects cannot be serialized over ZeroMQ, the core.async thread communicates out-of-band to the ZeroMQ thread via a java.util.concurrent queue (basically just yelling on the ZeroMQ control socket "Unblock yo, I just put something on the queue for you to handle").
The ZeroMQ thread will then take from the queue and:
- write a value out to a ZeroMQ socket,
[sock-id val]
, whereval
can be a string or byte array. - register a new socket,
[:register sock-id sock]
, wheresock-id
is a string andsock
is a ZeroMQ socket object that is ready to be read from or written to (i.e., it has already been bound or connected). - close a socket,
[:close sock-id]
.
The ZeroMQ thread writes [sock-id val]
to the core.async thread's control channel when it receives value val
from the socket with identifier sock-id
.
Sockets are closed when their corresponding core.async in channel is closed.
Now, you may be wondering: why not just create two new threads for each ZeroMQ socket (one blocking on incoming messages from the socket and another blocking on outgoing messages from the core.async channel). Conceptually, this is much simpler and would be reflected in the codebase. However, the theory upon which this library rests is that ZeroMQ communications are IO-bound, not CPU-bound. A single pair of threads should be capable of handling all traffic, and thus are preferred over a thread pair for each ZeroMQ socket if only because library consumers may want to handle hundreds of ZeroMQ socket connections and spinning up a pair of threads for each would be gratuitous. Your use case may vary, in which case you should benchmark.
Thanks
Thanks to @brandonbloom for the initial architecture idea, @zachallaun for pair programming/debugging, @ztellman for advice on error handling and all of the non-happy code paths, @puredanger for suggestions about naming and daemonizing the Java threads, @richhickey for the suggestions to explicitly handle all blocking combinations in a matrix, require explicit buffering semantics from consumers, and to accept byte buffers instead of just arrays, and @halgari for requesting multiple message pump pairs to avoid large-data reads from blocking potentially high-priority smaller messages.
Thanks also to the fine people at YourKit, who are supporting this project by providing a license for their awesome Java profiler. (The automatic deadlocked thread detection and detailed tracing info in particular has been very helpful for this project.) YourKit, LLC is the creator of innovative and intelligent tools for profiling Java and .NET applications.
Other Clojure ZeroMQ libraries
I looked at several ZeroMQ/Clojure bindings before writing this one: Zilch, clj-0MQ, and ezmq haven't been updated in the past two years and don't offer much more than a thin layer of Clojure over native Java interop calls.
After I started work on this library, an official ZeroMQ Clojure binding was released, but it also seems like just a thin layer of Clojure over jzmq (the underlying ZeroMQ Java binding that zmq-async also uses) and doesn't seem to offer any help for using ZeroMQ sockets concurrently.
Finally, this library ships with native Linux 64 and OS X 64 compiled bindings to ZeroMQ 3.2. As long as you're on x64 Linux or OS X, you don't have to manually compile and install jzmq. See the project.clj for the SHA of the jzmq commit compiled into this library.
TODO (?)
- Handle ByteBuffers in addition to just strings and byte arrays.
- Enforce that provided ports never block and/or are read/write only as appropriate.
- Add shutdown hooks to context objects?