Event sourcing aims to capture all changes to the state of a microservice as a sequence of events. The persistence mechanism used to store these events is referred to as an event journal. This is usually a NoSQL or SQL database, but it could also be a bespoke solution.
A microservice's state can then be recreated by replaying the sequence of events read from an event journal. Before the microservice can write an event to the journal it must validate that the event can be applied to the current state. This validation may involve network or disk I/O.
Each resource collection e.g. /posts
/comments
and /links
will usually have it's own event journal. The state of a given resource e.g. /posts/:id
, /comments/:id
and /links/:id
can be recreated by replaying events with a given identifier in the respective journal. To recreate the state of an entire resource collection requires all events in a journal to be replayed. This can lead to reduced read performance so a separate query model is often maintained (see CQRS).
In much the same way that the representation e.g. application/json
or application/xml
of a resource could, and perhaps should, vary so could the representation of events. This is especially true if the content or structure of events changes over time. The representation may start of as something human readable, say JSON, and then transition to binary, say Protobuf, if or when performance becomes an issue.
To implement event sourcing in Scala then, at a minimum, the types State
, Event
and EventJournal
need to be represented. The functions Seq[Event] => Option[State]
and (Event, Option[State]) => Future[Option[State]]
are also needed. These types and functions can be defined using generic traits:
trait CanReplay[Event, State] {
def replay(s: Seq[Event]): Option[State]
}
trait CanValidate[Event, State] {
def validate(e: Event, s: Option[State]): Future[Option[State]]
}
It would also be nice to support reading and writing multiple Event
and Identifier
representations:
trait CanRead[Model, Rep] {
def read(a: Rep): Model
}
trait CanWrite[Model, Rep] {
def write(a: Model): Rep
}
With these generic traits it is possible to define the following generic EventJournal
:
trait EventJournal[Identifier, Event, IdentifierRep, EventRep] {
def events(
resourceId: Identifier)(implicit
canWriteIdentifier: CanWrite[Identifier, IdentifierRep],
canReadEvent: CanRead[Event, EventRep]): Future[Seq[Event]]
def read[State](
resourceId: Identifier)(implicit
canReplay: CanReplay[Event, State],
canWriteIdentifier: CanWrite[Identifier, IdentifierRep],
canReadEvent: CanRead[Event, EventRep]): Future[Option[State]]
def write[State](
resourceId: Identifier,
event: Event)(implicit
canReplay: CanReplay[Event, State],
canValidate: CanValidate[Event, State],
canReadEvent: CanRead[Event, EventRep],
canWriteEvent: CanWrite[Event, EventRep],
canWriteIdentifier: CanWrite[Identifier, IdentifierRep]): Future[Option[State]]
}
And an example implementation using Slick:
class SlickEventJournal[
Identifier,
Event : ClassTag,
IdentifierRep : JdbcType,
EventRep : JdbcType](
private val driver: JdbcDriver,
private val database: Database)(implicit
private val context: ExecutionContext)
extends EventJournal[Identifier, Event, IdentifierRep, EventRep] with Initializable {
private val eventName = implicitly[ClassTag[Event]].runtimeClass.getSimpleName
private val underscoreEventName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, eventName)
private val name = s"${underscoreEventName}_journal"
import driver.api._
private case class EventRow(resourceId: IdentifierRep, seqNo: Long, event: EventRep)
private class EventsTable(tag: Tag) extends Table[EventRow](tag, name) {
def resourceId = column[IdentifierRep]("resource_id")
def seqNo = column[Long]("seq_no")
def event = column[EventRep]("event")
def * = (resourceId, seqNo, event) <> (EventRow.tupled, EventRow.unapply)
def pk = primaryKey(s"${tableName}_pk", (resourceId, seqNo))
}
private val events = TableQuery[EventsTable]
def events(
resourceId: Identifier)(implicit
canWriteIdentifier: CanWrite[Identifier, IdentifierRep],
canReadEvent: CanRead[Event, EventRep]): Future[Seq[Event]] = {
database.
run(events.
filter(e => e.resourceId === canWriteIdentifier.write(resourceId)).
sortBy(e => e.seqNo).result).
map(es => es.map(e => canReadEvent.read(e.event)).toVector)
}
def read[State](
resourceId: Identifier)(implicit
canReplay: CanReplay[Event, State],
canWriteIdentifier: CanWrite[Identifier, IdentifierRep],
canReadEvent: CanRead[Event, EventRep]): Future[Option[State]] = {
database.
run(events.
filter(e => e.resourceId === canWriteIdentifier.write(resourceId)).
sortBy(e => e.seqNo).
result).
map(es => es.map(e => canReadEvent.read(e.event)).toVector).
map(canReplay.replay)
}
def write[State](
resourceId: Identifier,
event: Event)(implicit
canReplay: CanReplay[Event, State],
canValidate: CanValidate[Event, State],
canReadEvent: CanRead[Event, EventRep],
canWriteEvent: CanWrite[Event, EventRep],
canWriteIdentifier: CanWrite[Identifier, IdentifierRep]): Future[Option[State]] = {
val resourceEvents = events.filter(e => e.resourceId === canWriteIdentifier.write(resourceId))
val isValid = resourceEvents.result.
map(es => es.map(e => canReadEvent.read(e.event)).toVector).
map(canReplay.replay).
flatMap(state => DBIO.from(canValidate.validate(event, state)))
val writeIfValid = isValid.flatMap { state =>
resourceEvents.map(e => e.seqNo).max.result.
map(o => o.map(_ + 1).getOrElse(0L)).
flatMap(s => events +=
EventRow(canWriteIdentifier.write(resourceId), s, canWriteEvent.write(event))).
map(_ => state)
}
database.run(writeIfValid.transactionally)
}
override def init(): Future[Unit] = {
database.run(MTable.getTables).flatMap { ts =>
if (ts.map(t => t.name.name).exists(n => n == name)) {
Future.successful(()) // NOP
} else {
database.run(events.schema.create)
}
}
}
}
Using the JdbcType
type class means that the SlickEventJournal
can read and write identifier and event representations of type String
, UUID
, Array[Byte]
and many more. This enables the support of binary and human readable representations for identifiers and events. The extensive use of implicits on methods makes for verbose method definitions, but also means that client code need only provide (define or import) a minimal set of implementations. In order to use the SlickEventJournal
the client code must provide implementations for the type classes defined above. Here are example definitions that make use of Play's JSON library to generate event representations:
case class Post(id: UUID, body: String)
object Post {
import play.api.libs.json._
val MaxBody = 1024
private val reads = Json.reads[Post].
filter(p => p.body.length < MaxBody)
private val writes = Json.writes[Post]
implicit val format = Format(reads, writes)
}
sealed trait PostEvent
case class PostCreated(id: UUID, body: String) extends PostEvent
case class PostUpdated(id: UUID, body: String) extends PostEvent
case class PostDeleted(id: UUID) extends PostEvent
object PostEvent {
import play.api.libs.json._
implicit val eventWrites = new Writes[PostEvent] {
def writes(e: PostEvent) = e match {
case e: PostCreated => Json.obj("id" -> e.id, "type" -> "post_created", "body" -> e.body)
case e: PostUpdated => Json.obj("id" -> e.id, "type" -> "post_updated", "body" -> e.body)
case e: PostDeleted => Json.obj("id" -> e.id, "type" -> "post_deleted")
}
}
implicit val eventReads = new Reads[PostEvent] {
override def reads(j: JsValue): JsResult[PostEvent] = Try {
(j \ "type").as[String] match {
case "post_created" => PostCreated((j \ "id").as[UUID], (j \ "body").as[String])
case "post_updated" => PostUpdated((j \ "id").as[UUID], (j \ "body").as[String])
case "post_deleted" => PostDeleted((j \ "id").as[UUID])
}
} map { case e => JsSuccess(e) } getOrElse { JsError() }
}
implicit object CanReadPostEvent extends CanRead[PostEvent, String] {
override def read(a: String): PostEvent = Json.fromJson[PostEvent](Json.parse(a)).get
}
implicit object CanWritePostEvent extends CanWrite[PostEvent, String] {
override def write(a: PostEvent): String = Json.stringify(Json.toJson(a))
}
implicit object CanWriteUUID extends CanWrite[UUID, UUID] {
override def write(a: UUID): UUID = a
}
private def update(event: PostEvent, state: Option[Post]): Option[Post] = {
event match {
case e: PostCreated => Some(Post(e.id, e.body))
case e: PostUpdated => state.map(p => p.copy(body = e.body))
case e: PostDeleted => None
}
}
implicit object CanReplayPostEvent extends CanReplay[PostEvent, Post] {
override def replay(es: Seq[PostEvent]): Option[Post] = {
@tailrec
def loop(es: Seq[PostEvent], state: Option[Post]): Option[Post] = {
es.headOption match {
case Some(e) => loop(es.tail, update(e, state))
case None => state
}
}
loop(es, None)
}
}
implicit object CanValidatePostEvent extends CanValidate[PostEvent, Post] {
override def validate(event: PostEvent, state: Option[Post]): Future[Option[Post]] = {
event match {
case e: PostCreated if state.isDefined => Future.failed(new ConflictException)
case e: PostUpdated if state.isEmpty => Future.failed(new NotFoundException)
case e: PostDeleted if state.isEmpty => Future.failed(new NotFoundException)
case _ => Future.successful(update(event, state))
}
}
}
}
A pleasant result of using type classes is that all of the methods defined in them are pure functions. More concretely, they are referentially transparent and, by definition, do not cause side-effects. This makes unit testing the replay
and validate
methods trivial. Testing the SlickEventJournal
is slightly less trivial however. With that in mind here is one approach to testing the SlickEventJournal
using Scalatest's new AsyncWordSpec
:
class SlickEventJournalSpec extends AsyncWordSpec with MustMatchers {
import PostEvent._
val uuid = UUID.randomUUID()
val body = "Test body"
val createEvent = PostCreated(uuid, body)
val updateEvent = PostUpdated(uuid, body.reverse)
val deleteEvent = PostDeleted(uuid)
"SlickEventJournal::events" when {
"no events have been written" must {
"return an empty Seq" in {
withJournal { journal =>
journal.init().
flatMap(_ => journal.events(uuid)).
map(es => es must be (Seq.empty))
}
}
}
"one event has been written" must {
"return a Seq containing that event" in {
withJournal { journal =>
val write = journal.write[Post]_
journal.init().
flatMap(_ => write(uuid, createEvent)).
flatMap(_ => journal.events(uuid)).
map(es => es must be (Seq(createEvent)))
}
}
}
"three events have been written" must {
"return a Seq containing those events in order" in {
withJournal { journal =>
val write = journal.write[Post]_
journal.init().
flatMap(_ => write(uuid, createEvent)).
flatMap(_ => write(uuid, updateEvent)).
flatMap(_ => write(uuid, deleteEvent)).
flatMap(_ => journal.events(uuid)).
map(es => es must be (Seq(createEvent, updateEvent, deleteEvent)))
}
}
}
}
"SlickEventJournal::read" when {
"no events have been written" must {
"return None" in {
withJournal { journal =>
val read = journal.read[Post]_
journal.init().
flatMap(_ => read(uuid)).
map(es => es must be (None))
}
}
}
"one event have been written" must {
"return Some(Post(...))" in {
withJournal { journal =>
val write = journal.write[Post]_
val read = journal.read[Post]_
journal.init().
flatMap(_ => write(uuid, createEvent)).
flatMap(_ => read(uuid)).
map(es => es must be (Some(Post(uuid, body))))
}
}
}
"two events have been written" must {
"return Some(Post(...))" in {
withJournal { journal =>
val write = journal.write[Post]_
val read = journal.read[Post]_
journal.init().
flatMap(_ => write(uuid, createEvent)).
flatMap(_ => write(uuid, updateEvent)).
flatMap(_ => read(uuid)).
map(es => es must be (Some(Post(uuid, body.reverse))))
}
}
}
"three events have been written" must {
"return None" in {
withJournal { journal =>
val write = journal.write[Post]_
val read = journal.read[Post]_
journal.init().
flatMap(_ => write(uuid, createEvent)).
flatMap(_ => write(uuid, updateEvent)).
flatMap(_ => write(uuid, deleteEvent)).
flatMap(_ => read(uuid)).
map(es => es must be (None))
}
}
}
}
def withJournal[T](thunk: SlickEventJournal[UUID, PostEvent, UUID, String] => T): T = {
val driver: JdbcDriver = slick.driver.H2Driver
val database: Database = Database.forURL(s"jdbc:h2:mem:${UUID.randomUUID()};DB_CLOSE_DELAY=-1")
val name: String = "posts"
import driver.api._
val journal = new SlickEventJournal[UUID, PostEvent, UUID, String](driver, database)
thunk(journal)
}
}
The above code aims to strike a balance between compile time type-safety and ease of use. It could be made more type-safe, however, by using an event "pack" to ensure that all events are handled. This could be implemented using a HList. This would not prevent a bad actor (whether that be another microservice, DBA or failing disk) from corrupting the event journal. In addition the above code ignores snapshotting. That said it should be reasonable trivial to update CanReplay
to implement a replay function along the lines of: (Seq[Event], Option[State) => Option[State]
. It would also be possible to reduce the amount of boilerplate serialisation code using scala macros. In terms of future work it should be possible to create an AkkaEventJournal
that makes use of akka-actor
, akka-cluster
, akka-cluster-sharding
and akka-persistence
for the persistence.