streamnative/pulsar-spark

[BUG] with maxBytesPerTrigger Spark job won't start because of ClassNotFoundException

ronaldbuit opened this issue · 0 comments

Describe the bug
When we configure the option maxBytesPerTrigger for our Spark job that reads from a Pulsar topic it won't start. It crashes with this error:

ERROR MicroBatchExecution: Query [id = f01bfeff-d1b8-415f-8592-c8e3211bead7, runId = e5d87131-b874-4072-8f3e-3532cbf74c45] terminated with error
org.apache.pulsar.client.admin.PulsarAdminException$TimeoutException: java.util.concurrent.TimeoutException

If debugging is enabled, this error occurs early in the process:

DEBUG ObjectMapperFactory: Add LoadManagerReport deserializer failed because LoadManagerReport.class has been shaded
java.lang.ClassNotFoundException: org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport

When we put the pulsar-client-admin.jar on the classpath the Spark job will start, but cannot receive anything from the topic because there is a conflict with shade classes for the GenericAvroRecord.

It looks related to apache/pulsar#15167.

To Reproduce
Steps to reproduce the behavior:

  1. Configure maxBytesPerTrigger with corresponding settings.
  2. Start Spark job with debug enabled
  3. See errors

Expected behavior
With maxBytesPerTrigger enabled the Spark job starts normally.

Additional context
Stacktrace for ClassNotFoundException:

DEBUG ObjectMapperFactory: Add LoadManagerReport deserializer failed because LoadManagerReport.class has been shaded
java.lang.ClassNotFoundException: org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:467)
	at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1069)
	at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1135)
	at org.apache.pulsar.shade.org.apache.commons.lang3.ClassUtils.getClass(ClassUtils.java:1118)
	at org.apache.pulsar.common.util.ObjectMapperFactory.setAnnotationsModule(ObjectMapperFactory.java:203)
	at org.apache.pulsar.common.util.ObjectMapperFactory.create(ObjectMapperFactory.java:117)
	at org.apache.pulsar.client.admin.internal.JacksonConfigurator.<init>(JacksonConfigurator.java:38)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.reflection.ReflectionHelper.makeMe(ReflectionHelper.java:1356)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ClazzCreator.createMe(ClazzCreator.java:248)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ClazzCreator.create(ClazzCreator.java:342)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SystemDescriptor.create(SystemDescriptor.java:463)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext$1.compute(SingletonContext.java:59)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext$1.compute(SingletonContext.java:47)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache$OriginThreadAwareFuture$1.call(Cache.java:74)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache$OriginThreadAwareFuture.run(Cache.java:131)
	at org.apache.pulsar.shade.org.glassfish.hk2.utilities.cache.Cache.compute(Cache.java:176)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.SingletonContext.findOrCreate(SingletonContext.java:98)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.Utilities.createService(Utilities.java:2102)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ServiceLocatorImpl.internalGetAllServiceHandles(ServiceLocatorImpl.java:1481)
	at org.apache.pulsar.shade.org.jvnet.hk2.internal.ServiceLocatorImpl.getAllServices(ServiceLocatorImpl.java:799)
	at org.apache.pulsar.shade.org.glassfish.jersey.inject.hk2.AbstractHk2InjectionManager.getAllInstances(AbstractHk2InjectionManager.java:170)
	at org.apache.pulsar.shade.org.glassfish.jersey.inject.hk2.ImmediateHk2InjectionManager.getAllInstances(ImmediateHk2InjectionManager.java:30)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.ContextResolverFactory$ContextResolversConfigurator.postInit(ContextResolverFactory.java:69)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.lambda$initRuntime$2(ClientConfig.java:461)
	at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4204)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig$State.initRuntime(ClientConfig.java:461)
	at org.apache.pulsar.shade.org.glassfish.jersey.internal.util.collection.Values$LazyValueImpl.get(Values.java:317)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientConfig.getRuntime(ClientConfig.java:819)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getClientRuntime(ClientRequest.java:176)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.ClientRequest.getInjectionManager(ClientRequest.java:567)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.onBuilder(JerseyWebTarget.java:371)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.request(JerseyWebTarget.java:206)
	at org.apache.pulsar.shade.org.glassfish.jersey.client.JerseyWebTarget.request(JerseyWebTarget.java:38)
	at org.apache.pulsar.client.admin.internal.BaseResource.lambda$requestAsync$1(BaseResource.java:101)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887)
	at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2325)
	at org.apache.pulsar.client.admin.internal.BaseResource.requestAsync(BaseResource.java:92)
	at org.apache.pulsar.client.admin.internal.BaseResource.request(BaseResource.java:72)
	at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:178)
	at org.apache.pulsar.client.admin.internal.BaseResource.asyncGetRequest(BaseResource.java:185)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStatsAsync(TopicsImpl.java:678)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.lambda$getInternalStats$22(TopicsImpl.java:665)
	at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:306)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:665)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:660)
	at org.apache.spark.sql.pulsar.PulsarAdmissionControlHelper.latestOffsetForTopicPartition(PulsarHelper.scala:542)
	at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$latestOffsets$2(PulsarHelper.scala:258)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:182)
	at org.apache.spark.sql.pulsar.PulsarHelper.latestOffsets(PulsarHelper.scala:254)
	at org.apache.spark.sql.pulsar.PulsarSource.latestOffset(PulsarSource.scala:90)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

Stacktrace for TimeoutException:

ERROR MicroBatchExecution: Query [id = f01bfeff-d1b8-415f-8592-c8e3211bead7, runId = e5d87131-b874-4072-8f3e-3532cbf74c45] terminated with error
org.apache.pulsar.client.admin.PulsarAdminException$TimeoutException: java.util.concurrent.TimeoutException
	at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:311)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:665)
	at org.apache.pulsar.client.admin.internal.TopicsImpl.getInternalStats(TopicsImpl.java:660)
	at org.apache.spark.sql.pulsar.PulsarAdmissionControlHelper.latestOffsetForTopicPartition(PulsarHelper.scala:542)
	at org.apache.spark.sql.pulsar.PulsarHelper.$anonfun$latestOffsets$2(PulsarHelper.scala:258)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:182)
	at org.apache.spark.sql.pulsar.PulsarHelper.latestOffsets(PulsarHelper.scala:254)
	at org.apache.spark.sql.pulsar.PulsarSource.latestOffset(PulsarSource.scala:90)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.util.concurrent.TimeoutException
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
	at org.apache.pulsar.client.admin.internal.BaseResource.sync(BaseResource.java:306)
	... 42 more