mongodb connector crash
Closed this issue · 1 comments
bovy89 commented
source connector:
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name": "mongodb-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": 1,
"mongodb.connection.mode": "replica_set",
"mongodb.connection.string": "mongodb://db-mongo:27017/?replicaSet=rs0",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
"topic.prefix" : "mongodb",
"collection.include.list": "inventory.products,inventory.orders,inventory.customers"
}
}'
sink connector:
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name" :"iceberg-sink-mongodb",
"config": {
"connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
"topics.regex": "mongodb.*",
"iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
"iceberg.warehouse": "s3a://warehouse/iceberg",
"iceberg.uri": "thrift://hive-metastore:9083",
"iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.s3.endpoint": "http://minio:9000",
"iceberg.s3.access-key-id": "admin",
"iceberg.s3.secret-access-key": "password",
"iceberg.s3.path-style-access": true,
"table.auto-create": true,
"upsert.keep-deletes": false,
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
example data:
db.getSiblingDB('inventory').orders.insertMany([
{ _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("4444"), quantity : NumberInt("1"), product_id : NumberLong("4666") }
])
this configuration do not work (error on insert)
2023-08-09 11:16:32,890 ERROR || WorkerSinkTask{id=iceberg-sink-mongodb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.iceberg.exceptions.ValidationException: Table Row identifier field `id` not found in table columns
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:342)
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent.icebergSchema(IcebergChangeEvent.java:64)
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:68)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:64)
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:55)
at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
changing sink as follow:
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name" :"iceberg-sink-mongodb",
"config": {
"connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
"topics.regex": "mongodb.*",
"iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
"iceberg.warehouse": "s3a://warehouse/iceberg",
"iceberg.uri": "thrift://hive-metastore:9083",
"iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.s3.endpoint": "http://minio:9000",
"iceberg.s3.access-key-id": "admin",
"iceberg.s3.secret-access-key": "password",
"iceberg.s3.path-style-access": true,
"table.auto-create": true,
"upsert.keep-deletes": false,
"transforms": "unwrap,renamekeyfield",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.renamekeyfield.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.renamekeyfield.renames": "id:_id"
}
}'
works:
trino:default> select * from mongodb_inventory_orders;
_id | order_date | purchaser_id | quantity | product_id | __deleted | __op | __collection | __source_ts_ms | __db | __source_ts
-------+---------------+--------------+----------+------------+-----------+------+--------------+----------------+-----------+--------------------------------
10004 | 1456012800000 | 4444 | 1 | 4666 | false | c | orders | 1691579754000 | inventory | 2023-08-09 11:15:54.000000 UTC
(1 row)
trino:default> desc mongodb_inventory_orders;
Column | Type | Extra | Comment
----------------+-----------------------------+-------+-----------------------------------------
_id | bigint | |
order_date | bigint | | org.apache.kafka.connect.data.Timestamp
purchaser_id | bigint | |
quantity | integer | |
product_id | bigint | |
__deleted | boolean | |
__op | varchar | |
__collection | varchar | |
__source_ts_ms | bigint | |
__db | varchar | |
__source_ts | timestamp(6) with time zone | |
(11 rows)
but if I try to delete it:
db.getSiblingDB('inventory').orders.deleteOne({"_id": 10004 })
the following error will be triggered:
2023-08-09 11:20:41,471 ERROR || WorkerSinkTask{id=iceberg-sink-mongodb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table Row identifier field `_id` not found in table columns [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.iceberg.exceptions.ValidationException: Table Row identifier field `_id` not found in table columns
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:342)
at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:67)
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:56)
at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-08-09 11:20:41,471 ERROR || WorkerSinkTask{id=iceberg-sink-mongodb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.iceberg.exceptions.ValidationException: Table Row identifier field `_id` not found in table columns
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:342)
at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:67)
at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:56)
at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
... 11 more
any idea?
bovy89 commented
Resolved.
MongoDB >= 6.0 required
Debezium >= 2.4.0 required (not released yet):
MongoDB collections must be created with changeStreamPreAndPostImages: true
:
db.getSiblingDB('inventory').runCommand ( { collMod: "orders", changeStreamPreAndPostImages: { enabled: true } } );
db.getSiblingDB('inventory').runCommand ( { collMod: "products", changeStreamPreAndPostImages: { enabled: true } } );
db.getSiblingDB('inventory').runCommand ( { collMod: "customers", changeStreamPreAndPostImages: { enabled: true } } );
Source connectors must specify capture.mode: change_streams_with_pre_image
or capture.mode: change_streams_update_full_with_pre_image
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name": "mongodb-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": 1,
"mongodb.connection.mode": "replica_set",
"mongodb.connection.string": "mongodb://db-mongo:27017/?replicaSet=rs0",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
"topic.prefix" : "mongodb",
"collection.include.list": "inventory.products,inventory.orders,inventory.customers",
"capture.mode": "change_streams_with_pre_image"
}
}'
Sink connectors must include id rename as follow:
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
"name" :"iceberg-sink-mongodb",
"config": {
"connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
"topics.regex": "mongodb.*",
"iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
"iceberg.warehouse": "s3a://warehouse/iceberg",
"iceberg.uri": "thrift://hive-metastore:9083",
"iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.s3.endpoint": "http://minio:9000",
"iceberg.s3.access-key-id": "admin",
"iceberg.s3.secret-access-key": "password",
"iceberg.s3.path-style-access": true,
"table.auto-create": true,
"allow-field-addition": false,
"upsert.keep-deletes": false,
"transforms": "unwrap,renamekeyfield",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.renamekeyfield.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.renamekeyfield.renames": "id:_id"
}
}'