Library implementing the Claim Check pattern for use with Kafka and Azure Blob Storage
Example usage for the claim-check-interceptors-azure
(Azure SDK v12) backend. See also the
Note about Azure SDK versions.
Add the dependency:
<dependency>
<groupId>se.irori.kafka</groupId>
<artifactId>claim-check-interceptors-azure</artifactId>
<version>1.0.0</version>
</dependency>
Configure your Kafka consumer/producer properties:
// common
config.put(
BaseClaimCheckConfig.Keys.CLAIMCHECK_BACKEND_CLASS_CONFIG,
AzureBlobStorageClaimCheckBackend.class);
config.put(
AzureClaimCheckConfig.Keys.AZURE_STORAGE_ACCOUNT_ENDPOINT_CONFIG,
"https://MY_STORAGE_ACCOUNT_NAME.blob.core.windows.net");
config.put(
AzureClaimCheckConfig.Keys.AZURE_STORAGE_ACCOUNT_SASTOKEN_FROM_CONFIG,
"file:/path/to/textfile/with/sas.token");
// producer (interceptor + wrapping serializer)
// any serializer as the wrapped serializer
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClaimCheckSerializer.class);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ClaimCheckProducerInterceptor.class.getName());
config.put(BaseClaimCheckConfig.Keys.CLAIMCHECK_WRAPPED_VALUE_SERIALIZER_CLASS,
StringSerializer.class);
// consumer (wrapping deserializer)
// any deserializer as the wrapped serializer
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClaimCheckDeserializer.class);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(BaseClaimCheckConfig.Keys.CLAIMCHECK_WRAPPED_VALUE_DESERIALIZER_CLASS,
StringDeserializer.class);
A streaming mode is implemented based on java.io.InputStream
, that can help reduce memory usage, since the payload
does not need to be fully in memory.
Note: you will need to provide an additional header in the Producer (message-claim-check-payload-size
) as a
serialized Long. A convenience method is provided to help set this header:
se.irori.kafka.claimcheck.ClaimCheckStreamingUtils.setPayloadSize(Headers headers, long payloadSize)
# producer
# Need to have KafkaProducer<T, InputStream>
# Need to send header message-claim-check-payload-size as Long
interceptor.classes=se.irori.kafka.claimcheck.ClaimCheckStreamingProducerInterceptor
value.serializer.wrapped.serializer=se.irori.kafka.claimcheck.InputStreamSerializer
# consumer
# Need to have KafkaConsumer<T, InputStream>
value.deserializer=se.irori.kafka.claimcheck.ClaimCheckStreamingDeserializer
See example usage in claim-check-interceptors-azure/.../ProduceConsumeStreamingKafkaAzureIT.java
.
claimcheck.backend.class
The fully qualified name of the backend implementation. E.g. se.irori.kafka.claimcheck.azure.AzureBlobStorageClaimCheckBackend
- Type: class
- Importance: medium
claimcheck.checkin.uncompressed-batch-size.over.bytes
The the byte limit where Kafka record batches above this size are checked in using the Claim Check backend. Note: this applies to the uncompressed message batch size. If you want to optimize for more messages not being checked in when compression is used, you will need to experiment with compression ratios for your specific flow, and then increase this config.
- Type: long
- Default: 1048064
- Importance: medium
interceptor.classes
Set to se.irori.kafka.claimcheck.ClaimCheckProducerInterceptor
for the Producer in a Claim Check enabled flow. If using streaming mode, instead use ClaimCheckStreamingProducerInterceptor
.
- Type: list
- Default: null
- Importance: medium
key.serializer
Standard Kafka key.serializer option. Used for the calculation of message size to determine if it should be checked in.
- Type: class
- Default: null
- Importance: medium
value.deserializer
Set to se.irori.kafka.claimcheck.ClaimCheckDeserializer
for the Consumer in a Claim Check enabled flow. If using streaming mode, instead use ClaimCheckStreamingDeserializer
.
- Type: class
- Default: null
- Importance: medium
value.deserializer.wrapped.deserializer
Set to the normal Kafka Consumer de-serializer that would have been used before enabling Claim Check interceptors on the flow.
- Type: class
- Default: null
- Importance: medium
value.serializer
Set to se.irori.kafka.claimcheck.ClaimCheckSerializer
for the Producer in a Claim Check enabled flow.
- Type: class
- Default: null
- Importance: medium
value.serializer.wrapped.serializer
Set to the normal Kafka Producer serializer that would have been used before enabling Claim Check interceptors on the flow.
- Type: class
- Default: null
- Importance: medium
See additional config reference per backend:
./mvnw clean install
Commit messages should follow the Conventional Commits standard.
We use testcontainers for integration tests against Kafka and an Azure Blob Storage emulator (Azurite). The tests are run with Maven Failsafe.
See Docker pre-requisites for running these tests locally.
See docs.
See docs.
With azurite
Azure API emulator: mvn verify -Pazurite
.
With real Azure storage account, and SAS tokens stored as local files: mvn verify -Pazure
. (see next section to setup).
If using another storage account than the example, you can override the endpoint the command:
mvn verify -Pazure -Dproducer.azure.blob.storage.account.endpoint=https://???.blob.core.windows.net/ \
-Dconsumer.azure.blob.storage.account.endpoint=https://???.blob.core.windows.net/
You will need two files in the project root: my-topic-sas-read.sastoken
and my-topic-sas-write.sastoken
.
az group create -l westeurope -n claimcheckrg
az storage account create \
--resource-group claimcheckrg \
--name claimcheckcitest \
--location westeurope
# get the SA key, to create SAS tokens
az storage account keys list -g claimcheckrg -n claimcheckcitest --query '[0].value'
export AZURE_STORAGE_KEY=...
# for container specific sas tokens the container (topic) needs to be created
az storage container create --name my-topic --account-name claimcheckcitest --resource-group claimcheckrg
# container (topic) restricted write sas, +6 months expiry
# Producer: rcl
# (r) read
# (c) create
# (l) list
az storage container generate-sas \
--account-name claimcheckcitest \
--permissions rcl \
--name my-topic \
--https-only \
--expiry $(date -v +6m +%Y-%m-%d) | tr -d '"' > my-topic-sas-write.sastoken
# container (topic) restricted read sas, +6 months expiry
# consumer: rl
# (r) read
# (l) list
az storage container generate-sas \
--account-name claimcheckcitest \
--permissions rl \
--name my-topic \
--https-only \
--expiry $(date -v +6m +%Y-%m-%d) | tr -d '"' > my-topic-sas-read.sastoken
Note: if you want to use the create container if not exists feature, then you need general storage account write permission, not tied to a specific container, e.g.;
az storage account generate-sas \
--account-name claimcheckcitest \
--permissions rwlac \
--services b \
--resource-types co \
--https-only \
--expiry $(date -v +6m +%Y-%m-%d) | tr -d '"' > general-write.sastoken
The following sets a Storage Account Lifecycle Management policy that will delete blobs after 14 days:
cat << EOF > example-expiry-policy-14-days.json
{
"rules": [
{
"enabled": true,
"name": "expire-claim-check-messages",
"type": "Lifecycle",
"definition": {
"actions": {
"baseBlob": {
"delete": {
"daysAfterModificationGreaterThan": 14
}
}
},
"filters": {
"blobTypes": [
"blockBlob"
]
}
}
}
]
}
EOF
az storage account management-policy create --account-name myaccount --policy @example-expiry-policy-14-days.json --resource-group myresourcegroup