raquo/Airstream

Improve/replace combineWith

ajaychandran opened this issue · 10 comments

Consider EventStream.combineWith:

  def combineWith[AA >: A, B](otherEventStream: EventStream[B]): EventStream[(AA, B)] = {
    new CombineEventStream2[AA, B, (AA, B)](
      parent1 = this,
      parent2 = otherEventStream,
      combinator = CombineObservable.guardedCombinator((_, _))
    )
  }

// usage
case class Point(x: Int, y: Int)
val $x: EventStream[Int] = ???
val $y: EventStream[Int] = ???
val $p: EventStream[Point] = $x.combineWith($y).map2(Point)

This combinator can be replaced with the more generalized form:

// simply exposing the additional parameter avoids unnecessary tuple allocation
// also, type parameter AA is redundant
  def zipWith[B, C](that: EventStream[B])(f: (A, B) => C): EventStream[C] =
    new CombineEventStream2(
      parent1 = this,
      parent2 = that,
      combinator = CombineObservable.guardedCombinator(f)
    )

// replacement for combineWith
  def zip[B](that: EventStream[B]): EventStream[(A, B)] =
    zipWith(that)(_ -> _)

// usage
case class Point(x: Int, y: Int)
val $x: EventStream[Int] = ???
val $y: EventStream[Int] = ???
val $p: EventStream[Point] = $x.zipWith(y)(Point)

What do you think?

raquo commented

Nice 👍 . Also reduces the number of observables by half in this case, since map2(Point) is not needed.

I don't want to use zip name because Airstream's combineWith doesn't work like zip does in ReactiveX, it doesn't wait for all of its inputs to produce events before emitting, it's much closer to combineLatest.

I think renaming the original method to combine and naming the new method combineWith would be fine, except for the migration pain. In my own app migration would be trivial. Does anyone have concerns? I'm inclined to implement this.

Alternatively, I thought about overloading combineWith with an extra combinator param in the first argument list, but while it works for case classes, it requires types ascribtions if combinator is an anonymous lambda. And omitting ascribtions would be convenient for example if you wanted to do two subsequent combine-s and get an EventStream[Tuple3[A, B, C]]: then you'd need to provide an anonymous lambda to massage (A, B), C into (A, B, C).

@raquo I think we've briefly discussed a similar thing regarding signals a while ago (https://gitter.im/Laminar_/Lobby?at=5f63c08ef969413294f0cc70).

