zio/interop-reactive-streams

Stream created from reactive publisher after completing or failing doesn't cancel subscription.

Zuchos opened this issue · 12 comments

    val publisher: Publisher[Int]  = ??? //(publisher created with akka-streams with fanout = true)
    env.unsafeRunAsync(
      publisher
        .toStream(2)
        .takeWhile(_ < 300)
        .map { e =>
          println(s"ZIO 1: $e")
          e
        }
        .run(zio.stream.Sink.drain)
    )(e => println(s"1 EXITING! $e"))

After running that stream, ZStream will finish consuming from publisher after 300 element, but it won't cancel subscription and in that way will prevent publisher from publishing to other subscribers. I have other subscriptions from that publisher and they also stops receiving new elements when this streams stops.

I made my own decorators on Subscriber, Publisher and Subscription and I verified that subscription.cancel is never called.

Thanks for reporting, @Zuchos. Unfortunately, I can't reproduce the issue you describe. Which version do you use? I have added tests for cancellation in #126. If these tests do not cover what you mean, please comment on the MR.

@runtologist did you try running gist that I pasted? (sorry for thread sleeps). In console println should not reach 500, it should stop around ~316.
Akka 2.6.5
Zio interop 1.0.3.5-RC (Caliban 0.8.0 dependency)
I will try to check lib's test it the mean time.

Which RC? There are 9 of them. ;-)

sorry, RC8.
I'm looking in a test suite and I guess that those PublisherToStreamSpec will always pass
Task(probe.expectCancelling()) since this one is not throwing exceptions or errors. So I guess it should be asserted with testEnv.verifyNoAsyncErrors().
But I'm not familiar with this reactive test kit, so maybe I'm missing the point.

    testM("test that is always passing") {
        assertM((for {
          probe <- makeProbe
          _     <- URIO(probe.expectCancelling())
        } yield ()).run)(
          succeeds(isUnit)
        )
      }

It always passes and apparently it should not.

Interesting. Good catch! I'll look into it.

I can confirm that the existing tests fail when rewritten as you proposed. The PR has been updated with the adjusted tests and a fix.

fixed in v1.0.3.5-RC10

Thanks again @Zuchos for your help in analyzing the issue.

It looks like artifact was only published for Scala 2.12 and it's missing for Scala 2.13.

All cross versions should be published now.