/franz

Better Api for using SQS with Scala

Primary LanguageScalaMIT LicenseMIT

Franz is a simple reactive-ish Scala wrapper around the Amazons SQS persistent message queue service.

#Initialization First you will need an instance of the trait SQSClient. The only currently available implementation is SimpleSQSClient, which has three constructors

new SimpleSQSClient(
	credentialProvider: com.amazonaws.auth.AWSCredentialsProvider,
	region: com.amazonaws.regions.Regions,
	buffered: Boolean
)

SimpleSQSClient(
	credentials: com.amazonaws.auth.AWSCredentials,
	region: com.amazonaws.regions.Regions,
	buffered: Boolean=false
)

SimpleSQSClient(key: String, secret: String, region: com.amazonaws.regions.Regions)

(Warning: Be careful when using buffered=true. It can improve performance, but it's buggy. Use at your own risk.)

Let's use the third.

import com.amazonaws.regions.Regions
val sqs = SimpleSQSClient(<your aws access key>, <your aws secret key>, Regions.US_WEST_1)

We'll come back to how to actually get a queue from the client shortly.

#SQSQueue The type you'll be using to actually interact with an SQS Queue is SQSQueue[T]. It provides all the primitives for sending and receiving messages.

##Sending SQSQueue[T] provides one method for sending messages:

def send(msg: T)(implicit ec: ExecutionContext): Future[MessageId]

There is no current use for the returned MessageId, but you can use the success of the Future as a send confimation.

If you need to pass one or more SQS message attributes along with the message, provide a Map[String,String] to the optional messageAttributes.

def send(msg: T, messageAttributes: Option[Map[String, String])(implicit ec: ExecutionContext): Future[MessageId]

##Receiving

###Direct SQSQueue provides several methods for getting the next message in the queue

def next(implicit ec: ExecutionContext): Future[Option[SQSMessage[T]]]
def nextBatch(maxBatchSize: Int)(implicit ec: ExecutionContext): Future[Seq[SQSMessage[T]]]
def nextWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Future[Option[SQSMessage[T]]]
def nextBatchWithLock(maxBatchSize: Int, lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Future[Seq[SQSMessage[T]]]

The returned SQSMessage[T] objects have the fields

val body: T //actual message payload
val attributes: Map[String,String] //raw attributes from com.amazonaws.services.sqs.model.Message
val consume: () => Unit //deletes the message from the queue

and the method

def consume[K](block: T => K): K

Which will call consume if no exception is thrown so you can do either

    processMyEvent(sqsMessage.body)
    sqsMessage.consume()

or

    sqsMessage.consume { body =>
        processMyEvent(body)
    }

The *WithLock methods lock (or rather, hide) the retrieved message(s) in the queue so that no other call will retrieve them during the lock timeout. You need to call consume on the message before the timeout expires in order to permanently remove it form the queue.

If the lock expires the message will again be available for retrieval, which is useful e.g. in case of an error when consume was never called.

The implementation uses 20 second long polls behind the scenes. If no message was available within that time a None or Seq.empty will be returned (depending on the method used). Note that due to the distributed and eventually consistent nature of SQS it is sometimes possible to get an empty response even if there are some (but few) messages in the queue if you happen to poll an empty node. The best practice solution to that is continuous retries, i.e. you'll make 3 requests per minute.

###Iteratees For the more functionally inclined SQSQueue[T] also provides enumerators to be used with your favorite Iteratee

def enumerator(implicit ec: ExecutionContext): Enumerator[SQSMessage[T]]
def enumeratorWithLock(lockTimeout: FiniteDuration)(implicit ec: ExecutionContext): Enumerator[SQSMessage[T]]

The semantics of retrievel and locking are identical to those of the next* methods.

#Getting a Queue

SQSClient currently has three methods for getting a specific queue

def simple(queue: QueueName, createIfNotExists: Boolean=false): SQSQueue[String]
def json(queue: QueueName, createIfNotExists: Boolean=false): SQSQueue[JsValue]
def formatted[T](queue: QueueName, createIfNotExists: Boolean=false)(implicit format: Format[T]): SQSQueue[T]

Where Format[T] and JsValue are form play.api.libs.json. QueueName is simply a typed wrapper around a string, which should be the full queue name (not the queue url).

#SQS Limitations

  • Fairly high latency. Not really suitable for things that require immediate action.
  • Message size is limited to ~64KB.
  • FIFO not guaranteed for messages sent close together (i.e. there is no strict ordering of messages).
  • Multicasting is somewhat cumbersome (could be done through SNS fanout).
  • No replay. Once a message is consumed, it's gone.

#Installation

You can get Franz from maven central. The artifact is franz_2.10 or franz_2.11 and the group id is com.kifi. The current version is 0.3.14. For example, if you are using sbt, just add this to your dependencies:

"com.kifi" % "franz_2.11" % "0.3.14"

To add a dependency that matches your scala version (2.10.x or 2.11.x), use

"com.kifi" %% "franz" % "0.3.13"

All classes are in in com.kifi.franz.

#See Also

Kifi's Reactive Scala Wrapper for Amazon SQS blog post