This project implements a set of generic Kafka Connect transformations that complement the built-in transformations.
The library can be downloaded from Maven central using com.redhat.insights.kafka:connect-transforms:${version}
coordinates.
Sets either a message key (com.redhat.insights.kafka.connect.transforms.DropIf$Key
) or message value (com.redhat.insights.kafka.connect.transforms.DropIf$Value
) to null
if the given predicate evaluates to true
.
Unlike Drop, this transform does not alter the key/value schema. In cases where the schema does not allow for a key/value to be null the user of this transform is responsible for altering the schema.
This transform evaluates the predicate using an ECMAScript ScriptEngine instance.
The message is made available to the expression under the record
variable.
Use record.key()
, record.value()
and record.headers()
to access key, value and headers, respectively.
If the predicate evaluation fails, or the predicate returns non-boolean result, this is considered an error.
Name | Description | Type | Default | Valid values | Importance |
---|---|---|---|---|---|
if |
The predicate to be evaluated on each message | string | - | Any valid ECMAScript expression that evaluates to a boolean | HIGH |
Assume the following configuration:
"transforms": "dropValue",
"transforms.dropValue.type":"com.redhat.insights.kafka.connect.transforms.DropIf$Value",
"transforms.dropValue.if": "record.value().get('country') == 'CZ'"
Example 1
- Before:
{"country": "CZ"}
- After:
null
Example 2
- Before:
{"country": "SK"}
- After:
{"country": "SK"}
Converts the given original field on a value to a JSON representation. This JSON representation is stored in the given destination field.
Name | Description | Type | Default | Valid values | Importance |
---|---|---|---|---|---|
originalField |
The field that will be serialized into JSON | string | - | Any field name containing serializable data | HIGH |
destinationField |
The field that will contain the JSON data | string | - | Any valid field name | HIGH |
Assume the following configuration:
"transforms.tagsToJson.type": "com.redhat.insights.kafka.connect.transforms.FieldToJson$Value",
"transforms.tagsToJson.originalField": "orig",
"transforms.tagsToJson.destinationField": "dest"
- Before:
{orig=[{key=value}]}
- After:
{orig=[{key=value}], dest="[{"key":"value"}]"}
Drops the entire record if the given predicate evaluates to false
.
This transform evaluates the predicate using an ECMAScript ScriptEngine instance.
The message is made available to the expression under the record
variable.
Use record.key()
, record.value()
and record.headers()
to access key, value and headers, respectively.
If the predicate evaluation fails, or the predicate returns non-boolean result, this is considered an error.
Name | Description | Type | Default | Valid values | Importance |
---|---|---|---|---|---|
if |
The predicate to be evaluated on each message | string | - | Any valid ECMAScript expression that evaluates to a boolean | HIGH |
Assume the following configuration:
"transforms": "filter",
"transforms.filter.type":"com.redhat.insights.kafka.connect.transforms.Filter",
"transforms.filter.if": "record['value']['country'] === 'CZ'"
Example 1
- Before:
SinkRecord{value={country=SK}}
- After:
null
Example 2
- Before:
SinkRecord{value={country=CZ}}
- After:
SinkRecord{value={country=CZ}}
Removes fields specified by the allowlist or denylist.
Name | Description | Type | Default | Valid values | Importance |
---|---|---|---|---|---|
field |
Name of the field to remove keys from | string | - | Any valid field name | HIGH |
allowlist |
If set, keys not in this list will be removed from the field | list | - | Any valid field name | HIGH |
denylist |
If set, keys in this list will be removed from the field | list | - | Any valid field name | HIGH |
Assume the following configuration:
"transforms.filterFields.type": "com.redhat.insights.kafka.connect.transforms.FilterFields$Value",
"transforms.filterFields.field": "data",
"transforms.filterFields.denylist": "key1, key2",
- Before:
{"data":{"key1":"value1", "key2":"value2", "key3":"value3"}}
- After:
{"data":{"key3":"value3"}}
Sets a key (com.redhat.insights.kafka.connect.transforms.InjectSchema$Key
) or value (com.redhat.insights.kafka.connect.transforms.InjectSchema$Value
) schema of a record.
Intended primarily to enrich schemaless messages but can also be used to override an existing schema on the record.
This transform uses JsonConverter
to parse a Connect schema from it's JSON format.
Name | Description | Type | Default | Valid values | Importance |
---|---|---|---|---|---|
schema |
The schema to be used | string | - | Any valid Kafka Connect schema definition in the JSON format | HIGH |
Assume the following configuration:
"transforms": "injectSchema",
"transforms.injectSchema.type":"com.redhat.insights.kafka.connect.transforms.InjectSchema$Value",
"transforms.injectSchema.schema": "{\"type\":\"string\",\"optional\":false}"
- Before:
value: "foo"
,schema: null
- After:
value: "foo"
,schema: Schema{STRING}