mguenther/kafka-junit

question regarding multi thread on update from kafka 2.8 to 3.0

Closed this issue · 15 comments

I'm updating my connectors from kafka version to 2.8 to 3.0 and I'm having an error, that is likely not related to the Kafka Junit but I'm wondering if there is any configuration I'm missing.
I get this error when my tasks are starting:

[KafkaBasedLog Work Thread - offset-topic] ERROR org.apache.kafka.connect.util.KafkaBasedLog - Unexpected exception in Thread[KafkaBasedLog Work Thread - offset-topic,5,main]
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2464)
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2448)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1218)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:311)
	at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:75)
	at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:425)

I'm asking because I also noticed that from release 2.8 to 3.0 I needed at least to configure the log compression because I had an error saying to not use "delete" log compression for connectors.

Caused by: org.apache.kafka.common.config.ConfigException: Topic 'offset-topic' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'.

So now I have something like this:

    this.kafkaCluster = provisionWith(
      newClusterConfig().configure(brokers().withNumberOfBrokers(1)
        .with(KafkaConfig$.MODULE$.LogCleanupPolicyProp(), "compact")).configure(
        kafkaConnect().deployConnector(connectorConfig)
          .with("config.storage.topic", "config-topic")
          .with("offset.storage.topic", "offset-topic")
          .with("status.storage.topic", "status-topic")
          .with("config.storage.replication.factor", "1")
          .with("offset.storage.replication.factor", "1")
          .with("status.storage.replication.factor", "1")
          .with("consumer.override.auto.offset.reset", "latest")));
  }

This line was what I needed to add:
.with(KafkaConfig$.MODULE$.LogCleanupPolicyProp(), "delete")).

I am wondering if there is some new parameter also for this concurrent problem

Any experience or issue with version 3.0?

this are the default configurations I normally have for my connector where I basically always for the tasks to 1 so I don't get the multi thread:

  public static Properties defaultSourceConnectorConfigs(String connectorCls,
    String valueConverter) {
    Properties connectorProps = new Properties();
    connectorProps.put("name", "test-connector");
    connectorProps.put("tasks.max", "1");
    connectorProps.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");

    if (connectorCls != null) {
      connectorProps.put("connector.class", connectorCls);
    }
    if (valueConverter != null) {
      connectorProps.put("value.converter", valueConverter);
    }

    return connectorProps;
  }

Also I'm in a producer of data so also that is not clear to me why it talks about consumer, I wonder if has something to do with offsets or kafka connect configurations

I should clarify that the reason why I ask here is that all my other tests seem to be fine, is just the integration testing where I use this tool to emulate kafka that is failing so I assume it's some new configurations in kafka I might need to "tune"

Sorry for the late response - but with holidays and all, some issues might linger longer than I'd like them to.

Did you figure this one out already?

I didn't experience anything like you've described and I'm in close contact with companies that already deploy Kafka >= 3.0 and use the lib for their tests. If you choose to manually create state and offset management topics for your connect worker, always be sure to set their log cleanup policy to compact. This is due to prevent data loss, causing your connect worker to lose potentially lose information about its latest "restart point".

I'm unaware if this was only a recommendation prior to 3.0 and thus, not enforced when using these topics with the default log.cleanup.policy (which is delete).

We also encountered this issue and I haven't found a solution myself yet. Thank you so much @cstmgl for the workaround.
@mguenther Are there any plans on fixing this?

Also this problem only appears if I start a second EmbeddedConnect in my tests (not concurrently but two tests using it and everything was shutdown properly in between)

I'm willing to take a peek at this if any of you can produce an integration test that exhibits this exact behavior. The configuration part is certainly not an issue with the testing library. If this fails for common cases though, we could argue that Kafka for JUnit should mitigate this in the first place by setting a sensible default (that can be overridden).

For the concurrency part: Kafka for JUnit has to somewhat rely on things playing nicely with each other at the level of Kafka components. If there is no proper isolation, components that run within a single JVM will exhibit occasional errors. Kafka for JUnit can't introduce consistency boundaries within components not under its control.

@mguenther Here we have a test which does not run without the workaround https://github.com/bakdata/kafka-connect-resetter/blob/main/src/test/java/com/bakdata/kafka/KafkaConnectorSinkResetterApplicationTest.java#L84. It works fine with Kafka 2.x. The problem only occurs if multiple tests using embedded connect are running after each other

