Aiven-Open/transforms-for-apache-kafka-connect

Hash$Value not working

Closed this issue · 4 comments

Have configured Debezium SQL connector with the below configuration.
"transforms" : "newstate,FNHash",
"transforms.newstate.type" : "io.debezium.transforms.ExtractNewRecordState",
"transforms.FNHash.type": "io.aiven.kafka.connect.transforms.Hash$Value",
"transforms.FNHash.field.name": "firstName",
"transforms.FNHash.function": "sha256"

and getting an error as below
"tasks":[
{
"id":0,
"state":"FAILED",
"worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:354)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: firstName in value schema can't be missing: SourceRecord{sourcePartition={server=dev}, sourceOffset={commit_lsn=0000002a:00000398:0001, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='dev', kafkaPartition=0, key=Struct{databaseName=test}, keySchema=Schema{io.debezium.connector.sqlserver.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.8.1.Final,connector=sqlserver,name=dev,ts_ms=1652936307753,snapshot=true,db=test,schema=dbo,table=customers,commit_lsn=0000002a:00000398:0001},databaseName=test,schemaName=dbo,tableChanges=[Struct{type=CREATE,id="test"."dbo"."customers",table=Struct{primaryKeyColumnNames=[cust_id],columns=[Struct{name=cust_id,jdbcType=4,typeName=int identity,typeExpression=int identity,length=10,scale=0,position=1,optional=false,autoIncremented=false,generated=false}, Struct{name=firstName,jdbcType=12,typeName=varchar,typeExpression=varchar,length=50,position=2,optional=false,autoIncremented=false,generated=false}, Struct{name=lastName,jdbcType=12,typeName=varchar,typeExpression=varchar,length=50,position=3,optional=false,autoIncremented=false,generated=false}, Struct{name=email,jdbcType=12,typeName=varchar,typeExpression=varchar,length=50,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=phone,jdbcType=12,typeName=varchar,typeExpression=varchar,length=20,position=5,optional=true,autoIncremented=false,generated=false}, Struct{name=cardno,jdbcType=12,typeName=varchar,typeExpression=varchar,length=20,position=6,optional=true,autoIncremented=false,generated=false}]}}]}, valueSchema=Schema{io.debezium.connector.sqlserver.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
at io.aiven.kafka.connect.transforms.Hash.getNewValueForNamedField(Hash.java:113)
at io.aiven.kafka.connect.transforms.Hash.getNewValue(Hash.java:77)
at io.aiven.kafka.connect.transforms.Hash$Value.apply(Hash.java:217)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)

  }

],

Hi

Please try using transforms.FNHash.skip.missing.or.null=true

Thank you so much for your help. Adding the above parameters fixed my issue.

It is working for single field hashing. How to achieve adding multiple fields in single row like "transforms.PhoneNoHash.type": "io.aiven.kafka.connect.transforms.Hash$Value",
"transforms.PhoneNoHash.field.name": "phone,mobile",

You need to setup two transformations