Stream created from reactive publisher after completing or failing doesn't cancel subscription.
Zuchos opened this issue · 12 comments
val publisher: Publisher[Int] = ??? //(publisher created with akka-streams with fanout = true)
env.unsafeRunAsync(
publisher
.toStream(2)
.takeWhile(_ < 300)
.map { e =>
println(s"ZIO 1: $e")
e
}
.run(zio.stream.Sink.drain)
)(e => println(s"1 EXITING! $e"))
After running that stream, ZStream will finish consuming from publisher after 300 element, but it won't cancel subscription and in that way will prevent publisher from publishing to other subscribers. I have other subscriptions from that publisher and they also stops receiving new elements when this streams stops.
I made my own decorators on Subscriber
, Publisher
and Subscription
and I verified that subscription.cancel
is never called.
I created gist to run that case:
https://gist.github.com/Zuchos/be215e25e78066db0cddfafe83d3cddd
@runtologist did you try running gist that I pasted? (sorry for thread sleeps). In console println
should not reach 500, it should stop around ~316.
Akka 2.6.5
Zio interop 1.0.3.5-RC (Caliban 0.8.0 dependency)
I will try to check lib's test it the mean time.
Which RC? There are 9 of them. ;-)
sorry, RC8.
I'm looking in a test suite and I guess that those PublisherToStreamSpec
will always pass
Task(probe.expectCancelling())
since this one is not throwing exceptions or errors. So I guess it should be asserted with testEnv.verifyNoAsyncErrors()
.
But I'm not familiar with this reactive test kit, so maybe I'm missing the point.
testM("test that is always passing") {
assertM((for {
probe <- makeProbe
_ <- URIO(probe.expectCancelling())
} yield ()).run)(
succeeds(isUnit)
)
}
It always passes and apparently it should not.
Interesting. Good catch! I'll look into it.
I can confirm that the existing tests fail when rewritten as you proposed. The PR has been updated with the adjusted tests and a fix.
fixed in v1.0.3.5-RC10
Thanks again @Zuchos for your help in analyzing the issue.
It looks like artifact was only published for Scala 2.12 and it's missing for Scala 2.13.
All cross versions should be published now.