ZStream made from a Queue is stopping prematurely when converting to Publisher
loicdescotte opened this issue · 2 comments
This code is creating an infinite stream :
object TestPubFromQueue extends App {
import zio.interop.reactivestreams._
implicit val system = ActorSystem()
val stream = for {
q <- Queue.bounded[String](100)
_ <- q.offer("hello").repeat(Schedule.spaced(1.seconds)).fork
} yield ZStream.fromQueue(q)
zio.Runtime.default.unsafeRun(
putStrLn("start") *> stream.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
.flatMap(source => ZIO.effect(source.runForeach(println)))
)
}
When I run it, it stops after the first "hello" :
start
hello
[end]
On the opposite, it works if I don't use a publisher (1) or if I use a publisher but no queue (2).
1 (queue, no publisher) :
object TestStreamFromQueue extends App {
val stream = for {
q <- Queue.bounded[String](100)
_ <- q.offer("hello").repeat(Schedule.spaced(1.seconds)).fork
} yield ZStream.fromQueue(q)
zio.Runtime.default.unsafeRun(
putStrLn("start") *> stream.flatMap(_.foreach(e => putStrLn(e)))
)
}
2 : publisher, no queue
object TestPub extends App {
implicit val system = ActorSystem()
import zio.interop.reactivestreams._
val stream = ZIO.effect(ZStream.tick(1.second).map(_ => "hello"))
zio.Runtime.default.unsafeRun(
putStrLn("start") *> stream.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
.flatMap(source => ZIO.effect(source.runForeach(println)))
)
}
Both will print an infinite stream :
start
hello
hello
hello
hello
[...]
The reason is similar to the one in #136:
Akka Streams' Source.runForeach
returns a Future. The example forks a fiber, which lives for the lifespan of the parent fiber. Then it does stuff, creates a Future, but usees ZIO.effect
instead of ZIO.fromFuture
, so the Future never is turned into a ZIO. ZIO.effect
succeeds immediately and the life of the parent fiber ends. The reason it prints the first "hello" probably is that you have multiple cores. I guess if you restrict the runtime to a single core, it will stop after "start".
-
Does use ZIO's
forEach
, so the issue does not arise. You have a single ZIO here, which runs till the end. -
Here I'm not sure why it works: This example also creates a
Future
.ZStream.tick
does not fork, however. I guess this is justscala.App
staying alive and running a background task for some reason. I guess if you add aprintln
after theunsafeRun
, It will get executed.
Thanks, indeed this version is working :
object TestPubFromQueue extends App {
import zio.interop.reactivestreams._
implicit val system = ActorSystem()
val stream = for {
q <- Queue.bounded[String](100)
_ <- q.offer("hello").repeat(Schedule.spaced(1.seconds)).fork
} yield ZStream.fromQueue(q)
val publisher = stream.flatMap(_.toPublisher).map(pub => Source.fromPublisher(pub))
zio.Runtime.default.unsafeRun(
putStrLn("start") *> publisher.flatMap(source => ZIO.fromFuture((_: ExecutionContext) => source.runForeach(println)))
)
}