sasou/syncClient

kafka.java bug?

mjjian0 opened this issue · 8 comments

当一个批次的数据其中一条或几条数据发送失败,导致ret=false,然后程序继续处理这个批次后面的数据,这个时候,如果后面的数据(数据A)发送到kafka成功,到这个批次处理结束后,ret仍然是false吧,然后就回滚了,回滚后,继续按之前的位点消费,是不是导致发到kafka的数据(数据A)重复了?

sasou commented

一个批次的数据,是顺序处理,如果其中一条处理失败,就会回滚,然后重复处理刚刚失败的那条数据,直到成功,才会处理后面的数据。

@sasou 举个例子:一个批次的数据:
1
2
3
4
5
假如在处理到3的时候,处理失败,ret=false了,然后循环继续往下走,到4,5的时候是处理成功的。然后这个批次循环结束syncEntry方法返回ret为false,然后开始回滚,那还是处理1,2,3,4,5数据。如果全部处理成功,这个时候,是不是1,2,4,5发送到kafka重复了啊?

sasou commented

假如在处理到3的时候,处理失败,ret=false了,然后就会阻塞,不会继续循环往下走;如果退出进程,再次进入获取批次数据,就是从失败的位置获取,比如:3、4、5、6、7,已经成功的不会回滚,也不会重复发送

@sasou 兄弟,我可能代码水平没达到高度,麻烦给个提示,是在哪里堵塞的。[捂脸],谢谢!谢谢!

sasou commented
		while (true) {
			Message message = connector.getWithoutAck(batchSize); // get batch num 这里是批量从canal取batchSize条数据放在内存,但当前方法每次调用只会把内存里面的数据返回一条处理
			long batchId = message.getId();
			int size = message.getEntries().size();
			if (!(batchId == -1 || size == 0)) {
				if (syncEntry(message.getEntries())) {
					connector.ack(batchId); // commit 这里确认当前处理成功的一条记录
				} else {
					connector.rollback(batchId); // rollback 这里回滚当前处理失败的一条记录,然后再次循环时:Message message = connector.getWithoutAck(batchSize);还是返回当前失败的这条记录,从而阻塞
				}
			}
		}

我的理解 syncEntry方法里是这样的两层循环;
@第一层循环是循环一个批次的数据比如说上面那个例子:1,2,3,4,5,也就是entrys里的数据。
for (Entry entry : entrys) {
for (RowData rowData : rowChage.getRowDatasList()) {
//如果 处理到 数据 3 失败 ret=false;
//数据3处理失败后,外层循环继续处理4,5成功,然后外层循环结束,这个时候syncEntry才开始返回 ret=false吧。
//然后回滚,继续处理刚才那个批次,然后1,2,4,5数据就发重复了
}
}

@sasou 是这样的吗?请赐教!谢谢!

sasou commented

不是的,是阻塞顺序执行的,失败就会阻塞,直到成功,不然,binlog是有严格执行顺序的,不顺序执行,数据就混乱了

@sasou 你好!能加下qq吗,951770717,谢谢,我是做大数据这块的。