
Storm bolt and spout for RabbitMQ

Primary LanguageJavaApache License 2.0Apache-2.0

Storm RabbitMQ

Build Status codecov Maven Version

Provides implementations of IRichSpout and IRichBolt for RabbitMQ.




compile 'ru.burov4j.storm:storm-rabbitmq:1.0.1'

RabbitMQ Connection

You can set RabbitMQ connection properties using RabbitMqConfigBuilder:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

The same with Storm's API:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue")
       .addConfiguration(RabbitMqConfig.KEY_ADDRESSES, "localhost:5672")
       .addConfiguration(RabbitMqConfig.KEY_USERNAME, "guest")
       .addConfiguration(RabbitMqConfig.KEY_PASSWORD, "guest")
       .addConfiguration(RabbitMqConfig.KEY_REQUESTED_HEARTBEAT, 60)
       .addConfiguration(RabbitMqConfig.KEY_VIRTUAL_HOST, "/");

It is not required to set all of properties: for example, you can set only RabbitMQ address. In the case another properties will set as defaults:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

RabbitMQ Spout

RabbitMqSpout deserializes input messages and then sends it in your Storm's topology. For using the class you should implement RabbitMqMessageScheme interface:

class MyRabbitMqMessageScheme implements RabbitMqMessageScheme {

    public void prepare(Map config, TopologyContext context) {
        // your implementation here

    public StreamedTuple convertToStreamedTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here

    public Map<String, Fields> getStreamsOutputFields() {
        // your implementation here

    public void cleanup() {
        // your implementation here

If you want to use only one output stream you can extends SingleStreamRabbitMqMessageScheme:

class MyRabbitMqMessageScheme extends SingleStreamRabbitMqMessageScheme {

    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    public List<Object> convertToTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    public Fields getOutputFields() {
        // your implementation here
    public void cleanup() {
        // your implementation here

The next step is to pass your custom scheme to RabbitMqSpout:

MyRabbitMqMessageScheme scheme = new MyRabbitMqMessageScheme();
RabbitMqSpout rabbitMqSpout = new RabbitMqSpout(scheme);

You can also set some properties for RabbitMqSpout:

builder.setSpout("rabbitmq-spout", rabbitMqSpout)
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue") // required
       .addConfiguration(RabbitMqSpout.KEY_AUTO_ACK, false)
       .addConfiguration(RabbitMqSpout.KEY_PREFETCH_COUNT, 64)
       .addConfiguration(RabbitMqSpout.KEY_REQUEUE_ON_FAIL, false);

Note that the property RabbitMqSpout.KEY_QUEUE_NAME is required.

To do some preparation logic you can implement RabbitMqInitializer interface:

class MyRabbitMqInitializer implements RabbitMqInitializer {
    public void initialize(Channel channel) throws IOException {
        // your implementation here

and then put it in your spout:

RabbitMqInitializer myRabbitMqInitializer = new MyRabbitMqInitializer();

RabbitMQ Bolt

If you want to send messages from your Storm's topology to RabbitMQ, you can use RabbitMqBolt. In the case you should implement TupleToRabbitMqMessageConverter interface:

class MyTupleToRabbitMqMessageConverter implements TupleToRabbitMqMessageConverter {

    public void prepare(Map config, TopologyContext context) {
        // your implementation here

    public String getExchange(Tuple tuple) throws ConvertionException {
        // your implementation here

    public String getRoutingKey(Tuple tuple) throws ConvertionException {
        // your implementation here

    public AMQP.BasicProperties getProperties(Tuple tuple) throws ConvertionException {
        // your implementation here

    public byte[] getMessageBody(Tuple tuple) throws ConvertionException {
        // your implementation here

    public void cleanup() {
        // your implementation here

The next step is to pass your custom converter to RabbitMqBolt:

MyTupleToRabbitMqMessageConverter converter = new MyTupleToRabbitMqMessageConverter();
RabbitMqBolt rabbitMqBolt = new RabbitMqBolt(converter);

You can also set some properties for RabbitMqBolt:

builder.setBolt("rabbitmq-bolt", rabbitMqBolt)
       .addConfiguration(RabbitMqBolt.KEY_MANDATORY, false)
       .addConfiguration(RabbitMqBolt.KEY_IMMEDIATE, false);

You can read more information about RabbitMQ properties here: https://www.rabbitmq.com/amqp-0-9-1-reference.html