/kafka-compoundkey-serde

A library to make dealing with composite string keys on Kafka topics a little less painful

Primary LanguageJavaApache License 2.0Apache-2.0

dwpbank MoveWP3 Kafka Composite Key Serde

Java CI Maven Central

Using Avro-based keys for Kafka messages is tricky: the schema version id is part of the byte sequence that Kafka uses to determine key-based identity. Both the the default partitioner [1] as well as the log compactor only see this byte sequence including tha schema version id [2]. Unfortunately, the schema version id can change for schema changes that would not otherwise affect binary compatibility of the serialized keys, including backwards compatible schema changes. In consequence, records having the same logical avro key but different schema version ids will be distributed across different partitions.

This library is an attempt to ease using string-based keys derived from POJOs that expose a composite key (i.e. key composed of multiple scalars). To promote decoupling serialized keys are to be viewed as opaque strings. Consequently, the deserialization implementation does not provide key decomposition.

[1] DefaultPartitioner.java

[2] AbstractKafkaAvroSerializer.java

Usage

To add this library to your project include the following dependency in your POM:

<dependency>
    <groupId>io.dwpbank.movewp3</groupId>
    <artifactId>kafka-compoundkey-serde</artifactId>
    <version>${movewp3-kafka-compoundkey-serde.version}</version>
</dependency>

Then follow these three easy steps:

(1) Implement the interface CompoundKeyProvider in your record type

private interface SampleCompoundKey extends CompoundKeyProvider { 
  long tenantId();
  long customerId();

  @Override
  default List<Object> compoundKeyAttributes() {
    return List.of(tenantId(), customerId());
  }
}

private class SampleProducerRecord implements SampleCompoundKey {
  private long tenantId, customerId;
  private String customerName;

  public SampleProducerRecord(long tenantId, long customerId, String customerName) {
    this.tenantdId = tenantId; this.customerId = customerId; this.customerName = customerName;
  }

  public long tenantId() { return tenantId; }
  public long customerId() { return customerId; }
}

(2) Use CompoundKeySerde to publish and receive records

public class CompoundKeyDemo {
  public static void main(Object[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", io.dwpbank.movewp3.kafka.compoundkey.CompoundKeySerde.CompoundKeySerializer.class);
    props.put("value.serializer", io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        
    try (Producer<SampleCompoundKey, SampleProducerRecord> producer = new KafkaProducer<>(props)) {
       SampleProducerRecord record = new SampleProducerRecord(4711, 815, "the customer"); 
       producer.send(new ProducerRecord<SampleProducerKey, SampleProducerRecord>("my-topic", record.toCompoundKey(), record));
    }
  }

(3) Enjoy key encoding

The key will be serialized as the opaque length encoded string "4:4711-3:815".

Another implementation example can be found in CompoundKeyProviderTest implementation.

Encoding

Encoding is in UTF-8. The CompoundKey's components are serialized using their toString() method except for BigDecimal using BigDecimal.toPlainString() and null values (explained in the grammar below).

Grammar

SIZE ::= [0-9]{1,9}
COLON ::= ':'
DATA ::= (.*)
NULL ::= 'N'
PAYLOAD_ELEMENT ::= (NULL | SIZE COLON DATA)
payload ::= PAYLOAD_ELEMENT | ('-' PAYLOAD_ELEMENT)

Alternatives

Alternatives are netstrings and its derivative tnetstrings. For our usecase, however, those are too mighty as they allow deserialization which we wanted to explicitly disallow. In addition we support BigDecimal.

Contributing

Pull requests are welcome. In order to make sure that your change can be easily merged, please follow these steps:

  • Develop your changes in a feature branch named feature/...
  • Base your feature branch on main
  • Open your pull request against main
  • Don't forget to implement tests

In case of any questions, feel open an issue in this project to discuss intended changes upfront.