camunda/camunda

Kafka cannot be loaded through our ExporterJarClassLoader

npepinpe opened this issue · 1 comments

Describe the bug

If loading an exporter with Kafka as a dependency through an isolated ExporterJarClassLoader, Kafka will produce ClassNotFoundError. The class, however, is definitely found by our class loader - it is simply unavailable to Kafka as it tries to load it through the Thread#getContextClassLoader(), which is (if unset) the class loader of the thread's creator - in our case, the system class loader.

To Reproduce

I used one of our user's exporter - https://github.com/openMF/ph-ee-exporter - and could reproduce the problem by loading an exporter and pointing the JAR path to the fat JAR it produces. I also verified the issue is indeed the Thread#getContextClassLoader by wrapping all of the accesses to an exporter instance's methods (e.g. #configure, #open, #close, etc.) in a try/finally block where I override the thread context class loader, and the problem went away.

Expected behavior

I expect that we support exporters providing their own dependency versions in an isolated class loader to avoid dependency collisions, and that we support most common access cases (e.g. getClass().getClassLoader(), Thread.currentThread().getContextClassLoader(), etc.).

Log/Stacktrace
If possible add the full stacktrace or Zeebe log which contains the issue.

Full Stacktrace

19:30:22.391 [Broker-0-Exporter-1] [Broker-0-zb-fs-workers-0] ERROR io.zeebe.util.actor - Uncaught exception in 'Broker-0-Exporter-1' in phase 'STARTED'. Continuing with next job.
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
	at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[?:?]
	at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[?:?]
	at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[?:?]
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[?:?]
	at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[?:?]
	at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:409) ~[?:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[?:?]
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:270) ~[?:?]
	at hu.dpc.rt.kafkastreamer.exporter.KafkaExporterClient.<init>(KafkaExporterClient.java:57) ~[?:?]
	at hu.dpc.rt.kafkastreamer.exporter.KafkaExporter.createClient(KafkaExporter.java:77) ~[?:?]
	at hu.dpc.rt.kafkastreamer.exporter.KafkaExporter.open(KafkaExporter.java:43) ~[?:?]
	at io.zeebe.broker.exporter.stream.ExporterDirector.onSnapshotRecovered(ExporterDirector.java:225) ~[classes/:?]
	at io.zeebe.broker.exporter.stream.ExporterDirector.onActorStarted(ExporterDirector.java:140) ~[classes/:?]
	at io.zeebe.util.sched.ActorJob.invoke(ActorJob.java:73) ~[classes/:?]
	at io.zeebe.util.sched.ActorJob.execute(ActorJob.java:39) [classes/:?]
	at io.zeebe.util.sched.ActorTask.execute(ActorTask.java:115) [classes/:?]
	at io.zeebe.util.sched.ActorThread.executeCurrentTask(ActorThread.java:107) [classes/:?]
	at io.zeebe.util.sched.ActorThread.doWork(ActorThread.java:91) [classes/:?]
	at io.zeebe.util.sched.ActorThread.run(ActorThread.java:195) [classes/:?]
19:30:22.591 [GatewayTopologyManager] [Broker-0-zb-actors-0] DEBUG io.zeebe.gateway - Received metadata change from Broker 0, partitions {1=LEADER} and terms {1=12}.

I had some free time today to look at it, the solution is pretty straight forward, just some changes in the ExporterDirector, but testing it is...interesting 😅

There I will need some brainstorming help on how to write a good test.