My question here is: is it different for streams? (and does it mean that we'd get this new zip for streams only, not signals?)

Also, how about having an additional combinator (though this would introduce an additional lib dependency: https://github.com/tulz-app/tuplez/)

  // naming can be better
  def tupleWith[B](that: EventStream[B])(implicit composition: Composition[A, B]): EventStream[composition.Composed] =
    zipWith(that)(TupleComposition.compose)

// libraryDependencies += "app.tulz" %%% "tuplez" % "0.2.0"

which would behave like this:

  val s1: EventStream[String]  = ???
  val s2: EventStream[Int]     = ???
  val s3: EventStream[Boolean] = ???
  val s4: EventStream[MyClass] = ???

  val tupled: EventStream[  (String, Int, Boolean, MyClass)  ] =  
    s1.tupleWith(s2).tupleWith(s3).tupleWith(s4)
// vs
  val zipped EventStream[  (((String, Int), Boolean), MyClass)  ] = 
    s1.zip(s2).zip(s3).zip(s4)

Or, maybe have zip itself behave like this (that's what I've wanted in about 100% cases anyways).

If we don't want to complicate the core — no problem, I'm including this in a lib that I'm preparing to release soon (a bunch of laminar extensions and utilities). :)

Also, I'd love to see this for withCurrentValueOf, as well (and ideally — for signals).

What about some code generation to generate 22 methods ( like a lot of libraries do ) and have:

val s1: EventStream[String]  = ???
val s2: EventStream[Int]     = ???
val s3: EventStream[Boolean] = ???
val s4: EventStream[MyClass] = ???
val combined: EventStream[(String, Int, Boolean, MyClass)] = EventStream.combine(s1, s2, s3, s4)

You could generate a file with a private [laminar] trait containing this methods and the mix it in EventStream.
This should work unless I don't see some problems it might have.

As per the documentation,

In the diamond-combine case described above Airstream avoids a glitch because CombineObservable-s (those created using the combineWith method) do not propagate downstream immediately. Instead, they are put into a pendingObservables queue in the current Transaction (we'll get to those soon).

Doesn't this ensure that the combinator function is called only once (in the "diamond" case)?

What about some code generation to generate 22 methods ( like a lot of libraries do ) and have:

I've been thinking about it, too (tuplez is also 100% generated code). I just haven't yet figured out an efficient way of combining n signals/streams (I also want to get Signal.seq[A](signals: Seq[Signal[A]]): Signal[Seq[A]] done right).

Actually, I ended up implementing this here: #51

raquo commented

Doesn't this ensure that the combinator function is called only once (in the "diamond" case)?

No, the way combineWith is implemented, the combinator function is called once per incoming event, so if you combine two signals and each of them emits in the same transaction, the combinator will be called twice, not once.

This wasn't an issue before, because combineWith didn't expose the ability to provide a custom combinator. If we do expose it, it would be nicer if combinator didn't need to be pure, but that's not super critical in this particular case, and I'm not sure if the reimplementation effort would be worth it.

raquo commented

I feel like this is shaping up nicely (pardon namings and typos, rough ideas here). Using Signals in examples but all of this should work on both Signals and EventStream-s.

1. combineWith with a project function

class Signal[A] {
  def combineWith[B, C](that: Signal[B])(project: (A, B) => C): Signal[C]
  def combineWith[B, C, D](that1: Signal[B], that2: Signal[C])(project: (A, B, C) => D): Signal[D]
  def combineWith[B, C, D, E](that1: Signal[B], that2: Signal[C], that3: Signal[D])(project: (A, B, C, D) => E): Signal[E]
  def combineWith[B, C, D, E, F](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E])(project: (A, B, C, D, E) => F): Signal[F]
  def combineWith[B, C, D, E, F, G](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E], that5: Signal[F])(project: (A, B, C, D, E, F) => G): Signal[G]
}

signalX.combineWith(signalY)(Point)
signalX.combineWith(signalY, signalZ)(Point3D)

This one is easy. Ideally we would make sure that project is called only once per output C. Although it is not very likely that someone will put side effects there, this is the general expectation with other user-provided callbacks. This can probably be achieved without too much refactoring at the cost of initializing a tuple behind the scenes. I would be ok with this.

For implementing combineWith methods with multiple args, we could use Iurii's generators from #51, they would be more efficient than chaining combineWith(a, b).

2. combine with Combinator evidence

Using the same idea as Composition in https://github.com/tulz-app/tuplez/ but I would like to avoid third party deps if possible (even from awesome third parties) and also in this case it seems like we can avoid code generation without too much pain.

class Signal[A] {
  def combine[B, C](that: Signal[B])(implicit val c: Combinator[A, B, C]): Signal[C] = ???

  def combine[B, C, D](that1: Signal[B], that2: Signal[C])(implicit val c: Combinator[A, (B, C), D]): Signal[D] = ???
  def combine[B, C, D, E](that1: Signal[B], that2: Signal[C], that3: Signal[D])(implicit val c: Combinator[A, (B, C, D), E]): Signal[E] = ???
  def combine[B, C, D, E, F](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E])(implicit val c: Combinator[A, (B, C, D, E), F]): Signal[F] = ???
  def combine[B, C, D, E, F, G](that1: Signal[B], that2: Signal[C], that3: Signal[D], that4: Signal[E], that5: Signal[F])(implicit val c: Combinator[A, (B, C, D, E, F), G]): Signal[G] = ???
}

class Combinator[A, B, C](combine: (A, B) => C)

