High performance, Akka based Amazon SQS messages consumer for Scala. Tested with scala 2.10 and Java 7
SQS is a complete message solution powered by Amazon, including monitoring, alarms, redundancy, etc. Due to its particularities, we decided to build a message consumer from scratch, to better handle costs, latency and others.
- Add sqsfork to your project:
libraryDependencies += "sqsfork" %% "sqsfork" % "1.0"
- Add a worker to process messages asynchronously:
import com.sqsfork.SQSWorker
import com.amazonaws.services.sqs.model.Message
class HardWorker extends SQSWorker {
def config = Map("queueName" -> "test", "concurrency" -> "10")
def perform(message: Message) {
println(message.getBody())
}
}
Configure the parallelism using the param concurrency, the default is 10.
- Start the worker:
import com.sqsfork.Credentials
object Main extends App {
implicit val credendials = Credentials("yourAwsAccessKey","yourAwsSecretAccessKey")
implicit val waitForever = true
val worker = new HardWorker
worker.run
}
With waitForever = false, the engine starts and this thread continues to execute. If waitForever = true, the thread will be blocked (suitable to be executed in dedicated process, like heroku workers)
You can call worker.run in any initializer hook from your application (in this case use waitForever = false). If you want to run your worker in a different process, use a Procfile (like Heroku does) to start your workers. To generate a start script, see sbt-start-script
- Make sure to disable data caches (or at least clean the data after each execution) in your workers, otherwise your workers will explode.
- Workers will automatically retry a job if the execution raises an exception
- If the execution does not raise an exception, workers assume the execution ran cleanly and will remove the message from queue
- Be aware of the costs of using SQS, even while under the service's free tier
- Implement a better retry policy (today the message will be retried forever)
- Add more tests
- Delay new fetches if the queue is empty