Kafka Dead Letter Publishing

When consuming event streams in Apache Kafka, there are various ways of handling exceptions. This blog post will give a detailed example of publishing dead-letter records with Spring Kafka. Areas where we deviate from the defaults will be highlighted, along with the considerations, and tests are provided.

@KafkaListener application

To start we need a small Kafka Consumer application. Ours will process orders by logging them when received. As we are using Java 17, we can use a Java record for our @Payload.

@Component
class OrderListener {

  private static final Logger log = LoggerFactory.getLogger(OrderListener.class);

  @KafkaListener(topics = KafkaConfiguration.ORDERS)
  void listen(@Payload @Validated Order order) {
    log.info("Received: {}", order);
  }

}

record Order(
  @NotNull UUID orderId,
  @NotNull UUID articleId,
  @Positive int amount) {
}

Notice how we are using @KafkaListener @Payload validation by annotating our order payload as @Validated. We only need the following bit of configuration to enable validation.

@Configuration
@EnableKafka
public class KafkaConfiguration implements KafkaListenerConfigurer {

  @Autowired
  private LocalValidatorFactoryBean validator;

  @Override
  public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
    registrar.setValidator(this.validator);
  }

  ...

}

We will trigger an exception in our tests by sending in an order with a negative amount. You would then expect to see a ListenerExecutionFailedException with an underlying cause of MethodArgumentNotValidException.

Configuring topics

Next we need two topics; one regular topic for our incoming orders, and another outgoing dead letter topic (DLT) for any orders we fail to handle successfully. Both are created in the same way using @Bean NewTopic, which in turn uses a TopicBuilder.

public static final String ORDERS = "orders";

@Bean
public NewTopic ordersTopic() {
  return TopicBuilder.name(ORDERS)
    // Use more than one partition for frequently used input topic
    .partitions(6)
    .build();
}

@Bean
public NewTopic deadLetterTopic(AppKafkaProperties properties) {
  // https://docs.spring.io/spring-kafka/docs/2.8.2/reference/html/#configuring-topics
  return TopicBuilder.name(ORDERS + properties.deadletter().suffix())
    // Use only one partition for infrequently used Dead Letter Topic
    .partitions(1)
    // Use longer retention for Dead Letter Topic, allowing for more time to troubleshoot
    .config(TopicConfig.RETENTION_MS_CONFIG, "" + properties.deadletter().retention().toMillis())
    .build();
}

You will notice two differences between the topics:

  1. The dead letter topic only has one partition. The reference documentation says to by default to use as many partitions on a dead letter topic as are available on the original topic. That way the DLT record can arrive at the same partition as the original record. However, with a small change below you can use a single partition for the infrequently used DLT, as the original partition is also already available as a header. This way your brokers need not manage a lot of unused partitions.

  2. The dead letter topic has an explicitly set retention. This way we can keep DLT records around longer than the original topic records, for debugging and recovery later if needed.

Configuring dead letter publishing

Next we will configure our error handler, at first only looking at the ConsumerRecordRecoverer. We will use the default DeadLetterPublishingRecoverer, with a custom argument destinationResolver to route to the first and only partition of our dead letter topic.

@Bean
public DefaultErrorHandler defaultErrorHandler(
    KafkaOperations<Object, Object> operations,
    AppKafkaProperties properties) {

  // Publish to dead letter topic any messages dropped after retries with back off
  var recoverer = new DeadLetterPublishingRecoverer(operations,
    // Always send to first/only partition of DLT suffixed topic
    (cr, e) -> new TopicPartition(cr.topic() + properties.deadletter().suffix(), 0));

  // Spread out attempts over time, taking a little longer between each attempt
  // Set a max for retries below max.poll.interval.ms; default: 5m, as otherwise we trigger a consumer rebalance
  Backoff backoff = properties.backoff();
  var exponentialBackOff = new ExponentialBackOffWithMaxRetries(backoff.maxRetries());
  exponentialBackOff.setInitialInterval(backoff.initialInterval().toMillis());
  exponentialBackOff.setMultiplier(backoff.multiplier());
  exponentialBackOff.setMaxInterval(backoff.maxInterval().toMillis());

  // Do not try to recover from validation exceptions when validation of orders failed
  var errorHandler = new DefaultErrorHandler(recoverer, exponentialBackOff);
  errorHandler.addNotRetryableExceptions(javax.validation.ValidationException.class);
  return errorHandler;
}

We will want to give our service some time to try to recover before publishing records onto the dead letter topic. For that we use an ExponentialBackOffWithMaxRetries, which is a BackOff implementation that can increase the time to wait between a maximum number of retries before publishing a record to the DLT. That way if for instance the database is restarting, you won’t immediately get DLT records, provided the restart completes soon enough. In any case you will want to stay below your max.poll.interval.ms (default: 5 minutes) to prevent triggering a consumer rebalance.

Finally, we bring together the ConsumerRecordRecoverer and BackOff in a DefaultErrorHandler. Notice how we explicitly tell the errorHandler to not retry any ValidationException, as any validation issues are permanent. That also allows us to test rapidly, as we need not exhaust all our retries before getting a DLT record. And with that we conclude our main application code.

Testing dead letter publishing recovery

