apache/beam

java.io.InvalidClassException with Spark 3.1.2

Closed this issue · 8 comments

This was reported on the mailing list.
 
----
 
Using spark downloaded from below link,
 
https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
 
I get below error when submitting a pipeline. 
Full error is on https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.
 
------------------------------------------------------------------------------------------------------------------
21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection from /192.168.11.2:35601
java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = 3456489343829468865, local class serialVersionUID = 1028182004549731694
at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
...
------------------------------------------------------------------------------------------------------------------
 
SDK Harness and Job service are deployed as below.
 

  1. SDK Harness
     
    sudo docker run --net=host apache/beam_spark3_job_server:2.31.0 --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true
     
  2. Job service
     
    sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool
     
  • apache/beam_spark_job_server:2.31.0 for spark 2.4.8
     
  1. SDK client code
     
    https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2
     Spark 2.4.8 succeeded without any errors using above components.
     
    https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz

 

Imported from Jira BEAM-12762. Original Jira may contain additional context.
Reported by: ibzib.
This issue has child subcomponents which were not migrated over. See the original Jira for more information.

I can confirm that I'm facing the same issue using a job services based on apache/beam_spark3_job_server:2.41.0 trying to run a beam pipeline on a bitnami/spark:3.1 based spark cluster.

stderror log from the task:

java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef; local class incompatible: stream classdesc serialVersionUID = 3456489343829468865, local class serialVersionUID = 1028182004549731694
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2005)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:299)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:352)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:298)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:298)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7(NettyRpcEnv.scala:246)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7$adapted(NettyRpcEnv.scala:246)
	at org.apache.spark.rpc.netty.RpcOutboxMessage.onSuccess(Outbox.scala:90)
	at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:195)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
	at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:750)

Based on my understanding, this issue is usually linked to having different versions of spark between the version deployed on the cluster and the client submitting the job into the cluster. In this case, I'm guessing the client is the job service.

I can also confirm, after running a quick test, downloading the same spark sdk (as the cluster version) locally and submitting a job from a local client to the cluster via something like:

./bin/spark-submit \ 
     --class org.apache.spark.examples.SparkPi 
     --master spark://localhost:7077  \
     -- ./examples/jars/spark-examples_2.12-3.1.3.jar 10000

Works fine.

I don't know what goes on inside the beam spark job server but this leads me to thinking, that this is probably an incompatibility between what the spark client version is on the beam job server and what the target cluster is running.

Any idea if there is an easy work around, around this? Is there any way to know the exact version of Spark that the apache/beam_spark_job_server:2.31.0 would support? I could adapt the Spark cluster version to that if needed.

Would greatly appreciate any feedback on this. Thanks!

@aymanfarhat Just FYI, this is a known Scala issue. You can find more details on the problem here including hints on how to resolve it.
The original Beam Jira also contains a bit more context, see https://issues.apache.org/jira/browse/BEAM-12762.

@aymanfarhat Just FYI, this is a known Scala issue. You can find more details on the problem here including hints on how to resolve it. The original Beam Jira also contains a bit more context, see https://issues.apache.org/jira/browse/BEAM-12762.

From the link you provided, it appears that Beam is using scala v2.12 and when we build beam it is being built using scala2.12.14 which is incompatible with Spark v3.1.2, How do we set the scala version for beam's job server?

@aromanenko-dev I'm not really familiar with the portable runner / the job server, would you have additional insights here?

As far as I can see, both beam-spark and beam-spark-job-server are using Scala 2.12.15.
Spark 3.1.2 / 3.1.3 depend on Scala 2.12.10 with the mentioned binary incompatibility, so I suspect the problem is in fact the cluster running with a broken Scala version.

An option might be the bump Beam to use Spark 3.2.2 or later (with Scala 2.12.15).

@mosche AFAIK, there is no specific Scala dependencies for Spark portable runner and job server. They are sitting in the same package branch and have the same Spark/Scala dependencies as a native runner. I'm sure that @ibzib should know much better than me.

Also, I'm wondering why Scala version was specifically set to 2.12.15 here since by default it is that way for Spark 3:

  spark_version = '3.1.2'
  spark_scala_version = '2.12'

Indeed, the right solution would be align the spark/scala versions. The question here is do we need to support the different versions or not?

@mosche AFAIK, there is no specific Scala dependencies for Spark portable runner and job server. They are sitting in the same package branch and have the same Spark/Scala dependencies as a native runner. I'm sure that @ibzib should know much better than me.

Also, I'm wondering why Scala version was specifically set to 2.12.15 here since by default it is that way for Spark 3:

  spark_version = '3.1.2'
  spark_scala_version = '2.12'

Indeed, the right solution would be align the spark/scala versions. The question here is do we need to support the different versions or not?

I tried setting scala to 2.12.10 in spark_runner.gradle; it did not work. Do I have to set scala version somewhere else?

Also when I use gradlew :runners:spark:3:job-server:runShadow command to build and run the job server, I see that a jar file over 200mb, named beam-runners-spark-3-job-server-2.41.0-SNAPSHOT.jar, is cretead under runner/spark/3/job-server/build/libs folder.

I opened that jar in 7zip to look at the content, the file scala-xml.properties says that the scala version is 2.12.8 whereas I had explicitly set the scala version to 2.12.10. Why is that happening.

Also what is the working and tested spark job server version and it's compatible Spark version. If I cannot use beam with Spark 3.1.2; the I will have to downgrade the Spark version.

@nitinlkoin1984 I finally found some time to look deeper into this. Sorry for the hassle, finding the job-server in this state is a bit disappointing.

Also what is the working and tested spark job server version and it's compatible Spark version.

Unfortunately this is a weakness of the existing test infrastructure, it uses Spark in local mode. In that setup such a classpath issue won't be discovered.

Anyways, I've done some testing:

  • You can fairly easily build yourself a custom version of the beam_spark3_job_server image on the latest Beam master for Spark 3.2.2 (or later). This versions of Spark are using Scala 2.12.15 and don't suffer from the Scala bug causing this. Here's the detailed steps how to do it:

    1. Pull the source code of Beam from https://github.com/apache/beam and checkout tag v.2.14.0
    2. Update the Spark version to 3.2.2 in these two places:

      spark_version = '3.1.2'
    3. In the project directory, run the gradle command to build the docker container:
      ./gradlew :runners:spark:3:job-server:container:docker
      
      This will build apache/beam_spark3_job_server:latest, it will be available for local use.
  • Downgrading Scala in the job-server image to 2.12.10 is also possible, but not as obvious. The Scala version is bumped by a transitive dependency.

    1. Append the following lines to runners/spark/3/job-server/build.gradle
      configurations.runtimeClasspath {
        resolutionStrategy {
          force "org.scala-lang:scala-library:2.12.10"
        }
      }
      
    2. In the project directory, run the gradle command to build the docker container:
      ./gradlew gradle :runners:spark:3:job-server:clean
      ./gradlew :runners:spark:3:job-server:container:docker
      
      This will build apache/beam_spark3_job_server:latest, it will be available for local use.

Alternatively you could build yourself a custom Spark 3.1.2 image that contains Scala 2.12.15 (instead of 2.12.10) on the classpath. But I don't think that's generally a feasible option.

Let me know if any of these options help!

@aromanenko-dev The 2nd option would fix the job-server without having to bump Spark. One the other hand bumping to Spark 3.2.2 seems to be the more robust and longterm solution. But the biggest concern there is the Avro dependency upgrade (1.10). What do you think? Anyone else who could chime in?