/the-white-rabbit

The White Rabbit is an asynchronous RabbitMQ (AMQP) client based on Kotlin coroutines

Primary LanguageKotlinMIT LicenseMIT

The White Rabbit

Build Status Download Open Source Helpers codecov License: MIT Gitter

The White Rabbit is a fast and asynchronous RabbitMQ (AMQP) client library based on Kotlin coroutines. Currently the following features are supported:

  • Queue and exchange manipulations
  • Message publishing with confirmation
  • Message consuming with acknowledgment
  • Transactional publishing and consuming
  • RPC pattern

Adding to project

Gradle
repositories {
    jcenter()
}

compile 'com.viartemev:the-white-rabbit:$version'
Maven
<repositories>
    <repository>
        <id>jcenter</id>
        <url>https://jcenter.bintray.com/</url>
    </repository>
</repositories>

<dependency>
  <groupId>com.viartemev</groupId>
  <artifactId>the-white-rabbit</artifactId>
  <version>${version}</version>
</dependency>

Usage notes and examples

Use one of the extension methods on com.rabbitmq.client.Connection to get a channel you need:

connection.channel { 
    /*
    The plain channel with consumer acknowledgments, supports:
        -- queue and exchange manipulations
        -- asynchronous consuming
        -- RPC pattern
     */
}

connection.confirmChannel { // 
    /*
    Channel with publisher confirmations, additionally supports:
        -- asynchronous message publishing
     */
}

connection.txChannel { // transactional support
    /*
    Supports transactional publishing and consuming.
     */
}

Queue and exchange manipulations

Asynchronous exchange declaration

connection.channel.declareExchange(ExchangeSpecification(EXCHANGE_NAME))

Asynchronous queue declaration

connection.channel.declareQueue(QueueSpecification(QUEUE_NAME))

Asynchronous queue binding to an exchange

connection.channel.bindQueue(BindQueueSpecification(EXCHANGE_NAME, QUEUE_NAME))

Asynchronous message publishing with confirmation

connection.confirmChannel {
    publish {
        val messages = (1..n).map { createMessage("Hello #$it") }
        publishWithConfirmAsync(coroutineContext, messages).awaitAll()
    }
}

or

connection.confirmChannel {
     publish {
        coroutineScope {
            val messages = (1..n).map { createMessage("Hello #$it") }
            messages.map { async { publishWithConfirm(it) } }
        }
    }
}

Asynchronous message consuming with acknowledgement

Consume only n-messages:

connection.channel {
    consume(QUEUE_NAME, PREFETCH_COUNT) {
        (1..n).map { async { consumeMessageWithConfirm({ println(it) }) } }.awaitAll()
    }
}

Transactional publishing and consuming

RabbitMQ and AMQP itself offer rather scarce support for transaction. When considering using transactions you should be aware that:

  • a transaction could only span one channel and one queue;
  • com.rabbitmq.client.Channel is not thread-safe;
  • channel can be either in confirm mode or in transaction mode at a time;
  • transactions cannot be nested into each other;

The library provides a convenient way to perform transactional publishing and receiving based on transaction extension function. This function commits a transaction upon normal execution of the block and rolls it back if a RuntimeException occurs. Exceptions are always propagated further. Coroutines are not used for publishing though, since there are no any asynchronous operations involved.

connection.txChannel {
    transaction {
        val message = createMessage(queue = oneTimeQueue, body = "Hello from tx")
        publish(message)
    }
}

RPC pattern

connection.channel {
    val message = RabbitMqMessage(MessageProperties.PERSISTENT_BASIC, "Hello world".toByteArray())
    coroutineScope {
        (1..10).map {
            async {
                rpc {
                    call(requestQueueName = "rpc_request", message = message)
                        .also { println("Reply: ${String(it.body)}") }
                }
            }
        }.awaitAll()
    }
}

Links