audienceproject/spark-dynamodb

java.lang.NoSuchMethodError

boyaryn opened this issue · 9 comments

I'm trying to get data from S3 to DynamoDB but encounter this exception when I submit a Spark job to a cluster (Spark 2.4.4 without Hadoop + Hadoop 3.2.1).

This is the stack trace:

Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.transform.JsonUnmarshallerContext.getCurrentToken()Lcom/amazonaws/thirdparty/jackson/core/JsonToken;
at com.amazonaws.services.dynamodbv2.model.transform.DescribeTableResultJsonUnmarshaller.unmarshall(DescribeTableResultJsonUnmarshaller.java:39)
at com.amazonaws.services.dynamodbv2.model.transform.DescribeTableResultJsonUnmarshaller.unmarshall(DescribeTableResultJsonUnmarshaller.java:29)
at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:118)
at com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
at com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1627)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1336)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:3443)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3419)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1660)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1635)
at com.amazonaws.services.dynamodbv2.document.Table.describe(Table.java:137)
at com.audienceproject.spark.dynamodb.connector.TableConnector.(TableConnector.scala:47)
at com.audienceproject.spark.dynamodb.datasource.DynamoDataSourceWriter.(DynamoDataSourceWriter.scala:32)
at com.audienceproject.spark.dynamodb.datasource.DefaultSource.createWriter(DefaultSource.scala:57)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255)
at example.S3$.main(S3.scala:102)
at example.S3.main(S3.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I tried various things with excluding and shadowing different libraries. But nothing works.

My build.sbt:

ThisBuild / scalaVersion     := "2.11.8"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "com.example"
ThisBuild / organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "ec",
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.4.4",  
      "com.amazonaws" % "aws-java-sdk-s3" % "1.11.678",
      "org.apache.hadoop" % "hadoop-common" % "3.2.1",
      "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.678",
      "com.audienceproject" %% "spark-dynamodb" % "1.0.2"
    )
  )

Could you help please?

It's a strange error. On the top of my head it looks like a classpath issue.

I also think that it's a classpath issue.

Is there any way to get the spark-dynamodb library working in a Scala or Java project for copying data from S3 to DynamoDB using Spark, i.e. in a project with Spark, AWS S3 and spark-dynamodb dependencies? Any versions of these dependencies would be okay.

That's what we actually use it for a lot of times. We generally use either standard AWS EMR or Databricks clusters for this, but it works in other environments just as well.

Could you share the combination of library versions (what Spark / AWS S3 / "hadoop-aws" / "spark-dynamodb" / ... versions you use) and Spark cluster distribution information (Spark version, what Hadoop version it uses) which work? Or (it would be even better) any source code (including build.sbt)?

All the library version combinations I tried lead to run-time exceptions. I also tried excluding/shadowing. Incompatible Jackson version, the exception I showed previously, ... I could send you all the files of the project which I try to get working if it's more convenient for you.

The AWS documentation thoroughly lists the libraries fro each version. We generally tend to use some of the latest versions https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-5x.html.

Here are some example dependencies we use for running on EMR version 5.26.0:

libraryDependencies ++= {
    val awsJavaSdkVersion = "1.11.336"
    val sparkVersion = "2.4.3"
    Seq(
        "com.amazonaws"             %   "aws-java-sdk-core"         %   awsJavaSdkVersion,
        "com.amazonaws"             %   "aws-java-sdk-s3"           %   awsJavaSdkVersion,
        ........
        "org.apache.spark"          %%  "spark-core"                %   sparkVersion % "provided",
        "org.apache.spark"          %%  "spark-sql"                 %   sparkVersion % "provided",
        .......
        "com.audienceproject"       %%  "spark-dynamodb"            %   "0.4.3",
        .......
    )
}

Thank you for the provided information. I switched to the versions you use: Hadoop 2.8.5, Spark 2.4.3 without Hadoop and the following in build.sbt:

libraryDependencies  ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.3" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "2.8.5",
  "com.amazonaws" % "aws-java-sdk-s3" % "1.11.336",
  "com.audienceproject" %% "spark-dynamodb" % "0.4.3",
  scalaTest % Test
)

The previous error has gone but now this new exception is thrown:

Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/util/concurrent/RateLimiter
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2735)
at org.apache.spark.sql.Dataset$$anonfun$withNewRDDExecutionId$1.apply(Dataset.scala:3349)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3345)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2734)
at com.audienceproject.spark.dynamodb.rdd.DynamoWriteRelation.write(DynamoWriteRelation.scala:47)
at com.audienceproject.spark.dynamodb.DefaultSource.createRelation(DefaultSource.scala:57)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.audienceproject.spark.dynamodb.implicits$DynamoDBDataFrameWriter.dynamodb(implicits.scala:73)
at example.Hello$.main(Hello.scala:101)
at example.Hello.main(Hello.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.google.common.util.concurrent.RateLimiter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 78 more

I googled and tried various things with the guava library (excluding, shadowing, etc) but this error persists. How can I solve this issue?

I've attached the screenshot and my project's source code (with AWS keys removed).
Screenshot from 2020-01-29 21-10-07
s3.zip

Right,

I've given you some older dependencies that came with the horrible caveat of Guava. Just try something like this:

libraryDependencies  ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.3" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "2.8.5",
  "com.amazonaws" % "aws-java-sdk-s3" % "1.11.336",
  "com.audienceproject" %% "spark-dynamodb" % "1.0.2",
  scalaTest % Test
)

Great, this versions combination works! Thank you.

Great success 😊