fd4s/fs2-kafka

Feature request (possibly) : Dynamic metering on Streams

Closed this issue · 2 comments

In the same manner that you can use a Signal[F, Boolean] to pause consumption, which I've found incredibly useful for code with e.g. dynamic feature flags to turn on/off consumption, I am hoping to throttle consumption.

1. is there currently a baked-in way to have dynamic metering on a stream, other than `evalTap(_ => doSomeDelay())

The limit of this is that someSleep won't have access to when the stream last emitted, how much time has elapsed, etc etc, which leads me to my actual question:

2. I think this is the feature request I'm asking for, if people would find this useful

// ignoring the details of where these signals come from
val targetDelaySignal: Signal[F, FiniteDuration] = createDelaySignal[F]()
val pauseSignal: Signal[F, Boolean] = createPauseSignal[F]()

// I would like to be able to:
someStream.
    .pauseWhen(pauseSignal)
    .meteredBy(targetDelay)
   .flatMap(...etcetc)

I would want the semantics of it to internally keep track of last time it produced a message, and if signal value is emitted that is less than what was last set, AND more time has passed since it was set, then it would immediately emit and then wait for the next duration to elapse. And inversely, if the duration is increased since last, then the time-delta is taken into consideration

So - in BDD format, this is the behaviour that I'm after:

Scenario 1: Initial startup
GIVEN target delay is initially 1 minute
WHEN the stream is started
THEN it emits a message every minute as long as there are more elements to emit

Scenario 2: Delay is changed to be < than it was previously
GIVEN targetDelay is initially 1 minute
AND the stream emits one message
AND 30 seconds pass
WHEN the targetDelay signal is changed to 5 seconds
THEN a message is immediately emitted because (newTime - timePassedSinceLastInvocaton) is negative or 0
AND every 5 seconds after this, a new message is emitted

Scenario 2: Delay is changed to be > than what it was previously
GIVEN targetDelay is initially 1 minute
AND the stream emits one message
AND 30 seconds pass
WHEN the targetDelay signal is changed to 45 seconds
THEN when an additional 15 seconds elapse, a message is immediately emitted
AND every 45 seconds after this, a new message is emitted

... well, actually, thinking about it, this might be a better issue for fs2 itself 🤔

Closing, moving over to the proper issue tracker