Support interruptible `send` within the Producer
idugalic opened this issue · 4 comments
First of all, thanks for taking this initiative. I like it, a lot! It is an opportunity for idiomatic (async and reactive) Kotlin integration with Kafka (coroutines and Flow
included).
I wonder if we can improve this suspending function in the Producer.kt
:
public suspend fun <A, B> KafkaProducer<A, B>.sendAwait(
record: ProducerRecord<A, B>,
): RecordMetadata =
suspendCoroutine { cont ->
// Those can be a SerializationException when it fails to serialize the message,
// a BufferExhaustedException or TimeoutException if the buffer is full,
// or an InterruptException if the sending thread was interrupted.
send(record) { a, e ->
// null if an error occurred, see: org.apache.kafka.clients.producer.Callback
if (a != null) cont.resume(a) else cont.resumeWithException(e)
}
}
Do we want to interrupt it (the thread) on the coroutine cancelation? Or, I am exaggerating, and this is not needed?
I can see that you have wrapped the poll
method on the Consumer.kt
in runIterruptable
:
runInterruptible(dispatcher) {
poll(timeout.toJavaDuration())
}
Obviously, this is a blocking function, and there is no (async) callback like in the send
method on the Producer side. It was interesting to learn that the exception is of type org.apache.kafka.common.errors.InterruptException
:) But, this part is clear. It will work.
I am just trying to understand these two low-level methods at first, and how we translate them into the Kotlin Coroutines (suspension) world.
Hey @idugalic,
Yes, we absolutely can and should! Most of the code that is currently on main
will probably disappear, since Consumer.kt
will be replaced by the currently open PR.
Which takes care of providing guarantees in terms of committing offsets, etc. (A commitAsync
will not do anything unless you also call poll
. So for committing offsets in an async in the face of cancellation/interruption, you need to perform addition poll
just for the commitAsync
to work. That is complexity I thought should be hidden for the users.
Similarly I wanted to add a more elaborate Producer API that implements all of this correctly. Especially linking the Java interruption world to the KotlinX cancellation world, and wiring CoroutineContext
is the most efficient way.
There are other methods of send
that return a KafkaFuture
, perhaps simply wiring Future#cancel
to suspend
is sufficient to make send
interruptible?
Here are a couple of convenient methods to bridge KotlinX Coroutines JDK8 to KafkaFuture:
For the Producer API I had something like this in mind. Not sure what you think. It's inspired again by reactor-kafka, but it can evolve from there, or based on suggestions start from a point that makes more sense for Kotlin.
interface KafkaSender<K, V> {
fun send(records: Iterable<ProducerRecord<K, V>): Flow<RecordMetadata>
fun sendTransactionally(records: Iterable<ProducerRecord<K, V>): Flow<RecordMetadata>
fun transactionManager(): TransactionManager
suspend fun <A> withProducer(action: suspend KafkaProducer<K, V>.(KafkaProducer<K, V>) -> A): A
}
I like it
Utilizing KotlinX Coroutines JDK8 to bridge the
Future
s to the coroutine suspensions looks like a good idea.I am not super experienced with
KotlinX Coroutines JDK8
library, but it looks like await
is cancellable. This might be the way going forward, we just need to take into account the NOTE (one-shot futures) below.
NOTE:
This method is intended to be used with one-shot futures, so on coroutine cancellation the CompletableFuture that corresponds to this CompletionStage (see CompletionStage.toCompletableFuture) is cancelled
Producer API (interface KafkaSender<K, V>
) looks good to me. I wonder if we can merge two send
methods into one?