[BUG] with maxBytesPerTrigger Spark job won't start because of ClassNotFoundException
ronaldbuit opened this issue · 0 comments
ronaldbuit commented
Describe the bug
When we configure the option maxBytesPerTrigger
for our Spark job that reads from a Pulsar topic it won't start. It crashes with this error:
ERROR MicroBatchExecution: Query [id = f01bfeff-d1b8-415f-8592-c8e3211bead7, runId = e5d87131-b874-4072-8f3e-3532cbf74c45] terminated with error
org.apache.pulsar.client.admin.PulsarAdminException$TimeoutException: java.util.concurrent.TimeoutException
If debugging is enabled, this error occurs early in the process:
DEBUG ObjectMapperFactory: Add LoadManagerReport deserializer failed because LoadManagerReport.class has been shaded
java.lang.ClassNotFoundException: org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport
When we put the pulsar-client-admin.jar
on the classpath the Spark job will start, but cannot receive anything from the topic because there is a conflict with shade classes for the GenericAvroRecord.
It looks related to apache/pulsar#15167.
To Reproduce
Steps to reproduce the behavior:
- Configure
maxBytesPerTrigger
with corresponding settings. - Start Spark job with debug enabled
- See errors
Expected behavior
With maxBytesPerTrigger
enabled the Spark job starts normally.
Additional context
Stacktrace for ClassNotFoundException
:
DEBUG ObjectMapperFactory: Add LoadManagerReport deserializer failed because LoadManagerReport.class has been shaded
java.lang.ClassNotFoundException: org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)
at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1069)
at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1135)
at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1118)
at org.apache.pulsar.common.util.ObjectMapperFactory.setAnnotationsModule(ObjectMapperFactory.java:203)
at org.apache.pulsar.common.util.ObjectMapperFactory.create(ObjectMapperFactory.java:117)
at org.apache.pulsar.client.admin.internal.JacksonConfigurator.<init>(JacksonConfigurator.java:38)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at org.apache.pulsar.shade.org.glassfish.hk2.utilities.reflection.ReflectionHelper.makeMe(ReflectionHelper.java:1356)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.ClazzCreator.createMe(ClazzCreator.java:248)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.ClazzCreator.create(ClazzCreator.java:342)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.SystemDescriptor.create(SystemDescriptor.java:463)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext$1.compute(SingletonContext.java:59)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext$1.compute(SingletonContext.java:47)
at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache$OriginThreadAwareFuture$1.call(Cache.java:74)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache$OriginThreadAwareFuture.run(Cache.java:131)
at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache.compute(Cache.java:176)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext.findOrCreate(SingletonContext.java:98)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.Utilities.createService(Utilities.java:2102)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.ServiceLocatorImpl.internalGetAllServiceHandles(ServiceLocatorImpl.java:1481)
at org.apache.pulsar.shade.org.jvnet.hk2.internal.ServiceLocatorImpl.getAllServices(ServiceLocatorImpl.java:799)
at org.apache.pulsar.shade.org.glassfish.jersey.inject.hk2.AbstractHk2InjectionManager.getAllInstances(AbstractHk2InjectionManager.java:170)
at org.apache.pulsar.shade.org.glassfish.jersey.inject.hk2.ImmediateHk2InjectionManager.getAllInstances(ImmediateHk2InjectionManager.java:30)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.ContextResolverFactory$ContextResolversConfigurator.postInit(ContextResolverFactory.java:69)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.lambda$initRuntime$2(ClientConfig.java:461)
at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:461)
at org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:317)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:819)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getClientRuntime(ClientRequest.java:176)
at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getInjectionManager(ClientRequest.java:567)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.onBuilder(JerseyWebTarget.java:371)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.request(JerseyWebTarget.java:206)
at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.request(JerseyWebTarget.java:38)
at org.apache.pulsar.client.admin.internal.BaseResource.lambda$requestAsync$1(BaseResource.java:101)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:92)
at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:72)
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:178)
at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:185)
at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStatsAsync(TopicsImpl.java:678)
at org.apache.pulsar.client.admin.internal.TopicsImpl.lambda$getInternalStats$22(TopicsImpl.java:665)
at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:306)
at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:665)
at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:660)
at org.apache.spark.sql.pulsar.PulsarAdmissionControlHelper.latestOffsetForTopicPartition(PulsarHelper.scala:542)
at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$latestOffsets$2(PulsarHelper.scala:258)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:182)
at org.apache.spark.sql.pulsar.PulsarHelper.latestOffsets(PulsarHelper.scala:254)
at org.apache.spark.sql.pulsar.PulsarSource.latestOffset(PulsarSource.scala:90)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Stacktrace for TimeoutException
:
ERROR MicroBatchExecution: Query [id = f01bfeff-d1b8-415f-8592-c8e3211bead7, runId = e5d87131-b874-4072-8f3e-3532cbf74c45] terminated with error
org.apache.pulsar.client.admin.PulsarAdminException$TimeoutException: java.util.concurrent.TimeoutException
at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:311)
at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:665)
at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:660)
at org.apache.spark.sql.pulsar.PulsarAdmissionControlHelper.latestOffsetForTopicPartition(PulsarHelper.scala:542)
at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$latestOffsets$2(PulsarHelper.scala:258)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:182)
at org.apache.spark.sql.pulsar.PulsarHelper.latestOffsets(PulsarHelper.scala:254)
at org.apache.spark.sql.pulsar.PulsarSource.latestOffset(PulsarSource.scala:90)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:306)
... 42 more