/kryptonite-for-kafka

Kryptonite for Kafka is a client-side 🔒 field level 🔓 crypto library for Apache Kafka® currently focused on Kafka Connect scenarios. It's an ! UNOFFICIAL ! community project

Primary LanguageJava

Kryptonite: Client-Side 🔒 Field-Level 🔓 Cryptography for Apache Kafka®

Donate

Disclaimer: This is an UNOFFICIAL community project!

Kryptonite is a library to do field-level cryptography for records on their way into and out of Apache Kafka®. Currently, it targets data integration scenarios based on Kafka Connect and brings support for a turn-key ready transformation (SMT) to run encryption / decryption operations on selected fields of records with or without schema. It uses authenticated encryption with associated data (AEAD) and in particular applies AES in GCM mode for probabilistic encryption (default) or SIV mode for uses cases which either require or at least benefit from deterministic encryption.

The preferred and new default way is to configure Kryptonite to use Google's Tink multi-language, cross-platform open-source cryptography library.

Kafka Connect Transformation (SMT)

Kryptonite provides a turn-key ready SMT called CipherField. The simple examples below show how to configure and use the SMT to encrypt and decrypt record fields.

Data Records without Schema

The following fictional data record value without schema - represented in JSON-encoded format - is used to illustrate a simple encrypt/decrypt scenario:

{
  "id": "1234567890",
  "myString": "some foo bla text",
  "myInt": 42,
  "myBoolean": true,
  "mySubDoc1": {"myString":"hello json"},
  "myArray1": ["str_1","str_2","...","str_N"],
  "mySubDoc2": {"k1":9,"k2":8,"k3":7}
}

Encryption of selected fields

Let's assume the fields "myString","myArray1" and "mySubDoc2" of the above data record should get encrypted, the CipherField SMT can be configured like so:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "ENCRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are encrypted using the keyset configured by its id with the cipher_data_key_identifier parameter. The keysets themselves are configured using the parameter cipher_data_keys where the key material itself is specified according to a Tink keyset configuration in JSON format (here is a concrete example). Apparently, the configured key materials have to be treated with utmost secrecy, for leaking any of the keyset materials renders encryption useless. The recommended way of doing this for now is to either

  • indirectly reference keyset materials by externalizing them into a separate properties file (find a few details here)

or

  • to NOT store the keyset materials at the client-side in the first place, but instead resolve keysets at runtime from a cloud KMS such as Azure Key Vault which is supported as well.

In general though, this can be considered a "chicken-and-egg" problem since the confidential settings in order to access a remote KMS also need to be store somewhere somehow.

Since the configuration parameter field_mode is set to OBJECT, complex field types are processed as a whole instead of element-wise, the latter of which can be achieved by choosing ELEMENT mode.

Below is an exemplary JSON-encoded record after the encryption:

{
  "id": "1234567890",
  "myString": "M007MIScg8F0A/cAddWbayvUPObjxuGFxisu5MUckDhBss6fo3gMWSsR4xOLPEfs4toSDDCxa7E=",
  "myInt": 42,
  "myBoolean": true,
  "mySubDoc1": {"myString":"hello json"},
  "myArray1": "UuEKnrv91bLImQvKqXTET7RTP93XeLfNRhzJaXVc6OGA4E+mbvGFs/q6WEFCAFy9wklJE5EPXJ+P85nTBCiVrTkU+TR+kUWB9zNplmOL70sENwwwsWux",
  "mySubDoc2": "fLAnBod5U8eS+LVNEm3vDJ1m32/HM170ASgJLKdPF78qDxcsiWj+zOkvZBsk2g44ZWHiSDy3JrI1btmUQhJc4OTnmqIPB1qAADqKhJztvyfcffOfM+y0ISsNk4+V6k0XHBdaT1tJXqLTsyoQfWmSZsnwpM4WARo5/cQWdAwwsWux"
}

NOTE: Encrypted fields are always represented as Base64-encoded strings which contain both, the ciphertext of the fields' original values and authenticated but unencrypted(!) meta-data. If you want to learn about a few more details look here.

Decryption of selected fields

Provided that the keyset used to encrypt the original data record is made available to a specific sink connector, the CipherField SMT can be configured to decrypt the data as follows:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "DECRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are decrypted using the keyset that was used to encrypt the original data. Apparently, this can work if and only if the keyset is properly configured.

Below is an exemplary JSON-encoded record after the decryption, which is equal to the original record:

{
  "id": "1234567890",
  "myString": "some foo bla text",
  "myInt": 42,
  "myBoolean": true,
  "mySubDoc1": {"myString":"hello json"},
  "myArray1": ["str_1","str_2","...","str_N"],
  "mySubDoc2": {"k1":9,"k2":8,"k3":7}
}