Did you try using separate offset topics for your tests?

Did you try using separate offset topics for your tests?

No. I guess it will also work but I think there is still an underlying problem. The offset topic is compacted at first but after shutting down Kafka and starting again, the offset topic still lives a as a zombie with delete as the cleanup policy

Okay, I have a better understanding now what's going on here. I'm also able to trigger this behavior for your test if I run it in isolation. As of 3.0.0, the following classes that are required by the Connect runtime expose a new constructor that consumes a Supplier<TopicAdmin>.

  • KafkaOffsetBackingStore
  • KafkaStatusBackingStore
  • KafkaConfigBackingStore

Kafka Connect uses TopicAdmin to fetch information on the topics that are used for these stores. Of course, if you don't pass on a Supplier<TopicAdmin> the call to TopicAdmin.verifyTopicCleanupPolicyOnlyCompact won't happen right away. I suppose that is why the first test that you are running succeeds, as this is only a safe-guard that you don't lose that kind of state information in a production-grade deployment. I'm guessing that the Connect runtime leaves some traces after test execution (it is supposed to properly clean up, I don't know what's the issue here). Anyhow, we've had a recent Hacktoberfest-related contribution (#77) which resolves the deprecation. I've packaged a snapshot released and ran your tests against it. Both Connect-based tests fail immediately.

I suppose that safe-guard has been introduced preliminary to the release of Kafka 3.3.0 (which introduces KIP-618: Exactly-once support for source connectors), as unsafe state handling for these topics would break the guarantees that Kafka claims with KIP-618. Again, this wasn't enforced - only recommended - prior to 3.0.0.

To remedy the need for a workaround we'll provide a simple fix for this. I'm thinking of something along the lines of: If you configure Kafka Connect through the EmbeddedKafkaClusterConfig, then log compaction will be enforced globally. I don't think that we need to address a mixed usage of both supported log cleanup policies as part of a single test instance or - for that matter - a single EmbeddedKafkaCluster deployment. You'll have to make sure though that you configure your Connect deployment along with EmbeddedKafkaClusterConfig. EmbeddedKafkaClusterConfig ensures consistency across the individual parts of the provided configuration. If you create an EmbeddedConnect instance apart from that, you'll have to continue to work with the workaround.

I've tested this with a simple fix and noticed that your tests exhibit other problems as well. Your test cases have the assumption that the cluster comes up with a cleanup slate, yet you re-use the existing configuration (incl. state directories, thus, also topics) across these tests. Once you hooked that up properly - and with the aforementioned fix in place - there shouldn't be a problem anymore.

See #87 for progress on the fix.

@philipp94831 : 3.2.2 is available @ Maven Central.

Thanks for the fix!

I've tested this with a simple fix and noticed that your tests exhibit other problems as well. Your test cases have the assumption that the cluster comes up with a cleanup slate, yet you re-use the existing configuration (incl. state directories, thus, also topics) across these tests. Once you hooked that up properly - and with the aforementioned fix in place - there shouldn't be a problem anymore.

I call stop() after every test. Does that not delete local storage, i.e., topics etc.?

Calling stop() on an EmbeddedKafka instance attempts to delete all local storage. Depending on your OS, this operation might fail and not clean up state properly, since the JVM process still holds a lock on the working directory of the temporary cluster.

The public API of Kafka expects the path to the working directory of the broker as a String. Kafka for JUnit creates a temporary directory and passes on its location as a String for that reason. At some point, this has to be converted to a java.nio.Path object of course. Unfortunately, we're not able to pass on a java.nio.Path object at this level ourselves, meaning that we have to have some existing local path which we pass on, so that Kafka is able construct the necessary file handles somewhat later. This, however, leads to the aforementioned problem that the cleanup as part of the same JVM process can fail.

In an ideal world we would simply create a virtual, in-mem directory abstraction masquerading as java.nio.Path and pass this to Kafka (using JimFS for example). Unfortunately, this is not possible with Apache Kafka.

Shouldn't stop release all locks before cleaning local storage? Maybe this is also a discussion for another issue.

Yes, you'd think so, but I wasn't able to get this working consistently.

This is something that bugs me to no end, to be honest. If you have a good idea, please go ahead and create an issue and we can discuss this further.

Since #87 fixes the addressed issue, I'm considering this issue closed.