databendcloud/debezium-server-databend

Change event source executor was interrupted

Closed this issue · 35 comments

I am not familar with Java code, sorry for the inconvenience

Environment:

windows 10 22H2
docker desktop minio
docker destktop databend
mysql 5.7.37
sync just one table, 200M

Error Message:

uploadFromStream failed
{"timestamp":"2023-10-10T13:36:01.414+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Coordinator didn't stop in the expected time, shutting down executor now","threadName":"pool-7-thread-1","threadId":31,"mdc":{},"ndc":"","hostName":"desktop-bpsk61g","processName":"io.debezium.server.Main","processId":22964}
{"timestamp":"2023-10-10T13:36:06.185+08:00","sequence":135,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-sales-change-event-source-coordinator","threadId":39,"mdc":{"dbz.taskId":"0","dbz.connectorName":"sales","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"desktop-bpsk61g","processName":"io.debezium.server.Main","processId":22964}
{"timestamp":"2023-10-10T13:36:06.187+08:00","sequence":136,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-sales-change-event-source-coordinator","threadId":39,"mdc":{"dbz.taskId":"0","dbz.connectorName":"sales","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"desktop-bpsk61g","processName":"io.debezium.server.Main","processId":22964,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":null,"frames":[{"class":"java.lang.Object","method":"wait0"},{"class":"java.lang.Object","method":"wait","line":366},{"class":"io.debezium.connector.base.ChangeEventQueue","method":"doEnqueue","line":204},{"class":"io.debezium.connector.base.ChangeEventQueue","method":"enqueue","line":169},{"class":"io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver","method":"changeRecord","line":440},{"class":"io.debezium.pipeline.EventDispatcher$1","method":"changeRecord","line":166},{"class":"io.debezium.relational.RelationalChangeRecordEmitter","method":"emitReadRecord","line":90},{"class":"io.debezium.relational.RelationalChangeRecordEmitter","method":"emitChangeRecords","line":50},{"class":"io.debezium.pipeline.EventDispatcher","method":"dispatchSnapshotEvent","line":155},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":407},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":577},{"class":"java.util.concurrent.FutureTask","method":"run","line":317},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class":"java.lang.Thread","method":"run","line":1589}]}}

@peleusj Sorry for late reponse, I will look into this issue.

Hi @peleusj , first of all, there is Change event source executor was interrupted log and it seems your debezium mysql source connector may config error. Would like to share your conf/application.properities?

Secondly, uploadFromStream failed indicate that your minio server may not open the presign function. I suggest you use databend jdbc to run a test case first and then use the debezium server databend.

More detail logs will be better!

Thanks for your detailed response.

My configuration files are on my work computer, and I'll do some tests tomorrow following your suggestions.

If there are still issues, I'll do my best to provide more logs.

Thanks again for your time.

@hantmac Sorry for the delay, didn't have time to run the test last Friday.

I run test again this morning, uploadFromStream failed didn't occur as I run databend on 'fs', but Change event source executor was interrupted still occured.

I double checked conf/application.properities, it seems no problem, and the local databend database successfully retrive the table structure of the source mysql database table, just no data flushed in.

The source mysql database is a RDS of ctyun.cn.

Logs:

