Loading a saved Pipeline including SparkAsyncDL makes the model lose its inputCol
PowerToThePeople111 opened this issue · 3 comments
Hi guys,
I am using sparkflow 0.7.0 on a Spark 2.4 emr cluster and try to load a pipeline that was created by code that you had in one example script.
from sparkflow.graph_utils import build_graph
from sparkflow.tensorflow_async import SparkAsyncDL
import tensorflow as tf
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.pipeline import Pipeline
from pyspark.sql import SparkSession
import os
def small_model():
x = tf.placeholder(tf.float32, shape=[None, 784], name='x')
y = tf.placeholder(tf.float32, shape=[None, 10], name='y')
layer1 = tf.layers.dense(x, 256, activation=tf.nn.relu)
layer2 = tf.layers.dense(layer1, 256, activation=tf.nn.relu)
out = tf.layers.dense(layer2, 10)
z = tf.argmax(out, 1, name='out')
loss = tf.losses.softmax_cross_entropy(y, out)
return loss
if __name__ == '__main__':
spark = SparkSession.builder \
.appName("examples") \
.getOrCreate()
df = spark.read.option("inferSchema", "true").csv('s3://abucket/mnist_train.csv')
df_test = spark.read.option("inferSchema", "true").csv('s3://abucket/mnist_test.csv')
mg = build_graph(small_model)
#Assemble and one hot encode
va = VectorAssembler(inputCols=df.columns[1:785], outputCol='features')
encoded = OneHotEncoder(inputCol='_c0', outputCol='labels', dropLast=False)
spark_model = SparkAsyncDL(
inputCol='features',
tensorflowGraph=mg,
tfInput='x:0',
tfLabel='y:0',
tfOutput='out:0',
tfLearningRate=.001,
iters=20,
predictionCol='predicted',
labelCol='labels',
verbose=1,
partitions=100
)
p = Pipeline(stages=[va, encoded, spark_model]).fit(df)
p.write().overwrite().save("s3://abucket/mnist_model")
After creating the model, I tried to load the pipeline again and transform the mnist_test.csv dataset.
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
pipeline = PipelineModel.load("s3://abucket/mnist_model")
df_test = spark.read.option("inferSchema", "true").csv("s3://abucket/mnist_test.csv")
predictions = pipeline.transform(df_test)
leads to the stacktrace:
Py4JJavaError: An error occurred while calling o161.transform.
: java.util.NoSuchElementException: Failed to find a default value for inputCol
at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:780)
at org.apache.spark.ml.param.Params$$anonfun$getOrDefault$2.apply(params.scala:780)
at scala.Option.getOrElse(Option.scala:121)
...
When executing alternatively:
predictions = pipeline.stages[2].transform(
pipeline.stages[1].transform(
pipeline.stages[0].transform(df_test)
)
) //the most outer part fails being the model
Maybe there is something wrong with serialisation and deserialsation?
Best,
My bad,
it was documented, but I did not see it before: using the PysparkPipelineWrapper.unwrap method helps. ;)
from pyspark.ml import Pipeline
from sparkflow.pipeline_util import PysparkPipelineWrapper
from pyspark.ml import PipelineModel
pipeline = PysparkPipelineWrapper.unwrap(PipelineModel.load("s3://abucket/mnist_model"))
Thanks for filing the issue, as it reminded me that I believe pyspark recently added the capability for custom Transformers to be saved/loaded. That is why this hack had to go in, which I have wanted it to be gone for awhile now.
I am happy, if that helped in some way. :)