zio/interop-reactive-streams

ZStream made from a Queue is stopping prematurely when converting to Publisher

loicdescotte opened this issue · 2 comments

This code is creating an infinite stream :

object TestPubFromQueue extends App {

  import zio.interop.reactivestreams._

  implicit val system = ActorSystem()

  val stream = for {
    q <- Queue.bounded[String](100)
    _ <- q.offer("hello").repeat(Schedule.spaced(1.seconds)).fork
  } yield ZStream.fromQueue(q)

  zio.Runtime.default.unsafeRun(
    putStrLn("start") *> stream.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
      .flatMap(source => ZIO.effect(source.runForeach(println)))
  )
}

When I run it, it stops after the first "hello" :

start
hello
[end]

On the opposite, it works if I don't use a publisher (1) or if I use a publisher but no queue (2).

1 (queue, no publisher) :

object TestStreamFromQueue extends App {

  val stream = for {
    q <- Queue.bounded[String](100)
    _ <- q.offer("hello").repeat(Schedule.spaced(1.seconds)).fork
  } yield ZStream.fromQueue(q)

  zio.Runtime.default.unsafeRun(
    putStrLn("start") *> stream.flatMap(_.foreach(e => putStrLn(e)))
  )
}

2 : publisher, no queue

object TestPub extends App {

  implicit val system = ActorSystem()
  import zio.interop.reactivestreams._

  val stream = ZIO.effect(ZStream.tick(1.second).map(_ => "hello"))
  zio.Runtime.default.unsafeRun(
    putStrLn("start") *> stream.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
      .flatMap(source => ZIO.effect(source.runForeach(println)))
  )

}

Both will print an infinite stream :

start
hello
hello
hello
hello
[...]

The reason is similar to the one in #136:

Akka Streams' Source.runForeach returns a Future. The example forks a fiber, which lives for the lifespan of the parent fiber. Then it does stuff, creates a Future, but usees ZIO.effect instead of ZIO.fromFuture, so the Future never is turned into a ZIO. ZIO.effect succeeds immediately and the life of the parent fiber ends. The reason it prints the first "hello" probably is that you have multiple cores. I guess if you restrict the runtime to a single core, it will stop after "start".

  1. Does use ZIO's forEach, so the issue does not arise. You have a single ZIO here, which runs till the end.

  2. Here I'm not sure why it works: This example also creates a Future. ZStream.tick does not fork, however. I guess this is just scala.App staying alive and running a background task for some reason. I guess if you add a println after the unsafeRun, It will get executed.

Thanks, indeed this version is working :

object TestPubFromQueue extends App {

  import zio.interop.reactivestreams._

  implicit val system = ActorSystem()

  val stream = for {
    q <- Queue.bounded[String](100)
    _ <- q.offer("hello").repeat(Schedule.spaced(1.seconds)).fork
  } yield ZStream.fromQueue(q)

  val publisher = stream.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))

  zio.Runtime.default.unsafeRun(
    putStrLn("start") *>  publisher.flatMap(source => ZIO.fromFuture((_: ExecutionContext) => source.runForeach(println)))
  )
}