RedHatInsights/connect-transforms

a few issues with com.redhat.insights.kafka.connect.transforms.Filter

Opened this issue · 0 comments

Hi team,

Below is my connector configuration on a MM2 org.apache.kafka.connect.mirror.MirrorSourceConnector.

    "key.converter": " org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",

    "transforms": "FilterByDomain",
    "transforms.FilterByDomain.type": "com.redhat.insights.kafka.connect.transforms.Filter",

when I configured the if predicate like this,
"transforms.FilterByDomain.if": "!!record['value']"
message payload on the target topic became a base64-encoded string, wondering how to reserve the original JSON format.

Secondly, when I added filter conditions on a specific top-level JSON field,
"transforms.FilterByDomain.if": "!!record['value'] && !!record['value']['name'] && record['value']['name'] == 'signout'"
The connector stopped ingesting messages to the target topic, while the connector tasks show running status with no errors.

Here is a sample message,
{
"_id": "64e644a3d36fe8004876b144",
"name": "signout",
"params": {
"user_id": "610c74e858a801009428fd45",
"reason": "auto-signout-refresh-session-expire-time"
},
"user_id": "610c74e858a801009428fd45",
"user_uuid": {
"$binary": {
"base64": "pxns/BqMUNC3hCiZhm7Cbw==",
"subType": "03"
}
},
.......

need advices to how to fix the issues.

Many thanks

Xinyu Liu