Kafka cannot be loaded through our ExporterJarClassLoader
npepinpe opened this issue · 1 comments
Describe the bug
If loading an exporter with Kafka as a dependency through an isolated ExporterJarClassLoader
, Kafka will produce ClassNotFoundError
. The class, however, is definitely found by our class loader - it is simply unavailable to Kafka as it tries to load it through the Thread#getContextClassLoader()
, which is (if unset) the class loader of the thread's creator - in our case, the system class loader.
To Reproduce
I used one of our user's exporter - https://github.com/openMF/ph-ee-exporter - and could reproduce the problem by loading an exporter and pointing the JAR path to the fat JAR it produces. I also verified the issue is indeed the Thread#getContextClassLoader
by wrapping all of the accesses to an exporter instance's methods (e.g. #configure
, #open
, #close
, etc.) in a try/finally block where I override the thread context class loader, and the problem went away.
Expected behavior
I expect that we support exporters providing their own dependency versions in an isolated class loader to avoid dependency collisions, and that we support most common access cases (e.g. getClass().getClassLoader()
, Thread.currentThread().getContextClassLoader()
, etc.).
Log/Stacktrace
If possible add the full stacktrace or Zeebe log which contains the issue.
Full Stacktrace
19:30:22.391 [Broker-0-Exporter-1] [Broker-0-zb-fs-workers-0] ERROR io.zeebe.util.actor - Uncaught exception in 'Broker-0-Exporter-1' in phase 'STARTED'. Continuing with next job.
org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.common.serialization.StringSerializer for configuration key.serializer: Class org.apache.kafka.common.serialization.StringSerializer could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:728) ~[?:?]
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) ~[?:?]
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) ~[?:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) ~[?:?]
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) ~[?:?]
at org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:409) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:326) ~[?:?]
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:270) ~[?:?]
at hu.dpc.rt.kafkastreamer.exporter.KafkaExporterClient.<init>(KafkaExporterClient.java:57) ~[?:?]
at hu.dpc.rt.kafkastreamer.exporter.KafkaExporter.createClient(KafkaExporter.java:77) ~[?:?]
at hu.dpc.rt.kafkastreamer.exporter.KafkaExporter.open(KafkaExporter.java:43) ~[?:?]
at io.zeebe.broker.exporter.stream.ExporterDirector.onSnapshotRecovered(ExporterDirector.java:225) ~[classes/:?]
at io.zeebe.broker.exporter.stream.ExporterDirector.onActorStarted(ExporterDirector.java:140) ~[classes/:?]
at io.zeebe.util.sched.ActorJob.invoke(ActorJob.java:73) ~[classes/:?]
at io.zeebe.util.sched.ActorJob.execute(ActorJob.java:39) [classes/:?]
at io.zeebe.util.sched.ActorTask.execute(ActorTask.java:115) [classes/:?]
at io.zeebe.util.sched.ActorThread.executeCurrentTask(ActorThread.java:107) [classes/:?]
at io.zeebe.util.sched.ActorThread.doWork(ActorThread.java:91) [classes/:?]
at io.zeebe.util.sched.ActorThread.run(ActorThread.java:195) [classes/:?]
19:30:22.591 [GatewayTopologyManager] [Broker-0-zb-actors-0] DEBUG io.zeebe.gateway - Received metadata change from Broker 0, partitions {1=LEADER} and terms {1=12}.
I had some free time today to look at it, the solution is pretty straight forward, just some changes in the ExporterDirector
, but testing it is...interesting 😅
There I will need some brainstorming help on how to write a good test.