/rabbitmq-scala-client

Scala wrapper over standard RabbitMQ Java client library

Primary LanguageScala

RabbitMQ client

Build Status Download

This client is lightweight wrapper over standard RabbitMQ java client. It's API may be difficult to use for inexperienced RabbitMQ users. Goal of this library is to simplify basic use cases and shadow the programmer from the underlying client.

The library has both Scala and Java API where the Scala API is generic and gives you an option to adapt to your application's type - see Scala usage below.

The library uses concept of connection and derived producers and consumers. Note that the connection shadows you from the underlying concept of AMQP connection and derived channels - it handles channels automatically according to best practises. Each producer and consumer can be closed separately while closing connection causes closing all derived channels and all producers and consumers.

Dependency

compile 'com.avast.clients.rabbitmq:rabbitmq-client-core_$scalaVersion:x.x.x'

Modules

There are api and core modules available for the most common usage but there are also few extras modules which contains some optional functionality:

  1. extras
  2. extras-circe (adds some circe-dependent functionality)
  3. extras-cactus (adds some cactus-dependent functionality)

Migration

There is a migration guide between versions 5 and 6.

Usage

Configuration

Structured config

Since v 5.x, it's necessary to have the config structured as following:

rabbitConfig {
  // connection config
  
  consumer1 {
    //consumer config
  }
  
  consumer2 {
    //consumer config
  }
  
  producer1 {
    //producer config
  }
  
  producer2 {
    //producer config
  }
}

Config example

myConfig {
  hosts = ["localhost:5672"]
  virtualHost = "/"
  
  name = "Cluster01Connection" // used for logging AND is also visible in client properties in RabbitMQ management console

  ssl {
    enabled = false // enabled by default
  }

  credentials {
    //enabled = true // enabled by default

    username = "guest"
    password = "guest"
  }

  connectionTimeout = 5s // default value

  networkRecovery {
    enabled = true // default value
    period = 5s // default value
  }


  // CONSUMERS AND PRODUCERS:

  // this is the name you use while creating; it's recommended to use something more expressive, like "licensesConsumer"
  consumer {
    name = "Testing" // this is used for metrics, logging etc.

    consumerTag = Default // string or "Default"; default is randomly generated string (like "amq.ctag-ov2Sp8MYKE6ysJ9SchKeqQ"); visible in RabbitMQ management console

    queueName = "test"

    prefetchCount = 100 // don't change unless you have a reason to do so ;-)

    // should the consumer declare queue he wants to read from?
    declare {
      enabled = true // disabled by default

      durable = true // default value
      autoDelete = false // default value
      exclusive = false // default value
    }

    // bindings from exchanges to the queue
    bindings = [
      {
        // all routing keys the queue should bind with
        // leave empty or use "" for binding to fanout exchange
        routingKeys = ["test"]

        // should the consumer declare exchange he wants to bind to?
        exchange {
          name = "myclient"

          declare {
            enabled = true // disabled by default

            type = "direct" // fanout, topic
          }
        }
      }
    ]
  }

  // this is the name you use while creating; it's recommended to use something more expressive, like "licensesProducer"
  producer {
    name = "Testing" // this is used for metrics, logging etc.

    exchange = "myclient"

    // should the producer declare exchange he wants to send to?
    declare {
      enabled = true // disabled by default

      type = "direct" // fanout, topic
      durable = true // default value
      autoDelete = false // default value
    }
  }
}

For full list of options please see reference.conf.

Scala usage

The Scala API is now finally tagless (read more e.g. here) - you can change the type it works with by specifying it when creating the connection. In general you have to provide cats.arrow.FunctionK[Task, A] and cats.arrow.FunctionK[A, Task] however there are some types supported out-of-the-box by just importing import com.avast.clients.rabbitmq._ (scala.util.Try, scala.concurrent.Future and monix.eval.Task currently).

The Scala API uses types-conversions for both consumer and producer, that means you don't have to work directly with Bytes (however you still can, if you want) and you touch only your business class which is then (de)serialized using provided converter.

The library uses two types of executors - one is for blocking (IO) operations and the second for callbacks. You have to provide both of them:

  1. Blocking executor as ExecutorService
  2. Callback executor as monix.execution.Scheduler - you can get it e.g. by calling Scheduler(myFavoriteExecutionContext)
import com.typesafe.config.ConfigFactory
import com.avast.metrics.api.Monitor
import com.avast.clients.rabbitmq._ // for generic types support
import com.avast.bytes.Bytes
import monix.execution._
import monix.eval._

val config = ConfigFactory.load().getConfig("myRabbitConfig")

implicit val sch: Scheduler = ???
val blockingExecutor: ExecutorService = Executors.newCachedThreadPool()

val monitor: Monitor = ???

// here you create the connection; it's shared for all producers/consumers amongst one RabbitMQ server - they will share a single TCP connection
// but have separated channels
// if you expect very high load, you can use separate connections for each producer/consumer, but it's usually not needed
val rabbitConnection = RabbitMQConnection.fromConfig[Task](config, blockingExecutor) // DefaultRabbitMQConnection[Task]

