kithara is a (limited) RabbitMQ client for Clojure, based on Lyra. Its specific scope - and thus source of its limitations - is the simple creation of RabbitMQ-based consumers with appropriate recovery and retry facilities.
If you’re looking for a more complete RabbitMQ library you should check out langohr.
With kithara you have a handler:
(defn message-handler
[{:keys [body]}]
(println "I received a message:" body)
{:status :ack})
Since the handler takes a message and returns a description of what to do with it, you can wrap it easily:
(defn message-handler-with-nack
[{:keys [body] :as message}]
(if (= body "error")
{:status :nack, :requeue? false}
(message-handler message)))
You turn the handler into a component, adding coercion and a name:
(defonce consumer
(-> message-handler-with-nack
(k/consumer {:as :string, :consumer-name "kithara-test"})
...))
Followed by a channel, a queue, some bindings and a connection:
(defonce consumer
(-> message-handler-with-nack
(k/consumer {:as :string, :consumer-name "kithara-test"})
(k/with-channel {:prefetch-count 256})
(k/with-queue "kithara-queue" {:exchange "kithara", :routing-keys ["#"]})
(k/with-connection {:host "localhost", :vhost "/kithara"})) )
And you watch it goooooo:
(alter-var-root #'consumer com.stuartsierra.component/start)
;; => ...
17:37:25.372 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Creating connection cxn-2 to [docker:5672]
17:37:25.390 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Created connection cxn-2 to amqp://192.168.59.103:5672/
17:37:25.395 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Created channel-1 on cxn-2
17:37:25.410 [nREPL-worker-0] INFO net.jodah.lyra.internal.ConnectionHandler - Created channel-2 on cxn-2
17:37:25.421 [nREPL-worker-0] DEBUG kithara.components.base-consumer - [kithara-test] starting consumer on queue kithara-queue (desired tag: 'kithara-test:96CQWOfPfKf8tjQ328') ...
17:37:25.425 [nREPL-worker-0] INFO net.jodah.lyra.internal.ChannelHandler - Created consumer-kithara-test:96CQWOfPfKf8tjQ328 of kithara-queue via channel-2 on cxn-2
I received a message: Hello, World!
17:38:29.819 [rabbitmq-cxn-2-consumer] DEBUG kithara.middlewares.logging - [kithara-test] [kithara-test:96CQWOfPfKf8tjQ328] [ack] exchange="kithara", routing-key="test-message", size=13
(And then you probably disable some of Lyra’s logging.)
kithara is basically a toolbox with components handling RabbitMQ connection, channel and queue setup. Here you’ll see how to combine them.
A handler is a function. It takes a single map (the message) and produces
another map (the confirmation) specifying how to react to the message, i.e.
whether to ACK
, NACK
or REJECT
it.
This is an example message:
{:channel #object[com.sun.proxy.$Proxy1 0x17cd517c "net.jodah.lyra.config.Config@190abc2d"],
:body #object["[B" 0x13d05618 "[B@13d05618"],
:body-raw #object["[B" 0x13d05618 "[B@13d05618"],
:properties {...},
:routing-key "the-routing-key",
:exchange "the-exchange",
:redelivered? false,
:delivery-tag 33,
:consumer-tag "kithara:96CDgYbID9a1IlDwDS"}
Most of the semantics should be clear, so just a few notes:
-
:body-raw
always contains a byte array with the raw payload. -
:body
contains a potentially preprocessed payload (see Message Coercion), otherwise it’s identical to:body-raw
. -
:properties
represents the message’sBasicProperties
.
The confirmation is a map describing how to react to the message that was
processed, mainly using the :status
key which can take any of the following
values:
-
:ack
(ACK
the message) -
:nack
(NACK
the message) -
:reject
(REJECT
the message) -
:error
(an exception occured) -
:done
(the message was explicitly confirmed within the handler)
:nack
, :reject
and :error
also rely on the :requeue?
key to decide
whether or not to re-add a message to the back of the queue (defaults to true
for NACK
). Additionally, :message
and :error
can be given to augment the
log messages printed by the consumer.
An example confirmation could thus be:
{:status :nack
:requeue? false
:message "this is unacceptable."}
The reliance on pure data for message handling should make your handlers a little more testable in the long run.
Once your handler is ready, you can create the base consumer component. It’s not yet bound to any connection, queue or channel, it just encapsulates the handling logic.
(k/consumer handler)
(k/consumer handler {... options ...})
Options can be used to tweak its behaviour a bit.
The :as
option specifies a coercer for the incoming payload. It can be one of
the following:
-
:bytes
(default): just use the raw byte array, -
:string
: convert the byte array to a UTF-8 string, -
a function: apply the function to the byte array,
-
any value implementing
kithara.protocols/Coercer
.
So, a consumer that prints every incoming message’s :body
as a string would
be constructed as:
(k/consumer
(fn [{:keys [body]}]
(println body)
{:status :ack})
{:as :string})
If a handler does not return a map (or a map without the :status
key) the
message will be confirmed using ACK
. In the same vein, if the handler throws
an exception the message will be confirmed with NACK
and requeued.
This can be adjusted using the :default-confirmation
and :error-confirmation
keys, e.g.:
(k/consumer
...
{:default-confirmation {:status :nack}
:error-confirmation {:status :reject}})
It’s often useful to be able to identify a consumer, e.g. in the RabbitMQ management plugin displaying only the consumer tag.
You can thus either set the consumer tag explicitly (using the :consumer-tag
option) or you can give your consumer a name (:consumer-name
) that will be
included in a custom, unique consumer tag chosen by kithara.
Consumers have to be bound to a channel before they can be started, which is easily achieved:
(k/with-channel
consumer
{:prefetch-count 256})
See the documentation for available options. You should set at least
:prefetch-count
, though, to prevent your consumer from loading more messages
into memory than it can stomach. A shorthand for this can be found in
with-prefetch-channel
.
Consumers need a queue to receive messages from and that queue is bound to exchange/routing-key pairs. It can be easily set up:
(k/with-queue
consumer
"queue-name"
{:durable? true
:exclusive? false
:auto-delete? false
:exchange "exchange"
:routing-keys ["#"]})
If no options are given, the queue will not be actively declared but expected
to already exist. Note that there are shorthands for commonly used queue types
like
with-server-named-queue
and
with-durable-queue
.
Without a connection to your RabbitMQ cluster there isn’t really a lot your consumer can accomplish. Set it thus up via:
(k/with-connection
consumer
{:host "rabbitmq.host.com"
:vhost "/kithara"
:username "..."
:password "..."})
See
kithara.config/connection
for endpoint configuration and
kithara.config/behaviour
for recovery/retry semantics. By default, the connection will employ backing-off
recovery (up to 60s) and immediate and unlimited retry.
Kithara aims to provide easily usable implementations for common messaging patterns and scenarios.
Usually, messages trigger some kind of change to the world, i.e. writing to a DB, starting of some pre-defined task, etc … And the "world" these changes rely on needs to be available to the message handler. There are several ways this can be achieved.
The message handler function can close over its dependencies, no matter whether they are available as global variables (like in mount) or parameters to a constructor function:
(defn build-message-handler
[database]
(fn [message]
(write! database message)
{:status :ack}))
(defonce closure-rabbitmq-consumer
(-> (build-message-handler my-database)
(k/consumer ...)
...))
The environment can be passed to the message handler directly
within the message map, facilitated by kithara’s with-env
function:
(defn message-handler
[{:keys [env] :as message}]
(write! (:database env) message)
{:status :ack})
(defonce injection-rabbitmq-consumer
(-> message-handler
(k/consumer ...)
...
(k/with-env {:database my-database})))
The with-env
wrapper plays nicely with the
component library since it allows
for arbitrary keys to be added to the environment using plain assoc
, making
it possible to employ consumers in component systems:
(defonce system
(component/system-map
:consumer (-> message-handler
(k/consumer ...)
...
(k/with-env)
(component/using [:database]))
:database (map->DB {...})))
Note that with-env
has to be on the top-level for this to work.
A combination of the two previous approaches uses an outer component to gather
the environment, then builds the consumer internally on startup.
peripheral's defcomponent
will let
you formulate this concisely (but note that it can be done with plain
component just as well):
(defcomponent Consumer [database]
:message-handler (build-message-handler database)
:component/consumer (-> message-handler
(k/consumer ...)
...))
(defonce system
(component/system-map
:consumer (component/using (map->Consumer {}) [:database])
:database (map->DB {...})))
Or using with-env
:
(defcomponent Consumer [database]
:component/consumer
(-> message-handler
(k/consumer ...)
...
(k/with-env {:database database})))
This approach arguably offers the most flexibility since building the stack is explicit and thus completely under your control.
Every with-*
function takes either a single component or a seq of them,
allowing your topology to "branch out" however you desire.
For example, you can parallelise processing by adding multiple identical consumers to a channel:
(-> (repeat 5 (k/consumer ...))
(k/with-channel ...)
...)
Although, following the one-channel-per-thread model, it should probably look like this:
(-> (k/consumer ...))
(k/with-channel ...)
(->> (repeat 5))
...)
It would be just as easy to have two completely independent consumers on the same connection:
(def consumer-1
(-> (k/consumer ...)
(k/with-channel ...)
(k/with-queue "consumer-1")))
(def consumer-2
(-> (k/consumer ...)
(k/with-channel ...)
(k/with-queue "consumer-2")))
(def consumer
(k/with-connection
[consumer-1 consumer-2]
...))
You have full control over who shares what, on each layer of the consumer stack.
In some cases you might want to declare just a single consumer but dispatch
message processing to a number of worker threads. This functionality is offered
by
with-threads
which will setup and teardown a fixed-size thread pool for one or more
consumers:
(require '[kithara.patterns.threads :refer [with-threads])
(defonce rabbitmq-consumer-with-backoff
(-> (k/consumer ...)
(with-threads 4)
(k/with-queue ...)
...))
Sometimes, throughput can be gained by batchwise processing of messages (i.e.
by bundling writes to a datastore), which can be achieved using kithara’s
batching-consumer
,
a drop-in replacement for the default consumer:
(defn batch-handler
[messages]
...
{:status :ack})
(defonce rabbitmq-batching-consumer
(-> batch-handler
(k/batching-consumer
{:as :string
:consumer-name "badger"
:batch-size 10})
...))
The batch handler can return either a single confirmation or a seq of them for
more nuanced processing. See the docstring of batching-consumer
for more
details.
The two wrappers
with-dead-letter-backoff
and
with-durable-dead-letter-backoff
provide delayed requeuing of messages by dispatching them to a secondary queue,
the "dead letter queue", from which they’ll eventually be republished. Both
wrappers have to be applied after with-queue
.
The simplest version infers names of additional exchanges/queues using the original consumer queue:
(require '[kithara.patterns.dead-letter-backoff :as dlx])
(defonce rabbitmq-consumer-with-backoff
(-> (k/consumer ...)
(dlx/with-dead-letter-backoff)
(k/with-queue ...)
...))
Kithara wraps the official Java RabbitMQ client - but only as far as necessary
to build consumers (and patterns). You can access those functions using
the kithara.rabbitmq.*
namespaces.
(Alghtough, if you crave this level of control you should probably use something like langohr.)
Contributions are always welcome!
-
Create a new branch where you apply your changes (ideally also adding tests).
-
Make sure existing tests are passing.
-
Open a Pull Request on Github.
The MIT License (MIT) Copyright (c) 2016 Yannick Scherer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.