new operator:take first n element of an eventstream
doofin opened this issue · 3 comments
doofin commented
I find some times we need to take first n element of an Eventstream ,something like that :https://www.learnrxjs.io/learn-rxjs/operators/filtering/take
vic commented
implicit class EventStreamExt[A](private val aStream: EventStream[A]) extends AnyVal {
def take(n: Int): EventStream[A] = {
var seen = 0
aStream.collect {
case event if seen < n =>
seen += 1
event
}
}
}
doofin commented
thanks @vic ,currently I guess something like that would also work:
a combineWith EventStream.fromSeq(0 to n) filter (x => x._2 < n) map (_._1)
raquo commented
I will implement this eventually.
@vic's version works just fine in 99% cases but if we were to add a take
operator to Airstream it would need to account for stream lifecycle, i.e. it should unsubscribe from parent observable when reaching the limit. Also need to decide whether the take count should reset or not if the stream stops (loses all observers), and also what if anything to do about Signals.