shotover/shotover-proxy

KafkaSinkCluster

Opened this issue · 0 comments

rukai commented

We need a kafka sink that will allow us to run shotover on a different host from kafka.
For this use case performance is a critical concern.

message rewriting

In order to ensure clients connect to shotover instead of the upstream kafka instances we need to rewrite messages to refer to shotover addresses instead.
But we need a way to tell which shotover instance's are assigned to which kafka instances.

For CassandraSinkCluster we defined each shotover instance as being responsible for a single cassandra rack.
This allows us to nicely assign a shotover node to a subset of the cluster.

However kafka has no such "rack" or "dc" level separation between nodes.
Edit: this turned out to be not true, but the original design is based off this assumption
In order to load balance between multiple shotover nodes we could have the user set a list of known shotover nodes in the topology.yaml and then rewrite kafka addresses by kafka_address = shotover_addresses[hash(kafka_address) % shotover_addresses.len()].
That way we have a consistent shotover address per kafka node in case the client relies on kafka nodes being consistent.

The transform config would look like:

- KafkaSinkCluster:
    first_contact_points:
      - "2.1.1.1:9092"
      - "2.1.1.2:9092"
      - "2.1.1.3:9092"
    shotover_nodes:
      - "1.1.1.1:9092"
      - "1.1.1.2:9092"
      - "1.1.1.3:9092"

The list would include the nodes own address to simplify generating config.

An alternative to this is adding raft support to shotover #1218 but manual config is a good MVP and raft support can come later.

message routing

Some useful background:

We also need to perform routing of messages to the kafka cluster.
To achieve this we must keep track of messages sent by the client to build up a topology of the kafka cluster.

Requirements:

  • A produce message must go to the partition leader
    • we must route on a hash of the key. The hash function doesnt matter we just need to guarantee that a certain key always results in being delivered to the same partition. This is needed to maintain ordering within a single key value.
  • A fetch message must go to the broker that contains the replica/original partition that the client was assigned when joining the group
    • again we need to route on the hash of the key
  • JoinGroup messages must be routed to the "group coordinator broker".

There is quite likely more routing requirements that will be uncovered during development.

Development chunking

  • Initial checkin of transform - implement rewriting addresses from shotover_nodes, no routing is performed which means the transform will only work on a single node kafka cluster.
  • Implement routing for produce messages
  • Implement routing for join group messages
  • Implement routing for fetch messages
  • Redesign to handle multiple shotover nodes in cluster as described in #1228
  • #1659
  • #1661
  • #1664
  • #1665
  • #1526
  • fetch controller when missing (needed for multi shotover setups)
  • handle DescribeCluster request, used by kafka-topics.sh --bootstrap-server 127.0.0.1:9192 --topic foo --describe