ivangfr/springboot-kafka-connect-jdbc-streams

JDBC connector task failed

ankit45621 opened this issue · 5 comments

Hi,

I am using following jdbc connector configuration:

curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d '{"name": "oms-sink4",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/central_sink",
"connection.user": "test",
"connection.password": "mysql",
"tasks.max": "1",
"auto.offset.reset":"earliest",
"topics.regex": "data_lake.moglix.(.*)",
"table.name.format": "moglix_${topic}",
"auto.create": "true",
"insert.mode":"upsert",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms": "route,unwrap,updatedAt,retailInvoice,createdAt,invoiceDate",
"transforms.updatedAt.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.updatedAt.target.type": "string",
"transforms.updatedAt.field":"created_at",
"transforms.retailInvoice.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.retailInvoice.target.type": "string",
"transforms.retailInvoice.field":"retail_invoice_date",
"transforms.createdAt.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.createdAt.target.type": "string",
"transforms.createdAt.field":"updated_at",
"transforms.invoiceDate.type":"com.abhi.connect.timestamp.TimeStampConverter$Value",
"transforms.invoiceDate.target.type": "string",
"transforms.invoiceDate.field":"invoice_date",
"transforms.route.replacement": "$3",
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
}'

and getting below error:

{"name":"oms-sink4","connector":{"state":"RUNNING","worker_id":"localhost:8083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Table 'analytics.moglix_supplier_payment_sheet' doesn't exist\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\nCaused by: java.sql.SQLException: java.sql.SQLSyntaxErrorException: Table 'analytics.moglix_supplier_payment_sheet' doesn't exist\n\n\t... 12 more\n","id":0,"worker_id":"localhost:8083"}],"type":"sink"}

Hi, it is complaining that analytics.moglix_supplier_payment_sheet table doesn't exist

java.sql.SQLException: java.sql.SQLSyntaxErrorException: Table 'analytics.moglix_supplier_payment_sheet' doesn't exist

But in my configuration i am using different schema central_sink and it use analytics schema of mysql

Ok, so you have 2 schemas:

  1. central_sink (with tables A and B)
  2. analytics (with tables C and moglix_supplier_payment_sheet)

You are connecting to central_sink schema, jdbc:mysql://localhost:3306/central_sink but the exception is saying the analytics.moglix_supplier_payment_sheet doesn't exist right?

One question, why are you mixing JDBC connector configuration with Debezium configuration? For example, the connector class is: "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector" and unwrapping with "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope".

Hi @ankit45621 , any update about this issue?

Yes issue is resolved thanks for help