Apache Kafka-消费重试 如何针对不同KafkaListener处理不同的ErrorHandler
Opened this issue · 0 comments
TerryGuan commented
@bean
@primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate template) {
logger.warn("kafkaErrorHandler begin to Handle");
// <1> 创建 DeadLetterPublishingRecoverer 对象
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
// <2> 创建 FixedBackOff 对象 设置重试间隔 10秒 次数为 3次
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
// <3> 创建 SeekToCurrentErrorHandler 对象
return new SeekToCurrentErrorHandler(recoverer, backOff);
}
这个是处理所有的KafkaListener抛出的异常的
@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)
public void onMessage(MessageMock messageMock){
logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
// 模拟抛出一次一行
throw new RuntimeException("MOCK Handle Exception Happened");
}
在这里,我想指定不同的ErrorHandler来处理重试?