Change event source executor was interrupted
Closed this issue · 35 comments
I am not familar with Java code, sorry for the inconvenience
Environment:
windows 10 22H2
docker desktop minio
docker destktop databend
mysql 5.7.37
sync just one table, 200M
Error Message:
uploadFromStream failed
{"timestamp":"2023-10-10T13:36:01.414+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Coordinator didn't stop in the expected time, shutting down executor now","threadName":"pool-7-thread-1","threadId":31,"mdc":{},"ndc":"","hostName":"desktop-bpsk61g","processName":"io.debezium.server.Main","processId":22964}
{"timestamp":"2023-10-10T13:36:06.185+08:00","sequence":135,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-sales-change-event-source-coordinator","threadId":39,"mdc":{"dbz.taskId":"0","dbz.connectorName":"sales","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"desktop-bpsk61g","processName":"io.debezium.server.Main","processId":22964}
{"timestamp":"2023-10-10T13:36:06.187+08:00","sequence":136,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-sales-change-event-source-coordinator","threadId":39,"mdc":{"dbz.taskId":"0","dbz.connectorName":"sales","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"desktop-bpsk61g","processName":"io.debezium.server.Main","processId":22964,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":null,"frames":[{"class":"java.lang.Object","method":"wait0"},{"class":"java.lang.Object","method":"wait","line":366},{"class":"io.debezium.connector.base.ChangeEventQueue","method":"doEnqueue","line":204},{"class":"io.debezium.connector.base.ChangeEventQueue","method":"enqueue","line":169},{"class":"io.debezium.pipeline.EventDispatcher$BufferingSnapshotChangeRecordReceiver","method":"changeRecord","line":440},{"class":"io.debezium.pipeline.EventDispatcher$1","method":"changeRecord","line":166},{"class":"io.debezium.relational.RelationalChangeRecordEmitter","method":"emitReadRecord","line":90},{"class":"io.debezium.relational.RelationalChangeRecordEmitter","method":"emitChangeRecords","line":50},{"class":"io.debezium.pipeline.EventDispatcher","method":"dispatchSnapshotEvent","line":155},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":407},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":577},{"class":"java.util.concurrent.FutureTask","method":"run","line":317},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1144},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":642},{"class":"java.lang.Thread","method":"run","line":1589}]}}
Hi @peleusj , first of all, there is Change event source executor was interrupted
log and it seems your debezium mysql source connector may config error. Would like to share your conf/application.properities
?
Secondly, uploadFromStream failed
indicate that your minio server may not open the presign
function. I suggest you use databend jdbc to run a test case first and then use the debezium server databend.
More detail logs will be better!
Thanks for your detailed response.
My configuration files are on my work computer, and I'll do some tests tomorrow following your suggestions.
If there are still issues, I'll do my best to provide more logs.
Thanks again for your time.
@hantmac Sorry for the delay, didn't have time to run the test last Friday.
I run test again this morning, uploadFromStream failed
didn't occur as I run databend on 'fs', but Change event source executor was interrupted
still occured.
I double checked conf/application.properities
, it seems no problem, and the local databend database successfully retrive the table structure of the source mysql database table, just no data flushed in.
The source mysql database is a RDS of ctyun.cn.
Logs:
{"timestamp":"2023-10-16T13:13:21.263+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:13:21.275+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:13:50.556+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.DatabendChangeConsumer","level":"WARN","message":"Creating table:\ncreate table debezium.\"from_mysql_scp_sale_t_xs_stock\" (\"stock_id\" varchar, \"sale_area_id\" varchar, \"sale_area_name\" varchar, \"warehouse_id\" varchar, \"warehouse_name\" varchar, \"stock_room_id\" varchar, \"stock_room_name\" varchar, \"stock_zone_id\" varchar, \"stock_zone_name\" varchar, \"sku_code\" varchar, \"sku_id\" varchar, \"resource_management\" varchar, \"product_class_id\" varchar, \"product_class_name\" varchar, \"product_brand_id\" varchar, \"product_brand_name\" varchar, \"product_texture_id\" varchar, \"product_texture_name\" varchar, \"prod_area_id\" varchar, \"prod_area_name\" varchar, \"specification\" varchar, \"specification1\" binary, \"specification2\" binary, \"expand1\" binary, \"expand2\" binary, \"length\" binary, \"meter_weight\" binary, \"quantity_type\" varchar, \"pieces\" bigint, \"tolerance\" binary, \"tolerance_range\" varchar, \"weight_range\" varchar, \"ten_meter_weight\" binary, \"supply_meter_weight\" binary, \"sale_lock_flag\" boolean, \"avble_amount\" bigint, \"avble_manager_weight\" binary, \"avble_pound_weight\" binary, \"stock_amount\" bigint, \"stock_pound_weight\" binary, \"stock_manager_weight\" binary, \"lock_amount\" bigint, \"lock_pound_weight\" binary, \"lock_manager_weight\" binary, \"contract_amount\" bigint, \"contract_pound_weight\" binary, \"contract_manager_weight\" binary, \"plan_amount\" bigint, \"plan_pound_weight\" binary, \"plan_manager_weight\" binary, \"lading_amount\" bigint, \"lading_pound_weight\" binary, \"lading_manager_weight\" binary, \"process_amount\" bigint, \"process_pound_weight\" binary, \"process_manager_weight\" binary, \"manual_amount\" bigint, \"manual_pound_weight\" binary, \"manual_manager_weight\" binary, \"average_manager_cost\" binary, \"average_pound_cost\" binary, \"average_pound_price\" binary, \"average_manager_price\" binary, \"price_pound\" binary, \"price_manager\" binary, \"price_discount_pound\" binary, \"price_discount_manager\" binary, \"version_price\" varchar, \"stock_room_type\" varchar, \"workgroup_id\" varchar, \"workgroup_name\" varchar, \"del_flag\" boolean, \"create_user_code\" varchar, \"create_user_name\" varchar, \"create_date\" bigint, \"update_user_code\" varchar, \"update_user_name\" varchar, \"update_date\" bigint, \"sys_id\" varchar, \"platform_id\" varchar, \"org_id\" varchar, \"org_name\" varchar, \"org_short_name\" varchar, \"owner_id\" varchar, \"owner_code\" varchar, \"owner_name\" varchar, \"demand_amount\" bigint, \"demand_manager_weight\" binary, \"demand_pound_weight\" binary, \"move_amount\" bigint, \"move_manager_weight\" binary, \"move_pound_weight\" binary, \"version\" bigint, \"sale_return_amount\" bigint, \"sale_return_manager_weight\" binary, \"sale_return_pound_weight\" binary, \"purchase_return_amount\" bigint, \"purchase_return_manager_weight\" binary, \"purchase_return_pound_weight\" binary, \"ship_type\" varchar, \"steel_coil_no\" varchar, \"profitLoss_amount\" bigint, \"profitLoss_manager_weight\" binary, \"profitLoss_pound_weight\" binary, \"spec1\" binary, \"spec2\" binary, \"spec3\" binary, \"spec4\" binary, \"tolerance_range_min\" binary, \"tolerance_range_max\" binary, \"weight_range_min\" binary, \"weight_range_max\" binary, \"purchase_date\" bigint, \"__deleted\" varchar)","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:13:50.814+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:14:09.615+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399}
{"timestamp":"2023-10-16T13:14:09.62+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":27399,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
@peleusj In the directory which you run debezium-server-databend, it will generate a data
dir.
@hantmac Thanks. I remove the data
dir and run test again, here is the logs:
{"timestamp":"2023-10-16T14:32:06.305+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:06.312+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:20.35+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlSnapshotChangeEventSource","level":"WARN","message":"Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:23.732+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:34.861+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916}
{"timestamp":"2023-10-16T14:32:34.863+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
@hantmac Thanks. I remove the
data
dir and run test again, here is the logs:{"timestamp":"2023-10-16T14:32:06.305+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916} {"timestamp":"2023-10-16T14:32:06.312+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916} {"timestamp":"2023-10-16T14:32:20.35+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlSnapshotChangeEventSource","level":"WARN","message":"Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916} {"timestamp":"2023-10-16T14:32:23.732+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916} {"timestamp":"2023-10-16T14:32:34.861+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916} {"timestamp":"2023-10-16T14:32:34.863+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":31916,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
Change event source executor was interrupted
indicates that the processing was interrupted. It seems that the MySQL has some unknown problems. Would u like to share the config file?
@hantmac OK. and I just find out the source database is a read-only replica of production RDS of ctyun.cn. Does this matter?
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://localhost:8000
debezium.sink.databend.database.username=databend
debezium.sink.databend.database.password=databend
# debezium.sink.databend.database.primaryKey=id
# debezium.sink.databend.database.tableName=products
# additional databend parameters
debezium.sink.databend.database.param.ssl=false
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source, related docs: https://docs.confluent.io/kafka-connectors/debezium-mysql-source/current/mysql_source_connector_config.html
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=
debezium.source.database.port=3306
debezium.source.database.user=
debezium.source.database.password=
debezium.source.database.dbname=scp_sale
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=scp_sale.t_xs_stock
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message! delete mode must open there conf
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite
# make tombstones as true to soft delete
debezium.transforms.unwrap.drop.tombstones=true
# ############ SET LOG LEVELS ############
quarkus.log.console.json=true
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=WARN
quarkus.log.file.enable=true
quarkus.http.port=8080
@peleusj BTW, has you enable the binlog for the RDS of ctyun.cn? Please follow this doc https://debezium.io/documentation/reference/stable/connectors/mysql.html#enable-mysql-binlog to check it first.
@hantmac Sorry for the delay.
I have communicate with workmate, and use comand SHOW GLOBAL VARIABLES
check the binlog status, the result indicates it is on and it matches status you mentioned:
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
Also, I tried Airbyte this morning, it can successfully sync the same database table to destination database, like Porstgres or ClickHouse.
Hope these information may help you finding out what's going on.
@peleusj Thanks for your information.
Yes, from your binlog status info it indicates binlong on.
And I have tried to reproduce this problem but can't be reproduced on Mac OS and Linux (I have no Windows machine). Would like to run it on Linux or MacOS? BTW, what is your JDK version? It is required JDK >= 11.
OK. I have a laptop with Ubuntu 22.04 installed, I will test on that machine and pay attention to JDK version.
@hantmac I have run a fresh test on Ubuntu 22.04, with Java 11, here is the logs:
docker run \
-p 8000:8000 \
-v ${HOME}/tmp/databend/meta:/var/lib/databend/meta \
-v ${HOME}/tmp/databend/query:/var/lib/databend/query \
-v ${HOME}/tmp/databend/log:/var/log/databend \
-e QUERY_DEFAULT_USER=databend \
-e QUERY_DEFAULT_PASSWORD=databend \
datafuselabs/databend
{"timestamp":"2023-10-18T15:34:18.855+08:00","sequence":129,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:34:18.862+08:00","sequence":130,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.runtime.WorkerConfig","level":"WARN","message":"Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:34:54.223+08:00","sequence":131,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.DatabendChangeConsumer","level":"WARN","message":"Creating table:\ncreate table debezium.\"from_mysql_scp_sale_t_xs_stock\" (\"stock_id\" varchar, \"sale_area_id\" varchar, \"sale_area_name\" varchar, \"warehouse_id\" varchar, \"warehouse_name\" varchar, \"stock_room_id\" varchar, \"stock_room_name\" varchar, \"stock_zone_id\" varchar, \"stock_zone_name\" varchar, \"sku_code\" varchar, \"sku_id\" varchar, \"resource_management\" varchar, \"product_class_id\" varchar, \"product_class_name\" varchar, \"product_brand_id\" varchar, \"product_brand_name\" varchar, \"product_texture_id\" varchar, \"product_texture_name\" varchar, \"prod_area_id\" varchar, \"prod_area_name\" varchar, \"specification\" varchar, \"specification1\" binary, \"specification2\" binary, \"expand1\" binary, \"expand2\" binary, \"length\" binary, \"meter_weight\" binary, \"quantity_type\" varchar, \"pieces\" bigint, \"tolerance\" binary, \"tolerance_range\" varchar, \"weight_range\" varchar, \"ten_meter_weight\" binary, \"supply_meter_weight\" binary, \"sale_lock_flag\" boolean, \"avble_amount\" bigint, \"avble_manager_weight\" binary, \"avble_pound_weight\" binary, \"stock_amount\" bigint, \"stock_pound_weight\" binary, \"stock_manager_weight\" binary, \"lock_amount\" bigint, \"lock_pound_weight\" binary, \"lock_manager_weight\" binary, \"contract_amount\" bigint, \"contract_pound_weight\" binary, \"contract_manager_weight\" binary, \"plan_amount\" bigint, \"plan_pound_weight\" binary, \"plan_manager_weight\" binary, \"lading_amount\" bigint, \"lading_pound_weight\" binary, \"lading_manager_weight\" binary, \"process_amount\" bigint, \"process_pound_weight\" binary, \"process_manager_weight\" binary, \"manual_amount\" bigint, \"manual_pound_weight\" binary, \"manual_manager_weight\" binary, \"average_manager_cost\" binary, \"average_pound_cost\" binary, \"average_pound_price\" binary, \"average_manager_price\" binary, \"price_pound\" binary, \"price_manager\" binary, \"price_discount_pound\" binary, \"price_discount_manager\" binary, \"version_price\" varchar, \"stock_room_type\" varchar, \"workgroup_id\" varchar, \"workgroup_name\" varchar, \"del_flag\" boolean, \"create_user_code\" varchar, \"create_user_name\" varchar, \"create_date\" bigint, \"update_user_code\" varchar, \"update_user_name\" varchar, \"update_date\" bigint, \"sys_id\" varchar, \"platform_id\" varchar, \"org_id\" varchar, \"org_name\" varchar, \"org_short_name\" varchar, \"owner_id\" varchar, \"owner_code\" varchar, \"owner_name\" varchar, \"demand_amount\" bigint, \"demand_manager_weight\" binary, \"demand_pound_weight\" binary, \"move_amount\" bigint, \"move_manager_weight\" binary, \"move_pound_weight\" binary, \"version\" bigint, \"sale_return_amount\" bigint, \"sale_return_manager_weight\" binary, \"sale_return_pound_weight\" binary, \"purchase_return_amount\" bigint, \"purchase_return_manager_weight\" binary, \"purchase_return_pound_weight\" binary, \"ship_type\" varchar, \"steel_coil_no\" varchar, \"profitLoss_amount\" bigint, \"profitLoss_manager_weight\" binary, \"profitLoss_pound_weight\" binary, \"spec1\" binary, \"spec2\" binary, \"spec3\" binary, \"spec4\" binary, \"tolerance_range_min\" binary, \"tolerance_range_max\" binary, \"weight_range_min\" binary, \"weight_range_max\" binary, \"purchase_date\" bigint, \"__deleted\" varchar)","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:34:54.446+08:00","sequence":132,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.databend.tablewriter.RelationalTable","level":"WARN","message":"Loaded Databend table debezium.debezium.from_mysql_scp_sale_t_xs_stock \nColumns:{contract_amount=DatabendRawType{type=contract_amount, isNullable=false}, avble_amount=DatabendRawType{type=avble_amount, isNullable=false}, owner_id=DatabendRawType{type=owner_id, isNullable=false}, weight_range_max=DatabendRawType{type=weight_range_max, isNullable=false}, stock_zone_name=DatabendRawType{type=stock_zone_name, isNullable=false}, tolerance_range=DatabendRawType{type=tolerance_range, isNullable=false}, avble_pound_weight=DatabendRawType{type=avble_pound_weight, isNullable=false}, contract_pound_weight=DatabendRawType{type=contract_pound_weight, isNullable=false}, stock_zone_id=DatabendRawType{type=stock_zone_id, isNullable=false}, prod_area_id=DatabendRawType{type=prod_area_id, isNullable=false}, lading_amount=DatabendRawType{type=lading_amount, isNullable=false}, workgroup_name=DatabendRawType{type=workgroup_name, isNullable=false}, product_texture_id=DatabendRawType{type=product_texture_id, isNullable=false}, plan_pound_weight=DatabendRawType{type=plan_pound_weight, isNullable=false}, weight_range_min=DatabendRawType{type=weight_range_min, isNullable=false}, sale_return_amount=DatabendRawType{type=sale_return_amount, isNullable=false}, expand1=DatabendRawType{type=expand1, isNullable=false}, product_brand_name=DatabendRawType{type=product_brand_name, isNullable=false}, sku_id=DatabendRawType{type=sku_id, isNullable=false}, expand2=DatabendRawType{type=expand2, isNullable=false}, create_user_code=DatabendRawType{type=create_user_code, isNullable=false}, version=DatabendRawType{type=version, isNullable=false}, price_pound=DatabendRawType{type=price_pound, isNullable=false}, manual_manager_weight=DatabendRawType{type=manual_manager_weight, isNullable=false}, warehouse_name=DatabendRawType{type=warehouse_name, isNullable=false}, product_texture_name=DatabendRawType{type=product_texture_name, isNullable=false}, supply_meter_weight=DatabendRawType{type=supply_meter_weight, isNullable=false}, sale_lock_flag=DatabendRawType{type=sale_lock_flag, isNullable=false}, lock_pound_weight=DatabendRawType{type=lock_pound_weight, isNullable=false}, purchase_date=DatabendRawType{type=purchase_date, isNullable=false}, average_pound_cost=DatabendRawType{type=average_pound_cost, isNullable=false}, meter_weight=DatabendRawType{type=meter_weight, isNullable=false}, move_amount=DatabendRawType{type=move_amount, isNullable=false}, resource_management=DatabendRawType{type=resource_management, isNullable=false}, demand_manager_weight=DatabendRawType{type=demand_manager_weight, isNullable=false}, price_manager=DatabendRawType{type=price_manager, isNullable=false}, version_price=DatabendRawType{type=version_price, isNullable=false}, stock_room_name=DatabendRawType{type=stock_room_name, isNullable=false}, move_manager_weight=DatabendRawType{type=move_manager_weight, isNullable=false}, pieces=DatabendRawType{type=pieces, isNullable=false}, spec3=DatabendRawType{type=spec3, isNullable=false}, sale_return_pound_weight=DatabendRawType{type=sale_return_pound_weight, isNullable=false}, spec4=DatabendRawType{type=spec4, isNullable=false}, process_pound_weight=DatabendRawType{type=process_pound_weight, isNullable=false}, org_name=DatabendRawType{type=org_name, isNullable=false}, spec1=DatabendRawType{type=spec1, isNullable=false}, spec2=DatabendRawType{type=spec2, isNullable=false}, weight_range=DatabendRawType{type=weight_range, isNullable=false}, purchase_return_pound_weight=DatabendRawType{type=purchase_return_pound_weight, isNullable=false}, __deleted=DatabendRawType{type=__deleted, isNullable=false}, lading_manager_weight=DatabendRawType{type=lading_manager_weight, isNullable=false}, ship_type=DatabendRawType{type=ship_type, isNullable=false}, stock_id=DatabendRawType{type=stock_id, isNullable=false}, warehouse_id=DatabendRawType{type=warehouse_id, isNullable=false}, stock_room_id=DatabendRawType{type=stock_room_id, isNullable=false}, tolerance_range_min=DatabendRawType{type=tolerance_range_min, isNullable=false}, create_user_name=DatabendRawType{type=create_user_name, isNullable=false}, lock_manager_weight=DatabendRawType{type=lock_manager_weight, isNullable=false}, org_short_name=DatabendRawType{type=org_short_name, isNullable=false}, product_brand_id=DatabendRawType{type=product_brand_id, isNullable=false}, ten_meter_weight=DatabendRawType{type=ten_meter_weight, isNullable=false}, plan_manager_weight=DatabendRawType{type=plan_manager_weight, isNullable=false}, sale_area_id=DatabendRawType{type=sale_area_id, isNullable=false}, steel_coil_no=DatabendRawType{type=steel_coil_no, isNullable=false}, create_date=DatabendRawType{type=create_date, isNullable=false}, tolerance=DatabendRawType{type=tolerance, isNullable=false}, del_flag=DatabendRawType{type=del_flag, isNullable=false}, owner_name=DatabendRawType{type=owner_name, isNullable=false}, stock_amount=DatabendRawType{type=stock_amount, isNullable=false}, average_manager_price=DatabendRawType{type=average_manager_price, isNullable=false}, average_pound_price=DatabendRawType{type=average_pound_price, isNullable=false}, update_user_name=DatabendRawType{type=update_user_name, isNullable=false}, tolerance_range_max=DatabendRawType{type=tolerance_range_max, isNullable=false}, contract_manager_weight=DatabendRawType{type=contract_manager_weight, isNullable=false}, update_date=DatabendRawType{type=update_date, isNullable=false}, demand_pound_weight=DatabendRawType{type=demand_pound_weight, isNullable=false}, org_id=DatabendRawType{type=org_id, isNullable=false}, sale_area_name=DatabendRawType{type=sale_area_name, isNullable=false}, quantity_type=DatabendRawType{type=quantity_type, isNullable=false}, process_manager_weight=DatabendRawType{type=process_manager_weight, isNullable=false}, manual_pound_weight=DatabendRawType{type=manual_pound_weight, isNullable=false}, sku_code=DatabendRawType{type=sku_code, isNullable=false}, price_discount_pound=DatabendRawType{type=price_discount_pound, isNullable=false}, product_class_name=DatabendRawType{type=product_class_name, isNullable=false}, profitLoss_amount=DatabendRawType{type=profitLoss_amount, isNullable=false}, sale_return_manager_weight=DatabendRawType{type=sale_return_manager_weight, isNullable=false}, purchase_return_manager_weight=DatabendRawType{type=purchase_return_manager_weight, isNullable=false}, process_amount=DatabendRawType{type=process_amount, isNullable=false}, owner_code=DatabendRawType{type=owner_code, isNullable=false}, update_user_code=DatabendRawType{type=update_user_code, isNullable=false}, demand_amount=DatabendRawType{type=demand_amount, isNullable=false}, plan_amount=DatabendRawType{type=plan_amount, isNullable=false}, sys_id=DatabendRawType{type=sys_id, isNullable=false}, specification2=DatabendRawType{type=specification2, isNullable=false}, price_discount_manager=DatabendRawType{type=price_discount_manager, isNullable=false}, specification1=DatabendRawType{type=specification1, isNullable=false}, workgroup_id=DatabendRawType{type=workgroup_id, isNullable=false}, avble_manager_weight=DatabendRawType{type=avble_manager_weight, isNullable=false}, profitLoss_pound_weight=DatabendRawType{type=profitLoss_pound_weight, isNullable=false}, lading_pound_weight=DatabendRawType{type=lading_pound_weight, isNullable=false}, move_pound_weight=DatabendRawType{type=move_pound_weight, isNullable=false}, product_class_id=DatabendRawType{type=product_class_id, isNullable=false}, average_manager_cost=DatabendRawType{type=average_manager_cost, isNullable=false}, purchase_return_amount=DatabendRawType{type=purchase_return_amount, isNullable=false}, length=DatabendRawType{type=length, isNullable=false}, prod_area_name=DatabendRawType{type=prod_area_name, isNullable=false}, specification=DatabendRawType{type=specification, isNullable=false}, profitLoss_manager_weight=DatabendRawType{type=profitLoss_manager_weight, isNullable=false}, stock_pound_weight=DatabendRawType{type=stock_pound_weight, isNullable=false}, stock_manager_weight=DatabendRawType{type=stock_manager_weight, isNullable=false}, manual_amount=DatabendRawType{type=manual_amount, isNullable=false}, platform_id=DatabendRawType{type=platform_id, isNullable=false}, lock_amount=DatabendRawType{type=lock_amount, isNullable=false}, stock_room_type=DatabendRawType{type=stock_room_type, isNullable=false}} \nPK:{}","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:35:11.961+08:00","sequence":133,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","level":"WARN","message":"Snapshot was interrupted before completion","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989}
{"timestamp":"2023-10-18T15:35:11.965+08:00","sequence":134,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.pipeline.ChangeEventSourceCoordinator","level":"WARN","message":"Change event source executor was interrupted","threadName":"debezium-mysqlconnector-from_mysql-change-event-source-coordinator","threadId":26,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"snapshot"},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":148989,"exception":{"refId":1,"exceptionType":"java.lang.InterruptedException","message":"Interrupted while snapshotting table scp_sale.t_xs_stock","frames":[{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEventsForTable","line":383},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"createDataEvents","line":316},{"class":"io.debezium.relational.RelationalSnapshotChangeEventSource","method":"doExecute","line":132},{"class":"io.debezium.pipeline.source.AbstractSnapshotChangeEventSource","method":"execute","line":76},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"doSnapshot","line":155},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"executeChangeEventSources","line":137},{"class":"io.debezium.pipeline.ChangeEventSourceCoordinator","method":"lambda$start$0","line":109},{"class":"java.util.concurrent.Executors$RunnableAdapter","method":"call","line":515},{"class":"java.util.concurrent.FutureTask","method":"run","line":264},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
@peleujs Thanks!
I copy my application.properties
and use there config to test it again:
debezium.sink.type=databend
debezium.sink.databend.upsert=true
debezium.sink.databend.upsert-keep-deletes=false
debezium.sink.databend.database.databaseName=debezium
debezium.sink.databend.database.url=jdbc:databend://localhost:8000
debezium.sink.databend.database.username=databend
debezium.sink.databend.database.password=databend
debezium.sink.databend.database.primaryKey=id
debezium.sink.databend.database.tableName=products
debezium.sink.databend.database.param.ssl=false
# additional databend parameters
# enable event schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json
# mysql source
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=60000
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
debezium.source.database.password=123456
debezium.source.database.dbname=mydb
debezium.source.database.server.name=from_mysql
debezium.source.include.schema.changes=false
debezium.source.table.include.list=mydb.products
debezium.source.max.batch.size=200000
debezium.source.max.queue.size=800000
# debezium.source.database.ssl.mode=required
# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat
# do event flattening. unwrap message!
# https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-drop-tombstones
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
#debezium.transforms.unwrap.add.fields=op,table,source.ts_ms,db
# soft delete
#debezium.transforms.unwrap.delete.handling.mode=rewrite
#debezium.transforms.unwrap.drop.tombstones=false
# hard delete
debezium.transforms.unwrap.delete.handling.mode=none
debezium.transforms.unwrap.drop.tombstones=false
# ############ SET LOG LEVELS ############
quarkus.log.console.json=true
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.file.path=./logs/debezium.log
quarkus.log.file.rotation.max-file-size=5M
quarkus.log.file.rotation.file-suffix=.yyyy-MM-dd.gz
quarkus.log.file.rotation.max-backup-index=3
quarkus.log.level=INFO
quarkus.log.file.enable=true
@hantmac I used your conf and just change the source mysql info, the logs seems have something different, it is too long, I save it to gist:
https://gist.github.com/peleusj/addb82f0d9496e0888df95ec3ef1bd73
Great! I found some very useful error message:
"message":"java.sql.SQLException: Query failed (#a722e8a1-a14a-4b73-b1b2-f888d2e44191): storage doesn't support presign operation"
This error was caused by your minio does not support presign. You must config your minio to support it(https://min.io/docs/minio/linux/integrations/presigned-put-upload-via-browser.html). Databend use this fucntion to write data into object storage.
I believe that it will success after handling this problem.
BTW, change this config to your own tables: debezium.source.table.include.list=mydb.products
.
@hantmac Glad to have progrss!
I have changed debezium.source.table.include.list
in last test.
I have a little confusion, does this mean I have to start a minio container to storage data, and configure the presign of minio, which in last test I just run a databend docker container which is default config with fs backend. My little confusion is why minio is needed here.
Any way, I will try to start a minio docker container and test it again.
@hantmac Glad to have progrss!
I have changed
debezium.source.table.include.list
in last test.I have a little confusion, does this mean I have to start a minio container to storage data, and configure the presign of minio, which in last test I just run a databend docker container which is default config with fs backend. My little confusion is why minio is needed here.
Any way, I will try to start a minio docker container and test it again.
@peleusj
Databend use presign
to write data into object storage. The fs
storage is just for testing and can't be used in production.
You can see this doc about presign
https://databend.rs/doc/contributing/rfcs/presign.
Just use this config in databend-query.toml:
[storage]
type = "s3"
[storage.s3]
bucket = "databend"
endpoint_url = "http://0.0.0.0:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
@hantmac I try to use your databend-query.toml
, but get some errors, I guess it need to set regions, but I checked some docs, could not find how to configure it:
Databend Query start failure, cause: StorageOther. Code: 4000, Text = other error (ConfigInvalid (permanent) at Builder::build, context: { service: s3 } => region is missing. Please find it by S3::detect_region() or set them in env.).
@hantmac I try to use your
databend-query.toml
, but get some errors, I guess it need to set regions, but I checked some docs, could not find how to configure it:Databend Query start failure, cause: StorageOther. Code: 4000, Text = other error (ConfigInvalid (permanent) at Builder::build, context: { service: s3 } => region is missing. Please find it by S3::detect_region() or set them in env.).
Are u uring aws s3 storage? You can find more error message in databend-query
.
@hantmac I am not using aws s3, I am just using a docker-compose.yml
and databend-query.toml
with your suggest configs:
services:
databend:
image: datafuselabs/databend
volumes:
- /home/albert/test/databend/conf/databend-query.toml:/etc/databend/query.toml
environment:
QUERY_CONFIG_FILE: /etc/databend/query.toml
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: 'true'
ports:
- '8000:8000'
- '9000:9000'
- '3307:3307'
- '8124:8124'
[query]
max_active_sessions = 256
wait_timeout_mills = 5000
flight_api_address = "0.0.0.0:9090"
admin_api_address = "0.0.0.0:8080"
metric_api_address = "0.0.0.0:7070"
mysql_handler_host = "0.0.0.0"
mysql_handler_port = 3307
clickhouse_http_handler_host = "0.0.0.0"
clickhouse_http_handler_port = 8124
http_handler_host = "0.0.0.0"
http_handler_port = 8000
flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = 8900
tenant_id = "default"
cluster_id = "default"
[log]
[log.stderr]
level = "WARN"
format = "text"
[log.file]
level = "INFO"
dir = "/var/log/databend"
[meta]
endpoints = ["0.0.0.0:9191"]
username = "root"
password = "root"
client_timeout_in_second = 60
[storage]
type = "s3"
[storage.s3]
bucket = "databend"
endpoint_url = "http://0.0.0.0:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
And where should I look after for databend-query
you metioned?
@hantmac I try to use your
databend-query.toml
, but get some errors, I guess it need to set regions, but I checked some docs, could not find how to configure it:Databend Query start failure, cause: StorageOther. Code: 4000, Text = other error (ConfigInvalid (permanent) at Builder::build, context: { service: s3 } => region is missing. Please find it by S3::detect_region() or set them in env.).
Are u uring aws s3 storage? You can find more error message in
databend-query
.
@Xuanwo Hi, would you like to help figure out this problem? 😄
I guess it need to set regions, but I checked some docs, could not find how to configure it:
You can set region this way:
[storage.s3]
bucket = "databend"
endpoint_url = "http://0.0.0.0:9000"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
+ region = "abc"
For minio, you can use us-east-1
if no other region has been set for minio.
@Xuanwo I try to add the region
, get some new errors:
Databend Query start failure, cause: StorageUnavailable. Code: 3901, Text = current configured storage is not available: config: S3(StorageS3Config { endpoint_url: "http://0.0.0.0:9000", region: "us-east-1", bucket: "databend", root: "", disable_credential_loader: false, enable_virtual_host_style: false, role_arn: "", external_id: "", access_key_id: "******min", secret_access_key: "******min", security_token: "", master_key: "", allow_anonymous: false }), cause: Unexpected (temporary) at Pager::next, context: { url: http://0.0.0.0:9000/databend?delimiter=%2F&list-type=2, called: http_util::Client::send_async, service: s3, path: / } => send async request, source: error sending request for url (http://0.0.0.0:9000/databend?delimiter=%2F&list-type=2): error trying to connect: tcp connect error: Connection refused (os error 111).
Connection refused
This error typically suggests improper storage setup. For instance, port 9000 might not be open or listening on 0.0.0.0
. Please review your settings to ensure correct minio configuration. To verify, run curl http://0.0.0.0:9000
in the same databend container.
And given you are using a non-https endpoint like http://xxx
, you will need to add an extra param in databend query to allow it:
[storage]
type = "s3"
+ allow_insecure = true
@Xuanwo Sorry for the bothering.
My Docker knowledge is limited, and I'm unsure about how to run this test.
To verify, run curl http://0.0.0.0:9000 in the same databend container.
I can only test through browser with minio console port 127.0.0.1:9090
which is defined in my configuration.
I've attached my complete configuration.
Could you please help me check it to find out the wrong configuration?
Thanks a lot!
docker-compose.yaml
services:
minio:
image: quay.io/minio/minio
volumes:
- /home/albert/tmp/minio:/data
environment:
- "MINIO_ROOT_USER=test"
- "MINIO_ROOT_PASSWORD=test1234"
ports:
- "9000:9000"
- "9090:9090"
command: [ "server", "/data", "--console-address", ":9090" ]
databend:
image: datafuselabs/databend
volumes:
- /home/albert/test/databend/conf/databend-query.toml:/etc/databend/query.toml
environment:
QUERY_CONFIG_FILE: /etc/databend/query.toml
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
ports:
- '8000:8000'
- '3307:3307'
- '8124:8124'
databend-query.toml
# Usage:
# databend-query -c databend-query.toml
[query]
max_active_sessions = 256
wait_timeout_mills = 5000
# Internal flight rpc for cluster communication.
flight_api_address = "0.0.0.0:9091"
# Admin REST API.
admin_api_address = "0.0.0.0:8080"
# Metrics REST API.
metric_api_address = "0.0.0.0:7070"
# Query Handler: MySQL
mysql_handler_host = "0.0.0.0"
mysql_handler_port = 3307
# Query Handler: Clickhouse HTTP
clickhouse_http_handler_host = "0.0.0.0"
clickhouse_http_handler_port = 8124
# Query Handler: HTTP API
http_handler_host = "0.0.0.0"
http_handler_port = 8000
# Query Handler: Experimental Arrow Flight SQL API
flight_sql_handler_host = "0.0.0.0"
flight_sql_handler_port = 8900
tenant_id = "default"
cluster_id = "default"
table_engine_memory_enabled = true
# [[query.users]]
# name = "root"
# auth_type = "no_password"
# [[query.users]]
# name = "databend"
# auth_type = "double_sha1_password"
# # echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
# auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5"
# [[query.users]]
# name = "datafuselabs"
# auth_type = "sha256_password"
# # echo -n "datafuselabs" | sha256sum
# auth_string = "6db1a2f5da402b43c066fcadcbf78f04260b3236d9035e44dd463f21e29e6f3b"
[log]
[log.file]
level = "WARN"
format = "text"
dir = "/var/log/databend"
[meta]
# It is a list of `grpc_api_advertise_host:<grpc-api-port>` of databend-meta config
endpoints = ["0.0.0.0:9191"]
username = "root"
password = "root"
client_timeout_in_second = 60
auto_sync_interval = 60
# Storage config.
[storage]
# fs | s3 | azblob | gcs | oss | cos | hdfs | webhdfs
type = "s3"
allow_insecure = true
# Set a local folder to store your data.
# Comment out this block if you're NOT using local file system as storage.
# [storage.fs]
# data_path = "/var/lib/databend/data"
# To use an Amazon S3-like storage service, uncomment this block and set your values.
[storage.s3]
bucket = "databend"
endpoint_url = "http://127.0.0.1:9000"
access_key_id = "test"
secret_access_key = "test1234"
region = "us-east-1"
# enable_virtual_host_style = false
# To use Azure Blob Storage, uncomment this block and set your values.
# [storage.azblob]
# endpoint_url = "https://<your-storage-account-name>.blob.core.windows.net"
# container = "<your-azure-storage-container-name>"
# account_name = "<your-storage-account-name>"
# account_key = "<your-account-key>"
# To use Google Cloud Storage, uncomment this block and set your values.
# [storage.gcs]
# bucket = "<your-bucket-name>"
# endpoint_url = "<your-endpoint>"
# credential = "<your-credentials>"
# To use Alibaba Cloud OSS, uncomment this block and set your values.
# [storage.oss]
# bucket = "<your-bucket-name>"
# endpoint_url = "<your-endpoint>"
# access_key_id = "<your-key-id>"
# access_key_secret = "<your-account-key>"
# To use Tencent Cloud Object Storage, uncomment this block and set your values.
# [storage.cos]
# bucket = "<your-bucket-name>"
# endpoint_url = "<your-endpoint>"
# secret_id = "<your-secret-id>"
# secret_key = "<your-secret-key>"
# To use HDFS, uncomment this block and set your values.
# [storage.hdfs]
# name_node = "<your-name-node>"
# To use WebHDFS, uncomment this block and set your values.
# [storage.webhdfs]
# endpoint_url = "<your-endpoint>"
# Cache config.
[cache]
# Type of storage to keep the table data cache
#
# available options: [none|disk]
# default is "none", which disable table data cache
# use "disk" to enabled disk cache
data_cache_storage = "none"
[cache.disk]
# cache path
path = "/var/lib/databend/cache"
# max bytes of cached data 20G
max_bytes = 21474836480
@Xuanwo I successfully deployed a standalone databend in my latop, follow these articles:
https://databend.rs/doc/deploy/deploying-databend
https://www.bilibili.com/read/cv25407731/
and it does not need to set the region
in databend-query.toml
as deploy by Docker.
@hantmac I finally succfully start the run.sh
, and get part of my source data to databend target table, but not all of the data, it is terrupted:
{"timestamp":"2023-10-20T12:35:07.165+08:00","sequence":1090,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-10-thread-1","threadId":31,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.183+08:00","sequence":1091,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.184+08:00","sequence":1092,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"INFO","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Java heap space', error = '{}'","threadName":"pool-7-thread-1","threadId":18,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324,"exception":{"refId":1,"exceptionType":"java.lang.OutOfMemoryError","message":"Java heap space","frames":[{"class":"java.util.LinkedHashMap","method":"newNode","line":256},{"class":"java.util.HashMap","method":"putVal","line":627},{"class":"java.util.HashMap","method":"put","line":608},{"class":"com.fasterxml.jackson.databind.node.ObjectNode","method":"replace","line":585},{"class":"com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer","method":"_deserializeContainerNoRecursion","line":594},{"class":"com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer","method":"deserialize","line":98},{"class":"com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer","method":"deserialize","line":23},{"class":"com.fasterxml.jackson.databind.deser.DefaultDeserializationContext","method":"readRootValue","line":323},{"class":"com.fasterxml.jackson.databind.ObjectMapper","method":"_readTreeAndClose","line":4772},{"class":"com.fasterxml.jackson.databind.ObjectMapper","method":"readTree","line":3138},{"class":"io.debezium.server.databend.DatabendChangeConsumer","method":"lambda$handleBatch$0","line":200},{"class":"io.debezium.server.databend.DatabendChangeConsumer$$Lambda$802/0x000000084061f840","method":"apply"},{"class":"java.util.stream.ReferencePipeline$3$1","method":"accept","line":195},{"class":"java.util.ArrayList$ArrayListSpliterator","method":"forEachRemaining","line":1655},{"class":"java.util.stream.AbstractPipeline","method":"copyInto","line":484},{"class":"java.util.stream.AbstractPipeline","method":"wrapAndCopyInto","line":474},{"class":"java.util.stream.ReduceOps$ReduceOp","method":"evaluateSequential","line":913},{"class":"java.util.stream.AbstractPipeline","method":"evaluate","line":234},{"class":"java.util.stream.ReferencePipeline","method":"collect","line":578},{"class":"io.debezium.server.databend.DatabendChangeConsumer","method":"handleBatch","line":207},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":83},{"class":"io.debezium.embedded.ConvertingEngineBuilder$$Lambda$375/0x0000000840321c40","method":"handleBatch"},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":822},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":192},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":152},{"class":"io.debezium.server.DebeziumServer$$Lambda$415/0x0000000840371440","method":"run"},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
{"timestamp":"2023-10-20T12:35:07.212+08:00","sequence":1093,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.212+08:00","sequence":1094,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the embedded engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
{"timestamp":"2023-10-20T12:35:07.255+08:00","sequence":1095,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-databend-dist stopped in 0.068s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"thinkpad-x250","processName":"io.debezium.server.Main","processId":26324}
does this mean the data transfer is working, but my old laptop Thinkpad X250 8G memory is not enough to complete the task?And what is the cause of former Change event source executor was interrupted
error? Is that caused by some setting in my initial application.properties
setup?
Lastly, a big thank your again for all your patience and time!
'Stopping connector after error in the application's handler method: Java heap space
@peleusj Congratulation!
- Yes, because of this error
'Stopping connector after error in the application's handler method: Java heap space
, it means you have no enough memory for the task. Change event source executor was interrupted
was caused by error mysql debezium settings.- Open issue any time if you have any questions!
Lastly, thanks @Xuanwo very much!
So I closed this issue since solved. Open the issue any time if you have any questions.
@hantmac Sorry for bothering again.
Today, I rerun previous test on a machine with more memory. All the configurations were the same as the previous test.
It only synchronized approximately 25,000 records before it was interrupted. The source was around 60,000 records.
Also, I noticed some issues this time that I didn't catch last time. Certain columns in MySQL didn't synchronize correctly in terms of their data type, such as columns with data type of decimal
and datetime
.
Below is the log information:
{"timestamp":"2023-10-25T15:22:48.426+08:00","sequence":1102,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.connector.mysql.MySqlStreamingChangeEventSource","level":"INFO","message":"Stopped reading binlog after 0 events, last recorded offset: {transaction_id=null, ts_sec=1698218534, file=mysql-bin.011044, pos=212595309, gtids=c2932ac0-66e5-11ed-abba-fa163ed055f0:1-1295929881, server_id=1695037098, event=1}","threadName":"blc-172.16.251.20:3306","threadId":55,"mdc":{"dbz.taskId":"0","dbz.connectorName":"from_mysql","dbz.connectorType":"MySQL","dbz.connectorContext":"binlog"},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.434+08:00","sequence":1103,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.jdbc.JdbcConnection","level":"INFO","message":"Connection gracefully closed","threadName":"pool-10-thread-1","threadId":58,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.451+08:00","sequence":1104,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"org.apache.kafka.connect.storage.FileOffsetBackingStore","level":"INFO","message":"Stopped FileOffsetBackingStore","threadName":"pool-7-thread-1","threadId":27,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.452+08:00","sequence":1105,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.ConnectorLifecycle","level":"INFO","message":"Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.sql.SQLException: failed to upload input stream', error = '{}'","threadName":"pool-7-thread-1","threadId":27,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392,"exception":{"refId":1,"exceptionType":"java.lang.RuntimeException","message":"java.sql.SQLException: failed to upload input stream","frames":[{"class":"io.debezium.server.databend.tablewriter.UpsertTableWriter","method":"deleteUpsert","line":80},{"class":"io.debezium.server.databend.tablewriter.UpsertTableWriter","method":"addToTable","line":46},{"class":"io.debezium.server.databend.DatabendChangeConsumer","method":"handleBatch","line":212},{"class":"io.debezium.embedded.ConvertingEngineBuilder","method":"lambda$notifying$2","line":83},{"class":"io.debezium.embedded.EmbeddedEngine","method":"run","line":822},{"class":"io.debezium.embedded.ConvertingEngineBuilder$2","method":"run","line":192},{"class":"io.debezium.server.DebeziumServer","method":"lambda$start$1","line":152},{"class":"java.util.concurrent.ThreadPoolExecutor","method":"runWorker","line":1128},{"class":"java.util.concurrent.ThreadPoolExecutor$Worker","method":"run","line":628},{"class":"java.lang.Thread","method":"run","line":829}]}}
{"timestamp":"2023-10-25T15:22:48.485+08:00","sequence":1106,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.server.DebeziumServer","level":"INFO","message":"Received request to stop the engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.485+08:00","sequence":1107,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.embedded.EmbeddedEngine","level":"INFO","message":"Stopping the embedded engine","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}
{"timestamp":"2023-10-25T15:22:48.525+08:00","sequence":1108,"loggerClassName":"org.jboss.logging.Logger","loggerName":"io.quarkus","level":"INFO","message":"debezium-server-databend-dist stopped in 0.071s","threadName":"main","threadId":1,"mdc":{},"ndc":"","hostName":"localhost.localdomain","processName":"io.debezium.server.Main","processId":27392}