Using Avro-based keys for Kafka messages is tricky: the schema version id is part of the byte sequence that Kafka uses to determine key-based identity. Both the the default partitioner [1] as well as the log compactor only see this byte sequence including tha schema version id [2]. Unfortunately, the schema version id can change for schema changes that would not otherwise affect binary compatibility of the serialized keys, including backwards compatible schema changes. In consequence, records having the same logical avro key but different schema version ids will be distributed across different partitions.
This library is an attempt to ease using string-based keys derived from POJOs that expose a composite key (i.e. key composed of multiple scalars). To promote decoupling serialized keys are to be viewed as opaque strings. Consequently, the deserialization implementation does not provide key decomposition.
[2] AbstractKafkaAvroSerializer.java
To add this library to your project include the following dependency in your POM:
<dependency>
<groupId>io.dwpbank.movewp3</groupId>
<artifactId>kafka-compoundkey-serde</artifactId>
<version>${movewp3-kafka-compoundkey-serde.version}</version>
</dependency>
Then follow these three easy steps:
private interface SampleCompoundKey extends CompoundKeyProvider {
long tenantId();
long customerId();
@Override
default List<Object> compoundKeyAttributes() {
return List.of(tenantId(), customerId());
}
}
private class SampleProducerRecord implements SampleCompoundKey {
private long tenantId, customerId;
private String customerName;
public SampleProducerRecord(long tenantId, long customerId, String customerName) {
this.tenantdId = tenantId; this.customerId = customerId; this.customerName = customerName;
}
public long tenantId() { return tenantId; }
public long customerId() { return customerId; }
}
public class CompoundKeyDemo {
public static void main(Object[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", io.dwpbank.movewp3.kafka.compoundkey.CompoundKeySerde.CompoundKeySerializer.class);
props.put("value.serializer", io.confluent.kafka.serializers.KafkaAvroSerializer.class);
try (Producer<SampleCompoundKey, SampleProducerRecord> producer = new KafkaProducer<>(props)) {
SampleProducerRecord record = new SampleProducerRecord(4711, 815, "the customer");
producer.send(new ProducerRecord<SampleProducerKey, SampleProducerRecord>("my-topic", record.toCompoundKey(), record));
}
}
The key will be serialized as the opaque length encoded string "4:4711-3:815"
.
Another implementation example can be found in CompoundKeyProviderTest
implementation.
Encoding is in UTF-8. The CompoundKey
's components are serialized using their toString()
method except for BigDecimal
using BigDecimal.toPlainString()
and null
values (explained in the grammar below).
SIZE ::= [0-9]{1,9}
COLON ::= ':'
DATA ::= (.*)
NULL ::= 'N'
PAYLOAD_ELEMENT ::= (NULL | SIZE COLON DATA)
payload ::= PAYLOAD_ELEMENT | ('-' PAYLOAD_ELEMENT)
Alternatives are netstrings and its derivative tnetstrings. For our usecase, however, those are too mighty as they allow deserialization which we wanted to explicitly disallow. In addition we support BigDecimal
.
Pull requests are welcome. In order to make sure that your change can be easily merged, please follow these steps:
- Develop your changes in a feature branch named
feature/...
- Base your feature branch on
main
- Open your pull request against
main
- Don't forget to implement tests
In case of any questions, feel open an issue in this project to discuss intended changes upfront.