val consumer = rabbitConnection.newConsumer[Bytes]("consumer", monitor) { 
  case delivery: Delivery.Ok[Bytes] =>
    println(delivery)
    Task.now(DeliveryResult.Ack)
    
  case _: Delivery.MalformedContent =>
    Task.now(DeliveryResult.Reject)
} // DefaultRabbitMQConsumer

val sender = rabbitConnection.newProducer("producer", monitor) // DefaultRabbitMQProducer[Task]

sender.send(...).runAsync // because it's Task, don't forget to run it ;-)

Providing converters for producer/consumer

Both the producer and consumer require type argument when creating from connection:

  1. connection.newConsumer[MyClass] which requires implicit DeliveryConverter[MyClass]
  2. connection.newProducer[MyClass] which requires implicit ProductConverter[MyClass]

There are multiple options where to get the converter (it's the same case for DeliveryConverter as for ProductConverter):

  1. Implement your own implicit converter for the type
  2. Modules extras-circe and extras-cactus provide support for JSON and GPB conversion.
  3. Use identity converter by specifying Bytes type argument. No further action needed in that case.

Caveats

  1. null instead of converter instance
    It may happen you run in this problem:

    scala> import io.circe.generic.auto._
    import io.circe.generic.auto._
    
    scala> import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
    import com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter
    
    scala> import com.avast.clients.rabbitmq.DeliveryConverter
    import com.avast.clients.rabbitmq.DeliveryConverter
    
    scala> case class Event(name: String)
    defined class Event
    
    scala> implicit val deliveryConverter: JsonDeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = null
    
    scala> implicit val deliveryConverter: DeliveryConverter[Event] = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.DeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@5b977aaa
    
    scala> implicit val deliveryConverter = JsonDeliveryConverter.derive[Event]()
    deliveryConverter: com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter[Event] = com.avast.clients.rabbitmq.extras.format.JsonDeliveryConverter$$anon$1@4b024fb2

    Notice the results of last three calls differ even though they are supposed to be the same (non-null respectively)! A very similar issue is discussed on the StackOverflow and so is similar the solution:

    1. Remove explicit type completely (not recommended)
    2. Make the explicit type more general (DeliveryConverter instead of JsonDeliveryConverter in this case)
  2. Cryptic errors
    It may happen you have everything configured "correctly" and the compiler still reports an error, for example

    def rabbitConnection(): RabbitMQConnection[Future] = RabbitMQConnection.fromConfig[Future](rabbitProperties, blocking)

    can give you something like:

    Error:(22, 99) could not find implicit value for evidence parameter of type com.avast.clients.rabbitmq.FromTask[scala.concurrent.Future]
    Error occurred in an application involving default arguments.
      def rabbitConnection(): RabbitMQConnection[Future] = RabbitMQConnection.fromConfig[Future](rabbitProperties, blocking)
    Error:(22, 99) not enough arguments for method fromConfig: (implicit evidence$3: com.avast.clients.rabbitmq.FromTask[scala.concurrent.Future], implicit evidence$4: com.avast.clients.rabbitmq.ToTask[scala.concurrent.Future])com.avast.clients.rabbitmq.DefaultRabbitMQConnection[scala.concurrent.Future].
    Unspecified value parameters evidence$3, evidence$4.
    Error occurred in an application involving default arguments.
      def rabbitConnection(): RabbitMQConnection[Future] = RabbitMQConnection.fromConfig[Future](rabbitProperties, blocking)
    

    This is caused by absence of ExecutionContext which makes def fkToFuture(implicit ec: ExecutionContext): FromTask[Future] impossible to use (unfortunately compiler won't say that).
    Please bear in mind there is nothing this library could do to help you in this case - there is no way to provide any hint. However there are some compiler plugins available which may help you to prevent such situations, e.g. Splain.

Java usage

The Java api is placed in subpackage javaapi (but not all classes have their Java counterparts, some have to be imported from Scala API, depending on your usage).
Don't get confused by the Java API partially implemented in Scala.

import com.avast.clients.rabbitmq.javaapi.*;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;

public class ExampleJava {
    public static void main(String[] args) {
        final Config config = ConfigFactory.load().getConfig("myConfig");
        final String routingKey = config.getString("consumer.queueName");

        final ExecutorService executor = Executors.newCachedThreadPool();
        final ForkJoinPool callbackExecutor = new ForkJoinPool();

        final RabbitMQJavaConnection connection = RabbitMQJavaConnection.newBuilder(config, executor).build();

        final RabbitMQConsumer rabbitMQConsumer = connection.newConsumer(
                "consumer",
                ...,
                callbackExecutor,
                ...
        );

        final RabbitMQProducer rabbitMQProducer = connection.newProducer(
                "producer",
                ...,
                callbackExecutor
        );
    }

}

The Java API has some limitations compared to the Scala one - mainly it does not support types conversions and it offers only asynchronous version with CompletableFuture as result of all operations.

See full example

Notes

Extras

There is a module with some optional functionality called extras.

DeliveryResult

The consumers readAction returns Future of DeliveryResult. The DeliveryResult has 4 possible values (descriptions of usual use-cases):

  1. Ack - the message was processed; it will be removed from the queue
  2. Reject - the message is corrupted or for some other reason we don't want to see it again; it will be removed from the queue
  3. Retry - the message couldn't be processed at this moment (unreachable 3rd party services?); it will be requeued (inserted on the top of the queue)
  4. Republish - the message may be corrupted but we're not sure; it will be re-published to the bottom of the queue (as a new message and the original one will be removed). It's usually wise to prevent an infinite republishing of the message - see Poisoned message handler.

Difference between Retry and Republish

When using Retry the message can effectively cause starvation of other messages in the queue until the message itself can be processed; on the other hand Republish inserts the message to the original queue as a new message and it lets the consumer handle other messages (if they can be processed).

Bind/declare arguments

There is an option to specify bind/declare arguments for queues/exchanges as you may read about at RabbitMQ docs. Check reference.conf or following example for usage:

  producer {
    name = "Testing" // this is used for metrics, logging etc.

    exchange = "myclient"

    // should the producer declare exchange he wants to send to?
    declare {
      enabled = true // disabled by default

      type = "direct" // fanout, topic
      
      arguments = { "x-max-length" : 10000 }
    }
  }

Additional declarations and bindings

Sometimes it's necessary to declare an additional queue or exchange which is not directly related to the consumers or producers you have in your application (e.g. dead-letter queue).
The library makes possible to do such thing, e.g.:

    rabbitConnection.bindExchange("backupExchangeBinding")

where the "backupExchangeBinding" is link to the configuration (use relative path to the factory configuration):

    backupExchangeBinding {
      sourceExchangeName = "mainExchange"
      destExchangeName = "backupExchange"
      routingKeys = []
      arguments {}
    }

Check reference.conf for all options or see application.conf in tests.

Pull consumer

Sometimes your use-case just doesn't fit the normal consumer scenario. Here you can use the pull consumer which gives you much more control over the received messages. You pull new message from the queue and acknowledge (reject, ...) it somewhere in the future.

The pull consumer uses PullResult as return type:

  • Ok - contains DeliveryWithHandle instance
  • EmptyQueue - there was no message in the queue available

Additionally you can call .toOption method on the PullResult.

A simplified example:

import com.avast.bytes.Bytes
import scala.concurrent.Future
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.api._

implicit val sch: Scheduler = ???

val connection: RabbitMQConnection[Future] = ???

val consumer = connection.newPullConsumer[Bytes](???, ???)


// receive "up to" 100 deliveries
val deliveries: Future[Seq[PullResult[Future, Bytes]]] = Future.sequence { (1 to 100).map(_ => consumer.pull()) }

// do your stuff!

???

// "handle" all deliveries, ignore failures and "empty queue" results
val handleResult: Future[Unit] = deliveries.flatMap(s => Future.sequence(s.flatMap(_.toOption).map(_.handle(DeliveryResult.Ack))).map(_ => Unit))

consumer.close()
connection.close()

MultiFormatConsumer

Quite often you receive a single type of message but you want to support multiple formats of encoding (Protobuf, Json, ...). This is where MultiFormatConsumer could be used.

Modules extras-circe and extras-cactus provide support for JSON and GPB conversion. They are both used in the example below.

The MultiFormatConsumer is Scala only.

Usage example:

Proto file

import com.avast.bytes.Bytes
import com.avast.cactus.bytes._ // Cactus support for Bytes, see https://github.com/avast/cactus#bytes
import com.avast.clients.rabbitmq.test.ExampleEvents.{NewFileSourceAdded => NewFileSourceAddedGpb}
import com.avast.clients.rabbitmq._
import com.avast.clients.rabbitmq.extras.format._
import io.circe.Decoder
import io.circe.generic.auto._ // to auto derive `io.circe.Decoder[A]` with https://circe.github.io/circe/codec.html#fully-automatic-derivation
import scala.concurrent.Future
import scala.collection.JavaConverters._

private implicit val d: Decoder[Bytes] = Decoder.decodeString.map(Utils.hexToBytesImmutable)

case class FileSource(fileId: Bytes, source: String)

case class NewFileSourceAdded(fileSources: Seq[FileSource])

val consumer = MultiFormatConsumer.forType[Future, NewFileSourceAdded](
  JsonDeliveryConverter.derive(), // requires implicit `io.circe.Decoder[NewFileSourceAdded]`
  GpbDeliveryConverter[NewFileSourceAddedGpb].derive() // requires implicit `com.avast.cactus.Converter[NewFileSourceAddedGpb, NewFileSourceAdded]`
)(businessLogic.processMessage)

(see unit test for full example)

Implementing own DeliveryConverter

The CheckedDeliveryConverter is usually reacting to Content-Type (like in the example below) but it's not required - it could e.g. analyze the payload (or first bytes) too.

val StringDeliveryConverter: CheckedDeliveryConverter[String] = new CheckedDeliveryConverter[String] {
  override def canConvert(d: Delivery[Bytes]): Boolean = d.properties.contentType.contains("text/plain")
  override def convert(b: Bytes): Either[ConversionException, String] = Right(b.toStringUtf8)
}