Interop with Flux and Mono types
bbarker opened this issue · 7 comments
Hi,
I was wondering if interop with the Flux and Mono types from https://projectreactor.io is something that might be considered for this repository - in particular, it would be great to be able to convert a ZStream
to Flux
or Mono
and vice versa. Maybe this is already easy enough with the existing interfaces. Just wanted to check before taking a stab at it, and if it works out, is it OK to submit a PR with this support enabled?
As near as I can tell, this should work out of the box - but one thing that is preventing me from testing it seems to be the fact that .toStream(Int)
is actually a method on Flux
itself, albeit with a different return type:
import zio.interop.reactivestreams.*
I'm not sure if there's a good approach to this, other than perhaps adding an alias to (or renaming) publisherToStream.toStream
to something like publisherToStream.toZioStream
.
I can work around it by using it as a regular class, but it is a bit unwieldy:
We ended up doing the following upcast to work around the naming clash:
import org.reactivestreams.Publisher
import reactor.core.publisher.{ Flux, Mono }
import zio.interop.reactivestreams._
import zio.stream.ZStream
object InteropCheck {
val flux: Flux[Int] = Flux.just(1, 2, 3)
val mono: Mono[Int] = Mono.just(1)
val monoIsAPublisher: Publisher[Int] = mono
val fluxIsAPublisher: Publisher[Int] = flux
val mStream: ZStream[Any, Throwable, Int] = monoIsAPublisher.toStream()
val fStream: ZStream[Any, Throwable, Int] = fluxIsAPublisher.toStream()
}
Yeah, based on @calvinlfer's approach, the above code snippet looks like this, which is a definite improvement:
I guess we could rename the methods to toZIOStream
and toZIOSink
to avoid name clashes in 2.0. Do you want to open a PR against the series/2.x
branch?
If you need this for 1.0, then I propose copying package.scala
to your codebase and namespace, renaming the extension methods, and importing them from the copy.
we can open an MR for 2.x - thanks @runtologist !
v2.0.0 with the renamings proposed here has been released. Thanks again for bringing this up!