gnu-gnu/spring-kafka-example

SeekToCurrentErrorHandler & RetryTemplate 관련 문의

eastglow opened this issue · 0 comments

안녕하세요. 지난주 카프카 사용자모임 밋업 발표 잘 들었습니다.

저도 사내에서 스프링부트와 카프카를 사용하게 되면서 최근 들어 계속 만져보고 있는 중인데요, 한가지 궁금한 점이 있어 깃헙까지(?) 찾아오게 되었습니다.

제목에도 적어두었듯이 저 2개의 쓰임새가 궁금한 부분인데 SeekToCurrentErrorHandler는 보통 컨슈머 쪽에서 ConcurrentKafkaListenerContainerFactory를 만들어줄 때 setErrorHandler를 통해 컨슘과정에서 에러가 발생할 경우, 이 핸들러로 공통 처리를 할 수 있는 것으로 이해하고 있습니다.

실제로 SeekToCurrentErrorHandler를 별도의 Bean으로 만들어서 FixedBackOff를 set하여 설정한 뒤, setErrorHandler에 담아주면 해당 핸들러 설정대로 에러가 났을 경우 다시 컨슘하여 재시도를 하게 되는 것을 확인하였습니다. (아래 코드와 같은 방식)

@Bean
public SeekToCurrentErrorHandler defaultConsumerErrorHandler() {

	SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((record, exception) -> {
		// TODO: 필요시 별도의 로직 추가 
	}, new FixedBackOff(3000L, 3L)) {
		@Override
		public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
			super.handle(thrownException, records, consumer, container);
		}
	};

	return handler;
}

예기치 못한 exception 발생시 재시도에 대한 부분을 찾아보다보니 위의 핸들러 말고도 RetryTemplate라는 것도 있더라고요. 그래서 이것 역시 별도의 Bean으로 만들어서 setRetryTemplate를 해보았는데 뭔가 생각처럼(?) 잘 되진 않더라고요.

여튼, 궁금한 부분을 정리해보자면...

  1. 컨슈머 쪽에서 예기치 못한 exception에 대비한 재시도를 위해서는 SeekToCurrentErrorHandler와 RetryTemplate 중 어떤 것을 어떻게 쓰는지 궁금합니다. (아니면 둘 다 사용해야하나요??...)
  2. 메시지를 생산하는 프로듀서 쪽에는 이와 같은 핸들러나 RetryTemplate에 대한 부분을 찾지 못했는데요, 그냥 send 메소드를 실행하는 부분을 RetryTemplate 객체의 execute로 감싸서 별도로 exception 발생시 재시도에 부분을 처리해줘야할까요?
    (실제로 공식 docs나 여러 예제코드, 근우님의 예제코드도 보았는데 메시지를 생산하는 KafkaTemplate 쪽에는 별도로 핸들러나 Retry 관련 설정은 없고, 컨슈머 쪽 리스너 컨테이너 팩토리 쪽에는 setErrorHandler와 같은 설정이 있더라고요.)
public DefaultProducer(KafkaTemplate<String, String> defaultKafkaTemplate, RetryTemplate retryTemplate) {
	this.defaultKafkaTemplate = defaultKafkaTemplate;
	this.retryTemplate = retryTemplate;
}

private final KafkaTemplate<String, String> defaultKafkaTemplate;
private final RetryTemplate retryTemplate;

public void produceMessageAsync(final String topic, final String key, final String msg) {
	try {
		retryTemplate.execute(
				new RetryCallback<Void, Exception>() {

					@Override
					public Void doWithRetry(RetryContext retryContext) throws Exception {
						ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, msg);

						ListenableFuture<SendResult<String, String>> future = defaultKafkaTemplate.send(record);
						future.addCallback(new ListenableFutureCallback<>() {

							@Override
							public void onFailure(Throwable throwable) {
								handleFailureAsync(topic, key, msg, throwable);
							}

							@Override
							public void onSuccess(SendResult<String, String> result) {
								handleSuccessAsync(result, msg);
							}
						});
						return null;
					}
				},
				// retry fail logging
				new RecoveryCallback<Void>() {
					@Override
					public Void recover(RetryContext context) throws Exception {
						log.error("### Kafka Produce Retry Error, topic; {}, key: {}, msg: {}", topic, StringUtils.defaultIfBlank(key, ""), msg, context.getLastThrowable());
						return null;
					}
				}
		);
	} catch (Exception e) {
		log.error("### Kafka Produce Error", e);
	}
}