/futiles

The missing utils for working with Scala Futures

Primary LanguageScalaApache License 2.0Apache-2.0

Futiles

Build Status Coverage Status License

The missing utils for working with Scala Futures

About

Throughout a few different Scala projects I have written these utility functions for working with futures over and over again.

Changelog

License: Apache Licence v2.

Quick start

The artifact is on maven central and can be used by adding it to your project dependencies in sbt:

libraryDependencies += "com.markatta" %% "futiles" % "2.0.2"

Latest stable version:

  • Scala 2.13 - Maven Central, Scala 2.13
  • Scala 2.12 - Maven Central, Scala 2.12
  • Scala 2.11 - Maven Central, Scala 2.11

Examples

Sequencing futures - markatta.futiles.Sequencing

The utilities inside of markatta.futiles.Sequencing allows you to sequence other functor types than collections, just like the built in scala.concurrent.Future.sequence function does.

Options

Make a future option value out of an option future value:

import scala.concurrent.Future
import scala.concurrent.Future.successful
import markatta.futiles.Sequencing.sequenceOpt
val optionFuture: Option[Future[String]] = Some(successful("woho!"))
val futureOption: Future[Option[String]] = sequenceOpt(optionFuture)

Eithers

The either sequencing allows you to go from one or two futures inside of an Either to a future with an either inside of it:

  • sequenceEither allows you to go from Either[Future[L], Future[R]] to Future[Either[L, R]]
  • sequenceL allows you to go from Either[Future[L], R] to Future[Either[L, R]]
  • sequenceR allows you to go from Either[L, Future[R]] to Future[Either[L, R]]

Tries

sequenceTries takes a collection of Future[A] and turn them into a future collection of each succeeded of failed future. So compared to the build in sequence it will not fail with the first failed future but rather collect all those and let you handle them when all has arrived.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.Future.{successful, failed}
import scala.util.Try
import markatta.futiles.Sequencing.sequenceTries

val futures: Seq[Future[String]] = Seq(successful("woho"), failed(new RuntimeException("bork")))

val allOfEm: Future[Seq[Try[String]]] = sequenceTries(futures)

Sequential traverse

scala.concurrent.Future.traverse allows you to have a collection of As, apply a function A => Future[B] to each of them and then sequence the resulting collection into a future collection of B.

Futiles contains one addition to that concept which basically does the same, but applies the function sequentially, so that at any time only one future is executing and the next one will not be done until it completes. If any future fails, it will stop and return a failed future with that exception.

Example:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future.{successful, failed}
import markatta.futiles.Traversal.traverseSequentially
import java.util.concurrent.CountDownLatch

val latch = new CountDownLatch(3)
val result = traverseSequentially(List(1,2,3)) { n =>
  Future {
    latch.countDown()
    (n , latch.getCount)
  }
}

// result will always be Future(List((1, 2L), (2, 1L), (3, 0L)))

Sequential fold left

A more general case of traversal, lets you build any kind of object asynchronously but sequentially:

Example

val latch = new CountDownLatch(3)
val result = foldLeftSequentially(List(1,2,3))(Seq.empty[Int]) { (acc, n) =>
  Future {
    latch.countDown()
    val l = latch.getCount.toInt
    acc :+ l
  }
}

// result will always be Future(List(2, 1, 0)))

Combining futures - markatta.futiles.Combining

multiple futures into one future tuple

Combine up to 6 futures into a tuple with the value of each future using product. If either fails the result will be a failed future. Or apply a function using mapN or flatMapN while you are at it

Example:

import scala.concurrent.Future
import scala.concurrent.Future.successful
import markatta.futiles.Combining._

val fTuple: Future[(Int, String)] = product(successful(1), successful("woho"))

val mapped: Future[String] = 
  map2(successful(1), successful("woho"))((a, b) => a.toString + b)

val flatMapped: Future[String] = 
  flatMap2(successful(1), successful("woho"))((a, b) => successful(a.toString + b)) 

Lifting and unlifting container types inside of futures - markatta.futiles.Lifting

Lift the implicit try into the future

If you want to lift a failed future into a successful future with a failed try inside, you can do that with markatta.futiles.Lifting.liftTry. This is probably not very interesting except for in the context of sequenceTries which is described above.

Options

A common problem when working with futures is that you have a nested option, and this makes it impossible to do clean for comprehensions, since you need to handle both Some(value) and None somehow.

val result =
  for {
    user <- userDao.findById(id): Future[Option[User]]
    cart <- shoppingCartDao.findByUserId(...oh noes, user is an option...)
  } yield (...oh noes, so is cart, what to do!? ...)

One easy way out is to "unlift" the value if it is a None into a failed future, but using unapply in the for comprehension gives us a MatchError with little info about what went wrong.

