/clamq

Clojure JMS Adapter

Primary LanguageClojureApache License 2.0Apache-2.0

Clamq - Version 0.1 (in progress)

Clamq is a Clojure adapter for interacting with JMS brokers, providing simple APIs to connect to brokers and sending/consuming messages to/from message queues and topics.

Currently supported JMS brokers:

Connecting to brokers

ActiveMQ

Here is how to define an ActiveMQ connection:

(ns clamq.test (:use [clamq.connection.activemq]))
(def connection (activemq broker-uri))

Where:

  • broker-uri is the ActiveMQ broker URI.

You can also pass an optional sequence of keyed parameters as follows:

(ns clamq.test (:use [clamq.connection.activemq]))
(def connection (activemq :username username :password password :max-connection max-connections))

Producer/Consumer APIs

Producing messages

First, define a producer:

(ns clamq.test (:use [clamq.producer]))
(def producer (producer connection :transacted transacted :pubSub pubSub))

Where:

  • connection is a broker connection obtained as previously described.
  • transacted defines (true/false) if the message sending must be transactional.
  • pubSub defines if produced messages are for publish/subscribe, that is, must be sent to a topic (optional: default to false).

Then, send messages through it:

(send-to producer destination message attributes)

Where:

  • producer is a producer obtained as previously described.
  • destination is the name of the JMS destination.
  • message is the message to be sent, of type text or object.
  • attributes is the map of string attributes to set into the message.

Consuming messages

First, define a consumer:

(ns clamq.test (:use [clamq.consumer]))
(def consumer (consumer connection destination handler-fn :transacted transacted :pubSub pubSub :limit limit :on-failure on-failure))

Where:

  • connection is a broker connection obtained as previously described.
  • destination is the name of the JMS destination to consume from.
  • handler-fn is the handler function to call at each consumed message, accepting the message itself as unique argument.
  • transacted defines (true/false) if the message consumption must be transactional.
  • pubSub defines if consumer messages are from publish/subscribe, that is, must be consumed from a topic (optional: default to false).
  • limit defines the max number of consumed messages, after which the consumer stops itself (optional: default to 0, unlimited).
  • on-failure defines a function called in case of exception during message handling (optional: by default the exception is just rethrown).

Then, start/stop consuming:

(start consumer)
(stop consumer)

Pipes and Filters APIs

Creating Pipes

Pipes define a conduit between source and destination endpoints, which can be different queues/topics belonging to different brokers. Each message flowing between endpoints in a pipe is filtered by a filter function, and eventually processed by a failure function in case of errors.

Clamq provides two different kind of pipes: unicast pipes, connecting two single endpoints, and multicast pipes, connecting a source endpoint with multiple destination endpoints.

Unicast pipes are defined as follows:

(ns clamq.test (:use [clamq.pipes]))
(def pipe (pipe {
  :from {:connection broker :endpoint source :pubSub pubSub}
  :to {:connection broker :endpoint destination} :transacted true :pubSub pubSub :limit limit :filter-by filter-fn :on-failure failure-fn}))

Multicast pipes are pretty similar, except they get an array of destinations:

(ns clamq.test (:use [clamq.pipes]))
(def pipe (multi-pipe {
  :from {:connection broker :endpoint source :pubSub pubSub}
  :to [{:connection broker :endpoint destination1 :pubSub pubSub :limit limit :filter-by filter-fn :on-failure failure-fn}] :transacted true}))

Where:

  • connection is a broker connection obtained as previously described.
  • endpoint is the name of a JMS queue/topic.
  • transacted defines (true/false) if the message consumption must be transactional.
  • pubSub defines if messages are from/to a topic (optional: default to false).
  • limit defines the max number of flowing messages, after which the pipe stops consuming messages (optional: default to 0, unlimited).
  • filter-by defines the name of the filter function (optional, default to identity function).
  • on-failure defines a function called in case of exception during message handling (optional: by default the exception is just rethrown).

Once defined, you can open/close pipes to let messages flow:

(open pipe)
(close pipe)

Examples

  • clamq/test/activemq_test.clj