Kafka::Client#deliver_message does not refresh metadata of a stale cluster
wamaral opened this issue · 0 comments
When using Kafka::Client#deliver_message
, the producer is not able to recover when the broker who is the leader of the partition goes offline, even after the Kafka cluster assigns a new leader for the partition. This is true even if the method is called with retries enabled - all retries will be exhausted and finally an error will be raised
I noticed the Kafka::Producer
interface does not have this issue: when the leader broker goes offline, subsequent retries are able to fetch the new topology from the cluster, and the producer is able to target the newly assigned leader
Upon inspection, both Kafka::Client#deliver_message
and Kafka::Producer#deliver_messages_with_retries
will mark the cluster as stale whenever they are not able to connect to the broker:
- https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/client.rb#L219
- https://github.com/zendesk/ruby-kafka/blob/master/lib/kafka/producer.rb#L433
After the cluster is marked as stale, and upon subsequent retries, Kafka::Producer#deliver_messages_with_retries
will trigger a metadata refresh on the cluster, as expected:
However, Kafka::Client#deliver_message
does not trigger the metadata refresh:
This means that retries in Kafka::Client#deliver_message
will always try to hit the leader broker that it can find in the current cache in memory, and this does not account for the situation where the broker is no longer available
I have a working fix which consists of merely adding @cluster.refresh_metadata_if_necessary!
just before this line, I will submit it in a PR after I run more extensive tests in our live cluster
- Version of Ruby: 2.7.2
- Version of Kafka: 2.0.1
- Version of ruby-kafka: master branch, unreleased, > 1.3.0, commit 36e6a4b1aac00023e169d490848a78ef2c0c44d8
Steps to reproduce
Not so trivial. You should have a cluster with at least 3 brokers running, so that it's able to run an election when one of the brokers is killed.
Leave this code running, and kill the broker who is the leader of the partition 0:
kafka = Kafka.new(...)
1.upto(1000).to_a.each do |i|
kafka.deliver_message(i.to_s, topic: "test_topic", retries: 5, partition: 0)
sleep 1
end
Expected outcome
deliver_message
retries, fetching the new topology from the cluster, and resume producing messages to the newly elected leader
Actual outcome
deliver_message
retries always trying to produce messages to the broker that was killed, eventually raising an error