long2ice/synch

使用kafka方式增量订阅报错

Closed this issue · 14 comments

(sync2) [root@hadoop1 sync2]# synch -c synch.yaml --alias mysql_db produce
Traceback (most recent call last):
File "/data/sync2/sync2/bin/synch", line 8, in
sys.exit(cli())
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 829, in call
return self.main(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/cli.py", line 91, in produce
broker = get_broker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/factory.py", line 87, in get_broker
b = KafkaBroker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/broker/kafka.py", line 27, in init
key_serializer=lambda x: x.encode(),
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/producer/kafka.py", line 383, in init
**self.config)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 244, in init
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
(sync2) [root@hadoop1 sync2]#

kafka 未启动

kafka配置的地址是10.10.10.25,服务也启动了,感觉连接那块有问题,换个不存在的ip地址也报这错误。

python3.7.7是太高了吗,这样安装的pip3 install git+https://github.com/long2ice/synch.git**

kafka:
servers:
- kafka 10.10.10.25:9092
topic_prefix: synch
(sync2) [root@hadoop1 sync2]# synch -c synch.yaml --alias mysql_db produce
Traceback (most recent call last):
File "/data/sync2/sync2/bin/synch", line 8, in
sys.exit(cli())
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 829, in call
return self.main(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), args, kwargs)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/cli.py", line 91, in produce
broker = get_broker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/factory.py", line 87, in get_broker
b = KafkaBroker(alias)
File "/data/sync2/sync2/lib/python3.7/site-packages/synch/broker/kafka.py", line 27, in init
key_serializer=lambda x: x.encode(),
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/producer/kafka.py", line 383, in init
self.config)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 244, in init
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/data/sync2/sync2/lib/python3.7/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
(sync2) [root@hadoop1 sync2]# netstat -ntpl
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:25 0.0.0.0:
LISTEN 1051/master
tcp 0 0 0.0.0.0:6379 0.0.0.0:
LISTEN 19526/redis-server
tcp 0 0 0.0.0.0:22 0.0.0.0:
LISTEN 867/sshd
tcp6 0 0 ::1:25 :::
LISTEN 1051/master
tcp6 0 0 10.10.10.25:9092 :::
LISTEN 4924/java

试试docker的方式

docker方式也报这个错误

engine = ReplacingMergeTree partition by toYYYYMM(created_at) order by id;
Traceback (most recent call last):
File "/usr/local/bin/synch", line 5, in
cli()
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 829, in call
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.9/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/synch/synch/cli.py", line 91, in produce
broker = get_broker(alias)
File "/synch/synch/factory.py", line 87, in get_broker
b = KafkaBroker(alias)
File "/synch/synch/broker/kafka.py", line 24, in init
self.producer = KafkaProducer(
File "/usr/local/lib/python3.9/site-packages/kafka/producer/kafka.py", line 381, in init
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
File "/usr/local/lib/python3.9/site-packages/kafka/client_async.py", line 244, in init
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.9/site-packages/kafka/client_async.py", line 927, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
(sync2) [root@hadoop1 synch]# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b759e9feac84 long2ice/synch "synch --alias mysql…" 7 minutes ago Restarting (1) 5 seconds ago synch_consumer.test_1
a977aff72385 long2ice/synch "synch --alias mysql…" 7 minutes ago Restarting (1) 10 seconds ago synch_producer_1
87a07211096a bitnami/zookeeper:3 "/opt/bitnami/script…" 8 minutes ago Up 7 minutes 2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp synch_zookeeper_1
82fecd31a59f redis:latest "docker-entrypoint.s…" 8 minutes ago Up 7 minutes 6379/tcp synch_redis_1
5b586c4830cc hlebalbau/kafka-manager "/cmak/bin/cmak -Dpi…" 8 minutes ago Up 7 minutes 0.0.0.0:9000->9000/tcp synch_kafka-manager_1
(sync2) [root@hadoop1 synch]#

看看kafka是否正常启动

端口是通的,这个错误跟手动部署遇到的是一个错误,kafka.errors.NoBrokersAvailable: NoBrokersAvailable

root@8bdb0112c28a:/synch# ping kafka
PING kafka (172.17.0.5) 56(84) bytes of data.
64 bytes from synch_kafka_1.synch_default (172.17.0.5): icmp_seq=1 ttl=64 time=0.114 ms
64 bytes from synch_kafka_1.synch_default (172.17.0.5): icmp_seq=2 ttl=64 time=0.125 ms
^C
--- kafka ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1000ms
rtt min/avg/max/mdev = 0.114/0.119/0.125/0.012 ms
root@8bdb0112c28a:/synch# telnet kafka 9092
Trying 172.17.0.5...
Connected to kafka.
Escape character is '^]'.

synch/broker/kafka.py文件中:
self.producer = KafkaProducer(
bootstrap_servers=self.servers, # 生产者修改为kafka的ip和port就可以了

self.consumer = KafkaConsumer(
bootstrap_servers=self.servers, # 消费者也需要修改

就是从配置文件读取的

了解了,配置文件我写错了。
另外,咨询下支持MySQL分库分表到Clickhouse为一个表的同步吗

暂不支持

订阅的特别慢,有啥优化方式吗,每秒处理21个event,追了一晚上才同步2天的binlog,负载也不高
2020-11-19 10:08:14 - synch.reader:83 - INFO - success send 21 events

看看配置文件

同步过程中把表清空,后续的同步会出现数据重复,配置文件没有看到相关配置呢
mysql-1 :) select order_id,count() from order group by order_id having count()>1;

SELECT
order_id,
count()
FROM order
GROUP BY order_id
HAVING count(
) > 1

┌─order_id─────────┬─count()─┐
│ 306895103433 │ 2 │
│ 307093771120 │ 2 │
│ 306895101287 │ 2 │
└──────────────────┴─────────┘

重复数据是可能会出现的,可以尝试换其他clickhouse表引擎