autodeployai/pypmml-spark

java.io.NotSerializableException: org.pmml4s.transformations.FieldColumnPair #1

alitrack opened this issue · 2 comments

I tried to using PMML model export with JPMML-SparkML, when run

import warnings
warnings.filterwarnings('ignore')
from pyspark import SparkConf
conf=SparkConf()\
.set("spark.jars.packages","org.pmml4s:pmml4s-spark_2.11:0.9.0")
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.config(conf=conf)\
.getOrCreate()

from pypmml_spark import ScoreModel
model = ScoreModel.fromFile('DecisionTreeIris.pmml').setPredictionCol("prediction")
df = spark.read.csv("iris.csv", header = True, inferSchema = True)
df=df.toDF(*(c.replace('.', '_').lower() for c in df.columns))

score_df = model.transform(df)

got the following exception,

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-53-82aed306ef91> in <module>
----> 1 score_df = model.transform(df)

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/pyspark/ml/base.py in transform(self, dataset, params)
    171                 return self.copy(params)._transform(dataset)
    172             else:
--> 173                 return self._transform(dataset)
    174         else:
    175             raise ValueError("Params must be a param map but got %s." % type(params))

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/pyspark/ml/wrapper.py in _transform(self, dataset)
    310     def _transform(self, dataset):
    311         self._transfer_params_to_java()
--> 312         return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
    313 
    314 

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/.pyenv/versions/3.6.4/envs/ts/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o475.transform.
: org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
	at org.pmml4s.spark.ScoreModel.transform(ScoreModel.scala:59)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.pmml4s.transformations.FieldColumnPair
Serialization stack:
	- object not serializable (class: org.pmml4s.transformations.FieldColumnPair, value: org.pmml4s.transformations.FieldColumnPair@3e25a3a)
	- element of array (index: 0)
	- array (class [Lorg.pmml4s.transformations.FieldColumnPair;, size 1)
	- field (class: org.pmml4s.transformations.MapValues, name: fieldColumnPairs, type: class [Lorg.pmml4s.transformations.FieldColumnPair;)
	- object (class org.pmml4s.transformations.MapValues, org.pmml4s.transformations.MapValues@313fbaaf)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(org.pmml4s.transformations.MapValues@313fbaaf))
	- field (class: org.pmml4s.metadata.OutputField, name: expr, type: class scala.Option)
	- object (class org.pmml4s.metadata.OutputField, OutputField(name=prediction, displayName=None, dataType=double, opType=nominal, feature=transformedValue, targetField=None, value=None, ruleFeature=consequent, algorithm=exclusiveRecommendation, rank=1, rankBasis=confidence, rankOrder=descending, isMultiValued=false, segmentId=None, isFinalResult=true, decisions=None, expr=Some(org.pmml4s.transformations.MapValues@313fbaaf)))
	- element of array (index: 1)
	- array (class [Lorg.pmml4s.metadata.OutputField;, size 5)
	- field (class: org.pmml4s.metadata.Output, name: outputFields, type: class [Lorg.pmml4s.metadata.OutputField;)
	- object (class org.pmml4s.metadata.Output, org.pmml4s.metadata.Output@49f822da)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(org.pmml4s.metadata.Output@49f822da))
	- field (class: org.pmml4s.model.TreeModel, name: output, type: class scala.Option)
	- object (class org.pmml4s.model.TreeModel, org.pmml4s.model.TreeModel@7ddb40e2)
	- field (class: org.pmml4s.spark.ScoreModel, name: model, type: class org.pmml4s.model.Model)
	- object (class org.pmml4s.spark.ScoreModel, scoreModel_957436760f62)
	- field (class: org.pmml4s.spark.ScoreModel$$anonfun$2, name: $outer, type: class org.pmml4s.spark.ScoreModel)
	- object (class org.pmml4s.spark.ScoreModel$$anonfun$2, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
	... 21 more

the attachment is pmml file and iris.csv
data.zip

btw, I tried the example, it works.

Thanks for your finding. The issue about Serialization above has been fixed in the PMML4S library versioned 0.9.1. Please, reinstall the latest version from github by the following command:

pip install --upgrade git+https://github.com/autodeployai/pypmml-spark.git

If you used the script link_pmml4s_jars_into_spark.py to link jars of pmml4s into Spark, you need to run this script again, which will link the new jars instead of old ones.

Please, let me know if you have problem.

works, and thanks!