twitter/util

AsyncStream.scanLeft is one element behind

adleong opened this issue · 6 comments

AsyncStream.scanLeft is one element behind when the original AsyncStream is an Embed.

Consider the snippet:

import com.twitter.concurrent.AsyncStream
import com.twitter.io.{Buf, Reader}

val reader = Reader.writable()
val as = AsyncStream.fromReader(reader)
val accum = as.scanLeft(Buf.Empty)(_ concat _)
accum.foreach { case Buf.Utf8(s) => println(s) }

reader.write(Buf.Utf8("one")) // "" is printed
reader.write(Buf.Utf8("two")) // "one" is printed
reader.write(Buf.Utf8("three")) // "onetwo" is printed

Expected behavior

"" should be printed as soon as the scanLeft is called. "one" should be printed when "one" is written. "onetwo" should be printed when "two" is written. "onetwothree" should be printed when "three" is written.

Actual behavior

The resulting AsyncStream is always one element behind, as indicated in the snippet's comments.

Steps to reproduce the behavior

See above snippet.

I was thinking of how to solve this issue, I hit it when creating a pull request for linkerd, and this comes to mind as a replacement for the current scanLeft:

  def scanLeft[B](z: B)(f: (B, A) => B): AsyncStream[B] =
    this match {
      case Embed(fas) => Cons(Future.value(z), () => Embed(fas.map(_.scanLeftEmbed(z)(f))))
      case Empty => FromFuture(Future.value(z))
      case FromFuture(fa) =>
        Cons(Future.value(z), () => FromFuture(fa.map(f(z, _))))
      case Cons(fa, more) =>
        Cons(Future.value(z), () => Embed(fa.map(a => more().scanLeft(f(z, a))(f))))
    }

  private def scanLeftEmbed[B](z: B)(f: (B, A) => B): AsyncStream[B] =
    this match {
      case Embed(fas) => Embed(fas.map(_.scanLeftEmbed(z)(f)))
      case Empty => Empty
      case FromFuture(fa) =>
        FromFuture(fa.map(f(z, _)))
      case Cons(fa, more) =>
        Embed(fa.map(a => more().scanLeft(f(z, a))(f)))
    }

If this makes sense as an approach then I'll create a pull request in the next few days.

I was also thinking about this and wondering if something like this would fix it:

case Embed(fas) => Cons(Future.value(z), () => Embed(fas.map(_.scanLeft(z)(f).drop(1)))

interesting find, thanks @mejran and @adleong.

shouldn't the first value printed by foreach be the initial value passed into scanLeft. scanLeft is documented as:

   * The resulting stream always begins with the initial value `z`,
   * not subject to the fate of the underlying future, i.e.:
   *
   * {{{
   * val never = AsyncStream.fromFuture(Future.never)
   * never.scanLeft(z)(f) == z +:: never // logical equality
   * }}}

given that, i think @mejran's proposed solution looks correct.

I believe adleong's approach is functionally identical to mine. Both approaches output the first value that's passed in for all cases. adleong's takes advantage of that fact and just strips it back out of any nested calls from Embed so that's it not outputed multiple times. Mine never appends it in the first place for nested calls. I suppose there's more overhead in adleong's code since elements are added and then stripped out but given how drop works it should be tiny.

I'll submit a PR this week unless someone else is already doing so, please let me know which solution makes more sense for the project.

@mejran a PR would be great. thanks!

This got fixed by #197
Thanks again for the patch.