AsyncStream.scanLeft is one element behind
adleong opened this issue · 6 comments
AsyncStream.scanLeft is one element behind when the original AsyncStream is an Embed.
Consider the snippet:
import com.twitter.concurrent.AsyncStream
import com.twitter.io.{Buf, Reader}
val reader = Reader.writable()
val as = AsyncStream.fromReader(reader)
val accum = as.scanLeft(Buf.Empty)(_ concat _)
accum.foreach { case Buf.Utf8(s) => println(s) }
reader.write(Buf.Utf8("one")) // "" is printed
reader.write(Buf.Utf8("two")) // "one" is printed
reader.write(Buf.Utf8("three")) // "onetwo" is printed
Expected behavior
"" should be printed as soon as the scanLeft is called. "one" should be printed when "one" is written. "onetwo" should be printed when "two" is written. "onetwothree" should be printed when "three" is written.
Actual behavior
The resulting AsyncStream is always one element behind, as indicated in the snippet's comments.
Steps to reproduce the behavior
See above snippet.
I was thinking of how to solve this issue, I hit it when creating a pull request for linkerd, and this comes to mind as a replacement for the current scanLeft:
def scanLeft[B](z: B)(f: (B, A) => B): AsyncStream[B] =
this match {
case Embed(fas) => Cons(Future.value(z), () => Embed(fas.map(_.scanLeftEmbed(z)(f))))
case Empty => FromFuture(Future.value(z))
case FromFuture(fa) =>
Cons(Future.value(z), () => FromFuture(fa.map(f(z, _))))
case Cons(fa, more) =>
Cons(Future.value(z), () => Embed(fa.map(a => more().scanLeft(f(z, a))(f))))
}
private def scanLeftEmbed[B](z: B)(f: (B, A) => B): AsyncStream[B] =
this match {
case Embed(fas) => Embed(fas.map(_.scanLeftEmbed(z)(f)))
case Empty => Empty
case FromFuture(fa) =>
FromFuture(fa.map(f(z, _)))
case Cons(fa, more) =>
Embed(fa.map(a => more().scanLeft(f(z, a))(f)))
}
If this makes sense as an approach then I'll create a pull request in the next few days.
I was also thinking about this and wondering if something like this would fix it:
case Embed(fas) => Cons(Future.value(z), () => Embed(fas.map(_.scanLeft(z)(f).drop(1)))
interesting find, thanks @mejran and @adleong.
shouldn't the first value printed by foreach
be the initial value passed into scanLeft
. scanLeft
is documented as:
* The resulting stream always begins with the initial value `z`,
* not subject to the fate of the underlying future, i.e.:
*
* {{{
* val never = AsyncStream.fromFuture(Future.never)
* never.scanLeft(z)(f) == z +:: never // logical equality
* }}}
given that, i think @mejran's proposed solution looks correct.
I believe adleong's approach is functionally identical to mine. Both approaches output the first value that's passed in for all cases. adleong's takes advantage of that fact and just strips it back out of any nested calls from Embed so that's it not outputed multiple times. Mine never appends it in the first place for nested calls. I suppose there's more overhead in adleong's code since elements are added and then stripped out but given how drop works it should be tiny.
I'll submit a PR this week unless someone else is already doing so, please let me know which solution makes more sense for the project.
@mejran a PR would be great. thanks!
This got fixed by #197
Thanks again for the patch.