trait LowPriorityCombinators { // Should only be used if other implicits are not found
  implicit def tuple_1_1_combinator[A, B]: Combinator[A, B, (A, B)] = Combinator((a, b) => (a, b))
  implicit def tuple_1_2_combinator[A, B, C]: Combinator[A, (B, C), (A, B, C)] =Combinator((a, t2) => (a, t2._1, t2._2))
  implicit def tuple_1_3_combinator[A, B, C, D]: Combinator[A, (B, C, D), (A, B, C, D)] = ???
  implicit def tuple_1_4_combinator[A, B, C, D, E]: Combinator[A, (B, C, D, E), (A, B, C, D, E)] = ???
  implicit def tuple_1_5_combinator[A, B, C, D, E, F]: Combinator[A, (B, C, D, E, F), (A, B, C, D, E, F)] = ???
}

object Combinator extends LowPriorityCombinators {

  def apply[A, B, C](combine: (A, B) => C) = new Combinator(combine)

  implicit def tuple_2_1_combinator[A, B, C]: Combinator[(A, B), C, (A, B, C)] = Combinator((t1, c) => (t1._1, t1._2, c))
  implicit def tuple_2_2_combinator[A, B, C, D]: Combinator[(A, B), (C, D), (A, B, C, D)] = Combinator((t1, t2) => (t1._1, t1._2, t2._1, t2._2))
  implicit def tuple_2_3_combinator[A, B, C, D, E]: Combinator[(A, B), (C, D, E), (A, B, C, D, E)] = ???
  implicit def tuple_2_4_combinator[A, B, C, D, E, F]: Combinator[(A, B), (C, D, E, F), (A, B, C, D, E, F)] = ???

  implicit def tuple_3_1_combinator[A, B, C, D]: Combinator[(A, B, C), D, (A, B, C, D)] = ???
  implicit def tuple_3_2_combinator[A, B, C, D, E]: Combinator[(A, B, C), (D, E), (A, B, C, D, E)] = ???
  implicit def tuple_3_3_combinator[A, B, C, D, E, E]: Combinator[(A, B, C), (D, E, F), (A, B, C, D, E, F)] = ???

  implicit def tuple_4_1_combinator[A, B, C, D, E]: Combinator[(A, B, C, D), E, (A, B, C, D, E)] = ???
  implicit def tuple_4_2_combinator[A, B, C, D, E, F]: Combinator[(A, B, C, D), (E, F), (A, B, C, D, E, F)] = ???

  implicit def tuple_5_1_combinator[A, B, C, D, E, F]: Combinator[(A, B, C, D, E), (F), (A, B, C, D, E, F)] = ???
}

signal1.combine(signal2) // Signal[(T1, T2)] (tuple_1_1_combinator)
signal1.combine(signal2).combine(signal3) // Signal[(T1, T2, T3)] (tuple_1_1_combinator + tuple_2_1_combinator)
signal1.combine(signal2).combine(signal3, signal4) // Signal[(T1, T2, T3, T4)] (tuple_1_1_combinator + tuple_2_2_combinator)
signal1.combine(signal2, signal3) // Signal[(T1, T2, T3)] (tuple_1_2_combinator)

I haven't tried to compile this but I think it should work, except might need to move the implicits somewhere else. The implicits above should be enough to combine up to 6 non-tupled signals in arbitrary ways. We could add more, but I don't think we need to go all the way up to 22, seems like overkill. Well maybe we can do that just for tuple_*_1 to allow for long chaining.

For implementing combine with multiple args we can probably use Iurii's code generators from #51

3. Signal.combine(signals)

This is Iurii's SeqSignal from #51

Signal.combine(signals: Seq[Signal[A]]) // Signal[Seq[A]]

4. withCurrentValueOf?

It would be nice to add the same custom combinator functionality to withCurrentValueOf. Same pattern as combine / combineWith will work, except I can't come up with two meaningful names to distinguish these. Ideas?

Does anyone chain several withCurrentValueOf operators in a row? I wonder if we need a version with multiple arguments. Unless people use these heavily, we could implement those by chaining withCurrentValueOf(a, b) instead of using generators, and optimize later if needed.

Note: I already made withCurrentValueOf available on Signals in #48.

5. All the same stuff on EventStream-s

None of this is Signal-specific except the implementation of combine / combineWith / withCurrentValueOf methods.

raquo commented

Closed by #51