val result =
  for {
    Some(user) <- userDao.findById(id): Future[Option[User]]
    Some(cart) <- shoppingCartDao.findByUserId(user.id)
  } yield (user, cart)
// hard to know that it wasn't some other MatchError here

Futiles contains two methods Lifting.unliftOption and Lifting.unliftOptionEx for dealing with this, one where you decide what exception to fail the future with and one where you just provide a text, and an UnliftException is used.

To make it really concise an implicit class is provided that will decorate Future[Option[A]] with corresponding methods called unlift

Example:

import scala.concurrent.ExecutionContext.Implicits.global
import markatta.futiles.UnliftException
import markatta.futiles.Lifting.Implicits._

val result =
  for {
    user <- userDao.findById(id).unlift(s"No user with $id")
    cart <- shoppingCartDao.findByUserId(user.id).unlift(s"No cart for user $user")
  } yield (user, cart)

result.recover {
  case UnliftException(msg) if msg.startsWith("No user") => ...
  case UnliftException(msg) if msg.startsWith("No cart") => ...
}

Eithers

Much like the option unlifting above unliftL and unliftLEx will fail the future if the value is a Right and succeed with the value inside of a Left while unliftR and unliftREx does the exact opposite.

There is also implicit decoration for the two:

import scala.concurrent.Future
import markatta.futiles.Lifting.Implicits._

val futureEither: Future[Either[String, Int]] = ???
val result: Future[Int] = futureEither.unliftR("Danger Danger!")

Boolean && and || for Future[Boolean]s. Operations are short circuited just like regular boolean expressions. Important: this affects failures, if the operation is short circuited the second Future[Boolean] might not be evaluated at all. If it is created before the boolean op it might be outright ignored even if it fails.

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import markatta.futiles.Boolean.Implicits._

// short circuit will only happen if we do not create 
// the futures before hand
def a: Future[Boolean] = ???
def b: Future[Boolean] = ???
val both: Future[Boolean] = a && b
val either: Future[Boolean] = a || b
val negated: Future[Boolean] = !a

There is an idea to be able to short circuit for the first arrived future, but this makes failure handling racey (see #5).

Some times you want to wait a specific amount of time before triggering a future, or you want it to complete at a specific timeout with a value you already have. This is available for example in the play framework future libraries, but maybe you would want to do that without depending on play.

If an exception is thrown by the by-name-parameter the future will be failed instead of completed when the timeout is reached.

Example:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import markatta.futiles.Timeouts.timeout

val timeoutF = timeout(1.seconds)("It was too slow, you get default instead")
val result = Future.firstCompletedOf(Seq(doItForReals(), timeoutF))

Implicit timeout methods

Timeouts is such a common concept that there also is implicit decorations for futures. Note that the original future will always complete at some point, so this does in no way cancel the future if it times out.

Example:

import Timeouts.Implicits._

val future: Future[String] = ???

val willFailIfNotCompletedWithin2s = future.withTimeoutError(2.seconds)
val willDefaultIfNotCompletedInTime = future.withTimeoutDefault(2.seconds, "Sensible default")

Retrying failed futures - markatta.futiles.Retry

A common scenario is that you use Futures to interact with remote systems, but what if the remote system is down exactly when the request is done, or the network cable was disconnected by your little brother.

Futiles contains two flavours of retry for futures:

  • Retry right away with markatta.futiles.Retry.retry
  • Retry with an exponential back off, waiting longer and longer before each retry

Both methods allows you to specify the maximum number of retries to perform and a predicate function Throwable => Boolean that will be given any exception and decides if it should lead to a retry or not.

By default all throwables lead to retry.

Example of the simple retry

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import markatta.futiles.Retry._

val result: Future[Int] = retry(5) {
  callThatService(): Future[Int]
}

Exponential back off additionally takes a back off time unit, which decides a base for the calculation (2 ^ tries) * jitter * timeUnit, where jitter will be a random value between 0.5 and 1.5. this means that a time unit of 1 second the first retry will back off between 1 and 3 seconds, the second try between 2 and 6 seconds, the third try between 4 and 12 and so on until the maximum number of retries is reached and at that time the future will be failed.

Example with exponential back off

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import markatta.futiles.Retry._

val result: Future[Int] = retryWithBackOff(5, 5.seconds) {
  callThatService(): Future[Int]
}

If you need to create a Future that you want to cancel at a later point in time then you can use a CancelableFuture.

import markatta.futiles.CancellableFuture

val cancellableFuture = CancellableFuture {
  someLongOperation()
}

cancellableFuture.cancel()

Note that the .cancel method on CancelableFuture is a best effort implementation and it also does not handle cleaning up of resources (such as file handles) since further computations deriving from .map/.flatMap/onComplete may not execute. If a CancelableFuture was cancelled this way it will fail with a CancellationException exception.

Note that CancellableFuture implements the scala.concurrent.Future interface which means you can also use it as a standard Future.