raquo/Airstream

`composeEither`: fixed-scope `flatten` without transcations

Opened this issue · 4 comments

Background

Flattening Airstream observables (using flatMap or flatten or any equivalent means) necessarily results in an observable that establishes a transaction boundary, i.e. it always emits events in a new transaction. Emitting events in new transactions is generally undesirable because it may cause FRP glitches, but, in short, in practice this actually has no practical harm if you only use flatMap / flatten when necessary, i.e. when no other operators (such as combineWith) would suffice – review the docs if any of this is not clear.

However, sometimes, (hopefully rarely, but still) we need flatMap not because the required behaviour is conceptually impossible to express without a general-purpose flatMap method, but because it is merely practically impossible, i.e. because Airstream lacks a specialized operator (that could possibly exist in principle) that would perform the task without creating a new transaction.

This issue identifies a use case that currently requires flattening, and proposes a specialized operator that would perform the same task without creating a new transaction, enabling users to express branched computations without fearing FRP glitches.

Use case: splitEither + flatten

Currently (in the latest 17.x release) we can do this (optional types ascribed for clarity):

val signalOfEither: Signal[Either[L, R]] = ???
def makeLeftSignal(signal: Signal[L]): Signal[Left[L2]] = ???
def makeRightSignal(signal: Signal[R]): Signal[Right[R2]] = ???

val metaSignal: Signal[Signal[Either[L2, R2]]] = signalOfEither.splitEither(
  (initialLeft: L, leftSignal: Signal[L]) => makeLeftSignal(leftSignal),
  (initialRight: R, rightSignal: Signal[R]) => makeRightSignal(rightSignal)
)

val outputSignal: Signal[Either[L2, R2]] = metaSignal.flatten

Remember, the new splitEither operator has the same semantics as the regular split operator, except instead of operating on each item in a collection, it operates on the left branch and right branches of Either. Basically we treat each incoming Either as a list of exactly one item, with .isRight as the key of that item. Hopefully that makes sense.

So, what the code above achieves, is it splits the processing of Either's left and right branches, and then merges them back together. It works, but the problem is that we need this flatten at the end. That flatten is a general purpose operator, and it fires all events in a new transaction.

However, I believe that this flatten here is not necessary, conceptually, because if you look closely at this use case, you can see that we only need to mirror a fixed, static set of signals (makeLeftSignal(leftSignal) and makeRightSignal(rightSignal)). But that is significantly less powerful than what flatten can do. In fact, it is precisely the fact that flatten can mirror an arbitrary set of signals, that is not fully known at signal creation time, that requires it to emit events in a new transaction, so perhaps we can drop that requirement with a careful implementation.

MergeStream equivalence

A signal.flatten operator that only mirrors a fixed set of input signals would be somewhat equivalent to MergeStream, so it would be... MergeSignal I guess. We don't have a general-purpose MergeSignal, because it's not clear how to merge the signals' initial values, but in case of splitEither().flatten, the initial values of the left and right branches are actually mutually exclusive, meaning that, if the parent signal emits a Left, the "flattened" output only needs to mirror the value of the left signal, ignoring the value of the right signal, and vice-versa. That works out very well for us to create a sort of a narrowly specialized MergeSignal for this specific use case.

Proposal: composeEither

So, instead of using a combination of splitEither + flatten, I propose to implement a dedicated composeEither operator that would work just like splitEither, except it would include the .flatten functionality internally, and avoid the need to fire events in a new transaction.

I was hoping to sneak this into 17.x release, thinking that it wouldn't be too hard, but actually even though the idea is simple in principle, the implementation needs to bypass a lot of Airstream's protections in order to work, and that means a lot of ugly code, careful analysis, testing, etc. Bottom line, I still think it should be possible, but it's much more work than I can afford to expend right now. And so, here's a brain dump ticket instead.

Long term

I think once we have this composeEither functionality, other similar use cases might pop up. Certainly we need the same for Option, Status, etc. – all the data types that have a fixed number of mutually exclusive branches should probably use the same implementation.

Perhaps the new functionality and the new understanding obtained from implementing this will help us find more opportunities to further narrow the gap between "conceptually impossible" and "practically impossible" when it comes to avoiding the use of flatMap / flatten.

Current status

Note to self – I sketched out some notes in the compose-either branch. Not much code, just some comments for the most part.

I encountered a case in the past that also required a combo of split and flatMap. It basically looks like this:

val inputsSignal: Signal[List[I]] = ???
def veryExpensiveFn(input: I): O = ???
val outputsSignal: Signal[List[O]] = inputsSignal
   .split(???) {(_, _, inputSignal) => inputSignal.map(veryExpensiveFn)}
   .flatMap { (outputSignals: List[Signal[O]]) => new CombineSignalN[???](???) }

Like you said, the code above is practically impossible without flatMap.

IMO, this will pop up more especially if we introduce #103, so a long term support is very welcomed.
I think that we do not need a MergeStream equivalence of Signal, because of the ambiguity in merging the initialValue, but something like splitAndMerge, which split a parent signal into children signals, modify them, and then merge them back together. We'll have more context when merging those children signals than the general signals (they will be mutually exclusive in Sum type cases and composable in Product type cases, etc...).

