mguenther/kafka-junit

lost messages after upgrade from 2.8 to 3.2

Closed this issue · 9 comments

I am trying to upgrade both my kafka client and the mguenther test library from 2.8 to 3.2, and I am getting lost messages causing my tests to fail at least 20% of the time (but not every time). By "lost" I mean that I wait 60 seconds and never see the message in my consumer. When it works it is almost instantaneous.

I am sending the message with KafkaProducer.send(ProducerRecord<K,V>, Callback) and logging from the callback, so I can see I get the callback showing success conditions (no exception), and no other errors or exceptions are logged.

I'm also using KafkaStreams, Avro SerDe, and rxjava3, so there is a lot going on here not shown, and it's not easy to provide a trivial code sample, but I wonder if there is a know issue, or any further ideas to troubleshoot.

What's the configuration of the Kafka clients that you use in your test case? (the complete configuration) Did you properly subscribe / hook up your reactive stream?

I'm just using the default config for the EmbeddedKafkaCluster

    @Before
    public void before() {
        kafka = EmbeddedKafkaCluster.provisionWith(EmbeddedKafkaClusterConfig.defaultClusterConfig());
        kafka.start();
    }

    @After
    public void after() {
        kafka.stop();
    }

I am then passing kafka.getBrokerList() to both my producer and consumer.

To answer "Did you properly subscribe / hook up your reactive stream?" I can say that the same code works with kafka-junit:2.8.0 and kafka 2.8.2. I ran it 100 times and never got a failure, and it's been working in production for months with no complaint, so I think it must be wired properly. It's only when I went to 3.2 that I saw this problem (3.0 and 3.1 appear to have the problem too). I can give you more code if you want.

I do have an extensive suite of regression tests which unfortunately don't exhibit these issues. I'd like to understand what I've been missing there in order to catch these regressions in the future (if there are any). I've compared the effective configuration for the ProducerConfig and the ConsumerConfig (used by Kafka for JUnit that assert that everything went ok) between versions 2.8 and 3.2 to check if there have been any changes to the default configuration in between the corresponding Kafka versions. There are indeed a couple of small differences, but I can't link any of them to the behavior that you're seeing.

This is a rather odd one.

I can't reproduce it with the tests that I have. Are you able to extract that much code from your test setup to contribute a test that exhibits this behavior?

I'll try to get around to that at some point, but I'm a bit overbooked right now and the kafka upgrade was not urgent so I rolled it back.

Sure. I edited the reply above. Not sure why I made the connection to Kafka Connect, I guess I'm a bit overbooked as well.

I'll shortly release patch version 3.2.1 of Kafka for JUnit which will upgrade Kafka dependencies from 3.2.0 to 3.2.3. Apparently, these newer releases incorporate some bugfixes for the Kafka client library. I'll file a release note once it's available. Would be great if you could run your test against that version and report back if it fixes the issue for you.

@dlipofsky Patch version 3.2.1 is available at Maven Central. Could I trouble you for a re-run of your failing test with that version? Thanks!

Unfortunately it's still failing with kafka-junit 3.2.1 and kafka 3.2.3. I'm seeing failures 10-20% of the time. (I also tested in combination with some other library versions, rxjava 3.0.4, 3.0.13, and 3.1.5, and confluent 6.2.7 and 7.2.2, but none of those made a difference)

That's unfortunate. I guess we won't make any progress on this without a test that reproduces the errors that you experience. I'll keep this issue open for the time being, but will label it accordingly. If you find the time at some point, please do provide a test that we can work with and we'll take another look at this.

Closed due to inactivity and a missing set of tests that reproduce the described behavior.