Random cancellation of calls in KafkaAdminClient
vshalts opened this issue · 1 comments
Hi, I found a bug in the implementation of cancelToken
(https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala#L198). It breaks the contract of Async.async
cancellation token. What the current implementation is actually doing is immediately executing cancelation and creating a token to cancel this cancelation. Therefore, cancellation competes with the completion of the future in https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala#L215. It causes the cancellation of requests to the admin client without any reason and random behavior.
Here is example that illustrates the issue and proposed fixed version:
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.effect._
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.internals.KafkaFutureImpl
import scala.concurrent.duration._
object TestFs2 extends IOApp {
implicit final class KafkaFutureSyntax[A](
private val future: KafkaFuture[A]
) extends AnyVal {
def cancelToken[F[_]](implicit F: Async[F]): F[Option[F[Unit]]] =
F.blocking { future.cancel(true); () }.start.map(_.cancel.some)
def cancelTokenFixed[F[_]](implicit F: Async[F]): F[Option[F[Unit]]] =
F.delay(F.blocking { future.cancel(true); () }.start.void.some)
}
override def run(args: List[String]): IO[ExitCode] = {
for {
_ <- IO.println("Testing original fs2 cancelToken")
f = new KafkaFutureImpl[String]()
_ <- f.cancelToken[IO] // create cancel token, but never call it
_ <- IO.sleep(1.seconds)
_ <- IO.println(f.isCancelled.toString) // true?? bug is here, future is canceled without calling cancel token
_ <- IO.println("Testing fixed cancelToken")
f = new KafkaFutureImpl[String]()
token <- f.cancelTokenFixed[IO] // create cancel token, but do not call it yet
_ <- IO.sleep(1.seconds)
_ <- IO.println(f.isCancelled.toString) // false, as expected
_ <- token.getOrElse(IO.unit) // call cancellation token (if we have it)
_ <- IO.sleep(1.seconds)
_ <- IO.println(f.isCancelled.toString) // true, as expected
} yield ExitCode.Success
}
}
Can you please fix it and release the version without this bug. Thanks!
Thanks!