micronaut-projects/micronaut-kafka

Allow customisation of the close() time in KafkaStreamsFactory

Closed this issue · 0 comments

Feature description

I wrote several @MicronautTests in a project that uses Kafka Streams through Micronaut Kafka. My tests were passing in Linux, but when I tried running them from Windows, I started noticing them failing like this:

io.micronaut.context.exceptions.BeanInstantiationException: Bean definition [org.apache.kafka.streams.KafkaStreams] could not be loaded: Error instantiating bean of type  [org.apache.kafka.streams.KafkaStreams]

Message: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory

To get them to run, I had to make sure that my tests either:

  • Did not start Kafka Streams at all, by using @MicronautTest(environments = "no_streams"), where the environment had kafka.streams.default.start-kafka-streams = false, or
  • used an @AfterAll method to indefinitely wait for Kafka Streams to close, like this:
@Property(name = "spec.name", value="MicronautStreamsTest")
@TestInstance(Lifecycle.PER_CLASS)
@MicronautTest
public class MicronautStreamsTest {
  // I would need a List<KafkaStreams> in the general case, but I only had one KStream @Singleton in my program
  @Inject
  KafkaStreams kStreams;

  @AfterAll
  public void cleanUp() {
    kStreams.close();
  }

  /* ... actual tests ... */
}

I think this issue is due to the use of a fixed Duration to wait for the various KafkaStreams in KafkaStreams (here). It may be that in my Linux system, 3 seconds leaves plenty of time to shut down Kafka Streams, but not on my Windows system. I'm referring to this code:

    @Override
    @PreDestroy
    public void close() {
        for (KafkaStreams stream : streams.keySet()) {
            try {
                stream.close(Duration.ofSeconds(3));
            } catch (Exception e) {
                // ignore
            }
        }
    }

Would it make sense to introduce something like this?

    @Value("${kafka.streams-close-seconds:3}")
    private int closeWaitSeconds;

    @Override
    @PreDestroy
    public void close() {
        for (KafkaStreams stream : streams.keySet()) {
            try {
                stream.close(Duration.ofSeconds(closeWaitSeconds));
            } catch (Exception e) {
                // ignore
            }
        }
    }

I thought about using kafka.streams.close-seconds, but then it could conflict with the settings for a stream with that name.