Data Records with Schema

The following example is based on an Avro value record and used to illustrate a simple encrypt/decrypt scenario for data records with schema. The schema could be defined as:

{
    "type": "record", "fields": [
        { "name": "id", "type": "string" },
        { "name": "myString", "type": "string" },
        { "name": "myInt", "type": "int" },
        { "name": "myBoolean", "type": "boolean" },
        { "name": "mySubDoc1", "type": "record",
            "fields": [
                { "name": "myString", "type": "string" }
            ]
        },
        { "name": "myArray1", "type": { "type": "array", "items": "string"}},
        { "name": "mySubDoc2", "type": { "type": "map", "values": "int"}}
    ]
}

The data of one such fictional record - represented by its Struct.toString() output - might look as:

Struct{
  id=1234567890,
  myString=some foo bla text,
  myInt=42,
  myBoolean=true,
  mySubDoc1=Struct{myString=hello json},
  myArray1=[str_1, str_2, ..., str_N],
  mySubDoc2={k1=9, k2=8, k3=7}
}

Encryption of selected fields

Let's assume the fields "myString","myArray1" and "mySubDoc2" of the above data record should get encrypted, the CipherField SMT can be configured as follows:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "ENCRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are encrypted using the keyset configured by its id with the cipher_data_key_identifier parameter. The keysets themselves are configured using the parameter cipher_data_keys where the key material itself is specified according to a Tink keyset configuration in JSON format (here is a concrete example). Apparently, the configured key materials have to be treated with utmost secrecy, for leaking any of the keyset materials renders encryption useless. The recommended way of doing this for now is to either

  • indirectly reference keyset materials by externalizing them into a separate properties file (find a few details here)

or

  • to NOT store the keyset materials at the client-side in the first place, but instead resolve keysets at runtime from a cloud KMS such as Azure Key Vault which is supported as well.

In general though, this can be considered a "chicken-and-egg" problem since the confidential settings in order to access a remote KMS also need to be store somewhere somehow.

Since the configuration parameter field_mode in the configuration above is set to 'OBJECT', complex field types are processed as a whole instead of element-wise, the latter of which can be achieved by choosing ELEMENT mode.

Below is an exemplary Struct.toString() output of the record after the encryption:

Struct{
  id=1234567890,
  myString=MwpKn9k5V4prVVGvAZdm6iOp8GnVUR7zyT+Ljb+bhcrFaGEx9xSNOpbZaJZ4YeBsJAj7DDCxa7E=,
  myInt=42,
  myBoolean=true,
  mySubDoc1=Struct{myString=hello json},
  myArray1=Ujlij/mbI48akEIZ08q363zOfV+OMJ+ZFewZEMBiaCnk7NuZZH+mfw6HGobtRzvxeavRhTL3lKI1jYPz0CYl7PqS7DJOJtJ1ccKDa5FLAgP0BQwwsWux,
  mySubDoc2=fJxvxo1LX1ceg2/Ba4+vq2NlgyJNiWGZhjWh6rkHQzuG+C7I8lNW8ECLxqJkNhuYuMMlZjK51gAZfID4HEWcMPz026HexzurptZdgkM1fqJMTMIryDKVlAicXc8phZ7gELZCepQWE0XKmQg0UBXr924V46x9I9QwaWUAdgwwsWux
}

NOTE 1: Encrypted fields are always represented as Base64-encoded strings which contain both, the ciphertext of the fields' original values and authenticated meta-data (unencrypted!) about the field in question. If you want to learn about a few more details look here.

NOTE 2: Obviously, in order to support this the original schema of the data record is automatically redacted such that any encrypted fields can be stored as strings, even though the original data types for the fields in question were different ones.

Decryption of selected fields

Provided that the keyset used to encrypt the original data record is made available to a specific sink connector, the CipherField SMT can be configured to decrypt the data as follows:

