zio/interop-reactive-streams

Interop with Flux and Mono types

bbarker opened this issue · 7 comments

Hi,

I was wondering if interop with the Flux and Mono types from https://projectreactor.io is something that might be considered for this repository - in particular, it would be great to be able to convert a ZStream to Flux or Mono and vice versa. Maybe this is already easy enough with the existing interfaces. Just wanted to check before taking a stab at it, and if it works out, is it OK to submit a PR with this support enabled?

As near as I can tell, this should work out of the box - but one thing that is preventing me from testing it seems to be the fact that .toStream(Int) is actually a method on Flux itself, albeit with a different return type:

image

So as you can see in the screenshot, I'm not getting back a ZStream, despite having:
import zio.interop.reactivestreams.*

I'm not sure if there's a good approach to this, other than perhaps adding an alias to (or renaming) publisherToStream.toStream to something like publisherToStream.toZioStream.

I can work around it by using it as a regular class, but it is a bit unwieldy:

image

We ended up doing the following upcast to work around the naming clash:

import org.reactivestreams.Publisher
import reactor.core.publisher.{ Flux, Mono }
import zio.interop.reactivestreams._
import zio.stream.ZStream

object InteropCheck {
  val flux: Flux[Int] = Flux.just(1, 2, 3)
  val mono: Mono[Int] = Mono.just(1)

  val monoIsAPublisher: Publisher[Int] = mono
  val fluxIsAPublisher: Publisher[Int] = flux

  val mStream: ZStream[Any, Throwable, Int] = monoIsAPublisher.toStream()
  val fStream: ZStream[Any, Throwable, Int] = fluxIsAPublisher.toStream()
}

Yeah, based on @calvinlfer's approach, the above code snippet looks like this, which is a definite improvement:

image

I guess we could rename the methods to toZIOStream and toZIOSink to avoid name clashes in 2.0. Do you want to open a PR against the series/2.x branch?

If you need this for 1.0, then I propose copying package.scala to your codebase and namespace, renaming the extension methods, and importing them from the copy.

we can open an MR for 2.x - thanks @runtologist !

v2.0.0 with the renamings proposed here has been released. Thanks again for bringing this up!