A common library to abstract the Amazon SQS and SNS producers/consumers interactions.
Features:
- Auto SQS url discovery;
- Auto SNS topic ARN discovery;
- Back-pressure out of the box (via Akka Streams);
- Optional message compression (via MsgPack);
Add the package to your dependencies and the bintray resolver.
libraryDependencies += "com.taxis99" %% "common-sqs" % "0.3.3"
resolvers += Resolver.bintrayRepo("99taxis", "maven")
Configure the queues in your configuration file (assuming you are using TypeSafe Config).
sqs {
my-queue = "my-queue-name"
another-queue = "queues-are-great"
}
sns {
my-topic = "my-cool-topic"
}
At last but not least you have to implement the SqsConsumer[T]
and SqsProducer[T]
traits. Both the consumers and producers are strong typed, the client handles the serialization under the hood, but you must define a type that can be serialized and deserialized in JsValue
:
package models
import play.api.libs.json.Json
case class MyCustomType(foo: String, bar: Int)
object MyCustomType {
implicit val myCustomTypeFormat = Json.format[MyCustomType]
}
package consumers
import javax.inject.{Inject, Singleton}
import com.taxis99.amazon.sqs.{SqsClient, SqsConsumer}
import models.MyCustomType
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class MyConsumer @Inject()(implicit val ec: ExecutionContext, val sqs: SqsClient)
extends SqsConsumer[MyCustomType] {
def queue = "my-queue"
def consume(message: MyCustomType) = ???
// Starts the consumer when the class is initialized
startConsumer()
}
package producers
import javax.inject.{Inject, Singleton}
import com.taxis99.amazon.sqs.{SqsClient, SqsProducer}
import models.MyCustomType
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class MyProducer @Inject()(implicit val ec: ExecutionContext, val sqs: SqsClient)
extends SqsProducer[MyCustomType] {
def queue = "my-queue"
}
package notifications
import javax.inject.{Inject, Singleton}
import com.taxis99.amazon.sns.{SnsClient, SnsPublisher}
import models.MyCustomType
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class MyNotification @Inject()(implicit val ec: ExecutionContext, val sns: SnsClient)
extends SnsPublisher[MyCustomType] {
def topic = "my-topic"
}
Since this API relies at the JSR-330 Dependency Injection interface, the integration with the Play Framework using Guice is quite straightforward.
One just need to create an AmazonSQSClientAsync
and register your instances at the application Module
.
import com.google.inject.{AbstractModule, Provides}
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}
import com.amazonaws.services.sqs.{AmazonSNSAsync, AmazonSNSAsyncClientBuilder}
import com.typesafe.config.Config
import com.taxis99.amazon.sqs.SqsClientFactory
import play.api.{Configuration, Environment}
import play.api.Mode.Prod
import consumers.MyConsumer
import producers.MyProducer
class Module extends AbstractModule {
@Provides
def amazonSqsClient(env: Environment): AmazonSQSAsync = {
if (env.mode == Prod) {
AmazonSQSAsyncClientBuilder.defaultClient()
} else {
SqsClientFactory.atLocalhost()
}
}
@Provides
def amazonSnsClient(env: Environment): AmazonSNSAsync = {
if (env.mode == Prod) {
AmazonSNSAsyncClientBuilder.defaultClient()
} else {
SqsClientFactory.atLocalhost()
}
}
@Provides
def config(config: Configuration): Config = config.underlying
def configure = {
bind(classOf[MyConsumer]).asEagerSingleton()
bind(classOf[MyProducer]).asEagerSingleton()
}
}
If you wish to use the advanced message compression (only for version 0.2.x
), you can specify the MsgPack
serialization at your consumers, producers and publishers:
package producers
import javax.inject.{Inject, Singleton}
import com.taxis99.amazon.serializers.{ISerializer, MsgPack}
import com.taxis99.amazon.sqs.{SqsClient, SqsProducer}
import models.MyCustomType
import scala.concurrent.{ExecutionContext, Future}
@Singleton
class MyProducer @Inject()(implicit val ec: ExecutionContext, val sqs: SqsClient)
extends SqsProducer[MyCustomType] {
override def serializer: ISerializer = MsgPack
def queue = "my-queue"
}
Take in consideration that your consumers and producers MUST specify the same serialization method to work properly.
The best way to develop is through a TDD style, the test uses a in memory ElasticMQ to run the flows, allowing a fast interaction with the code.
Other caveat is to run the tests against several scala versions:
$ sbt
> // Run tests against Scala 2.11 and 2.12
> + test
Integration tests
First you must launch an instance of the ElasticMQ server at localhost:9324
then run the tests.
$ docker-compose up -d
$ sbt it:test
Building
To cross compile the jar
for Scala 2.11 and 2.12 use the +
modifier.
$ sbt "+ package"
common-sqs
is open source software released under the Apache 2.0 License by 99Taxis.
See the LICENSE file for details.