/rabbitmq-client

Akka actor-based RabbitMq Client Library

Primary LanguageScala

RabbitMq-Client Build Status Codacy Badge Coverage Status Release Version

rabbitmq-client is an Akka actor-based wrapper for the standard java RabbitMQ API.

There are two basic use-cases; publish and consume.

How to publish

object MyApp extends App {
  val addresses: List[com.rabbitmq.client.Address] = ???

  // Use constructor or setters to apply connection details of your rabbit broker.
  val connectionFactory = new com.rabbitmq.client.ConnectionFactory()

  val rabbitActor = actorSystem.actorOf(Props(classOf[RabbitMqActor], connectionFactory, addresses), "my-rabbitmq-actor")

  val myPublishingActor = system.actorOf(MyPublishingActor.props())

  rabbitActor ! RegisterPublisher(myPublishingActor, declarations = Nil, publisherConfirms = false)
}

myPublishingActor will now receive a PublisherChannelActor which contains the ActorRef that must be used by myPublishingActor when it wants to publish to RabbitMq.

class MyPublishingActor extends Actor {
  def receive: Receive = {
    case PublisherChannelActor(rabbitPublisher) => context.become(readyToPublish(rabbitPublisher))
  }

  def readyToPublish(rabbitPublisher: ActorRef): Receive = {
    case myJsonMessage: String =>
      rabbitPublisher !
    BasicPublish(
      "exchangeName",
      "routingKey",
      myBasicProperties, // reference to BasicProperties which defines AMQP Headers
      myJsonMessage.getBytes("UTF-8")
    )
  }
}

How to consume

Below is a very basic sketch of an application that will consume from a RabbitMq Queue.

object BasicSampleApp extends App {
  val addresses: List[com.rabbitmq.client.Address] = ???
  val connectionFactory = new com.rabbitmq.client.ConnectionFactory()

  val rabbitActor: ActorRef = system.actorOf(Props(new RabbitMqActor(connectionFactory, addresses)), "rabbitActor")

  val myConsumingActor: ActorRef = system.actorOf(Props(new MyConsumingActor()))

  rabbitActor ! RegisterConsumer("myQueue", myConsumingActor, List.empty[Declaration])
}

class MyConsumingActor extends Actor {
  override def receive: Receive = {
    case d: Delivery =>
      log.info(s"Message successfully consumed - ${d.body.map(_.toChar).mkString}")
      sender() ! Ack
  }
}

How can I contribute?

Please see CONTRIBUTING.md.

What licence is this released under?

This is released under a modified version of the BSD licence. Please see LICENCE.md.