JohnSnowLabs/spark-nlp

KMeans throws “Column features must be of type equal to one of the following types”

Loyoyoyoyo opened this issue · 2 comments

Hi,I think the exception arise from the wrong input of KMeans.Because when I delete the cluster_alg in the pipeline, no exception arise。
I read a similar issue,you advise to use embeddingsSentence. but I don't need a sentenceEmbedding,so I choose embeddingFinisher.

Exception: IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>.

Code

I have some dataset(lang: zh) that segmented by jieba,stopWordsRemover already.

seg_udf = udf(seg, ArrayType(StringType()))
data = data.withColumn('jieba',seg_udf(data['content']))
remover = StopWordsRemover(inputCol="jieba", outputCol="stopwords", stopWords=stop_words)
data = remover.transform(data)
data = data.withColumn("stopwords",concat_ws(" ",col("stopwords")))

then according to #5685 I use RegexTokenizer . then I user BertEmbeddings,embeddingsFinisher and Kmeans
documentAssembler = DocumentAssembler() .setInputCol("stopwords") .setOutputCol("document")

regexTokenizer = RegexTokenizer() .setInputCols(["document"]) .setOutputCol("token") .setToLowercase(True)

bertEmbedding_model = BertEmbeddings.load(model_path).setInputCols("document","token"). setOutputCol("embeddings"). setDimension(768). setStorageRef('bert_multi_cased')

embeddingsFinisher = EmbeddingsFinisher() .setInputCols("embeddings") .setOutputCols("features") .setOutputAsVector(True) .setCleanAnnotations(False)

cluster_alg = KMeans() .setFeaturesCol("features") .setK(10)

pipeline = Pipeline( stages = [ documentAssembler, regexTokenizer, bertEmbedding_model, embeddingsFinisher, cluster_alg ])
model = pipeline.fit(data)
data = model.transform(data)

Spark NLP version and Apache Spark

Apache spark 3.2.0
SparkNLP 4.2.4(2022.11.29)

Current Behavior


IllegalArgumentException Traceback (most recent call last)
Cell In[42], line 9
1 pipeline = Pipeline(
2 stages = [
3 documentAssembler,
(...)
7 cluster_alg
8 ])
----> 9 model = pipeline.fit(data)
10 data = model.transform(data)

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\pyspark\ml\base.py:161, in Estimator.fit(self, dataset, params)
159 return self.copy(params)._fit(dataset)
160 else:
--> 161 return self._fit(dataset)
162 else:
163 raise TypeError("Params must be either a param map or a list/tuple of param maps, "
164 "but got %s." % type(params))

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\pyspark\ml\pipeline.py:114, in Pipeline._fit(self, dataset)
112 dataset = stage.transform(dataset)
113 else: # must be an Estimator
--> 114 model = stage.fit(dataset)
115 transformers.append(model)
116 if i < indexOfLastEstimator:

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\pyspark\ml\base.py:161, in Estimator.fit(self, dataset, params)
159 return self.copy(params)._fit(dataset)
160 else:
--> 161 return self._fit(dataset)
162 else:
163 raise TypeError("Params must be either a param map or a list/tuple of param maps, "
164 "but got %s." % type(params))

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\pyspark\ml\wrapper.py:335, in JavaEstimator._fit(self, dataset)
334 def _fit(self, dataset):
--> 335 java_model = self._fit_java(dataset)
336 model = self._create_model(java_model)
337 return self._copyValues(model)

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\pyspark\ml\wrapper.py:332, in JavaEstimator._fit_java(self, dataset)
318 """
319 Fits a Java model to the input dataset.
320
(...)
329 fitted Java model
330 """
331 self._transfer_params_to_java()
--> 332 return self._java_obj.fit(dataset._jdf)

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.call(self, *args)
1316 command = proto.CALL_COMMAND_NAME +
1317 self.command_header +
1318 args_command +
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):

File D:\Kit\Anaconda\envs\BERTopic_Env\lib\site-packages\pyspark\sql\utils.py:117, in capture_sql_exception..deco(*a, **kw)
113 converted = convert_exception(e.java_exception)
114 if not isinstance(converted, UnknownException):
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise

IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array,values:array>, array, array] but was actually of type array<struct<type:tinyint,size:int,indices:array,values:array>>.

Hi,

K-Mean operates an 1 vector per row. So all the features (in this case your text) must result in 1 vector with N dimension. If you are using BertEmbeddings, you will have 1 vector with N dimensions for each token. You MUST either use an annotator that outputs 1 vector per row, or use SentenceEmbeddings to average all these vectors into 1 vector.

Some examples: https://github.com/JohnSnowLabs/spark-nlp/blob/master/src/test/scala/com/johnsnowlabs/nlp/EmbeddingsFinisherTestSpec.scala

PS: EmbeddingsFinisher is what you need to go from Spark NLP to Spark ML, but the column still has to have an acceptable type.

This issue is stale because it has been open 180 days with no activity. Remove stale label or comment or this will be closed in 5 days