nomisRev/kotlin-kafka

Support interruptible `send` within the Producer

idugalic opened this issue · 4 comments

First of all, thanks for taking this initiative. I like it, a lot! It is an opportunity for idiomatic (async and reactive) Kotlin integration with Kafka (coroutines and Flow included).

I wonder if we can improve this suspending function in the Producer.kt :

public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
  record: ProducerRecord<A, B>,
): RecordMetadata =
  suspendCoroutine { cont ->
    // Those can be a SerializationException when it fails to serialize the message,
    // a BufferExhaustedException or TimeoutException if the buffer is full,
    // or an InterruptException if the sending thread was interrupted.
    send(record) { a, e ->
      // null if an error occurred, see: org.apache.kafka.clients.producer.Callback
      if (a != null) cont.resume(a) else cont.resumeWithException(e)
    }
  }

Do we want to interrupt it (the thread) on the coroutine cancelation? Or, I am exaggerating, and this is not needed?

I can see that you have wrapped the poll method on the Consumer.kt in runIterruptable:

 runInterruptible(dispatcher) {
        poll(timeout.toJavaDuration())
      }

Obviously, this is a blocking function, and there is no (async) callback like in the send method on the Producer side. It was interesting to learn that the exception is of type org.apache.kafka.common.errors.InterruptException :) But, this part is clear. It will work.

I am just trying to understand these two low-level methods at first, and how we translate them into the Kotlin Coroutines (suspension) world.

Hey @idugalic,

Yes, we absolutely can and should! Most of the code that is currently on main will probably disappear, since Consumer.kt will be replaced by the currently open PR.

Which takes care of providing guarantees in terms of committing offsets, etc. (A commitAsync will not do anything unless you also call poll. So for committing offsets in an async in the face of cancellation/interruption, you need to perform addition poll just for the commitAsync to work. That is complexity I thought should be hidden for the users.

Similarly I wanted to add a more elaborate Producer API that implements all of this correctly. Especially linking the Java interruption world to the KotlinX cancellation world, and wiring CoroutineContext is the most efficient way.

There are other methods of send that return a KafkaFuture, perhaps simply wiring Future#cancel to suspend is sufficient to make send interruptible? 🤔

Here are a couple of convenient methods to bridge KotlinX Coroutines JDK8 to KafkaFuture:

For the Producer API I had something like this in mind. Not sure what you think. It's inspired again by reactor-kafka, but it can evolve from there, or based on suggestions start from a point that makes more sense for Kotlin.

interface KafkaSender<K, V> {
  fun send(records: Iterable<ProducerRecord<K, V>): Flow<RecordMetadata>
  fun sendTransactionally(records: Iterable<ProducerRecord<K, V>): Flow<RecordMetadata>
  fun transactionManager(): TransactionManager
  suspend fun <A> withProducer(action: suspend KafkaProducer<K, V>.(KafkaProducer<K, V>) -> A): A
}

I like it


Utilizing KotlinX Coroutines JDK8 to bridge the Futures to the coroutine suspensions looks like a good idea.
I am not super experienced with KotlinX Coroutines JDK8 library, but it looks like await is cancellable. This might be the way going forward, we just need to take into account the NOTE (one-shot futures) below.

NOTE:

This method is intended to be used with one-shot futures, so on coroutine cancellation the CompletableFuture that corresponds to this CompletionStage (see CompletionStage.toCompletableFuture) is cancelled

Producer API (interface KafkaSender<K, V>) looks good to me. I wonder if we can merge two send methods into one?