Improve/replace combineWith
ajaychandran opened this issue · 10 comments
Consider EventStream.combineWith
:
def combineWith[AA >: A, B](otherEventStream: EventStream[B]): EventStream[(AA, B)] = {
new CombineEventStream2[AA, B, (AA, B)](
parent1 = this,
parent2 = otherEventStream,
combinator = CombineObservable.guardedCombinator((_, _))
)
}
// usage
case class Point(x: Int, y: Int)
val $x: EventStream[Int] = ???
val $y: EventStream[Int] = ???
val $p: EventStream[Point] = $x.combineWith($y).map2(Point)
This combinator can be replaced with the more generalized form:
// simply exposing the additional parameter avoids unnecessary tuple allocation
// also, type parameter AA is redundant
def zipWith[B, C](that: EventStream[B])(f: (A, B) => C): EventStream[C] =
new CombineEventStream2(
parent1 = this,
parent2 = that,
combinator = CombineObservable.guardedCombinator(f)
)
// replacement for combineWith
def zip[B](that: EventStream[B]): EventStream[(A, B)] =
zipWith(that)(_ -> _)
// usage
case class Point(x: Int, y: Int)
val $x: EventStream[Int] = ???
val $y: EventStream[Int] = ???
val $p: EventStream[Point] = $x.zipWith(y)(Point)
What do you think?
Nice 👍 . Also reduces the number of observables by half in this case, since map2(Point)
is not needed.
I don't want to use zip
name because Airstream's combineWith
doesn't work like zip
does in ReactiveX, it doesn't wait for all of its inputs to produce events before emitting, it's much closer to combineLatest
.
I think renaming the original method to combine
and naming the new method combineWith
would be fine, except for the migration pain. In my own app migration would be trivial. Does anyone have concerns? I'm inclined to implement this.
Alternatively, I thought about overloading combineWith
with an extra combinator param in the first argument list, but while it works for case classes, it requires types ascribtions if combinator is an anonymous lambda. And omitting ascribtions would be convenient for example if you wanted to do two subsequent combine-s and get an EventStream[Tuple3[A, B, C]]
: then you'd need to provide an anonymous lambda to massage (A, B), C
into (A, B, C)
.
@raquo I think we've briefly discussed a similar thing regarding signals a while ago (https://gitter.im/Laminar_/Lobby?at=5f63c08ef969413294f0cc70).
My question here is: is it different for streams? (and does it mean that we'd get this new zip
for streams only, not signals?)
Also, how about having an additional combinator (though this would introduce an additional lib dependency: https://github.com/tulz-app/tuplez/)
// naming can be better
def tupleWith[B](that: EventStream[B])(implicit composition: Composition[A, B]): EventStream[composition.Composed] =
zipWith(that)(TupleComposition.compose)
// libraryDependencies += "app.tulz" %%% "tuplez" % "0.2.0"
which would behave like this:
val s1: EventStream[String] = ???
val s2: EventStream[Int] = ???
val s3: EventStream[Boolean] = ???
val s4: EventStream[MyClass] = ???
val tupled: EventStream[ (String, Int, Boolean, MyClass) ] =
s1.tupleWith(s2).tupleWith(s3).tupleWith(s4)
// vs
val zipped EventStream[ (((String, Int), Boolean), MyClass) ] =
s1.zip(s2).zip(s3).zip(s4)
Or, maybe have zip
itself behave like this (that's what I've wanted in about 100% cases anyways).
If we don't want to complicate the core — no problem, I'm including this in a lib that I'm preparing to release soon (a bunch of laminar extensions and utilities). :)
Also, I'd love to see this for withCurrentValueOf
, as well (and ideally — for signals).
What about some code generation to generate 22 methods ( like a lot of libraries do ) and have:
val s1: EventStream[String] = ???
val s2: EventStream[Int] = ???
val s3: EventStream[Boolean] = ???
val s4: EventStream[MyClass] = ???
val combined: EventStream[(String, Int, Boolean, MyClass)] = EventStream.combine(s1, s2, s3, s4)
You could generate a file with a private [laminar] trait
containing this methods and the mix it in EventStream
.
This should work unless I don't see some problems it might have.
As per the documentation,
In the diamond-combine case described above Airstream avoids a glitch because CombineObservable-s (those created using the combineWith method) do not propagate downstream immediately. Instead, they are put into a pendingObservables queue in the current Transaction (we'll get to those soon).
Doesn't this ensure that the combinator function is called only once (in the "diamond" case)?
What about some code generation to generate 22 methods ( like a lot of libraries do ) and have:
I've been thinking about it, too (tuplez is also 100% generated code). I just haven't yet figured out an efficient way of combining n signals/streams (I also want to get Signal.seq[A](signals: Seq[Signal[A]]): Signal[Seq[A]]
done right).
Doesn't this ensure that the combinator function is called only once (in the "diamond" case)?
No, the way combineWith
is implemented, the combinator function is called once per incoming event, so if you combine two signals and each of them emits in the same transaction, the combinator will be called twice, not once.
This wasn't an issue before, because combineWith
didn't expose the ability to provide a custom combinator. If we do expose it, it would be nicer if combinator didn't need to be pure, but that's not super critical in this particular case, and I'm not sure if the reimplementation effort would be worth it.
I feel like this is shaping up nicely (pardon namings and typos, rough ideas here). Using Signals in examples but all of this should work on both Signals and EventStream-s.
1. combineWith
with a project
function
class Signal[A] {
def combineWith[B, C](that: Signal[B])(project: (A, B) => C): Signal[C]
def combineWith[B, C, D](that1: Signal[B], that2: Signal[C])(project: (A, B, C) => D): Signal[D]
def combineWith[B, C, D, E](that1: Signal[B], that2: Signal[C], that3: Signal[D])(project: (A, B, C, D) => E): Signal[E]
def combineWith[B, C, D, E, F](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E])(project: (A, B, C, D, E) => F): Signal[F]
def combineWith[B, C, D, E, F, G](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E], that5: Signal[F])(project: (A, B, C, D, E, F) => G): Signal[G]
}
signalX.combineWith(signalY)(Point)
signalX.combineWith(signalY, signalZ)(Point3D)
This one is easy. Ideally we would make sure that project
is called only once per output C. Although it is not very likely that someone will put side effects there, this is the general expectation with other user-provided callbacks. This can probably be achieved without too much refactoring at the cost of initializing a tuple behind the scenes. I would be ok with this.
For implementing combineWith
methods with multiple args, we could use Iurii's generators from #51, they would be more efficient than chaining combineWith(a, b).
2. combine
with Combinator
evidence
Using the same idea as Composition in https://github.com/tulz-app/tuplez/ but I would like to avoid third party deps if possible (even from awesome third parties) and also in this case it seems like we can avoid code generation without too much pain.
class Signal[A] {
def combine[B, C](that: Signal[B])(implicit val c: Combinator[A, B, C]): Signal[C] = ???
def combine[B, C, D](that1: Signal[B], that2: Signal[C])(implicit val c: Combinator[A, (B, C), D]): Signal[D] = ???
def combine[B, C, D, E](that1: Signal[B], that2: Signal[C], that3: Signal[D])(implicit val c: Combinator[A, (B, C, D), E]): Signal[E] = ???
def combine[B, C, D, E, F](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E])(implicit val c: Combinator[A, (B, C, D, E), F]): Signal[F] = ???
def combine[B, C, D, E, F, G](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E], that5: Signal[F])(implicit val c: Combinator[A, (B, C, D, E, F), G]): Signal[G] = ???
}
class Combinator[A, B, C](combine: (A, B) => C)
trait LowPriorityCombinators { // Should only be used if other implicits are not found
implicit def tuple_1_1_combinator[A, B]: Combinator[A, B, (A, B)] = Combinator((a, b) => (a, b))
implicit def tuple_1_2_combinator[A, B, C]: Combinator[A, (B, C), (A, B, C)] =Combinator((a, t2) => (a, t2._1, t2._2))
implicit def tuple_1_3_combinator[A, B, C, D]: Combinator[A, (B, C, D), (A, B, C, D)] = ???
implicit def tuple_1_4_combinator[A, B, C, D, E]: Combinator[A, (B, C, D, E), (A, B, C, D, E)] = ???
implicit def tuple_1_5_combinator[A, B, C, D, E, F]: Combinator[A, (B, C, D, E, F), (A, B, C, D, E, F)] = ???
}
object Combinator extends LowPriorityCombinators {
def apply[A, B, C](combine: (A, B) => C) = new Combinator(combine)
implicit def tuple_2_1_combinator[A, B, C]: Combinator[(A, B), C, (A, B, C)] = Combinator((t1, c) => (t1._1, t1._2, c))
implicit def tuple_2_2_combinator[A, B, C, D]: Combinator[(A, B), (C, D), (A, B, C, D)] = Combinator((t1, t2) => (t1._1, t1._2, t2._1, t2._2))
implicit def tuple_2_3_combinator[A, B, C, D, E]: Combinator[(A, B), (C, D, E), (A, B, C, D, E)] = ???
implicit def tuple_2_4_combinator[A, B, C, D, E, F]: Combinator[(A, B), (C, D, E, F), (A, B, C, D, E, F)] = ???
implicit def tuple_3_1_combinator[A, B, C, D]: Combinator[(A, B, C), D, (A, B, C, D)] = ???
implicit def tuple_3_2_combinator[A, B, C, D, E]: Combinator[(A, B, C), (D, E), (A, B, C, D, E)] = ???
implicit def tuple_3_3_combinator[A, B, C, D, E, E]: Combinator[(A, B, C), (D, E, F), (A, B, C, D, E, F)] = ???
implicit def tuple_4_1_combinator[A, B, C, D, E]: Combinator[(A, B, C, D), E, (A, B, C, D, E)] = ???
implicit def tuple_4_2_combinator[A, B, C, D, E, F]: Combinator[(A, B, C, D), (E, F), (A, B, C, D, E, F)] = ???
implicit def tuple_5_1_combinator[A, B, C, D, E, F]: Combinator[(A, B, C, D, E), (F), (A, B, C, D, E, F)] = ???
}
signal1.combine(signal2) // Signal[(T1, T2)] (tuple_1_1_combinator)
signal1.combine(signal2).combine(signal3) // Signal[(T1, T2, T3)] (tuple_1_1_combinator + tuple_2_1_combinator)
signal1.combine(signal2).combine(signal3, signal4) // Signal[(T1, T2, T3, T4)] (tuple_1_1_combinator + tuple_2_2_combinator)
signal1.combine(signal2, signal3) // Signal[(T1, T2, T3)] (tuple_1_2_combinator)
I haven't tried to compile this but I think it should work, except might need to move the implicits somewhere else. The implicits above should be enough to combine up to 6 non-tupled signals in arbitrary ways. We could add more, but I don't think we need to go all the way up to 22, seems like overkill. Well maybe we can do that just for tuple_*_1 to allow for long chaining.
For implementing combine with multiple args we can probably use Iurii's code generators from #51
3. Signal.combine(signals)
This is Iurii's SeqSignal from #51
Signal.combine(signals: Seq[Signal[A]]) // Signal[Seq[A]]
4. withCurrentValueOf
?
It would be nice to add the same custom combinator functionality to withCurrentValueOf
. Same pattern as combine
/ combineWith
will work, except I can't come up with two meaningful names to distinguish these. Ideas?
Does anyone chain several withCurrentValueOf
operators in a row? I wonder if we need a version with multiple arguments. Unless people use these heavily, we could implement those by chaining withCurrentValueOf(a, b)
instead of using generators, and optimize later if needed.
Note: I already made withCurrentValueOf
available on Signals in #48.
5. All the same stuff on EventStream-s
None of this is Signal-specific except the implementation of combine / combineWith / withCurrentValueOf methods.