zio/interop-reactive-streams

Streaming from publisher stops when using ZIO.app

Closed this issue · 4 comments

I have found a strange behavior when using Stream to Publisher conversion with ZIO.app.
The stream is not consumed in this case.

This example without Publisher conversion is working fine, "hello" is printed continually :

object TestPub extends zio.App {

  implicit val system = ActorSystem()

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
    val stream = ZIO.effect(ZStream.tick(10.second).map(_ => "hello"))
    val tick = ZStream.tick(1.second).map(_ => "waiting...")
    val merged = stream.map(s => tick.merge(s))
    merged.flatMap(_.foreach(e => putStrLn(e)))
      .exitCode
  }

}

This example with ZIO.app is not working, "hello" is never printed :

object TestPubBis extends zio.App {

  implicit val system = ActorSystem()

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
    val stream = ZIO.effect(ZStream.tick(10.second).map(_ => "hello"))
    val tick = ZStream.tick(1.second).map(_ => "waiting...")
    val merged = stream.map(s => tick.merge(s))
    merged.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
      .flatMap(source => ZIO.effect(source.runForeach(println)))
      .exitCode
  }

}

The same example without ZIO.app is working fine :

object TestPubTer extends /*zio.*/App {

  implicit val system = ActorSystem()

  //override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, ExitCode] = {
    val stream = ZIO.effect(ZStream.tick(10.second).map(_ => "hello"))
    val tick = ZStream.tick(1.second).map(_ => "waiting...")
    val merged = stream.map(s => tick.merge(s))
    zio.Runtime.default.unsafeRun(
      merged.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
        .flatMap(source => ZIO.effect(source.runForeach(println)))
    )
  //}
}

Note 1 : I've tried with .fold(_ => ExitCode.failure, _ => ExitCode.success) instead of exitCode, but it does not change the behavior.

Note 2 : I also had an issue with ZStreams made using a ZIO Queue (using ZStream.fromQueue(queue)) , I don't know if it is related or not, but this kind of stream was "frozen" after being converted to a Publisher. I could open an issue for this one too if it helps.

@loicdescotte Thank you for reporting this issue and the detailed examples. Please open a second issue regarding fromQueue.

toPublisher does not immediately start a stream. It creates a Publisher, which will run the stream as soon as a Subscriber subscribes. For this reason, the fiber running the stream is a daemon fiber. It does not run in the scope of the fiber running toPublisher.

Your observation is due to slightly different behaviour of scala.App and zio.App:

scala.App will terminate only if no running threads are left in the default Executor. As the stream has been started by main, it keeps running. zio.App will terminate once the ZIO returned by run finishes. This happens as soon as the publisher has been created. As the stream runs in the background, it does not block run. Simply sleeping for a few seconds will show you that it gets executed.

If you want to block run, you could for example complete a promise after merged has been consumed or simply return ZIO.never.exitCode.

@runtologist thanks for the explanation!
I will try to reproduce the queue issue and open a ticket for it.

Other issue opened for fromQueue : #141