org.apache.spark.SparkException: Task not serializable
sudhakaru opened this issue · 4 comments
Describe the bug
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2477)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:753)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:325)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:391)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:338)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
at com.DeequSatisfiesTest.main(DeequSatisfiesTest.java:106)
Caused by: java.io.NotSerializableException: com.DeequSatisfiesTest$3
Serialization stack:
- object not serializable (class: com.DeequSatisfiesTest$3, value: )
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 4)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lorg/apache/spark/sql/catalyst/expressions/ScalaUDF;Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=4])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2892/9452563, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2892/9452563@5df54296)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(knownnotnull(input[1, double, true])))
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: canonicalized, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(knownnotnull(input[1, double, true])))
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$catalystConverter$3:(Lorg/apache/spark/sql/catalyst/expressions/ScalaUDF;Lscala/Function1;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2894/1598961818, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2894/1598961818@301e5d23)
- element of array (index: 1)
- array (class [Lscala.Function1;, size 2)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 6)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2053/651023535, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2053/651023535@21a0795f)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 36 more
To Reproduce
Steps to reproduce the behavior:
- Please run below java test case
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.util.ArrayList;
import java.util.List;
public class DeequSatisfiesTest {
public static void main(String s[]) {
SparkSession spark = SparkSession.builder().appName("SparkSQLApp") .master("local") .getOrCreate();
StructType sourceSchema = new StructType();
sourceSchema = sourceSchema.add("test", DataTypes.StringType);
sourceSchema = sourceSchema.add("dropoff_latitude", DataTypes.DoubleType);
List<Object> obj1 = new ArrayList<>();
obj1.add("test1");
obj1.add(41.0);
List<Object> obj2 = new ArrayList<>();
obj2.add("test2");
obj2.add(42.0);
List<Row> listOfRows = new ArrayList<>();
listOfRows.add(RowFactory.create(obj1.toArray()));
listOfRows.add(RowFactory.create(obj2.toArray()));
Dataset<Row> sourceData = spark.createDataFrame(listOfRows,sourceSchema);
sourceData.show();
Check satisfiesCheck = new Check(CheckLevel.Error(), "statisfies",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.satisfies("dropoff_latitude > 41.0", "statisfies",
new scala.runtime.AbstractFunction1<Object, Object>() {
@Override
public Object apply(Object size) {
if ((double) size == 1.0) return true;
else return false;
}
},Option.empty(), JavaConverters.asScalaIteratorConverter(new ArrayList<String>().iterator())
.asScala().toList());
Check hasMinCheck = new Check(CheckLevel.Error(), "hasMin",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.hasMin("dropoff_latitude", new scala.runtime.AbstractFunction1<Object, Object>() {
@Override
public Object apply(Object size) {
if ((double) size < 100.0) return true;
else return false;
}
},Option.empty());
Check hasMaxCheck = new Check(CheckLevel.Error(), "hasMax",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.hasMin("dropoff_latitude", new scala.runtime.AbstractFunction1<Object, Object>() {
@Override
public Object apply(Object size) {
if ((double) size > 100.0) return true;
else return false;
}
},Option.empty());
Check isUniqueCheck = new Check(CheckLevel.Error(), "isUnique",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.isUnique("test", Option.empty());
Check iscompleteCheck = new Check(CheckLevel.Error(), "iscomplete",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.isComplete("test", Option.empty());
List<Check> checks = new ArrayList<>();
checks.add(hasMinCheck);
checks.add(hasMaxCheck);
checks.add(satisfiesCheck);
checks.add(isUniqueCheck);
checks.add(iscompleteCheck);
Seq<Check> checkSeq = JavaConverters.asScalaBuffer(checks).toSeq();
VerificationResult vrRow = new VerificationSuite().onData(sourceData).addChecks(checkSeq).run();
if (vrRow.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vrRow.rowLevelResultsAsDataFrame(spark, vrRow,sourceData);
vrr.printSchema();
vrr.show();
}
}
}
once ran the above test case , getting Task not serializable error
I have downloaded latest Deequ source code and generate new jar file deequ_2.12-2.0.3-spark-3.3.jar.
Java version : 1.8
Spark version: 3.3.0
Please help on above error.
I tried to re-create this on my machine. Here is the modified class that compiled for me. Note the statisfies
check modification, commented the un-necessary parameter.
I didn't see the issue and the check was successful. Please let know if this is still an issue for you.
package com.text.app;
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.util.ArrayList;
import java.util.List;
public class DeequSatisfiesTest {
public static void main(String[] s) {
SparkSession spark = SparkSession.builder().appName("SparkSQLApp") .master("local") .getOrCreate();
StructType sourceSchema = new StructType();
sourceSchema = sourceSchema.add("test", DataTypes.StringType);
sourceSchema = sourceSchema.add("dropoff_latitude", DataTypes.DoubleType);
List<Object> obj1 = new ArrayList<Object>();
obj1.add("test1");
obj1.add(41.0);
List<Object> obj2 = new ArrayList<Object>();
obj2.add("test2");
obj2.add(42.0);
List<Row> listOfRows = new ArrayList<Row>();
listOfRows.add(RowFactory.create(obj1.toArray()));
listOfRows.add(RowFactory.create(obj2.toArray()));
Dataset<Row> sourceData = spark.createDataFrame(listOfRows,sourceSchema);
sourceData.show();
Check satisfiesCheck = new Check(CheckLevel.Error(), "statisfies",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.satisfies("dropoff_latitude > 41.0", "statisfies",
new scala.runtime.AbstractFunction1<Object, Object>() {
@Override
public Object apply(Object size) {
if (new Double(size.toString()) == 1.0) return true;
else return false;
}
}, Option.<String>empty());
// , JavaConverters.asScalaIteratorConverter(new ArrayList<String>().iterator())
// .asScala().toList());
Check hasMinCheck = new Check(CheckLevel.Error(), "hasMin",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.hasMin("dropoff_latitude", new scala.runtime.AbstractFunction1<Object, Object>() {
@Override
public Object apply(Object size) {
if (new Double(size.toString()) < 100.0) return true;
else return false;
}
}, Option.<String>empty());
Check hasMaxCheck = new Check(CheckLevel.Error(), "hasMax",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.hasMin("dropoff_latitude", new scala.runtime.AbstractFunction1<Object, Object>() {
@Override
public Object apply(Object size) {
if (new Double(size.toString()) > 100.0) return true;
else return false;
}
}, Option.<String>empty());
Check isUniqueCheck = new Check(CheckLevel.Error(), "isUnique",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.isUnique("test", Option.<String>empty());
Check iscompleteCheck = new Check(CheckLevel.Error(), "iscomplete",
JavaConverters.asScalaIteratorConverter(new ArrayList<Constraint>().iterator())
.asScala().toSeq())
.isComplete("test", Option.<String>empty());
List<Check> checks = new ArrayList<Check>();
checks.add(hasMinCheck);
checks.add(hasMaxCheck);
checks.add(satisfiesCheck);
checks.add(isUniqueCheck);
checks.add(iscompleteCheck);
Seq<Check> checkSeq = JavaConverters.asScalaBuffer(checks).toSeq();
VerificationResult vrRow = new VerificationSuite().onData(sourceData).addChecks(checkSeq).run();
if (vrRow.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vrRow.rowLevelResultsAsDataFrame(spark, vrRow,sourceData);
vrr.printSchema();
vrr.show();
}
}
}
Thanks for verifying. you used released version of deequ jar that's why class is not compiled. I have downloaded latest source code and generated the deequ jar file. Please use latest code and will get this Task not serializable error.
This issue got resolved and close the issue.
Thank you @sudhakaru