Concatenation of EventStreams
cornerman opened this issue · 9 comments
Can we concatenate multiple EventStreams
?
The use case is that I have two streams that should emit elements after each other:
val e1: EventStream = ???
val e2: EventStream = ???
// all elements in e2 should come only after e1 has finished emitting items
val combined = e1 concat e2
In the readme, I saw that there is no concept of closing Observables. So, I was wondering whether this is possible?
In the absence of built-in observable completion feature you could simulate it in a rather ugly way.
You could write custom logic using an EventBus
that would add e2
as a source when some condition happens (e.g. e1
emits something like Done
on "completion" after emitting Value(v)
). Or I guess if not EventBus
, using a merge
/ sample
/ filter
operators.
Implementing a completion feature, while not a priority for now, is not out of the question. What are the real life use cases for concatenation?
Let's say I have a server-client scenario. In my client I have an eventstream coming in from a remote API. The events are mapped into a state which renders the ui. Furthermore I have some initialization logic in my app, which are also modeled as Events:
val apiEvents: EventStream[ApiEvent] = ???
val localEvents: EventStream[ApiEvent] = EventStream.fromSeq(...)
Then I would like to combine these such that the localEvents
come before the apiEvents
. In this case, a function like apiEvents.startWith(Seq(...))
would even suffice (like monix). But what if localEvents
comes in asynchronously or as a stream?
Another use case I had are observables derived from futures. So a future can be converted into an observable emitting only a single value eventually. Then I wanted to combine these while keeping the order.
So... do you want apiEvents
stream to be started (e.g. some ajax request triggered) when localEvents
completes?
Or does apiEvents
emit regardless of localEvents
, and you just want to filter out events that come before localEvents
completes?
Re: Creating observables from futures and flattening of observables of futures – that will be the next thing I implement in Airstream, regardless of completion / concat concepts.
Here, I was thinking that apiEvents
and localEvents
are both directly started. Then I want to prepend all local events before the api events.
Re: Creating observables from futures and flattening of observables of futures – that will be the next thing I implement in Airstream, regardless of completion / concat concepts.
Nice!
Ok so it sounds like you want it to behave more or less like rxjs concat, which starts the next stream if and when the previous one completes.
Another way to possibly accomplish that is the flatten
operator that we already have in Airstream. You could make a meta stream that emits a stream from which you want to get events, then flatten it.
I'm not sure what do the streams / events actually represent in your case, but I'm hoping the integration with futures that I mentioned will solve that issue.
Another way to possibly accomplish that is the flatten operator that we already have in Airstream. You could make a meta stream that emits a stream from which you want to get events, then flatten it.
Indeed, that could work in this case. I will try it out and let you know. Thank you!
Though, from reading the code, I got the impression that flatten
works like switchMap
in monix. Meaning that each new value in the outer observable will switch the flattened observable to the newly emitted EventStream. But for the example, I then need to take care when I switch from localEvents to apiEvents.
You're correct that it works like switchMap
. There is still no "stream completion" feature in Airstream, so yes, you'll need to manually trigger the switch somehow. Or – again – alternatively you can emit a "sentinel value" event that the consuming stream can interpret as a "stream completed" event. For now there's no pretty solution to this I'm afraid.
For anyone else having similar problems with lacking operators, keep in mind that Airstream is very forgiving of impurity – all our stuff is shared execution, that is, the stream will run exactly once regardless of how many subscribers it has (as long as it's at least one). So it's relatively easy to create custom operators that keep internal state.
Thanks for the detailed response! I think I can work with a sentinel value for now. It should even be possible to make this generic enough to be reusable for my project :)