An Apache Camel Component that will natively communicate with a RabbitMQ broker. This is implemented using Spring's AMQP project, so it should ultimately become vendor-agnostic.
The simplest Fanout exchange can be defined as:
spring-amqp:<Exchange Name>
spring-amqp:<Exchange Name>:<Queue Name>
Here a simple message producer will send a message to Exchange Name, and a simple consumer will bind the Exchange Name to the Queue Name.
If you wish to use a routing key, URIs have the structure:
spring-amqp:<Exchange Name>:<Queue Name>:<Routing Key>?type=<Exchange Type>
The routing key is optional, but Queue Name and Exchange Name are required for consumers. Just the Exchange Name is required for producers.
Producers can also defer the routing key to the message header, where the ROUTING_KEY header could be set to the appropriate routing key.
Producers can override the exchange name specified in the URI by setting the EXCHANGE_NAME Camel message header.
Options to the URI include the exchange type, which defaults to direct if none is specified.
For header based exchanges, the URI is similar but name/value pairs can be specified in place of the routing key. For example:
spring-amqp:myExchange:qName:cheese=gouda?type=headers
This example will fetch all messages where a header named "cheese" has the value of "gouda." You can also add additional name/value pairs:
spring-amqp:myExchange:qName:cheese=gouda&fromage=jack?type=headers
Which will create a binding for headers where "cheese" has the value of "gouda" AND "fromage" has the value of "jack." You can also choose to create an OR relationship:
spring-amqp:myExchange:qName:cheese=gouda|fromage=jack?type=headers
Additional properties can be added to the endpoint as URI parameters. For example, to create a topic exchange that is non-exclusive and not durable:
spring-amqp:myExchange_10:writeQueue:write.*?type=topic&durable=false&autodelete=true&exclusive=false
Parameters available include:
prefetchCount | How many messages a consumer should pre-fetch |
concurrentConsumers | The number of concurrent threads that will consume messages from a queue |
transactional | Mark messages coming to/from this endpoint as transactional |
autodelete | Allow this endpoint to be automagically deleted from the broker once it is gone |
durable | Make messages being produced from this endpoint persistent |
type | One of the AMQP exchange types: direct, fanout, headers, or topic. Defaults to direct. |
exclusive | Mark this endpoint as an exclusive point for message exchanges |
acknowledgeMode | Sets the acknowledge mode (NONE, AUTO) |
connection | Configure a specific connection factory (for systems with multiple AMQP brokers) |
autoReply | Consumer sends back a response message when ReplyTo header is present in the consumed message. Defaults to true. |
The camel-spring-amqp component will attempt to fetch as much information from the application context it sits within. For example, if we are using Spring we could issue the following:
<bean id="messageConverter" class="amqp.spring.converter.XStreamConverter"/>
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="messageConverter"/>
<rabbit:admin connection-factory="connectionFactory"/>
The message converter amqp.spring.converter.XStreamConverter is provided by the camel-spring-amqp component; it provides JSON marshalling using the XStream libraries. If you would rather use the Jackson JSON marshalling (or another conversion method) provided by the Spring AMQP framework, you can swap out the appropriate message converter class in the above example.
You may wish to use JSON marshalling for the majority of your inter-process communication, but may have the need to do XML marshalling for REST API calls or the like. If you wish you can specify multiple types of message conversion based on the content type of the message. For example, we may have messages marshalled to JSON by default but want content types of application/text to just be printed out as strings. Within the Spring XML DSL you may define:
<bean id="jsonMessageConverter" class="amqp.spring.converter.XStreamConverter"/>
<bean id="textMessageConverter" class="amqp.spring.converter.StringConverter"/>
<bean id="messageConverter" class="amqp.spring.converter.ContentTypeConverterFactory">
<property name="converters">
<map>
<entry key="application/json" value-ref="jsonMessageConverter"/>
<entry key="application/xml" value-ref="textMessageConverter"/>
</map>
</property>
<property name="fallbackConverter" ref="jsonMessageConverter"/>
</bean>
This would allow messages with a "Content-Type" of application/json to be marshalled with the XStream converter, while messages with a content type of application/xml will be marshalled into a simple character string. If no content type is specified, the XStream JSON message converter will be used.
Release builds of the Camel Spring AMQP Component are hosted within the Sonatype repository. You can include this component within your Maven POM as:
<dependency>
<groupId>com.bluelock</groupId>
<artifactId>camel-spring-amqp</artifactId>
<version>1.6.2</version>
</dependency>
- Transactions are currently not supported
- Lifecycle events (e.g. stop, shutdown) need to be refined
- Unit tests require a running AMQP broker. I may end up creating a VM local Qpid instance as an AMQP broker...
- Validate with other AMQP brokers (such as Qpid)
This package, the Camel Spring AMQP component is licensed under the Mozilla Public License v2.0. See LICENSE for details.