@HollandDM I'm afraid that the approach I outlined is not scalable to the more general split operator, because split operates on a dynamic collection of items – its size and items are not only unknown upfront, but can also change over time.

A signal of Either[L, R] always has two possible branches – left and right. We can create the signals for both of those branches upfront, and make the output (splitMerge signal) depend on them. Thus, we'll be able to compute the topoRank of the output signal when creating said signal, and this is, roughly speaking, what we need to avoid firing events in new transactions.

In contrast, a signal of List[A] has N possible branches (one for each item), and that N changes over time. When instantiating the output signal (the splitMerge signal), we are unable to compute its topoRank in any meaningful way, because we don't yet know all of the observables that it would end up merging (and thus depending on) – the same problem as we have with the flatten signal.

Airstream uses topoRank to avoid FRP glitches, and with the current architecture, the topoRank does need to be static. Currently, the observable's topoRank needs to be computed when instantiating the observable, but in theory, its evaluation can perhaps be delayed, at least until just before this observable fires its first event. However, that does not seem to be useful.

It would be useful to support dynamic topoRank-s, but it would be incompatible with our de-glitching algorithm. I thought about perhaps attaching topoRank to events, not observables, but so far I couldn't figure out anything. Perhaps I will some day get more insight into this problem, but for now I'm not aware of a better approach to de-glitching.

After some reading, I think Angular's push & pull approach can be modified to be suitable for our system. Maybe one push to determine topoRank-s, and a pull to actually calculate the value? It does make the system 2-times slower thou.

Hm. I could be wrong, but I don't think that Angular's approach requires topoRank-s at all.

It's easy to see how their algorithm solves the basic diamond glitch case, so let's skip that.

I wonder if this kind of algorithm is able to handle the flatMap operator...

The push stage needs to mark as dirty both the changed signal and all signals that directly or indirectly depend on it, notifying them about the need to pull changes. That... we can do, I think. We know which signals depend on each signal, including signal.flatMap(...). Well, sort of. We don't know the insides of flatMap, but we know enough to mark the dependents of flatMap itself as dirty – it may turn out that they didn't need to be marked as such, but that does not seem to be a problem in this approach (aside form performance).

Then, for each signal that the push stage marked as dirty, the pull stage is run. It tells each of the signal's parents to fetch a fresh value, and if their values were marked as dirty in the push stage, they'll need to ask their own parents, etc. Because of that, the actual computation will happen in the opposite order – first the most upstream of the affected signals will recompute its value, then with that, its dependents can compute theirs, and so on.

So... suppose signal.flatMap(ev => innerSignal()) was marked as dirty in the push stage. Now it's the pull stage, and its dependent signal comes knocking, asking for its new value. So we ask signal for its new value, we get ev, and we can create innerSignal(). Suppose we can pull the value from it too, in a similar manner, now that we know where to pull from. Finally, we can provide this value to the dependent of flatMap that asked for it in the first place. So, I guess that works? At first glance, at least.

Now, how does the pull stage decide which observables will be pulled first, and more importantly – does it matter?

At first glance, it seems that order shouldn't matter. When a signal's value is being pulled, it knows which other signals it depends on, it can pull their values, and so on, recursively. After that is done, nothing else should be able to update those values we pulled... right? Well, it seems that way, at least.

There are several questions remaining:

  • Async operators. You can't "pull" a value from a delay(1) operator – the value won't be available until later. In Airstream current pulling (onWillStart), this is not a problem because Airstream signals don't have async operators – but streams do, and if we were to apply this logic universally (which we need to do, if we are to replace topoRank), then we'll need a solution
    to that. This should be pretty straightforward – we'll just need to delay pulling, handle it in a separate "transaction".
  • Loops. You can make loops in observable graphs with flatMap, e.g. you can make two signals that depend on each other. This will need special handling. Possibly similar to the async treatment, where looped events get branched off into their own, separate, propagation, similar to how transactions behave now.
  • Streams emitting multiple events in response to a single input event. As above – it will need similar kind of treatment for subsequent events where their propagation is done separately, I think.
  • As a corollary to the previous bullet point – we'll need to make sure that the push-pull system does not "swallow" events. If we would expect any signal or stream to update e.g. twice under the current rules, this expectation should be maintained, we shouldn't skip the first update to only process the second update.

It does seem that this kind of system requires more overhead. Not only does it need two passes, but the passes need to go deeper (e.g. past any filter operators even if they would have resolved to false), and it seems that every push stage would need to build a potentially large array of signals that need to be pull-ed. I think? I dunno, maybe people have come up with some optimizations, I would need to look it up.

I'm not in a rush to look more into this, but it's an interesting avenue to explore eventually. Although, I'm not sure how applicable the findings will be, since Angular-style signals are intended for a very different, callback-driven usage style, and they don't seem to have a concept of event streams.