A collection of some akka tools I use quite frequently in my dataflows
Take a sorted stream as input and while the key is the same accumulate a value for the group. When the key changes output the final result of the accumulator applying an optional transform
// Count groups of sorted things
import com.kosmyna.util.streams.AggregateSortedGroupFlow
import akka.stream.scaladsl.{ Sink, Source }
def keyFn(item: (String, Int)): String = item._1
def newAccFn(acc: Option[Int]): Int = 0
def reduceFn(acc: Int, next: (String, Int)): Int = acc + next._2
val r = Source(List(("a", 3), ("a", 2), ("b", 1)))
.via(AggregateSortedGroupFlow[(String, Int), String, Int](keyFn, newAccFn, reduceFn))
.runWith(Sink.seq)
// r == Future{ Seq(5, 1) }
Log on a timer how many messages have passed through this flow
import akka.stream.DelayOverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import com.kosmyna.util.streams.ThroughputLoggerFlow
import scala.concurrent.duration._
Source(Stream.from(1))
.delay(20.millis, DelayOverflowStrategy.backpressure)
.via(ThroughputLoggerFlow(40.millis))
.take(20)
.runWith(Sink.ignore)
Flow that makes it easy to retry async calls like things going over a network
import akka.stream.scaladsl.{ Sink, Source }
import com.kosmyna.util.streams.AsyncRetryFlow
import scala.concurrent.Future
import scala.concurrent.duration._
// this should throw a RetryException anytime it wants to be retried
def replaceWithSomethingLikeHttpRequests(value: Int): Future[Int] = Future{ value }
val reponsesThatWereRetriedOnFailures = Source.apply(List(1, 2, 0, 3, 4))
.via(AsyncRetryFlow[Int, Int](replaceWithSomethingLikeHttpRequests, 1.minute))
.runWith(Sink.collection)
The implementation uses AsyncRetryFlow Simple flow that will retry on network errors, and because es times out when overloaded this can act as a dumb throttle
import akka.stream.scaladsl.Flow
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.requests.bulk.BulkCompatibleRequest
import com.kosmyna.util.streams.RetryElasticSearchFlow
import com.kosmyna.util.streams.IndexStats
val esClient: ElasticClient = ???
val indexStats = Flow[Seq[BulkCompatibleRequest]]
.via(RetryElasticSearchFlow(esClient))
.runWith(IndexStats.perIndexStats())
A simple flow that will throw an exception if the data running through it isn't sorted
import akka.stream.scaladsl.{ Sink, Source }
import com.kosmyna.util.streams.SortCheckerFlow
// this will throw an exception
val result = Source(List(1, 2, 3, 5, 4))
.via(SortCheckerFlow(identity))
.runWith(Sink.ignore)