{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "DECRYPT",
  "transforms.cipher.cipher_data_keys": "[{\"identifier\":\"my-demo-secret-key-123\",\"material\":{<TINK_KEYSET_SPEC_JSON_HERE>}}]", //key materials of utmost secrecy!
  "transforms.cipher.field_config": "[{\"name\":\"myString\",\"schema\": {\"type\": \"STRING\"}},{\"name\":\"myArray1\",\"schema\": {\"type\": \"ARRAY\",\"valueSchema\": {\"type\": \"STRING\"}}},{\"name\":\"mySubDoc2\",\"schema\": { \"type\": \"MAP\", \"keySchema\": { \"type\": \"STRING\" }, \"valueSchema\": { \"type\": \"INT32\"}}}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

Take notice of the extended field_config parameter settings. For decryption of schema-aware data, the SMT configuration expects that for each field to decrypt the original schema information is explicitly specified. This allows to redact the encrypted record's schema towards a compatible decrypted record's schema upfront, such that the resulting plaintext field values can be stored in accordance with their original data types.

The result after applying this SMT is a record in which all the fields specified in the field_config parameter are decrypted using the keyset that was used to encrypt the original data. Apparently, this can work if and only if the keyset is properly configured.

Below is the decrypted data - represented by its Struct.toString() output - which is equal to the original record:

Struct{
  id=1234567890,
  myString=some foo bla text,
  myInt=42,
  myBoolean=true,
  mySubDoc1=Struct{myString=hello json},
  myArray1=[str_1, str_2, ..., str_N],
  mySubDoc2={k1=9, k2=8, k3=7}
}

Configuration Parameters

Name Description Type Default Valid Values Importance
cipher_data_key_identifierkeyset identifier to be used as default data encryption keyset for all fields which don't refer to a field-specific keyset identifierstring""
  • non-empty string if
    cipher_mode=ENCRYPT
  • empty string if
    cipher_mode=DECRYPT
high
cipher_data_keysJSON array with data key objects specifying the key identifiers together with key sets for encryption / decryption which are defined in Tink's key specification format. The contained keyset objects are mandatory if
kms_type=NONE
but the array may be left empty in order to resolve keysets from a remote KMS such as Azure Key Vault.
kms_type=AZ_KV_SECRETS
Irrespective of their origin, all keysets ("material" fields) are expected to be valid tink keyset descriptions in JSON format which are used for encryption / decryption purposes.
password
[]
JSON array either empty or holding N data key config objects each of which refers to a tink keyset in JSON format, e.g.
[
    {
        "identifier": "my-demo-secret-key-123",
        "material": {
            "primaryKeyId": 1234567890,
            "key": [
                {
                    "keyData": {
                        "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey",
                        "value": "<BASE64_ENCODED_KEY_HERE>",
                        "keyMaterialType": "SYMMETRIC"
                    },
                    "status": "ENABLED",
                    "keyId": 1234567890,
                    "outputPrefixType": "TINK"
                }
            ]
        }
    }
]
high
cipher_modedefines whether the data should get encrypted or decryptedstring
ENCRYPT
DECRYPT
high
field_configJSON array with field config objects specifying which fields together with their settings should get either encrypted / decrypted (nested field names are expected to be separated by '.' per default, or by a custom
path_delimiter
config
stringJSON array holding at least one valid field config object, e.g.
[
    {
        "name": "my-field-abc"
    },
    {
        "name": "my-nested.field-xyz"
    }
]
high
key_sourcedefines the origin of the keysets which can be defined directly in the config or fetched from a remote KMS (see
kms_type
and
kms_config
)
string
CONFIG
CONFIG
KMS
medium
kms_typedefines if keysets are read from the config directly or resolved from a remote/cloud KMS (e.g. Azure Key Vault).string
NONE
NONE
AZ_KV_SECRETS
medium
kms_configJSON object specifying KMS-specific client authentication settings (currently only supports Azure Key Vault).
kms_type=AZ_KV_SECRETS
string{}JSON object defining the KMS-specific client authentication settings, e.g. for azure key vault access:
{
    "clientId": "...",
    "tenantId": "...",
    "clientSecret": "...",
    "keyVaultUrl": "..."
}
medium
field_modedefines how to process complex field types (maps, lists, structs), either as full objects or element-wisestring
ELEMENT
ELEMENT
OBJECT
medium
cipher_algorithmcipher algorithm used for data encryptionstring
TINK/AES_GCM
JCE/AES_GCM
TINK/AES_GCM
TINK/AES_GCM_SIV
medium
cipher_text_encodingdefines the encoding of the resulting ciphertext bytes (currently only supports BASE64)string
BASE64
BASE64
low
path_delimiterpath delimiter used as field name separator when referring to nested fields in the input recordstring
.
non-empty stringlow

Externalize configuration parameters

The problem with directly specifying configuration parameters which contain sensitive data, such as keyset materials, is that they are exposed via Kafka Connect's REST API. This means for connect clusters that are shared among teams the configured keyset materials would leak, which would be unacceptable. The way to deal with this for now, is to indirectly reference such configuration parameters from external property files.

This approach can be used to configure any kind of sensitive data such keyset materials themselves or KMS-specific client authentication settings, in case the keysets aren't sourced from the config directly but rather retrieved from a cloud KMS such as Azure Key Vault.

Below is a quick example of how such a configuration would look like:

  1. Before you can make use of configuration parameters from external sources you have to customize your Kafka Connect worker configuration by adding the following two settings:
connect.config.providers=file
connect.config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
  1. Then you create the external properties file e.g. classified.properties which contains the keyset materials. This file needs to be available on all your Kafka Connect workers which you want to run Kryptonite on. Let's pretend the file is located at path /secrets/kryptonite/classified.properties on your worker nodes:
cipher_data_keys=[{"identifier":"my-demo-secret-key-123","material":{<TINK_KEYSET_SPEC_JSON_HERE>}}]
  1. Finally, you simply reference this file and the corresponding key of the property therein, from your SMT configuration like so:
{
  //...
  "transforms":"cipher",
  "transforms.cipher.type":"com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
  "transforms.cipher.cipher_mode": "ENCRYPT",
  "transforms.cipher.cipher_data_keys": "${file:/secrets/kryptonite/classified.properties:cipher_data_keys}",
  "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
  "transforms.cipher.field_config": "[{\"name\":\"myString\"},{\"name\":\"myArray1\"},{\"name\":\"mySubDoc2\"}]",
  "transforms.cipher.field_mode": "OBJECT",
  //...
}

In case you want to learn more about configuration parameter externalization there is e.g. this nice blog post from the Debezium team showing how to externalize username and password settings using a docker-compose example.

Tink Keysets

Key material is configured in the cipher_data_keys property of the CipherField SMT which takes an array of JSON objects. The material field in one such JSON object represents a keyset and might look as follows:

{
  "primaryKeyId": 1234567890,
  "key": [
    {
      "keyData": {
        "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey",
        "value": "<BASE64_ENCODED_KEY_HERE>",
        "keyMaterialType": "SYMMETRIC"
      },
      "status": "ENABLED",
      "keyId": 1234567890,
      "outputPrefixType": "TINK"
    }
  ]
}

Note that the JSON snippet above needs to be specified either:

... "material": { "primaryKeyId": 1234567890, "key": [ { "keyData": { "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", "value": "<BASE64_ENCODED_KEY_HERE>", "keyMaterialType": "SYMMETRIC" }, "status": "ENABLED", "keyId": 1234567890, "outputPrefixType": "TINK" } ] } ...

or

  • as single-line escape/quoted JSON string if included directly within a connector's JSON configuration

"... \"material\": { \"primaryKeyId\": 1234567890, \"key\": [ { \"keyData\": { \"typeUrl\": \"type.googleapis.com/google.crypto.tink.AesGcmKey\", \"value\": \"<BASE64_ENCODED_KEY_HERE>\", \"keyMaterialType\": \"SYMMETRIC\" }, \"status\": \"ENABLED\", \"keyId\": 1234567890, \"outputPrefixType\": \"TINK\" } ] } ..."

Cipher algorithm specifics

Kryptonite version 0.2.0+ provides the following cipher algorithms:

  • AEAD using AES in GCM mode for probabilistic encryption based on Tink's implementation
  • DAEAD using AES in SIV mode for deterministic encryption based on Tink's implementation
  • for backwards compatibility to earlier versions of Kryptonite a JCE-based AEAD AES GCM implementation which should be considered deprecated in the context of this project and not used any longer

All three cryptographic primitives offer support for authenticated encryption with associated data (AEAD). This basically means that besides the ciphertext, an encrypted field additionally contains unencrypted but authenticated meta-data. In order to keep the storage overhead per encrypted field relatively low, the implementation currently only incorporates a version identifier for Kryptonite itself together with a short identifier representing the algorithm as well as the keyset identifier which was used to encrypt the field in question. Future versions might benefit from additional meta-data.

By design, every application of AEAD in probabilistic mode on a specific record field results in different ciphertexts for one and the same plaintext. This is in general not only desirable but very important to make attacks harder. However, in the context of Kafka records this has an unfavorable consequence for producing clients e.g. a source connector. Applying Kryptonite using AEAD in probabilistic mode on a source record's key would result in a 'partition mix-up' because records with the same original plaintext key would end up in different topic partitions. In other words, if you plan to use Kryptonite for source record keys make sure to configure it to apply deterministic AEAD i.e. AES in SIV mode. Doing so safely supports the encryption of record keys and keeps topic partitioning and record ordering intact.

Build, installation / deployment

Either you build this project from sources via Maven or you can download a pre-built, self-contained package of the latest version of the Kryptonite Kafka Connect SMT kafka-connect-transform-kryptonite-0.2.0-SNAPSHOT.jar.

In order to deploy it you simply put the jar into a 'plugin path' that is configured to be scanned by your Kafka Connect worker nodes.

After that, configure Kryptonite as transformation for any of your source / sink connectors, sit back and relax! Happy 'binge watching' plenty of ciphertexts ;-)

Donate

If you like this project and want to support its further development and maintenance we are happy about your PayPal donation.

License Information

This project is licensed according to Apache License Version 2.0

Copyright (c) 2021. Hans-Peter Grahsl (grahslhp@gmail.com)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.