Stream-based library for RabbitMQ built-in on top of Fs2 and the RabbitMq Java Client.
Disclaimer: It's still under development and it was created just to solve my specific cases, but more features will be added as needed. Contributors are welcome :)
Add the only dependency to your build.sbt:
libraryDependencies += "com.github.gvolpe" %% "fs2-rabbit" % "0.2"
has the following dependencies and it's cross compiled to Scala 2.11.12
and 2.12.4
Dependency | Version |
cats | 1.0.1 |
cats-effect | 0.8 |
fs2 | 0.10.2 |
circe | 0.9.1 |
amqp-client | 4.1.0 |
By default, fs2-rabbit will look into the application.conf file for the configuration of the library, here's an example:
fs2-rabbit {
connection {
virtual-host = "/"
host = ""
username = "guest"
password = "guest"
port = 5672
ssl = false
connection-timeout = 3
requeue-on-nack = false
See reference.conf for more.
Creating connection, channel, "acker-consumer" and publisher + declare queue and exchange + binding queue
Connection and Channel will be acquired in a safe way, so in case of an error, the resources will be cleaned up.
represents the effect type. In the examples both cats.effect.IO
and monix.eval.Task
are used but it's possible to use any other effect with an implicit instance of cats.effect.Effect[F]
implicit val ec: ExecutionContext = ???
implicit val F: Fs2Rabbit[IO] = Fs2Rabbit[IO]
val exchangeName = ExchangeName("ex")
val queueName = QueueName("daQ")
val routingKey = RoutingKey("rk")
val program = F.createConnectionChannel flatMap { implicit channel => // Stream[F, AMQPChannel]
for {
_ <- F.declareQueue(queueName) // Stream[F, Unit]
_ <- F.declareExchange(exchangeName, ExchangeType.Topic) // Stream[F, Unit]
_ <- F.bindQueue(queueName, exchangeName, routingKey) // Stream[F, Unit]
ackerConsumer <- F.createAckerConsumer(queueName) // (StreamAcker[F], StreamConsumer[F])
(acker, consumer) = ackerConsumer
publisher <- F.createPublisher(exchangeName, routingKey) // StreamPublisher[F]
_ <- doSomething(consumer, acker, publisher)
} yield ()
// this will give you an Effect describing your program F[Unit]
val effect: F[Unit] =
// if using cats.effect.IO you can execute it like this
// StreamAcker is a type alias for Sink[F, AckResult]
// StreamConsumer is a type alias for Stream[F, AmqpEnvelope]
// StreamPublisher is a type alias for Sink[F, AmqpMessage[String]]
It is possible to create either an autoAckConsumer or an ackerConsumer. If we choose the first one then we only need to worry about consuming the message. If we choose the latter instead, then we are in control of acking / nacking back to RabbitMQ. Here's a simple example on how you can do it:
import com.github.gvolpe.fs2rabbit.model._
import fs2.{Pipe, Stream}
def logPipe: Pipe[F, AmqpEnvelope, AckResult] = { streamMsg =>
for {
amqpMsg <- streamMsg
_ <- Stream.eval(F.delay(println(s"Consumed: $amqpMsg")))
} yield Ack(amqpMsg.deliveryTag)
consumer through logPipe to acker
When creating a consumer, it's also possible to indicate whether it is exclusive, non-local and the basic QOS among others.
Both createAckerConsumer
and createAutoackConsumer
methods support two extra arguments: A BasicQos
and a ConsumerArgs
. By default, the basic QOS is set to a prefetch size of 0, a prefetch count of 1 and global is set to false. The ConsumerArgs is by default None since it's optional. When defined, you can indicate consumerTag
(default is ""), noLocal
(default is false), exclusive
(default is false) and args
(default is an empty Map[String, AnyRef]).
A stream-based Json Decoder that can be connected to a StreamConsumer is provided out of the box. Implicit decoders for your classes must be on scope (you can use Circe's codec auto derivation):
import io.circe._
case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)
private val jsonDecoder = new Fs2JsonDecoder[F]
import jsonDecoder._
(consumer through jsonDecode[Person]) flatMap {
case (Left(error), tag) => (Stream.eval(F.delay(error)) to errorSink).map(_ => Nack(tag)) to acker
case (Right(msg), tag) => Stream.eval(F.delay((msg, tag))) to processorSink
To publish a simple String message is very simple:
import com.github.gvolpe.fs2rabbit.model._
import fs2._
val message = AmqpMessage("Hello world!", AmqpProperties.empty)
Stream(message).covary[F] to publisher
A stream-based Json Encoder that can be connected to a StreamPublisher is provided out of the box. Very similar to the Json Decoder shown above, but in this case, implicit encoders for your classes must be on scope (again you can use Circe's codec auto derivation):
import com.github.gvolpe.fs2rabbit.model._
import fs2._
case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)
private val jsonEncoder = new Fs2JsonEncoder[F]
import jsonEncoder._
val message = AmqpMessage(Person(1L, "Sherlock", Address(212, "Baker St")), AmqpProperties.empty)
Stream(message).covary[F] through jsonEncode[F, Person] to publisher
If you want your program to run forever with automatic error recovery you can choose to run your program in a loop that will restart every certain amount of specified time. An useful StreamLoop
object that you can use to achieve this is provided by the library.
So, for the program defined above, this would be an example of a resilient app that restarts after 1 second and then exponentially (1, 2, 4, 8, etc) in case of failure:
import com.github.gvolpe.fs2rabbit.StreamLoop
import scala.concurrent.duration._ => program, 1.second)
See the examples to learn more!
