Introduce Token Throttle
andreas-schroeder opened this issue · 3 comments
Short description
While Akka Stream provides a backpressuring throttle flow working remarkably well, it is not possible to control or change the token refill rate externally. The Akka docs acknowledge this and also propose an actor-based solution in its cookbook.
However, while building a token-bucket-based throttling backed by a token bucket in Redis, I found another design that might be useful to a wider audience for controlling rate externally.
Details
Using a graph stage that fetches tokens from an externally provided upstream (and storing them internally), it is possible to throttle a flow based on a central token store. To give an example:
// query Redis up to 10 times / sec for tokens, expecting a Long to be returned
val client: RedisClient = ???
val reactive = client.connect().reactive()
val tokens: Source[Long, NotUsed] = Source
.unfold(NotUsed)(_ => Some(NotUsed, reactive.eval[Long]("lua script", ScriptOutputType.INTEGER, "tokens")))
.throttle(10, 1.second)
.flatMapConcat(Source.fromPublisher)
// throttle processing based on availability of tokens from Redis
mySource.via(TokenThrottle(tokens)(_ => 1)).map(processing)
With the used lua script subtracts the returned tokens and additionally does bucket refill behind the scenes similar to here, and some failure handling for on top, that would be almost all there is to be added.
TokenThrottle.apply
used above requires a tokenSource: Graph[SourceShape[Long], M]
and a costCalculation: A => Long
to compute costs of the current item. For fix-sized cost, it would be actually possible to achieve the same logic with the built-in zipWith
. For variable per-item cost, having a custom stage is simpler.
PR follows suit.
Let me know if this would be a valuable addition to akka-stream-contrib.
I've published a Gist for the redis scripts in case you want to get a better idea.
Here's how this can then be used (once you have the hash of the script):
val reactive = client.connect().reactive()
def callRedis = reactive.evalsha[Long](
fetchTokenScriptHash,
ScriptOutputType.INTEGER,
Array("tokens"),
"10", "20", "3")
val tokens: Source[Long, NotUsed] = Source
.unfold(NotUsed)(_ => Some((NotUsed, callRedis)))
.throttle(3, 1.second)
.flatMapConcat(Source.fromPublisher)
Any comments on this? It would also be fine for me if you decide that akka-stream-contrib isn't the right place for this. In case the example was too short or a bit too cryptic, here's a full one without any details left out (except loading the lua scripts and the scripts themselves). The benefit of the new stage would be that tokens can be stored in a central token bucket, and consumed by multiple streams (even possibly running on different nodes):
val client = RedisClient.create(RedisURI.create("localhost", 6379))
implicit val system: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
val tokensScript = loadScript("fetch-tokens.lua")
val initScript = loadScript("init-bucket.lua")
val connection = client.connect()
val sync = connection.sync()
val initId = sync.scriptLoad(initScript)
val scriptId = sync.scriptLoad(tokensScript)
sync.evalsha(initId,
ScriptOutputType.STATUS,
Array("my_token_bucket"), // bucket name
"20", // bucket capacity: 20 tokens
"3", // refill 3 ...
"1000000") // every second
val reactive = client.connect().reactive()
// ask for tokens in batches of 10 tokens per request, with fallback of 5 tokens in case redis in unavailable.
def callRedis =
reactive
.evalsha[Long](scriptId, ScriptOutputType.INTEGER, Array("my_token_bucket"), "10")
.retryBackoff(10, 10.millis.asJava)
.onErrorReturn(5L)
val tokens: Source[Long, NotUsed] = Source
.unfold(NotUsed)(_ => Some((NotUsed, callRedis)))
.throttle(3, 1.second) // fetch tokens from redis at most 3 times per second.
.flatMapConcat(Source.fromPublisher)
.map { t =>
println(s"${Instant.now}: Fetched $t tokens.")
t
}
Source
.repeat(1)
.via(TokenThrottle(tokens)(_ => 1))
.runForeach(x => println(s"${Instant.now}: processing one item."))
this would produce the following output:
2019-07-12T17:52:08.640Z: Fetched 10 tokens.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.641Z: processing one item.
2019-07-12T17:52:08.642Z: processing one item.
2019-07-12T17:52:08.966Z: Fetched 10 tokens.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:08.966Z: processing one item.
2019-07-12T17:52:09.294Z: Fetched 0 tokens.
2019-07-12T17:52:09.635Z: Fetched 3 tokens.
2019-07-12T17:52:09.635Z: processing one item.
2019-07-12T17:52:09.636Z: processing one item.
2019-07-12T17:52:09.636Z: processing one item.
2019-07-12T17:52:09.965Z: Fetched 0 tokens.
2019-07-12T17:52:10.294Z: Fetched 0 tokens.
2019-07-12T17:52:10.636Z: Fetched 3 tokens.
2019-07-12T17:52:10.636Z: processing one item.
2019-07-12T17:52:10.636Z: processing one item.
2019-07-12T17:52:10.637Z: processing one item.
2019-07-12T17:52:10.965Z: Fetched 0 tokens.
2019-07-12T17:52:11.296Z: Fetched 0 tokens.
2019-07-12T17:52:11.628Z: Fetched 3 tokens.
2019-07-12T17:52:11.629Z: processing one item.
2019-07-12T17:52:11.629Z: processing one item.
2019-07-12T17:52:11.629Z: processing one item.
2019-07-12T17:52:11.955Z: Fetched 0 tokens.
Sorry for not getting back to this sooner. I think it is fine to have this here first. But also it generalizes throttle quite well. I think it could be promoted to the akka-stream module eventually.