Call to `Stream#flatTap` (impl from `cats.FlatMap`) completes stream
Closed this issue · 2 comments
After a call to .flatTap
method (provided by default cats.FlatMap
implementation) stream completes and releases all resources.
I would expect .flatTap
behaviour to be consistent with .observe
(or .evalTap
), which do not complete the stream. Also I would expect it to be consistent with cats.effect.Resource#flatTap
which does not releases resource after tapping.
FS2 version: 3.10.2.
Scastie link: https://scastie.scala-lang.org/DtMkmLqbRFKNoar7uy3KfA
Sample scala-cli
script below:
//> using scala "3.3.0"
//> using lib "org.typelevel::cats-effect::3.5.4"
//> using lib "co.fs2::fs2-core::3.10.2"
import cats.*
import cats.syntax.all.*
import cats.effect.*
import fs2.concurrent.*
import fs2.*
import scala.concurrent.duration.*
object StreamFlatTapTest extends IOApp.Simple:
val run =
val res = Resource.make(IO.ref(true))(ref => ref.set(false))
IO.deferred[Ref[IO, Boolean]]
.flatMap: dr =>
Stream
.resource(res)
.flatTap: ref =>
Stream.exec(dr.complete(ref) *> ref.get.flatMap(b => IO.println(s"Stream#flatTap allocated: $b")))
.onFinalizeCaseWeak: ec =>
dr.get.flatMap(_.get).flatMap(ref => IO.println(s"Stream#flatTap finalize: $ec, allocated: $ref"))
.flatMap: ref =>
Stream.exec(ref.get.flatMap(b => IO.println(s"Stream#flatMap allocated: $b")))
.onFinalizeCaseWeak: ec =>
dr.get.flatMap(_.get).flatMap(ref => IO.println(s"Stream#flatMap finalize: $ec, allocated: $ref"))
.compile
.drain
*>
IO.deferred[Ref[IO, Boolean]]
.flatMap: dr =>
res
.flatTap: ref =>
Resource.eval(dr.complete(ref) *> ref.get.flatMap(b => IO.println(s"Resource#flatTap allocated: $b")))
.flatMap: ref =>
Resource.eval(ref.get.flatMap(b => IO.println(s"Resource#flatMap allocated: $b")))
.use_
.guaranteeCase: out =>
dr.get.flatMap(_.get).flatMap(ref => IO.println(s"Resource guarantee: $out, allocated: $ref"))
end run
end StreamFlatTapTest
Output:
$> scala-cli run StreamFlatTap.scala
Compiling project (Scala 3.3.0, JVM)
Compiled project (Scala 3.3.0, JVM)
Stream#flatTap allocated: true
Stream#flatTap finalize: Succeeded, allocated: false
Stream#flatMap finalize: Succeeded, allocated: false
Resource#flatTap allocated: true
Resource#flatMap allocated: true
Resource guarantee: Succeeded(IO(())), allocated: false
The same flatTap
behavior occurs with any list-like monad. For example:
scala> List(1, 2, 3).flatTap(_ => Nil)
val res0: List[Int] = List()
The same
flatTap
behavior occurs with any list-like monad.
Bummer! 8-0 But understandable, if flatTap(fa, f) <-> flatMap(fa, a => as(f(a), a))
.
Also it's understandable why Resource#flatTap
behaves differently -- it's closure inner type is Unit
with value, and not Nothing
. If I change Stream.exec
to Stream.eval
, Stream#flatTap
behaves the same.
Thanks for the answer, I'll close this issue then.