audienceproject/spark-dynamodb

java.math.BigDecimal cannot be cast to java.util.Map when infering schema

Closed this issue · 2 comments

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 11, ip-10-44-5-206.ec2.internal, executor 2): java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.util.Map
	at com.amazonaws.services.dynamodbv2.document.Item.getRawMap(Item.java:891)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:88)
	at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:87)
	at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition.com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow(ScanPartition.scala:100)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:82)
	at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:59)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:401)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
  ... 54 elided
Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.util.Map
  at com.amazonaws.services.dynamodbv2.document.Item.getRawMap(Item.java:891)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$apply$21$$anonfun$apply$22.apply(TypeConversion.scala:51)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:88)
  at com.audienceproject.spark.dynamodb.datasource.TypeConversion$$anonfun$nullableGet$1.apply(TypeConversion.scala:87)
  at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$$anonfun$com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow$1.apply(ScanPartition.scala:100)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition.com$audienceproject$spark$dynamodb$datasource$ScanPartition$$itemToRow(ScanPartition.scala:100)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader$$anonfun$nextPage$2.apply(ScanPartition.scala:94)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:82)
  at com.audienceproject.spark.dynamodb.datasource.ScanPartition$PartitionReader.get(ScanPartition.scala:59)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:59)
  at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:291)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:283)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  ... 3 more

Hello
The issue seems to be that you have mixed types in your DynamoDB table.
You have a column in which there is both Maps and Numbers, and the schema is being inferred to Map, which is incompatible with Number. Spark cannot handle mixed data types.
Unfortunately, you will have to clean up your Dynamo table first, or explicitly exclude the problematic column from the select in Spark.

Thanks!