jpmml/jpmml-evaluator-spark

Invalid lambda deserialization at org.shaded.jpmml.evaluator.OutputFilters.$deserializeLambda$

Closed this issue · 4 comments

Hi There,

I have been using this library in my project. I am getting following error when i run K-means clustering algorithm(or any other clustering algorithms) on hadoop data lake.

It works fine on the standalone machine, but only fails on data lake when run in yarn cluster mode. Interesting point to mention that logistic regression, xgBoost ,decision Tree classification algorithms works fine on both standalone and yarn-cluster.

I am pasting error stack trace down below, snippet of program and pom-xml file content.
Note that it throws java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.shaded.jpmml.evaluator.OutputFilters.$deserializeLambda$OutputFilters.java

Error:

19/01/30 13:06:58 ERROR executor.Executor: Exception in task 0.2 in stage 1.0 (TID 3)
java.io.IOException: unexpected exception type
	at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682)
	at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1973)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1565)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
	at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248)
	... 78 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
	at org.shaded.jpmml.evaluator.OutputFilters.$deserializeLambda$(OutputFilters.java:21)
	... 88 more

My Program is:

public class AppNew {
    public static void main(String[] args) throws IOException, JAXBException, org.xml.sax.SAXException {
        // TODO Auto-generated method stu

        String fileName = args[0];
        String dataFileName = args[1];
        String writeLocation =args[2];

        SparkSession spark = SparkSession.builder().appName("jpmml").config("spark.master",args[3]).getOrCreate();

        Configuration conf = spark.sparkContext().hadoopConfiguration();
        FileSystem fs =  FileSystem.get(conf);


        EvaluatorBuilder evaluatorBuilder = new LoadingModelEvaluatorBuilder()
                .setLocatable(false)
                .setVisitors(new DefaultVisitorBattery())
                .load(fs.open(new Path(fileName)).getWrappedStream());
        Evaluator evaluator = evaluatorBuilder.build();
        evaluator.verify();

        TransformerBuilder pmmlTransformerBuilder = new TransformerBuilder(evaluator)
                .withTargetCols()
                .withOutputCols()
                .exploded(true);

        Transformer pmmlTransformer = pmmlTransformerBuilder.build();

        Random r= new Random();
        Dataset<Row> df = spark .read().option("header", "true").csv(dataFileName).toDF();
        Dataset<Row> tdf = pmmlTransformer.transform(df);

        tdf.printSchema();
        tdf.write().option("header","true").csv(String.format("%s_%s", writeLocation, r.nextLong()));
        spark.stop();
    }

My Pom file looks like this:

<dependencies>
    <dependency>
        <groupId>org.jpmml</groupId>
        <artifactId>jpmml-evaluator-spark</artifactId>
        <version>1.2.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
        <version>2.2.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.jpmml</groupId>
                <artifactId>pmml-model</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>

                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.handlers</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.fxc.rpc.impl.member.MemberProvider</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/spring.schemas</resource>
                                </transformer>
                            </transformers>
                            <relocations>
                                <relocation>
                                    <pattern>org.dmg.pmml</pattern>
                                    <shadedPattern>org.shaded.dmg.pmml</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>org.jpmml</pattern>
                                    <shadedPattern>org.shaded.jpmml</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.guava</pattern>
                                    <shadedPattern>com.shaded.google.guava</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>com.shaded.google.common</shadedPattern>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <compilerArgument>-XDignore.symbol.file</compilerArgument>
                </configuration>
            </plugin>
        </plugins>
    </build>

Appreciate your help in resolving this issue.

Thanks in advance.

Regards,
Ibrahim.

Note that it throws java.lang.IllegalArgumentException: Invalid lambda deserialization

This is the code location:
https://github.com/jpmml/jpmml-evaluator/blob/master/pmml-evaluator/src/main/java/org/jpmml/evaluator/OutputFilters.java#L21-L38

Maybe this whole OutputFilters interface should be marked as java.io.Serializable too?

Anyway, it's extremely strange, that this deserialization error happens with some model types (eg ClusteringModel) but not with others (GeneralRegressionModel, MiningModel elements). Now that doesn't make sense, and suggests that perhaps there's some deeper configuration issue with your runtime system - maybe you're using different Java versions in different machines, and they do lambda serialization differently?

@vruusmann appreciate your swift response. We will investigate and will post findings.

@vruusmann Thanks for fixing this issue, appreciate your help.
Is it possible to release these fixes soon? We are very keen to test and deploy these packages to our production environment. Thanks in advance.

@Ibrahim2008 The current version uses these lambda-based output filters by default; if they break your application, then simply replace them with custom manually-created ones:

EvaluatorBuilder evaluatorBuilder = new ModelEvaluatorBuilder(pmml)
	setOutputFilter(new OutputFilter(){
		@Override
		public boolean test(org.dmg.pmml.OutputField outputField){
			// Do whatever is appropriate
		}
	});
Evaluator evaluator = evaluatorBuilder.build();