Allow customisation of the close() time in KafkaStreamsFactory
Closed this issue · 0 comments
Feature description
I wrote several @MicronautTest
s in a project that uses Kafka Streams through Micronaut Kafka. My tests were passing in Linux, but when I tried running them from Windows, I started noticing them failing like this:
io.micronaut.context.exceptions.BeanInstantiationException: Bean definition [org.apache.kafka.streams.KafkaStreams] could not be loaded: Error instantiating bean of type [org.apache.kafka.streams.KafkaStreams]
Message: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
To get them to run, I had to make sure that my tests either:
- Did not start Kafka Streams at all, by using
@MicronautTest(environments = "no_streams")
, where the environment hadkafka.streams.default.start-kafka-streams = false
, or - used an
@AfterAll
method to indefinitely wait for Kafka Streams to close, like this:
@Property(name = "spec.name", value="MicronautStreamsTest")
@TestInstance(Lifecycle.PER_CLASS)
@MicronautTest
public class MicronautStreamsTest {
// I would need a List<KafkaStreams> in the general case, but I only had one KStream @Singleton in my program
@Inject
KafkaStreams kStreams;
@AfterAll
public void cleanUp() {
kStreams.close();
}
/* ... actual tests ... */
}
I think this issue is due to the use of a fixed Duration to wait for the various KafkaStream
s in KafkaStreams (here). It may be that in my Linux system, 3 seconds leaves plenty of time to shut down Kafka Streams, but not on my Windows system. I'm referring to this code:
@Override
@PreDestroy
public void close() {
for (KafkaStreams stream : streams.keySet()) {
try {
stream.close(Duration.ofSeconds(3));
} catch (Exception e) {
// ignore
}
}
}
Would it make sense to introduce something like this?
@Value("${kafka.streams-close-seconds:3}")
private int closeWaitSeconds;
@Override
@PreDestroy
public void close() {
for (KafkaStreams stream : streams.keySet()) {
try {
stream.close(Duration.ofSeconds(closeWaitSeconds));
} catch (Exception e) {
// ignore
}
}
}
I thought about using kafka.streams.close-seconds
, but then it could conflict with the settings for a stream with that name.