/storm-rabbitmq

Storm bolt and spout for RabbitMQ

Primary LanguageJavaApache License 2.0Apache-2.0

Storm RabbitMQ

Build Status codecov Maven Version

Provides implementations of IRichSpout and IRichBolt for RabbitMQ.

Maven

<dependency>
    <groupId>ru.burov4j.storm</groupId>
    <artifactId>storm-rabbitmq</artifactId>
    <version>1.0.1</version>
</dependency>

Gradle

compile 'ru.burov4j.storm:storm-rabbitmq:1.0.1'

RabbitMQ Connection

You can set RabbitMQ connection properties using RabbitMqConfigBuilder:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()
                .setAddresses("localhost:5672")
                .setUsername("guest")
                .setPassword("guest")
                .setRequestedHeartbeat(60)
                .setVirtualHost("/")
                .build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

The same with Storm's API:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue")
       .addConfiguration(RabbitMqConfig.KEY_ADDRESSES, "localhost:5672")
       .addConfiguration(RabbitMqConfig.KEY_USERNAME, "guest")
       .addConfiguration(RabbitMqConfig.KEY_PASSWORD, "guest")
       .addConfiguration(RabbitMqConfig.KEY_REQUESTED_HEARTBEAT, 60)
       .addConfiguration(RabbitMqConfig.KEY_VIRTUAL_HOST, "/");

It is not required to set all of properties: for example, you can set only RabbitMQ address. In the case another properties will set as defaults:

RabbitMqConfig rabbitMqConfig = new RabbitMqConfigBuilder()
                .setAddresses("localhost:5672")
                .build();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitmq-spout", new RabbitMqSpout(rabbitMqConfig, scheme))
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue");

RabbitMQ Spout

RabbitMqSpout deserializes input messages and then sends it in your Storm's topology. For using the class you should implement RabbitMqMessageScheme interface:

class MyRabbitMqMessageScheme implements RabbitMqMessageScheme {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }

    @Override
    public StreamedTuple convertToStreamedTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    }

    @Override
    public Map<String, Fields> getStreamsOutputFields() {
        // your implementation here
    }

    @Override
    public void cleanup() {
        // your implementation here
    }
}

If you want to use only one output stream you can extends SingleStreamRabbitMqMessageScheme:

class MyRabbitMqMessageScheme extends SingleStreamRabbitMqMessageScheme {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }
                
    @Override
    public List<Object> convertToTuple(Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws ConvertionException {
        // your implementation here
    }
    
    @Override
    public Fields getOutputFields() {
        // your implementation here
    }
    
    @Override
    public void cleanup() {
        // your implementation here
    }
}

The next step is to pass your custom scheme to RabbitMqSpout:

MyRabbitMqMessageScheme scheme = new MyRabbitMqMessageScheme();
RabbitMqSpout rabbitMqSpout = new RabbitMqSpout(scheme);

You can also set some properties for RabbitMqSpout:

builder.setSpout("rabbitmq-spout", rabbitMqSpout)
       .addConfiguration(RabbitMqSpout.KEY_QUEUE_NAME, "myQueue") // required
       .addConfiguration(RabbitMqSpout.KEY_AUTO_ACK, false)
       .addConfiguration(RabbitMqSpout.KEY_PREFETCH_COUNT, 64)
       .addConfiguration(RabbitMqSpout.KEY_REQUEUE_ON_FAIL, false);

Note that the property RabbitMqSpout.KEY_QUEUE_NAME is required.

To do some preparation logic you can implement RabbitMqInitializer interface:

class MyRabbitMqInitializer implements RabbitMqInitializer {
    
    @Override
    public void initialize(Channel channel) throws IOException {
        // your implementation here
    }
}

and then put it in your spout:

RabbitMqInitializer myRabbitMqInitializer = new MyRabbitMqInitializer();
rabbitMqSpout.setInitializer(myRabbitMqInitializer);

RabbitMQ Bolt

If you want to send messages from your Storm's topology to RabbitMQ, you can use RabbitMqBolt. In the case you should implement TupleToRabbitMqMessageConverter interface:

class MyTupleToRabbitMqMessageConverter implements TupleToRabbitMqMessageConverter {

    @Override
    public void prepare(Map config, TopologyContext context) {
        // your implementation here
    }

    @Override
    public String getExchange(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public String getRoutingKey(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public AMQP.BasicProperties getProperties(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public byte[] getMessageBody(Tuple tuple) throws ConvertionException {
        // your implementation here
    }

    @Override
    public void cleanup() {
        // your implementation here
    }
}

The next step is to pass your custom converter to RabbitMqBolt:

MyTupleToRabbitMqMessageConverter converter = new MyTupleToRabbitMqMessageConverter();
RabbitMqBolt rabbitMqBolt = new RabbitMqBolt(converter);

You can also set some properties for RabbitMqBolt:

builder.setBolt("rabbitmq-bolt", rabbitMqBolt)
       .addConfiguration(RabbitMqBolt.KEY_MANDATORY, false)
       .addConfiguration(RabbitMqBolt.KEY_IMMEDIATE, false);

You can read more information about RabbitMQ properties here: https://www.rabbitmq.com/amqp-0-9-1-reference.html