{"timestamp":"2023-10-16T13:13:21.263+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:13:21.275+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:13:50.556+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.DatabendChangeConsumer","level":"WARN","message":"Creating table:\ncreate table debezium.\"from_mysql_scp_sale_t_xs_stock\" (\"stock_id\" varchar, \"sale_area_id\" varchar, \"sale_area_name\" varchar, \"warehouse_id\" varchar, \"warehouse_name\" varchar, \"stock_room_id\" varchar, \"stock_room_name\" varchar, \"stock_zone_id\" varchar, \"stock_zone_name\" varchar, \"sku_code\" varchar, \"sku_id\" varchar, \"resource_management\" varchar, \"product_class_id\" varchar, \"product_class_name\" varchar, \"product_brand_id\" varchar, \"product_brand_name\" varchar, \"product_texture_id\" varchar, \"product_texture_name\" varchar, \"prod_area_id\" varchar, \"prod_area_name\" varchar, \"specification\" varchar, \"specification1\" binary, \"specification2\" binary, \"expand1\" binary, \"expand2\" binary, \"length\" binary, \"meter_weight\" binary, \"quantity_type\" varchar, \"pieces\" bigint, \"tolerance\" binary, \"tolerance_range\" varchar, \"weight_range\" varchar, \"ten_meter_weight\" binary, \"supply_meter_weight\" binary, \"sale_lock_flag\" boolean, \"avble_amount\" bigint, \"avble_manager_weight\" binary, \"avble_pound_weight\" binary, \"stock_amount\" bigint, \"stock_pound_weight\" binary, \"stock_manager_weight\" binary, \"lock_amount\" bigint, \"lock_pound_weight\" binary, \"lock_manager_weight\" binary, \"contract_amount\" bigint, \"contract_pound_weight\" binary, \"contract_manager_weight\" binary, \"plan_amount\" bigint, \"plan_pound_weight\" binary, \"plan_manager_weight\" binary, \"lading_amount\" bigint, \"lading_pound_weight\" binary, \"lading_manager_weight\" binary, \"process_amount\" bigint, \"process_pound_weight\" binary, \"process_manager_weight\" binary, \"manual_amount\" bigint, \"manual_pound_weight\" binary, \"manual_manager_weight\" binary, \"average_manager_cost\" binary, \"average_pound_cost\" binary, \"average_pound_price\" binary, \"average_manager_price\" binary, \"price_pound\" binary, \"price_manager\" binary, \"price_discount_pound\" binary, \"price_discount_manager\" binary, \"version_price\" varchar, \"stock_room_type\" varchar, \"workgroup_id\" varchar, \"workgroup_name\" varchar, \"del_flag\" boolean, \"create_user_code\" varchar, \"create_user_name\" varchar, \"create_date\" bigint, \"update_user_code\" varchar, \"update_user_name\" varchar, \"update_date\" bigint, \"sys_id\" varchar, \"platform_id\" varchar, \"org_id\" varchar, \"org_name\" varchar, \"org_short_name\" varchar, \"owner_id\" varchar, \"owner_code\" varchar, \"owner_name\" varchar, \"demand_amount\" bigint, \"demand_manager_weight\" binary, \"demand_pound_weight\" binary, \"move_amount\" bigint, \"move_manager_weight\" binary, \"move_pound_weight\" binary, \"version\" bigint, \"sale_return_amount\" bigint, \"sale_return_manager_weight\" binary, \"sale_return_pound_weight\" binary, \"purchase_return_amount\" bigint, \"purchase_return_manager_weight\" binary, \"purchase_return_pound_weight\" binary, \"ship_type\" varchar, \"steel_coil_no\" varchar, \"profitLoss_amount\" bigint, \"profitLoss_manager_weight\" binary, \"profitLoss_pound_weight\" binary, \"spec1\" binary, \"spec2\" binary, \"spec3\" binary, \"spec4\" binary, \"tolerance_range_min\" binary, \"tolerance_range_max\" binary, \"weight_range_min\" binary, \"weight_range_max\" binary, \"purchase_date\" bigint, \"__deleted\" varchar)","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:13:50.814+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:14:09.615+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:14:09.62+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}

@peleusj Hi, just remove the data dir and run test again?

@hantmac Which data dir do you mean?

@peleusj In the directory which you run debezium-server-databend, it will generate a data dir.

@hantmac Thanks. I remove the data dir and run test again, here is the logs:

{"timestamp":"2023-10-16T14:32:06.305+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:06.312+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:20.35+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlSnapshotChangeEventSource","level":"WARN","message":"Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:23.732+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:34.861+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:34.863+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}

@hantmac Thanks. I remove the data dir and run test again, here is the logs:

{"timestamp":"2023-10-16T14:32:06.305+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:06.312+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:20.35+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlSnapshotChangeEventSource","level":"WARN","message":"Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:23.732+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:34.861+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:34.863+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}

Change event source executor was interrupted indicates that the processing was interrupted. It seems that the MySQL has some unknown problems. Would u like to share the config file?

@hantmac OK. and I just find out the source database is a read-only replica of production RDS of ctyun.cn. Does this matter?

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://localhost:8000
debezium.sink.databend.database.username=databend
debezium.sink.databend.database.password=databend
# debezium.sink.databend.database.primaryKey=id
# debezium.sink.databend.database.tableName=products
# additional databend parameters
debezium.sink.databend.database.param.ssl=false

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source, related docs: https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=
debezium.source.database.port=3306
debezium.source.database.user=
debezium.source.database.password=
debezium.source.database.dbname=scp_sale
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=scp_sale.t_xs_stock
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message! delete mode must open there conf
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
# make tombstones as true to soft delete
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET LOG LEVELS ############
quarkus.log.console.json=true
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=WARN
quarkus.log.file.enable=true
quarkus.http.port=8080

@peleusj BTW, has you enable the binlog for the RDS of ctyun.cn? Please follow this doc https://debezium.io/documentation/reference/stable/connectors/mysql.html#enable-mysql-binlog to check it first.

@hantmac OK, I will check that with my workmate asap.

@hantmac Sorry for the delay.

I have communicate with workmate, and use comand SHOW GLOBAL VARIABLES check the binlog status, the result indicates it is on and it matches status you mentioned:

log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 10

Also, I tried Airbyte this morning, it can successfully sync the same database table to destination database, like Porstgres or ClickHouse.

Hope these information may help you finding out what's going on.

@peleusj Thanks for your information.
Yes, from your binlog status info it indicates binlong on.

And I have tried to reproduce this problem but can't be reproduced on Mac OS and Linux (I have no Windows machine). Would like to run it on Linux or MacOS? BTW, what is your JDK version? It is required JDK >= 11.

OK. I have a laptop with Ubuntu 22.04 installed, I will test on that machine and pay attention to JDK version.

@hantmac I have run a fresh test on Ubuntu 22.04, with Java 11, here is the logs:

docker run \
    -p 8000:8000 \
    -v ${HOME}/tmp/databend/meta:/var/lib/databend/meta \
    -v ${HOME}/tmp/databend/query:/var/lib/databend/query \
    -v ${HOME}/tmp/databend/log:/var/log/databend \
    -e QUERY_DEFAULT_USER=databend \
    -e QUERY_DEFAULT_PASSWORD=databend \
    datafuselabs/databend
{"timestamp":"2023-10-18T15:34:18.855+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:34:18.862+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:34:54.223+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.DatabendChangeConsumer","level":"WARN","message":"Creating table:\ncreate table debezium.\"from_mysql_scp_sale_t_xs_stock\" (\"stock_id\" varchar, \"sale_area_id\" varchar, \"sale_area_name\" varchar, \"warehouse_id\" varchar, \"warehouse_name\" varchar, \"stock_room_id\" varchar, \"stock_room_name\" varchar, \"stock_zone_id\" varchar, \"stock_zone_name\" varchar, \"sku_code\" varchar, \"sku_id\" varchar, \"resource_management\" varchar, \"product_class_id\" varchar, \"product_class_name\" varchar, \"product_brand_id\" varchar, \"product_brand_name\" varchar, \"product_texture_id\" varchar, \"product_texture_name\" varchar, \"prod_area_id\" varchar, \"prod_area_name\" varchar, \"specification\" varchar, \"specification1\" binary, \"specification2\" binary, \"expand1\" binary, \"expand2\" binary, \"length\" binary, \"meter_weight\" binary, \"quantity_type\" varchar, \"pieces\" bigint, \"tolerance\" binary, \"tolerance_range\" varchar, \"weight_range\" varchar, \"ten_meter_weight\" binary, \"supply_meter_weight\" binary, \"sale_lock_flag\" boolean, \"avble_amount\" bigint, \"avble_manager_weight\" binary, \"avble_pound_weight\" binary, \"stock_amount\" bigint, \"stock_pound_weight\" binary, \"stock_manager_weight\" binary, \"lock_amount\" bigint, \"lock_pound_weight\" binary, \"lock_manager_weight\" binary, \"contract_amount\" bigint, \"contract_pound_weight\" binary, \"contract_manager_weight\" binary, \"plan_amount\" bigint, \"plan_pound_weight\" binary, \"plan_manager_weight\" binary, \"lading_amount\" bigint, \"lading_pound_weight\" binary, \"lading_manager_weight\" binary, \"process_amount\" bigint, \"process_pound_weight\" binary, \"process_manager_weight\" binary, \"manual_amount\" bigint, \"manual_pound_weight\" binary, \"manual_manager_weight\" binary, \"average_manager_cost\" binary, \"average_pound_cost\" binary, \"average_pound_price\" binary, \"average_manager_price\" binary, \"price_pound\" binary, \"price_manager\" binary, \"price_discount_pound\" binary, \"price_discount_manager\" binary, \"version_price\" varchar, \"stock_room_type\" varchar, \"workgroup_id\" varchar, \"workgroup_name\" varchar, \"del_flag\" boolean, \"create_user_code\" varchar, \"create_user_name\" varchar, \"create_date\" bigint, \"update_user_code\" varchar, \"update_user_name\" varchar, \"update_date\" bigint, \"sys_id\" varchar, \"platform_id\" varchar, \"org_id\" varchar, \"org_name\" varchar, \"org_short_name\" varchar, \"owner_id\" varchar, \"owner_code\" varchar, \"owner_name\" varchar, \"demand_amount\" bigint, \"demand_manager_weight\" binary, \"demand_pound_weight\" binary, \"move_amount\" bigint, \"move_manager_weight\" binary, \"move_pound_weight\" binary, \"version\" bigint, \"sale_return_amount\" bigint, \"sale_return_manager_weight\" binary, \"sale_return_pound_weight\" binary, \"purchase_return_amount\" bigint, \"purchase_return_manager_weight\" binary, \"purchase_return_pound_weight\" binary, \"ship_type\" varchar, \"steel_coil_no\" varchar, \"profitLoss_amount\" bigint, \"profitLoss_manager_weight\" binary, \"profitLoss_pound_weight\" binary, \"spec1\" binary, \"spec2\" binary, \"spec3\" binary, \"spec4\" binary, \"tolerance_range_min\" binary, \"tolerance_range_max\" binary, \"weight_range_min\" binary, \"weight_range_max\" binary, \"purchase_date\" bigint, \"__deleted\" varchar)","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:34:54.446+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:35:11.961+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:35:11.965+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}

@peleujs Thanks!

I copy my application.properties and use there config to test it again:

debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://localhost:8000
debezium.sink.databend.database.username=databend
debezium.sink.databend.database.password=databend
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=false
# additional databend parameters

# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000

debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
debezium.source.max.batch.size=200000
debezium.source.max.queue.size=800000
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
# https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
# soft delete
#debezium.transforms.unwrap.delete.handling.mode=rewrite
#debezium.transforms.unwrap.drop.tombstones=false
# hard delete
debezium.transforms.unwrap.delete.handling.mode=none
debezium.transforms.unwrap.drop.tombstones=false

# ############ SET LOG LEVELS ############
quarkus.log.console.json=true
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=INFO
quarkus.log.file.enable=true

@hantmac I used your conf and just change the source mysql info, the logs seems have something different, it is too long, I save it to gist:

https://gist.github.com/peleusj/addb82f0d9496e0888df95ec3ef1bd73

@peleusj

Great! I found some very useful error message:

"message":"java.sql.SQLException: Query failed (#a722e8a1-a14a-4b73-b1b2-f888d2e44191): storage doesn't support presign operation"

This error was caused by your minio does not support presign. You must config your minio to support it(https://min.io/docs/minio/linux/integrations/presigned-put-upload-via-browser.html). Databend use this fucntion to write data into object storage.

I believe that it will success after handling this problem.

BTW, change this config to your own tables: debezium.source.table.include.list=mydb.products.

@hantmac Glad to have progrss!

I have changed debezium.source.table.include.list in last test.

I have a little confusion, does this mean I have to start a minio container to storage data, and configure the presign of minio, which in last test I just run a databend docker container which is default config with fs backend. My little confusion is why minio is needed here.

Any way, I will try to start a minio docker container and test it again.

@hantmac Glad to have progrss!

I have changed debezium.source.table.include.list in last test.

I have a little confusion, does this mean I have to start a minio container to storage data, and configure the presign of minio, which in last test I just run a databend docker container which is default config with fs backend. My little confusion is why minio is needed here.

Any way, I will try to start a minio docker container and test it again.

@peleusj
Databend use presign to write data into object storage. The fs storage is just for testing and can't be used in production.
You can see this doc about presign https://databend.rs/doc/contributing/rfcs/presign.

Just use this config in databend-query.toml:

[storage]
type = "s3"
[storage.s3]
bucket = "databend"
endpoint_url = "http://0.0.0.0:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"

@hantmac I try to use your databend-query.toml, but get some errors, I guess it need to set regions, but I checked some docs, could not find how to configure it:

Databend Query start failure, cause: StorageOther. Code: 4000, Text = other error (ConfigInvalid (permanent) at Builder::build, context: { service: s3 } => region is missing. Please find it by S3::detect_region() or set them in env.).

@hantmac I try to use your databend-query.toml, but get some errors, I guess it need to set regions, but I checked some docs, could not find how to configure it:

Databend Query start failure, cause: StorageOther. Code: 4000, Text = other error (ConfigInvalid (permanent) at Builder::build, context: { service: s3 } => region is missing. Please find it by S3::detect_region() or set them in env.).

Are u uring aws s3 storage? You can find more error message in databend-query.

@hantmac I am not using aws s3, I am just using a docker-compose.yml and databend-query.toml with your suggest configs:

services:
  databend:
    image: datafuselabs/databend
    volumes:
      - /home/albert/test/databend/conf/databend-query.toml:/etc/databend/query.toml
    environment:
      QUERY_CONFIG_FILE: /etc/databend/query.toml
      QUERY_DEFAULT_USER: databend
      QUERY_DEFAULT_PASSWORD: databend
      MINIO_ENABLED: 'true'
    ports:
      - '8000:8000'
      - '9000:9000'
      - '3307:3307'
      - '8124:8124'
[query]
max_active_sessions = 256
wait_timeout_mills = 5000

flight_api_address = "0.0.0.0:9090"
admin_api_address = "0.0.0.0:8080"
metric_api_address = "0.0.0.0:7070"

mysql_handler_host = "0.0.0.0"
mysql_handler_port = 3307

clickhouse_http_handler_host = "0.0.0.0"
clickhouse_http_handler_port = 8124

http_handler_host = "0.0.0.0"
http_handler_port = 8000

flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = 8900

tenant_id = "default"
cluster_id = "default"

[log]

[log.stderr]
level = "WARN"
format = "text"

[log.file]
level = "INFO"
dir = "/var/log/databend"

[meta]
endpoints = ["0.0.0.0:9191"]
username = "root"
password = "root"
client_timeout_in_second = 60

[storage]
type = "s3"

[storage.s3]
bucket = "databend"
endpoint_url = "http://0.0.0.0:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"

And where should I look after for databend-query you metioned?

@hantmac I try to use your databend-query.toml, but get some errors, I guess it need to set regions, but I checked some docs, could not find how to configure it:

Databend Query start failure, cause: StorageOther. Code: 4000, Text = other error (ConfigInvalid (permanent) at Builder::build, context: { service: s3 } => region is missing. Please find it by S3::detect_region() or set them in env.).

Are u uring aws s3 storage? You can find more error message in databend-query.

@Xuanwo Hi, would you like to help figure out this problem? 😄

Xuanwo commented

I guess it need to set regions, but I checked some docs, could not find how to configure it:

You can set region this way:

[storage.s3]
bucket = "databend"
endpoint_url = "http://0.0.0.0:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
+ region = "abc"

For minio, you can use us-east-1 if no other region has been set for minio.

@Xuanwo I try to add the region, get some new errors:

Databend Query start failure, cause: StorageUnavailable. Code: 3901, Text = current configured storage is not available: config: S3(StorageS3Config { endpoint_url: "http://0.0.0.0:9000", region: "us-east-1", bucket: "databend", root: "", disable_credential_loader: false, enable_virtual_host_style: false, role_arn: "", external_id: "", access_key_id: "******min", secret_access_key: "******min", security_token: "", master_key: "", allow_anonymous: false }), cause: Unexpected (temporary) at Pager::next, context: { url: http://0.0.0.0:9000/databend?delimiter=%2F&list-type=2, called: http_util::Client::send_async, service: s3, path: / } => send async request, source: error sending request for url (http://0.0.0.0:9000/databend?delimiter=%2F&list-type=2): error trying to connect: tcp connect error: Connection refused (os error 111).
Xuanwo commented

Connection refused

This error typically suggests improper storage setup. For instance, port 9000 might not be open or listening on 0.0.0.0. Please review your settings to ensure correct minio configuration. To verify, run curl http://0.0.0.0:9000 in the same databend container.

And given you are using a non-https endpoint like http://xxx, you will need to add an extra param in databend query to allow it:

[storage]
type = "s3"
+ allow_insecure = true

@Xuanwo Sorry for the bothering.

My Docker knowledge is limited, and I'm unsure about how to run this test.

To verify, run curl http://0.0.0.0:9000 in the same databend container.

I can only test through browser with minio console port 127.0.0.1:9090 which is defined in my configuration.

I've attached my complete configuration.
Could you please help me check it to find out the wrong configuration?
Thanks a lot!

docker-compose.yaml

services:
  minio:
    image: quay.io/minio/minio
    volumes:
      - /home/albert/tmp/minio:/data
    environment:
      - "MINIO_ROOT_USER=test"
      - "MINIO_ROOT_PASSWORD=test1234"
    ports:
      - "9000:9000"
      - "9090:9090"
    command: [ "server", "/data", "--console-address", ":9090" ]

  databend:
    image: datafuselabs/databend
    volumes:
      - /home/albert/test/databend/conf/databend-query.toml:/etc/databend/query.toml
    environment:
      QUERY_CONFIG_FILE: /etc/databend/query.toml
      QUERY_DEFAULT_USER: databend
      QUERY_DEFAULT_PASSWORD: databend
    ports:
      - '8000:8000'
      - '3307:3307'
      - '8124:8124'

databend-query.toml

# Usage:
# databend-query -c databend-query.toml

[query]
max_active_sessions = 256
wait_timeout_mills = 5000

# Internal flight rpc for cluster communication.
flight_api_address = "0.0.0.0:9091"

# Admin REST API.
admin_api_address = "0.0.0.0:8080"

# Metrics REST API.
metric_api_address = "0.0.0.0:7070"

# Query Handler: MySQL
mysql_handler_host = "0.0.0.0"
mysql_handler_port = 3307

# Query Handler: Clickhouse HTTP
clickhouse_http_handler_host = "0.0.0.0"
clickhouse_http_handler_port = 8124

# Query Handler: HTTP API
http_handler_host = "0.0.0.0"
http_handler_port = 8000

# Query Handler: Experimental Arrow Flight SQL API
flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = 8900

tenant_id = "default"
cluster_id = "default"

table_engine_memory_enabled = true

# [[query.users]]
# name = "root"
# auth_type = "no_password"

# [[query.users]]
# name = "databend"
# auth_type = "double_sha1_password"
# # echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
# auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5"

# [[query.users]]
# name = "datafuselabs"
# auth_type = "sha256_password"
# #  echo -n "datafuselabs" | sha256sum
# auth_string = "6db1a2f5da402b43c066fcadcbf78f04260b3236d9035e44dd463f21e29e6f3b"


[log]

[log.file]
level = "WARN"
format = "text"
dir = "/var/log/databend"

[meta]
# It is a list of `grpc_api_advertise_host:<grpc-api-port>` of databend-meta config
endpoints = ["0.0.0.0:9191"]
username = "root"
password = "root"
client_timeout_in_second = 60
auto_sync_interval = 60

# Storage config.
[storage]
# fs | s3 | azblob | gcs | oss | cos | hdfs | webhdfs
type = "s3"
allow_insecure = true

# Set a local folder to store your data.
# Comment out this block if you're NOT using local file system as storage.
# [storage.fs]
# data_path = "/var/lib/databend/data"

# To use an Amazon S3-like storage service, uncomment this block and set your values.
[storage.s3]
bucket = "databend"
endpoint_url = "http://127.0.0.1:9000"
access_key_id = "test"
secret_access_key = "test1234"
region = "us-east-1"
# enable_virtual_host_style = false

# To use Azure Blob Storage, uncomment this block and set your values.
# [storage.azblob]
# endpoint_url = "https://<your-storage-account-name>.blob.core.windows.net"
# container = "<your-azure-storage-container-name>"
# account_name = "<your-storage-account-name>"
# account_key = "<your-account-key>"

# To use Google Cloud Storage, uncomment this block and set your values.
# [storage.gcs]
# bucket = "<your-bucket-name>"
# endpoint_url = "<your-endpoint>"
# credential = "<your-credentials>"

# To use Alibaba Cloud OSS, uncomment this block and set your values.
# [storage.oss]
# bucket = "<your-bucket-name>"
# endpoint_url = "<your-endpoint>"
# access_key_id = "<your-key-id>"
# access_key_secret = "<your-account-key>"

# To use Tencent Cloud Object Storage, uncomment this block and set your values.
# [storage.cos]
# bucket = "<your-bucket-name>"
# endpoint_url = "<your-endpoint>"
# secret_id = "<your-secret-id>"
# secret_key = "<your-secret-key>"

# To use HDFS, uncomment this block and set your values.
# [storage.hdfs]
# name_node = "<your-name-node>"

# To use WebHDFS, uncomment this block and set your values.
# [storage.webhdfs]
# endpoint_url = "<your-endpoint>"

# Cache config.
[cache]
# Type of storage to keep the table data cache
#
# available options: [none|disk]
# default is "none", which disable table data cache
# use "disk" to enabled disk cache
data_cache_storage = "none"

[cache.disk]
# cache path
path = "/var/lib/databend/cache"
# max bytes of cached data 20G
max_bytes = 21474836480

@Xuanwo I successfully deployed a standalone databend in my latop, follow these articles:

https://databend.rs/doc/deploy/deploying-databend
https://www.bilibili.com/read/cv25407731/

and it does not need to set the region in databend-query.toml as deploy by Docker.

@hantmac I finally succfully start the run.sh, and get part of my source data to databend target table, but not all of the data, it is terrupted:

{"timestamp":"2023-10-20T12:35:07.165+08:00","sequence":1090,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-10-thread-1","threadId":31,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.183+08:00","sequence":1091,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.184+08:00","sequence":1092,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"INFO","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Java heap space', error = '{}'","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324,"exception":{"refId":1,"exceptionType":"java.lang.OutOfMemoryError","message":"Java heap space","frames":[{"class":"java.util.LinkedHashMap","method":"newNode","line":256},{"class":"java.util.HashMap","method":"putVal","line":627},{"class":"java.util.HashMap","method":"put","line":608},{"class":"com.fasterxml.jackson.databind.node.ObjectNode","method":"replace","line":585},{"class":"com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer","method":"_deserializeContainerNoRecursion","line":594},{"class":"com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer","method":"deserialize","line":98},{"class":"com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer","method":"deserialize","line":23},{"class":"com.fasterxml.jackson.databind.deser.DefaultDeserializationContext","method":"readRootValue","line":323},{"class":"com.fasterxml.jackson.databind.ObjectMapper","method":"_readTreeAndClose","line":4772},{"class":"com.fasterxml.jackson.databind.ObjectMapper","method":"readTree","line":3138},{"class":"io.debezium.server.databend.DatabendChangeConsumer","method":"lambda$handleBatch$0","line":200},{"class":"io.debezium.server.databend.DatabendChangeConsumer$$Lambda$802/0x000000084061f840","method":"apply"},{"class":"java.util.stream.ReferencePipeline$3$1","method":"accept","line":195},{"class":"java.util.ArrayList$ArrayListSpliterator","method":"forEachRemaining","line":1655},{"class":"java.util.stream.AbstractPipeline","method":"copyInto","line":484},{"class":"java.util.stream.AbstractPipeline","method":"wrapAndCopyInto","line":474},{"class":"java.util.stream.ReduceOps$ReduceOp","method":"evaluateSequential","line":913},{"class":"java.util.stream.AbstractPipeline","method":"evaluate","line":234},{"class":"java.util.stream.ReferencePipeline","method":"collect","line":578},{"class":"io.debezium.server.databend.DatabendChangeConsumer","method":"handleBatch","line":207},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":83},{"class":"io.debezium.embedded.ConvertingEngineBuilder$$Lambda$375/0x0000000840321c40","method":"handleBatch"},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":822},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":192},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":152},{"class":"io.debezium.server.DebeziumServer$$Lambda$415/0x0000000840371440","method":"run"},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
{"timestamp":"2023-10-20T12:35:07.212+08:00","sequence":1093,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.212+08:00","sequence":1094,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the embedded engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.255+08:00","sequence":1095,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-databend-dist stopped in 0.068s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}

does this mean the data transfer is working, but my old laptop Thinkpad X250 8G memory is not enough to complete the task?And what is the cause of former Change event source executor was interrupted error? Is that caused by some setting in my initial application.properties setup?

Lastly, a big thank your again for all your patience and time!

'Stopping connector after error in the application's handler method: Java heap space

@peleusj Congratulation!

  1. Yes, because of this error 'Stopping connector after error in the application's handler method: Java heap space , it means you have no enough memory for the task.
  2. Change event source executor was interrupted was caused by error mysql debezium settings.
  3. Open issue any time if you have any questions!

Lastly, thanks @Xuanwo very much!

So I closed this issue since solved. Open the issue any time if you have any questions.

@hantmac Sorry for bothering again.

Today, I rerun previous test on a machine with more memory. All the configurations were the same as the previous test.
It only synchronized approximately 25,000 records before it was interrupted. The source was around 60,000 records.

Also, I noticed some issues this time that I didn't catch last time. Certain columns in MySQL didn't synchronize correctly in terms of their data type, such as columns with data type of decimal and datetime.

Below is the log information:

{"timestamp":"2023-10-25T15:22:48.426+08:00","sequence":1102,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlStreamingChangeEventSource","level":"INFO","message":"Stopped reading binlog after 0 events, last recorded offset: {transaction_id=null, ts_sec=1698218534, file=mysql-bin.011044, pos=212595309, gtids=c2932ac0-66e5-11ed-abba-fa163ed055f0:1-1295929881, server_id=1695037098, event=1}","threadName":"blc-172.16.251.20:3306","threadId":55,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"binlog"},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.434+08:00","sequence":1103,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-10-thread-1","threadId":58,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.451+08:00","sequence":1104,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":27,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.452+08:00","sequence":1105,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"INFO","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.sql.SQLException: failed to upload input stream', error = '{}'","threadName":"pool-7-thread-1","threadId":27,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392,"exception":{"refId":1,"exceptionType":"java.lang.RuntimeException","message":"java.sql.SQLException: failed to upload input stream","frames":[{"class":"io.debezium.server.databend.tablewriter.UpsertTableWriter","method":"deleteUpsert","line":80},{"class":"io.debezium.server.databend.tablewriter.UpsertTableWriter","method":"addToTable","line":46},{"class":"io.debezium.server.databend.DatabendChangeConsumer","method":"handleBatch","line":212},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":83},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":822},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":192},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":152},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
{"timestamp":"2023-10-25T15:22:48.485+08:00","sequence":1106,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.485+08:00","sequence":1107,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the embedded engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.525+08:00","sequence":1108,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-databend-dist stopped in 0.071s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}

@peleusj Hi bro, there is an error message : "message":"java.sql.SQLException: failed to upload input stream", it indicates that the debezium-server can't connect with your oss, there has a network error.

@hantmac Thanks for your quick responsing.

I deploy minio and databend both locally, and my connection to my RDS seems solid.

Also do you have any insights with the target coloum data type issues, the decimal column became varchar, the datetime column became bigint.

Thanks again.