typelevel/fs2

Stream hangs on error with broadcastThrough and readOutputStream

Opened this issue · 0 comments

fs2 version: 3.11.0
scala version: 3.3.4

Scastie

The following code snippet throws a java.util.NoSuchElementException: None.get, which means that the stream is hanging.

I expect it to throw java.lang.Exception: boom instead.

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import scala.concurrent.duration.DurationInt

fs2.Stream
  .emit[IO, Byte](1)
  .evalTap(_ => IO.raiseError(new Exception("boom")))
  .broadcastThrough(in =>
    fs2.io.readOutputStream(1) { out =>
      in.evalMap(b => IO.interruptible(out.write(b))).compile.drain
    }
  )
  .compile
  .drain
  .unsafeRunTimed(2.second)
  .get