JDBC connector task failed
ankit45621 opened this issue · 5 comments
Hi,
I am using following jdbc connector configuration:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d '{"name": "oms-sink4",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/central_sink",
"connection.user": "test",
"connection.password": "mysql",
"tasks.max": "1",
"auto.offset.reset":"earliest",
"topics.regex": "data_lake.moglix.(.*)",
"table.name.format": "moglix_${topic}",
"auto.create": "true",
"insert.mode":"upsert",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms": "route,unwrap,updatedAt,retailInvoice,createdAt,invoiceDate",
"transforms.updatedAt.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.updatedAt.target.type": "string",
"transforms.updatedAt.field":"created_at",
"transforms.retailInvoice.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.retailInvoice.target.type": "string",
"transforms.retailInvoice.field":"retail_invoice_date",
"transforms.createdAt.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.createdAt.target.type": "string",
"transforms.createdAt.field":"updated_at",
"transforms.invoiceDate.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.invoiceDate.target.type": "string",
"transforms.invoiceDate.field":"invoice_date",
"transforms.route.replacement": "$3",
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
}'
and getting below error:
{"name":"oms-sink4","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Table 'analytics.moglix_supplier_payment_sheet' doesn't exist\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\nCaused by: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Table 'analytics.moglix_supplier_payment_sheet' doesn't exist\n\n\t... 12 more\n","id":0,"worker_id":"localhost:8083"}],"type":"sink"}
Hi, it is complaining that analytics.moglix_supplier_payment_sheet
table doesn't exist
java.sql.SQLException: java.sql.SQLSyntaxErrorException: Table 'analytics.moglix_supplier_payment_sheet' doesn't exist
But in my configuration i am using different schema central_sink
and it use analytics
schema of mysql
Ok, so you have 2 schemas:
central_sink
(with tablesA
andB
)analytics
(with tablesC
andmoglix_supplier_payment_sheet
)
You are connecting to central_sink schema
, jdbc:mysql://localhost:3306/central_sink
but the exception is saying the analytics.moglix_supplier_payment_sheet
doesn't exist right?
One question, why are you mixing JDBC connector
configuration with Debezium
configuration? For example, the connector class is: "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector"
and unwrapping with "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
.
Hi @ankit45621 , any update about this issue?
Yes issue is resolved thanks for help