/rabbitmq-worker

A small wrapper around the Langohr RabbitMQ client

Primary LanguageClojure

rabbitmq-worker

A Clojure library that wraps the excellent Langohr library in order to hide the connecting to RabbitMQ, , consuming, acking/nacking of messages, publishing etc. For us this has been suprisingly error / confusion prone in projects that roll their own setup.

This library tries to be a "Sugar-coated API for task queues that hides all the AMQP machinery from the developer", unlike the quite low level langohr. This library provides sensible defaults for most things, in an attempt to give a maximally simple interface to consume from queues and publish messages, so that you as a developer don't have to think as much.

Usage

Getting the library

For leiningen: [uswitch/rabbitmq-worker "0.3.1"]

Opening a connection

(require '[rabbitmq-worker.worker :as worker])

(worker/open-connection {:uri "amqp://guest:guest@localhost:5672"})

(worker/open-connection {:host "127.0.0.1", :port 5672, :vhost "/", :password "guest", :username "guest"})

Consuming messages

From default exchange:

(worker/consume connection 
                {:queue "banana"} 
                (fn [message] (println message)))

The queue will be automatically declared with the defaults :queue-auto-delete false, :queue-durable true and :queue-exclusive false.

From another exchange:

(worker/consume connection 
                {:exchange "banana-exchange" :queue "fruits"} 
                (fn [message] (println message)))

Messages that throw an error will be automatically pushed onto a queue named {original-queue-name}-failed. The failed queue is declared with :failed-queue-auto-delete false, :failed-queue-durable true and :failed-queue-exclusive false.

The following map shows all the options supported if you need to configure more advanced consumers, but don't override unless you know what you're doing.

{:queue                    String
 :queue-auto-delete        Boolean
 :queue-exclusive          Boolean
 :queue-durable            Boolean
 :queue-arguments          {:x-dead-letter-exchange String
                            :x-dead-letter-routing-key String}
 :metadata                 Boolean (provide metadata to message hander function as 2nd arg.)
 :raw-payload              Boolean (provide message as raw payload (Byte array) instead of string)
 :failed-queue             String
 :failed-queue-auto-delete Boolean
 :failed-queue-durable     Boolean
 :failed-queue-exclusive   Boolean}

Handling errors

Messages are automatically acknowledged if the handler successfully processes the payload and automatically rejected if it fails. The consumer config accepts an :on-error key, which is a function that will be invoked when an error happens. The function should take the error and the received payload.

(worker/consume connection
                {:queue    "pineapple-queue"
                 :on-error (fn [error payload] (log/error error))}
                (fn [message] (println message)))

Development

Running tests

For local testing with lein test, you need a RabbitMQ instance running locally.

License

Copyright © 2017-2018 uSwitch.

Distributed under the Eclipse Public License, the same as Clojure.