ucarGroup/DataLink

worker运行bug以及kafka消息乱码

oozhen0000003 opened this issue · 0 comments

reader是mysql,write是kafka,运行中报如下错误以及消费kafka是乱码

1、运行时候报错,出现如下错误(已解决)
Caused by: java.lang.LinkageError: loader constraint violation: when resolving method "org.slf4j.impl.StaticLoggerBinder.getLoggerFactory()Lorg/slf4j/ILoggerFactory;" the class loader (instance of com/ucar/datalink/worker/core/util/classloader/RelPluginClassLoader) of the current class, org/slf4j/LoggerFactory, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/slf4j/impl/StaticLoggerBinder, have different Class objects for the type org/slf4j/ILoggerFactory used in the signature

解决办法:kafka的plugin的lib依赖去除sl4j依赖包即可

2、kafka消息为乱码数据,如下片段
{"databaseName":"test","dbTableRowCellVOList":[{"afterValue":"啊啊啊啊啊啊","beforeValue":"中文啊啊啊","columnName":"name"},{"afterValue":"1","columnName":"id"}],"eventType":"UPDATE","id":"1","tableName":"test1"} ��*����� �| 3����mysql-bin.000008���� �*�UTF-80�ڠ��-8�B J PVX����C��Z?UPDATE test1SETname='啊啊啊啊啊啊' WHERE (id='1')��� L����mysql-bin.000008����� �*�UTF-80�ڠ��-8�B�testJ�test1PYX�b� rowsCount��1������I��P b�� &� �������������id �( 0 B�1R bigint(20) /��� ��name ( 0 B�中文啊啊啊R varchar(100)�&� �������������id �( 0 B�1R bigint(20)�2��� ��name (�0 B�啊啊啊啊啊啊R varchar(100)�@ 1����mysql-bin.000008�Е�� �*�UTF-80�ڠ��-8�B J P���� ��8104191 ��*����� ��� 4����mysql-bin.000008����� �*�UTF-80�����-8�B J P��X����r��ZnINSERT INTO t_dl_task_delaytime (task_id,delay_time,create_time) VALUES (1,633, now())��� ^����mysql-bin.000008���� �*�UTF-80�����-8�B�datalinkJ�t_dl_task_delaytimeP?X�b� rowsCount��1������7��P b���/� �������������id �(�0 B�6R�BIGINT(20) unsigned�0�������������� delay_time (�0 B�633R BIGINT(20)�6���]��create_time (�0 B�2020-01-17 10:14:59R�DATETIME�+���������������task_id (�0 B�1R BIGINT(20)��� 4����mysql-bin.000008���� �*�UTF-80�����-8�B J P��X��������Z��INSERT INTO t_dl_task_statistic (task_id, records_per_minute, size_per_minute, write_time_per_record, exceptions_per_minute, read_write_count_per_minute, create_time) VALUES (1, 2, 398, 4.00, 0, 2, now())��� ^����mysql-bin.000008����� �*�UTF-80�����-8�B�datalinkJ�t_dl_task_statisticPX�b�
rowsCount��1������;��P b���1� �������������id �(�0 B�970R�BIGINT(20) unsigned�+���������������task_id (�0 B�1R
BIGINT(20)�6���������������records_per_minute (�0 B�2R
BIGINT(20)�5���������������size_per_minute (�0 B�398R
DECIMAL(20,2)�9���������������exceptions_per_minute (�0 B�0R
BIGINT(20)�6���]��create_time (�0 B�2020-01-17 10:14:59R�DATETIME�?���������������read_write_count_per_minute (�0 B�2R
BIGINT(20)���
4����mysql-bin.000008�ܜ�� ��UTF-80�����-8�B J P��X��������Z��INSERT INTO t_dl_worker_jvm_state
(worker_id, host, old_mem_used, old_mem_max, young_mem_max, young_mem_used, create_time, interval_old_collection_count, interval_young_collection_count, interval_old_collection_time, interval_young_collection_time, current_thread_count)
VALUE
(2, '10.104.17.192', 0, 4294967296, 1610612736, 611795072, now(), 0, 0, 0, 0, 50)���
a����mysql-bin.000008�Š�� �
�UTF-80�����-8�B�datalinkJ�t_dl_worker_jvm_stateP��X�b�
rowsCount��1������F��P b���1� �������������id �(�0 B�978R�BIGINT(20) unsigned�-�������������� worker_id (�0 B�2R
10.104.17.192R�VARCHAR(20)�0�������������� old_mem_used (�0 B�0R
BIGINT(20)�8���������������old_mem_max (�0 B
4294967296R
young_mem_max (�0 B
1610612736R
BIGINT(20)�:���������������young_mem_used (�0 B 611795072R
BIGINT(20)�7���]��create_time (�0 B�2020-01-17 10:14:59R TIMESTAMP�A���������������interval_old_collection_count (�0 B�0R
BIGINT(20)�C� �������������interval_young_collection_count (�0 B�0R
BIGINT(20)�@�
�������������interval_old_collection_time (�0 B�0R
BIGINT(20)�B���������������interval_young_collection_time (�0 B�0R
BIGINT(20)�9� �������������current_thread_count (�0 B�50R
BIGINT(20)���
4����mysql-bin.000008����� ��UTF-80�����-8�B J P��X��������Z��INSERT INTO t_dl_worker_system_state
(worker_id, host, load_average, user_cpu_utilization, sys_cpu_utilization, incoming_network_traffic, outgoing_network_traffic, tcp_current_estab, create_time)
VALUE
(2, '10.104.17.192', 0, 1.26, 0.57, 1313505, 1517376, 101, now())���
c����mysql-bin.000008����� �
�UTF-80�����-8�B�datalinkJ�t_dl_worker_system_statePyX�b�
rowsCount��1������G��P b���1� �������������id �(�0 B�978R�BIGINT(20) unsigned�-�������������� worker_id (�0 B�2R
DECIMAL(20,2)�B���������������incoming_network_traffic (�0 B�1313505R
BIGINT(20)�B���������������outgoing_network_traffic (�0 B�1517376R
BIGINT(20)�7���������������tcp_current_estab (�0 B�101R
BIGINT(20)�7� �]��create_time (�0 B�2020-01-17 10:14:59R TIMESTAMP`

如上,会包含正常的数据结果,但是有多于的信息打入kafka

worker的日志如下:
2020-01-17 10:03:24.146 [Task-1-Writer-writer-kafka-chunk-pool-1] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'value.serializer' was supplied but isn't a known config. 2020-01-17 10:03:24.146 [Task-1-Writer-writer-kafka-chunk-pool-1] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'acks' was supplied but isn't a known config. 2020-01-17 10:03:24.146 [Task-1-Writer-writer-kafka-chunk-pool-1] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'batch.size' was supplied but isn't a known config. 2020-01-17 10:03:24.146 [Task-1-Writer-writer-kafka-chunk-pool-1] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'key.serializer' was supplied but isn't a known config