fd4s/fs2-kafka

Random cancellation of calls in KafkaAdminClient

vshalts opened this issue · 1 comments

Hi, I found a bug in the implementation of cancelToken (https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala#L198). It breaks the contract of Async.async cancellation token. What the current implementation is actually doing is immediately executing cancelation and creating a token to cancel this cancelation. Therefore, cancellation competes with the completion of the future in https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/core/src/main/scala/fs2/kafka/internal/syntax.scala#L215. It causes the cancellation of requests to the admin client without any reason and random behavior.

Here is example that illustrates the issue and proposed fixed version:

import cats.effect.syntax.all._
import cats.syntax.all._
import cats.effect._
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.internals.KafkaFutureImpl

import scala.concurrent.duration._

object TestFs2 extends IOApp {

  implicit final class KafkaFutureSyntax[A](
      private val future: KafkaFuture[A]
  ) extends AnyVal {

    def cancelToken[F[_]](implicit F: Async[F]): F[Option[F[Unit]]] =
      F.blocking { future.cancel(true); () }.start.map(_.cancel.some)

    def cancelTokenFixed[F[_]](implicit F: Async[F]): F[Option[F[Unit]]] =
      F.delay(F.blocking { future.cancel(true); () }.start.void.some)

  }

  override def run(args: List[String]): IO[ExitCode] = {
    for {
      _ <- IO.println("Testing original fs2 cancelToken")
      f  = new KafkaFutureImpl[String]()
      _ <- f.cancelToken[IO] // create cancel token, but never call it
      _ <- IO.sleep(1.seconds)
      _ <- IO.println(f.isCancelled.toString) // true?? bug is here, future is canceled without calling cancel token

      _     <- IO.println("Testing fixed cancelToken")
      f      = new KafkaFutureImpl[String]()
      token <- f.cancelTokenFixed[IO] // create cancel token, but do not call it yet
      _     <- IO.sleep(1.seconds)
      _     <- IO.println(f.isCancelled.toString) // false, as expected
      _     <- token.getOrElse(IO.unit) // call cancellation token (if we have it)
      _     <- IO.sleep(1.seconds)
      _     <- IO.println(f.isCancelled.toString) // true, as expected

    } yield ExitCode.Success
  }

}

Can you please fix it and release the version without this bug. Thanks!

Thanks!