Our tests strive to get as close as possible to a real world scenario, while still being practical about our test setup. TestContainers for Kafka allows us to quickly spin up a Kafka container, which we wire up to our Spring Boot application using the @DynamicPropertySource annotation.

We use @Autowired to get a KafkaOperations instance with which to produce our input records. And we create a KafkaConsumer using KafkaTestUtils to read any produced dead letter topic records.

@SpringBootTest
@Testcontainers
class KafkaDeadLetterPublishingApplicationTests {

  private static final String ORDERS_DLT = "orders.DLT";

  private static final Logger log = LoggerFactory.getLogger(KafkaDeadLetterPublishingApplicationTests.class);

  @Container // https://www.testcontainers.org/modules/kafka/
  static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1"));

  @DynamicPropertySource
  static void setProperties(DynamicPropertyRegistry registry) {
    // Connect our Spring application to our Testcontainers Kafka instance
    registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
  }

  @Autowired
  private KafkaOperations<String, Order> operations;

  private static KafkaConsumer<String, String> kafkaConsumer;

  @BeforeAll
  static void setup() {
    // Create a test consumer that handles <String, String> records, listening to orders.DLT
    // https://docs.spring.io/spring-kafka/docs/2.8.2/reference/html/#testing
    var consumerProps = KafkaTestUtils.consumerProps(kafka.getBootstrapServers(), "test-consumer", "true");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaConsumer = new KafkaConsumer<>(consumerProps);
    kafkaConsumer.subscribe(List.of(ORDERS_DLT));
  }

  @AfterAll
  static void close() {
    // Close the consumer before shutting down Testcontainers Kafka instance
    kafkaConsumer.close();
  }

  ...

}

Now we will first want to make sure we can handle a valid order, without producing anything onto our dead letter topic. The following test produces a record onto the input topic, and asserts that over a set amount of time no dead letter topic records arrive.

@Test
void should_not_produce_onto_dlt_for_ok_message() throws Exception {
  // Send in valid order
  Order order = new Order(randomUUID(), randomUUID(), 1);
  operations.send("orders", order.orderId().toString(), order)
    .addCallback(
      success -> log.info("Success: {}", success),
      failure -> log.info("Failure: {}", failure));

  // Verify no message was produced onto Dead Letter Topic
  assertThrows(
    IllegalStateException.class,
    () -> KafkaTestUtils.getSingleRecord(kafkaConsumer, ORDERS_DLT, 5000),
    "No records found for topic");
}

Secondly we will want to make sure that any invalid orders are immediately produced onto our dead letter topic. The following test produces an order with a negative amount, which should trigger a ValidationException in our consumer. We assert that a record is produced onto our dead letter topic, and that the record has the expected header values and payload.

@Test
void should_produce_onto_dlt_for_bad_message() throws Exception {
  // Amount can not be negative, validation will fail
  Order order = new Order(randomUUID(), randomUUID(), -2);
  operations.send("orders", order.orderId().toString(), order)
    .addCallback(
      success -> log.info("Success: {}", success),
      failure -> log.info("Failure: {}", failure));

  // Verify message produced onto Dead Letter Topic
  ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(kafkaConsumer, ORDERS_DLT, 2000);

  // Verify headers present, and single header value
  Headers headers = record.headers();
  assertThat(headers).map(Header::key).containsAll(List.of(
    "kafka_dlt-exception-fqcn",
    "kafka_dlt-exception-cause-fqcn",
    "kafka_dlt-exception-message",
    "kafka_dlt-exception-stacktrace",
    "kafka_dlt-original-topic",
    "kafka_dlt-original-partition",
    "kafka_dlt-original-offset",
    "kafka_dlt-original-timestamp",
    "kafka_dlt-original-timestamp-type",
    "kafka_dlt-original-consumer-group"));
  assertThat(new String(headers.lastHeader("kafka_dlt-exception-fqcn").value()))
    .isEqualTo("org.springframework.kafka.listener.ListenerExecutionFailedException");
  assertThat(new String(headers.lastHeader("kafka_dlt-exception-cause-fqcn").value()))
    .isEqualTo("org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException");
  assertThat(new String(headers.lastHeader("kafka_dlt-exception-message").value()))
    .contains("Field error in object 'order' on field 'amount': rejected value [-2]");

  // Verify payload value matches sent in order
  assertThat(record.value()).isEqualToIgnoringWhitespace("""
    { "orderId": "%s", "articleId": "%s", "amount":-2 }""".formatted(order.orderId(), order.articleId()));
}

Conclusion

We have seen that it is fairly easy to add retries with back off, dead letter topic publishing and recovery to Spring Kafka. This allows you to inspect any failed records on a separate topic, with diagnostic details available in the headers and payload. Tools such as AKHQ can then be used to publish the dead letter topic records onto the input topic again after a fix has been applied.

Now of course as said in the outline, this is just one of various ways of handling exceptions. Notably this method provides no automated way of processing records published onto a dead letter topic. It is fine to use for infrequent dead letter topic publication, where fully automated recovery is not necessary. Also take into account that processing order guarantees are not maintained for subsequent records using the same key. You can look into retry topics and redirected events if you need more advanced ordered processing guarantees.

Developed using Spring Kafka version 2.8.2 and Confluent Platform for Apache Kafka version 7.0.1. The full application is available at GitHub.