JohnSnowLabs/spark-nlp-workshop

Sentence similarity with SparkNLP only works on Google DataProc with ONE sentence, FAILS when multiple sentences are provided

askorostelev opened this issue · 1 comments

Deployed the following colab python code(see link below) to DataProc on Google Cloud and it only works when the input_list is an array with one item, when the input_list has two items then the PySpark job dies with the following error on line "for r in result.collect()" in get_similarity method below:

java.io.IOException: Premature EOF from inputStream
        at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
        at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:739)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
        at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
        at java.lang.Thread.run(Thread.java:745)
input_list=["no error"]                 <---- works
input_list=["this", "throws EOF error"] <---- does not work

link to colab for sentence similarity using spark-nlp:
https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/streamlit_notebooks/SENTENCE_SIMILARITY.ipynb#scrollTo=6E0Y5wtunFi4

def get_similarity(input_list):
    df = spark.createDataFrame(pd.DataFrame({'text': input_list}))
    result = light_pipeline.transform(df)
    embeddings = []
    for r in result.collect():
        embeddings.append(r.sentence_embeddings[0].embeddings)
    embeddings_matrix = np.array(embeddings)
    return np.matmul(embeddings_matrix, embeddings_matrix.transpose())

I've tried changing the "dfs.datanode.max.transfer.threads" to 8192 in hadoop cluster config and still no luck

hadoop_config.set('dfs.datanode.max.transfer.threads', "8192")

How can I get this code working when input_list has multiple items in the array?

Hey @askorostelev ,

the notebook has been updated, let me know if it works for you or if you still encounter issues,

thanks a lot