Issue saving the model
sridhar opened this issue · 2 comments
- I've built the library using:
./gradlew clean build -x test -PsparkVersion=2.4.3 -PscalaVersion=2.11.12
- I have spark instantiated via
spark-2.4.3-hadoop2.6/sbin/start-all.sh
- I have the following code that I'm building using sbt (example adapted from README)
import com.linkedin.relevance.isolationforest._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{ SparkSession, DataFrame, Row }
object Main {
def main(args: Array[String]): Unit = {
//val conf = new SparkConf().setAppName("IsolShap").setMaster("spark://localhost:4040")
val spark = SparkSession.builder
.master("spark://ai-monster:7077")
.appName("Isol")
.config("spark.jars","lib/isolation-forest_2.4.3_2.11-2.0.6.jar")
.getOrCreate()
val rawData = spark.read
.format("csv")
.option("comment", "#")
.option("header", "false")
.option("inferSchema", "true")
.load("resources/shuttle.csv")
val cols = rawData.columns
val labelCol = cols.last
val assembler = new VectorAssembler()
.setInputCols(cols.slice(0, cols.length - 1))
.setOutputCol("features")
val data = assembler
.transform(rawData)
.select(col("features"), col(labelCol).as("label"))
val contamination = 0.1
val isolationForest = new IsolationForest()
.setNumEstimators(100)
.setBootstrap(false)
.setMaxSamples(256)
.setMaxFeatures(1.0)
.setFeaturesCol("features")
.setPredictionCol("predictedLabel")
.setScoreCol("outlierScore")
.setContamination(contamination)
.setContaminationError(0.01 * contamination)
.setRandomSeed(1)
val isolationForestModel = isolationForest.fit(data)
/**
* Score the training data
*/
val dataWithScores = isolationForestModel.transform(data)
isolationForestModel.save("/tmp/mymodel") <==== ERROR here
dataWithScores.take(5).foreach(println)
dataWithScores.printSchema()
The above run fails on model save stage with the following error:
21/06/03 18:57:27 INFO IsolationForestModelReadWrite$IsolationForestModelWriter: Saving IsolationForestModel tree data to path /tmp/mymodel/data
[error] (run-main-0) java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
[error] java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.avro.AvroFileFormat. Please find packages at http://spark.apache.org/third-party-projects.html
[error] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
[error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
[error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
[error] at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImplHelper(IsolationForestModelReadWrite.scala:263)
[error] at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImpl(IsolationForestModelReadWrite.scala:242)
[error] at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
[error] at org.apache.spark.ml.util.MLWritable$class.save(ReadWrite.scala:306)
[error] at com.linkedin.relevance.isolationforest.IsolationForestModel.save(IsolationForestModel.scala:20)
[error] at Main$.main(Main.scala:63)
[error] at Main.main(Main.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.avro.AvroFileFormat.DefaultSource
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] stack trace is suppressed; run last Compile / bgRun for the full output
21/06/03 18:57:27 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
Now since spark-avro has already been compiled in the linkedin jar, I shouldn't have to include it again. But I added I tried multiple things;
- Converted the gradle dependencies from compile to implementation, but that didn't help.
- Added spark-avro to the dependency list of sbt while compiling the above code. Then I get:
[error] (run-main-0) org.apache.spark.SparkException: Job aborted.
[error] org.apache.spark.SparkException: Job aborted.
[error] at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
[error] at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
[error] at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
[error] at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
[error] at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
[error] at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
[error] at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
[error] at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
[error] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[error] at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
[error] at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
[error] at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
[error] at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
[error] at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
[error] at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
[error] at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
[error] at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
[error] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
[error] at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
[error] at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
[error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
[error] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
[error] at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImplHelper(IsolationForestModelReadWrite.scala:263)
[error] at com.linkedin.relevance.isolationforest.IsolationForestModelReadWrite$IsolationForestModelWriter.saveImpl(IsolationForestModelReadWrite.scala:242)
[error] at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:180)
[error] at org.apache.spark.ml.util.MLWritable$class.save(ReadWrite.scala:306)
[error] at com.linkedin.relevance.isolationforest.IsolationForestModel.save(IsolationForestModel.scala:20)
[error] at Main$.main(Main.scala:63)
[error] at Main.main(Main.scala)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:498)
[error] Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 187, 10.40.14.5, executor 0): java.lang.ClassNotFoundExcep$
ion: org.apache.spark.sql.avro.AvroOutputWriterFactory
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[error] at java.lang.Class.forName0(Native Method)
[error] at java.lang.Class.forName(Class.java:348)
[error] at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
[error] at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
[error] at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
[error] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
[error] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
[error] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
[error] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
[error] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
[error] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
[error] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
...
I'm using the Top of the tree source. Am I missing something here? Do I need to do something specific here?
Is there a bug in the packaging script of the library?
Note that the above code works without the save invocation.
The issue goes away when I manually copy the spark-avro jar to the executors. This should have been automatic if I'm doing the same for the main tar:
.config("spark.jars","lib/isolation-forest_2.4.3_2.11-2.0.6.jar,lib/spark-avro_2.11-2.4.3.jar")
The spark-avro
module is external and isn't included by default.
https://spark.apache.org/docs/latest/sql-data-sources-avro.html
As you point out, you can add the jar manually or specify the coordinates.
spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.3,com.linkedin.isolation-forest:isolation-forest_2.4.3_2.11:2.0.4