shotover/shotover-proxy

KafkaSinkCluster - rewrite fetch responses to not refer to real brokers

Closed this issue · 0 comments

rukai commented

rewrite FetchResponse::node_endpoints, PartitionData::preferred_read_replica and PartitionData::current_leader to avoid pointing at the real kafka broker.
The logic can go in KafkaSinkCluster::process_responses.
We should be able to just clear these values from the produce response and then call Message::invalidate_cache()

To learn more about these kafka fields you can take a look at: https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client
But the general idea is that they are optional new fields that enable faster rerouting in the case that the client attempted to route to a partition that was recently moved to another broker.
The problem with these fields is that they refer to internal kafka state such as broker IP addresses and broker id's that are incorrect from the clients perspective.
If the client attempted to connect to the IP address they would bypass shotover which would either fail (due to the IP being inaccessible to the client) or bypass shotovers transform logic. Both of which are bad outcomes.
Since these new fields are optional the easiest fix here is to rewrite the fields to be empty so that the optional functionality remains unused.

Normally when working with the kafka protocol I like to refer to https://kafka.apache.org/protocol.html#protocol_messages
However in this case node_endpoints is not included there since its implemented as one of the _tagged_fields.
In this case you are better off looking at the rust protocol crate: https://docs.rs/kafka-protocol/latest/kafka_protocol/messages/fetch_response/struct.FetchResponse.html
Or at the json files from which the java driver and rust protocol crate are generated from:
https://github.com/apache/kafka/blob/a6718dbbdbcf3c71a8a3ab9844107cd994ceec25/clients/src/main/resources/common/message/FetchResponse.json#L108-L115