Creating race conditions when converting akka streams to fs2 streams is too easy
Daenyth opened this issue · 6 comments
The problem:
Akka streams have materialized values, but the default .toStream()
or .toSink()
discard them. I've created race conditions by converting sinks with a materialized Future result and not waiting on that future.
Ad-hoc solution:
I created this:
implicit class RichAkkaSink[A, B](val sink: AkkaSink[A, Future[B]])
extends AnyVal {
/** Converts an akka sink with a success-status-indicating Future[B]
* materialized result into an fs2 stream which will fail if the Future fails.
* The stream returned by this will emit the Future's value one time at the end,
* then terminate.
*/
def toSinkWithStatusMat(
implicit ec: ExecutionContext,
m: Materializer
): Pipe[IO, A, B] =
Fs2AkkaCompat.toSinkWithStatusMat(sink)
/** The same as toSinkWithStatusMat, but ignoring the materialized value */
def toSinkWithStatusMat_(
implicit ec: ExecutionContext,
m: Materializer
): Sink[IO, A] =
in => in.through(Fs2AkkaCompat.toSinkWithStatusMat(sink)).void
}
/** Converts an akka sink with a success-status-indicating Future[B]
* materialized result into an fs2 stream which will fail if the Future fails.
* The stream returned by this will emit the Future's value one time at the end,
* then terminate.
*/
def toSinkWithStatusMat[A, B](
akkaSink: AkkaSink[A, Future[B]]
)(
implicit ec: ExecutionContext,
m: Materializer
): Pipe[IO, A, B] = {
val mkPromise = Promise.empty[IO, Either[Throwable, B]]
// `Sink` is just a function of Stream[F, A] => Stream[F, Unit], so we take a stream as input.
in =>
Stream.eval(mkPromise).flatMap { p =>
// Akka streams produce a materialized value as a side effect of being run.
// streamz-converters allows us to have a `Future[Done] => Unit` callback when that materialized value is created.
// This callback tells the akka materialized future to store its result status into the Promise
val captureMaterializedResult: Future[B] => Unit = _.onComplete {
case Failure(ex) => p.complete(Left(ex)).unsafeRunSync
case Success(value) => p.complete(Right(value)).unsafeRunSync
}
// toSink is from streamz-converters; convert an akka sink to fs2 sink with a callback for the materialized values
val fs2Sink: Sink[IO, A] = akkaSink.toSink(captureMaterializedResult)
val fs2Stream: Stream[IO, Unit] = fs2Sink.apply(in)
val materializedResultStream: Stream[IO, B] = Stream.eval {
p.get // Async wait on the promise to be completed; => IO[Either[Throwable, B]]
.rethrow // IO[Either[Throwable, B]] => IO[B]
}
// Run the akka sink for its effects and then run stream containing the effect of getting the Promise results
fs2Stream.drain ++ materializedResultStream
}
}
The problem with my approach is that the user is still required to realize they need the alternate converter method.
A more comprehensive solution:
I'd like to make a breaking API change before we make the 1.x release.
// currently
def toSink(onMaterialization: M => Unit = _ => ())
Proposed:
package streamz.converter {
// For convenience on wildcard imports
val Discard: M => Unit = _ => ()
}
def toSink(onMaterialization: M => Unit)
def toSink()(implicit ev: M =:= Akka.NotUsed)
This guarantees that a caller must either provide an explicit onMaterialization
, for which Discard
will be wildcard imported for convenience, or if they don't want to pass one, that M
must be NotUsed
.
I haven't tested this yet, but I believe the general approach should work.
If you're willing to take this, I'll send a PR implementing it
I like the general idea of exposing the materialized value as FS2 stream instead of providing a callback (which makes composition on FS2 side hard or impossible). But for better symmetry with Akka Sink|Source|Flow plus materialized value what do you think about adding converters like toSinkMat
, toSourceMat
and toPipeMat
returning a tuple containing the converted Sink|Stream|Pipe plus a Stream that emits the single materialized value. For example (in pseudocode):
val (fs2Sink, fs2MatStream) = akkaSink.toSinkMat()
val (fs2Source, fs2MatStream) = akkaSource.toSourceMat()
...
This would allow applications to compose the result tuple elements as they like in FS2. Also, when converting an Akka Source to an FS2 Stream, the Stream element type and the materialized value type usually differ and cannot be combined into a single Stream (unless you use something like Either[A,B]
as element type but this has less symmetry to Akka Streams).
The onMaterialization
parameter of existing converters toSink
, toStream
and toPipe
could then be removed/deprecated. WDYT?
That's not a bad idea - we can have toFoo(implicit ev: Mat =:= NotUsed)
and toFooMat
returning the tuple. That sounds like a nice interface!
Sounds good, would be great to see a PR! I'm not sure if toFoo(implicit ev: Mat =:= NotUsed)
is too restrictive but we can discuss that in the PR later. Thanks for your useful proposal!
I don't think it's too bad - If there's a used Mat
value, I think discarding it should be done explicitly, hence the Discard
constant that I'd put in the package object to be imported with the implicits
I have a WIP proposal here: master...Daenyth:matvalue
I'm playing around a bit to see if I can keep the single method name next, while still having the differing return types based on Mat
I don't think it's too bad - If there's a used
Mat
value, I think discarding it should be done explicitly, hence theDiscard
constant that I'd put in the package object to be imported with the implicits
Understood. If discarding can be done explicitly than it's fine. I thought users are forced to use toFooMat
then. So please ignore my previous concern then.