getindata/kafka-connect-iceberg-sink

mongodb connector crash

Closed this issue · 1 comments

bovy89 commented

source connector:

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
  "name": "mongodb-connector",
  "config": {
    "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": 1,
    "mongodb.connection.mode": "replica_set",
    "mongodb.connection.string": "mongodb://db-mongo:27017/?replicaSet=rs0",
    "mongodb.user" : "debezium",
    "mongodb.password" : "dbz",
    "topic.prefix" : "mongodb",
    "collection.include.list": "inventory.products,inventory.orders,inventory.customers"
  }
}'

sink connector:

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
  "name" :"iceberg-sink-mongodb",
  "config": {
    "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
    "topics.regex": "mongodb.*",
    "iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
    "iceberg.warehouse": "s3a://warehouse/iceberg",
    "iceberg.uri": "thrift://hive-metastore:9083",
    "iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.s3.endpoint": "http://minio:9000",
    "iceberg.s3.access-key-id": "admin",
    "iceberg.s3.secret-access-key": "password",
    "iceberg.s3.path-style-access": true,
    "table.auto-create": true,
    "upsert.keep-deletes": false,
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
    "transforms.unwrap.drop.tombstones": true,
    "transforms.unwrap.delete.handling.mode": "rewrite"
  }
}'

example data:

db.getSiblingDB('inventory').orders.insertMany([
    { _id : NumberLong("10004"), order_date : new ISODate("2016-02-21T00:00:00Z"), purchaser_id : NumberLong("4444"), quantity : NumberInt("1"), product_id : NumberLong("4666") }
])

this configuration do not work (error on insert)

2023-08-09 11:16:32,890 ERROR  ||  WorkerSinkTask{id=iceberg-sink-mongodb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.iceberg.exceptions.ValidationException: Table Row identifier field `id` not found in table columns
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:342)
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent.icebergSchema(IcebergChangeEvent.java:64)
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:68)
	at java.base/java.util.Optional.orElseGet(Optional.java:369)
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:64)
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:55)
	at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)

changing sink as follow:

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
  "name" :"iceberg-sink-mongodb",
  "config": {
    "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
    "topics.regex": "mongodb.*",
    "iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
    "iceberg.warehouse": "s3a://warehouse/iceberg",
    "iceberg.uri": "thrift://hive-metastore:9083",
    "iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.s3.endpoint": "http://minio:9000",
    "iceberg.s3.access-key-id": "admin",
    "iceberg.s3.secret-access-key": "password",
    "iceberg.s3.path-style-access": true,
    "table.auto-create": true,
    "upsert.keep-deletes": false,
    "transforms": "unwrap,renamekeyfield",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
    "transforms.unwrap.drop.tombstones": true,
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.renamekeyfield.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
    "transforms.renamekeyfield.renames": "id:_id"
  }
}'

works:

trino:default> select * from mongodb_inventory_orders;
  _id  |  order_date   | purchaser_id | quantity | product_id | __deleted | __op | __collection | __source_ts_ms |   __db    |          __source_ts
-------+---------------+--------------+----------+------------+-----------+------+--------------+----------------+-----------+--------------------------------
 10004 | 1456012800000 |         4444 |        1 |       4666 | false     | c    | orders       |  1691579754000 | inventory | 2023-08-09 11:15:54.000000 UTC
(1 row)

trino:default> desc mongodb_inventory_orders;
     Column     |            Type             | Extra |                 Comment
----------------+-----------------------------+-------+-----------------------------------------
 _id            | bigint                      |       |
 order_date     | bigint                      |       | org.apache.kafka.connect.data.Timestamp
 purchaser_id   | bigint                      |       |
 quantity       | integer                     |       |
 product_id     | bigint                      |       |
 __deleted      | boolean                     |       |
 __op           | varchar                     |       |
 __collection   | varchar                     |       |
 __source_ts_ms | bigint                      |       |
 __db           | varchar                     |       |
 __source_ts    | timestamp(6) with time zone |       |
(11 rows)

but if I try to delete it:

db.getSiblingDB('inventory').orders.deleteOne({"_id": 10004 })

the following error will be triggered:

2023-08-09 11:20:41,471 ERROR  ||  WorkerSinkTask{id=iceberg-sink-mongodb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table Row identifier field `_id` not found in table columns   [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.iceberg.exceptions.ValidationException: Table Row identifier field `_id` not found in table columns
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:342)
	at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:67)
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:56)
	at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
2023-08-09 11:20:41,471 ERROR  ||  WorkerSinkTask{id=iceberg-sink-mongodb-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.iceberg.exceptions.ValidationException: Table Row identifier field `_id` not found in table columns
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:342)
	at com.getindata.kafka.connect.iceberg.sink.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:67)
	at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:56)
	at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
	... 11 more

any idea?

bovy89 commented

Resolved.

MongoDB >= 6.0 required

Debezium >= 2.4.0 required (not released yet):

MongoDB collections must be created with changeStreamPreAndPostImages: true:

db.getSiblingDB('inventory').runCommand ( { collMod: "orders", changeStreamPreAndPostImages: { enabled: true } } );
db.getSiblingDB('inventory').runCommand ( { collMod: "products", changeStreamPreAndPostImages: { enabled: true } } );
db.getSiblingDB('inventory').runCommand ( { collMod: "customers", changeStreamPreAndPostImages: { enabled: true } } );

Source connectors must specify capture.mode: change_streams_with_pre_image or capture.mode: change_streams_update_full_with_pre_image

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
  "name": "mongodb-connector",
  "config": {
    "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": 1,
    "mongodb.connection.mode": "replica_set",
    "mongodb.connection.string": "mongodb://db-mongo:27017/?replicaSet=rs0",
    "mongodb.user" : "debezium",
    "mongodb.password" : "dbz",
    "topic.prefix" : "mongodb",
    "collection.include.list": "inventory.products,inventory.orders,inventory.customers",
    "capture.mode": "change_streams_with_pre_image"
  }
}'

Sink connectors must include id rename as follow:

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors --data '
{
  "name" :"iceberg-sink-mongodb",
  "config": {
    "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink",
    "topics.regex": "mongodb.*",
    "iceberg.catalog-impl": "org.apache.iceberg.hive.HiveCatalog",
    "iceberg.warehouse": "s3a://warehouse/iceberg",
    "iceberg.uri": "thrift://hive-metastore:9083",
    "iceberg.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "iceberg.s3.endpoint": "http://minio:9000",
    "iceberg.s3.access-key-id": "admin",
    "iceberg.s3.secret-access-key": "password",
    "iceberg.s3.path-style-access": true,
    "table.auto-create": true,
    "allow-field-addition": false,
    "upsert.keep-deletes": false,
    "transforms": "unwrap,renamekeyfield",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.add.fields": "op,collection,source.ts_ms,db",
    "transforms.unwrap.drop.tombstones": true,
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.renamekeyfield.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
    "transforms.renamekeyfield.renames": "id:_id"
  }
}'