typelevel/fs2

Call to `Stream#flatTap` (impl from `cats.FlatMap`) completes stream

Closed this issue · 2 comments

After a call to .flatTap method (provided by default cats.FlatMap implementation) stream completes and releases all resources.

I would expect .flatTap behaviour to be consistent with .observe (or .evalTap), which do not complete the stream. Also I would expect it to be consistent with cats.effect.Resource#flatTap which does not releases resource after tapping.

FS2 version: 3.10.2.

Scastie link: https://scastie.scala-lang.org/DtMkmLqbRFKNoar7uy3KfA

Sample scala-cli script below:

//> using scala "3.3.0"

//> using lib "org.typelevel::cats-effect::3.5.4"
//> using lib "co.fs2::fs2-core::3.10.2"

import cats.*
import cats.syntax.all.*
import cats.effect.*
import fs2.concurrent.*
import fs2.*

import scala.concurrent.duration.*

object StreamFlatTapTest extends IOApp.Simple:
  val run =
    val res = Resource.make(IO.ref(true))(ref => ref.set(false))

    IO.deferred[Ref[IO, Boolean]]
      .flatMap: dr =>
        Stream
          .resource(res)
          .flatTap: ref =>
            Stream.exec(dr.complete(ref) *> ref.get.flatMap(b => IO.println(s"Stream#flatTap allocated: $b")))
          .onFinalizeCaseWeak: ec =>
            dr.get.flatMap(_.get).flatMap(ref => IO.println(s"Stream#flatTap finalize: $ec, allocated: $ref"))
          .flatMap: ref => 
            Stream.exec(ref.get.flatMap(b => IO.println(s"Stream#flatMap allocated: $b")))
          .onFinalizeCaseWeak: ec =>
            dr.get.flatMap(_.get).flatMap(ref => IO.println(s"Stream#flatMap finalize: $ec, allocated: $ref"))
          .compile
          .drain
    *>
    IO.deferred[Ref[IO, Boolean]]
      .flatMap: dr =>
        res
          .flatTap: ref =>
            Resource.eval(dr.complete(ref) *> ref.get.flatMap(b => IO.println(s"Resource#flatTap allocated: $b")))
          .flatMap: ref => 
            Resource.eval(ref.get.flatMap(b => IO.println(s"Resource#flatMap allocated: $b")))
          .use_
          .guaranteeCase: out =>
            dr.get.flatMap(_.get).flatMap(ref => IO.println(s"Resource guarantee: $out, allocated: $ref"))
  end run

end StreamFlatTapTest

Output:

$> scala-cli run StreamFlatTap.scala
Compiling project (Scala 3.3.0, JVM)
Compiled project (Scala 3.3.0, JVM)
Stream#flatTap allocated: true
Stream#flatTap finalize: Succeeded, allocated: false
Stream#flatMap finalize: Succeeded, allocated: false
Resource#flatTap allocated: true
Resource#flatMap allocated: true
Resource guarantee: Succeeded(IO(())), allocated: false

The same flatTap behavior occurs with any list-like monad. For example:

scala> List(1, 2, 3).flatTap(_ => Nil)
val res0: List[Int] = List()

The same flatTap behavior occurs with any list-like monad.

Bummer! 8-0 But understandable, if flatTap(fa, f) <-> flatMap(fa, a => as(f(a), a)).

Also it's understandable why Resource#flatTap behaves differently -- it's closure inner type is Unit with value, and not Nothing. If I change Stream.exec to Stream.eval, Stream#flatTap behaves the same.

Thanks for the answer, I'll close this issue then.