yangshangwei/boot2

Apache Kafka-消费重试 如何针对不同KafkaListener处理不同的ErrorHandler

Opened this issue · 0 comments

@bean
@primary
public ErrorHandler kafkaErrorHandler(KafkaTemplate template) {

    logger.warn("kafkaErrorHandler begin to Handle");

    // <1> 创建 DeadLetterPublishingRecoverer 对象
    ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
    // <2> 创建 FixedBackOff 对象   设置重试间隔 10秒 次数为 3次
    BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
    // <3> 创建 SeekToCurrentErrorHandler 对象
    return new SeekToCurrentErrorHandler(recoverer, backOff);
}

这个是处理所有的KafkaListener抛出的异常的

@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)
public void onMessage(MessageMock messageMock){
logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);

    // 模拟抛出一次一行
    throw new RuntimeException("MOCK Handle Exception Happened");
}

在这里,我想指定不同的ErrorHandler来处理重试?