消费kafka后,未提交偏移量
easonliu30624700 opened this issue · 3 comments
easonliu30624700 commented
当前发现,如果KafkaSource的并发度>(分区数/2)时,即有部分线程只消费一个kafka分区,会出现偏移量未自动提交到broker。
备注:未开启checkpoint,enable.auto.commit参数为true(没有被adjustAutoCommitConfig方法调整为false)
easonliu30624700 commented
解决了。AbstractKafkaSource将setStartFromLatest改成setStartFromGroupOffsets
easonliu30624700 commented
改成setStartFromGroupOffsets后,作业刚开始会提交偏移量。后面运行一段时间又不提交了。
指标dtTopicPartitionLag收集显示为0,即KafkaDeserializationMetricWrapper类里面通过反射获取当前的消费偏移量是正常没有延迟的。不知道是不是kafka的bug还是flink-connector-kafka问题
a49a commented
可能是Kafka版本没配置对,Kafka Connector有多个版本,要和Kafka集群的版本对齐。