a few issues with com.redhat.insights.kafka.connect.transforms.Filter
Opened this issue · 0 comments
Hi team,
Below is my connector configuration on a MM2 org.apache.kafka.connect.mirror.MirrorSourceConnector.
"key.converter": " org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "FilterByDomain",
"transforms.FilterByDomain.type": "com.redhat.insights.kafka.connect.transforms.Filter",
when I configured the if
predicate like this,
"transforms.FilterByDomain.if": "!!record['value']"
message payload on the target topic became a base64-encoded string, wondering how to reserve the original JSON format.
Secondly, when I added filter conditions on a specific top-level JSON field,
"transforms.FilterByDomain.if": "!!record['value'] && !!record['value']['name'] && record['value']['name'] == 'signout'"
The connector stopped ingesting messages to the target topic, while the connector tasks show running
status with no errors.
Here is a sample message,
{
"_id": "64e644a3d36fe8004876b144",
"name": "signout",
"params": {
"user_id": "610c74e858a801009428fd45",
"reason": "auto-signout-refresh-session-expire-time"
},
"user_id": "610c74e858a801009428fd45",
"user_uuid": {
"$binary": {
"base64": "pxns/BqMUNC3hCiZhm7Cbw==",
"subType": "03"
}
},
.......
need advices to how to fix the issues.
Many thanks
Xinyu Liu