upperbound is a purely functional rate limiter. It allows you to submit jobs concurrently, which will then be started at a rate no higher than what you specify.
To get upperbound, add the following line to your build.sbt
libraryDependencies += "org.systemfw" %% "upperbound" % "version"
You can find the latest version in the releases tab.
upperbound depends on fs2
, cats
, cats-effect
and cats-collections
.
Note:
For the time being binary compatibility is not guaranteed. This is not a problem for usage in applications (which is where you would mostly use a rate limiter anyway), but risky if used in libraries. Binary compatibility will be guaranteed in the future.
upperbound is an interval based limiter, which means jobs are
started at a constant rate. This strategy prevents spikes in
throughput, and makes it a very good fit for client side limiting,
e.g. calling a rate limited API.
upperbound is completely pure, which allows for ease of reasoning
and composability. On a practical level, this means that some
familiarity with cats, cats-effect and fs2 is required.
The main entity of the library is a Limiter
, which is defined as:
trait Limiter[F[_]] {
def submit[A](job: F[A], priority: Int = 0): F[Unit]
def interval: SignallingRef[F, FiniteDuration]
def initial: FiniteDuration
def pending: F[Int]
}
The submit
method takes an F[A]
, which can represent any
program, and returns an F[Unit]
that represents the action of
submitting it to the limiter, with the given priority. The semantics
of submit
are fire-and-forget: the returned F[Unit]
immediately
returns, without waiting for the input F[A]
to complete its
execution.
Limiter.submit
is designed to be called concurrently: every
concurrent call submits a job to Limiter
, and they are then started
(in order of priority) at a rate which is no higher then the maximum
rate you specify on construction. A higher number indicates higher
priority, and FIFO order is used in case there are multiple jobs with
the same priority being throttled.
interval
is an fs2.concurrent.SignallingRef
that allows you to
sample, change or react to changes to the current interval between two
tasks. Finally, initial
and pending
return the initial interval
specified on creation, and the number of jobs that are queued up
waiting to start, respectively.
The Limiter
algebra is the basic building block of the library,
additional functionality is expressed as combinators over it.
To create a Limiter
, use the start
method:
case class Rate(n: Int, t: FiniteDuration)
object Limiter {
def start[F[_]: Concurrent: Timer](maxRate: Rate, n: Int = Int.MaxValue): Resource[F, Limiter[F]]
}
start
creates a new Limiter
and starts processing the jobs
submitted so it, which are started at a rate no higher than maxRate
.
import upperbound.syntax.rate._
exposes the every
syntax for creating Rate
s:
import upperbound.syntax.rate._
import scala.concurrent.duration._
Limiter.start[F](100 every 1.minute)
Additionally, n
enforces a bound on the maximum number of jobs
allowed to queue up while waiting for execution. Once this number is
reached, calling submit
will fail the resulting F[Unit]
with a
LimitReachedException
, so that you can in turn signal for
backpressure downstream. Processing restarts as soon as the number of
jobs waiting goes below n
again.
The reason start
returns a cats.effect.Resource
is so that
processing can be stopped gracefully when the Limiter
's lifetime is
over.
To assemble your program, all the places that need limiting at the
same rate should take a Limiter
as an argument, which is then
created at the end of a region of sharing (typically main
) and
injected via Limiter.start(...).use
or
Stream.resource(Limiter.start(...)).flatMap
. If this sentence didn't
make sense to you, it's recommended to watch this talk.
Note:
It's up to you whether you want to pass the Limiter
algebra
implicitly (as an F[_]: Limiter
bound) or explicitly.
My position is that it's ok to pass algebras implicitly as long as
the instance is made implicit at call site, as close as possible to
where it's actually injected. This avoids any problems related to
mixing things up, and is essentially equivalent to having an instance
of your algebra for a newtype over Kleisli.
Reasonable people might disagree however, and I myself pass algebras
around both ways, in different codebases.
upperbound is slightly skewed towards the F[_]: Limiter
style:
internal combinators are expressed that way, and Limiter
has a
summoner method to allow Limiter[F].submit
As mentioned above, submit
has fire-and-forget semantics.
When this is not sufficient, you can use await
:
object Limiter {
def await[F[_]: Concurrent: Limiter, A](job: F[A], priority: Int = 0): F[A]
}
await
looks very similar to submit
, except its semantics are
blocking: the returned F[A]
only completes when job
has
finished its execution. Note however, that the blocking is only semantic,
no actual threads are blocked by the implementation.
Limiter[F].interval
offers flexible control over the rate, which can
be used as a mechanism for applying backpressure based on the result
of a specific job (e.g. a REST call that got rejected upstream).
Although this can be implemented entirely in user land, upperbound
provides some backpressure helpers and combinators out of the box.
class BackPressure[F[_]: Limiter, A](job: F[A]) {
def withBackoff(
backOff: FiniteDuration => FiniteDuration,
ack: BackPressure.Ack[A]
): F[A]
}
object BackPressure {
case class Ack[-A](slowDown: Either[Throwable, A] => Boolean)
}
withBackoff
enriches an F[A]
with a Limiter
constraint with the ability to apply backpressure to the Limiter
:
Every time a job signals backpressure is needed through ack
, the Limiter
will
adjust its current rate by applying backOff
to it. This means the
rate will be adjusted by calling backOff
repeatedly whenever
multiple consecutive jobs signal for backpressure, and reset to its
original value when a job signals backpressure is no longer needed.
Note that since jobs submitted to the Limiter are processed asynchronously, rate changes will not propagate instantly when the rate is smaller than the job completion time. However, the rate will eventually converge to its most up-to-date value.
BackPressure.Ack[A]
is a wrapper over Either[Throwable, A] => Boolean
,
and it's used to assert
that backpressure is needed based on a specific result (or error) of
the submitted job. You can write your own Ack
s, but the library provides
some for you, including:
BackPressure.onAllErrors
: signal backpressure every time a job fails with any error.BackPressure.onError[E <: Throwable]
: signal backpressure if a job fails with a specific type of exception. Meant to be used with explicit type application, e.g.BackPressure.onError[MyException]
.BackPressure.onResult(cond: A => Boolean)
: signal backpressure when the result of a job satisfies the given condition.
Note that withBackoff
only transforms the input job, you still need
to actually submit
or await
yourself. This is done to allow
further combinators to operate on a job as a chain of F[A] => F[A]
functions before actually submitting to the Limiter
.
It's also available as syntax:
import scala.concurrent.duration._
import upperbound._
import upperbound.syntax.backpressure._
def prog[F[_]: Limiter, A](fa: F[A]): F[Unit] =
Limiter[F].submit {
fa.withBackoff(_ + 1.second, Backpressure.onAllErrors)
}
Finally, please be aware that the backpressure support doesn't interfere with your own error handling, nor does any error handling (e.g. retrying) for you.
If you need to satisfy a Limiter
constraint in test code, where you
don't actally care about rate limiting, you can use Limiter.noOp
,
which gives you a stub Limiter
with no actual rate limiting and a
synchronous submit
method.