raquo/Airstream

new operator:take first n element of an eventstream

doofin opened this issue · 3 comments

I find some times we need to take first n element of an Eventstream ,something like that :https://www.learnrxjs.io/learn-rxjs/operators/filtering/take

vic commented
 implicit class EventStreamExt[A](private val aStream: EventStream[A]) extends AnyVal {
    def take(n: Int): EventStream[A] = {
      var seen = 0
      aStream.collect {
        case event if seen < n =>
          seen += 1
          event
      }
    }
}

thanks @vic ,currently I guess something like that would also work:

a combineWith EventStream.fromSeq(0 to n) filter (x => x._2 < n) map (_._1)
raquo commented

I will implement this eventually.

@vic's version works just fine in 99% cases but if we were to add a take operator to Airstream it would need to account for stream lifecycle, i.e. it should unsubscribe from parent observable when reaching the limit. Also need to decide whether the take count should reset or not if the stream stops (loses all observers), and also what if anything to do about Signals.