Hash$Value not working
Closed this issue · 4 comments
Have configured Debezium SQL connector with the below configuration.
"transforms" : "newstate,FNHash",
"transforms.newstate.type" : "io.debezium.transforms.ExtractNewRecordState",
"transforms.FNHash.type": "io.aiven.kafka.connect.transforms.Hash$Value",
"transforms.FNHash.field.name": "firstName",
"transforms.FNHash.function": "sha256"
and getting an error as below
"tasks":[
{
"id":0,
"state":"FAILED",
"worker_id":"localhost:8083",
"trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:354)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: firstName in value schema can't be missing: SourceRecord{sourcePartition={server=dev}, sourceOffset={commit_lsn=0000002a:00000398:0001, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='dev', kafkaPartition=0, key=Struct{databaseName=test}, keySchema=Schema{io.debezium.connector.sqlserver.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.8.1.Final,connector=sqlserver,name=dev,ts_ms=1652936307753,snapshot=true,db=test,schema=dbo,table=customers,commit_lsn=0000002a:00000398:0001},databaseName=test,schemaName=dbo,tableChanges=[Struct{type=CREATE,id="test"."dbo"."customers",table=Struct{primaryKeyColumnNames=[cust_id],columns=[Struct{name=cust_id,jdbcType=4,typeName=int identity,typeExpression=int identity,length=10,scale=0,position=1,optional=false,autoIncremented=false,generated=false}, Struct{name=firstName,jdbcType=12,typeName=varchar,typeExpression=varchar,length=50,position=2,optional=false,autoIncremented=false,generated=false}, Struct{name=lastName,jdbcType=12,typeName=varchar,typeExpression=varchar,length=50,position=3,optional=false,autoIncremented=false,generated=false}, Struct{name=email,jdbcType=12,typeName=varchar,typeExpression=varchar,length=50,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=phone,jdbcType=12,typeName=varchar,typeExpression=varchar,length=20,position=5,optional=true,autoIncremented=false,generated=false}, Struct{name=cardno,jdbcType=12,typeName=varchar,typeExpression=varchar,length=20,position=6,optional=true,autoIncremented=false,generated=false}]}}]}, valueSchema=Schema{io.debezium.connector.sqlserver.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
at io.aiven.kafka.connect.transforms.Hash.getNewValueForNamedField(Hash.java:113)
at io.aiven.kafka.connect.transforms.Hash.getNewValue(Hash.java:77)
at io.aiven.kafka.connect.transforms.Hash$Value.apply(Hash.java:217)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
}
],
Hi
Please try using transforms.FNHash.skip.missing.or.null=true
Thank you so much for your help. Adding the above parameters fixed my issue.
It is working for single field hashing. How to achieve adding multiple fields in single row like "transforms.PhoneNoHash.type": "io.aiven.kafka.connect.transforms.Hash$Value",
"transforms.PhoneNoHash.field.name": "phone,mobile",
You need to setup two transformations