AbsaOSS/spark-hats

Multiple calls of nestedMapColumn cause dataframe to hang

Closed this issue · 4 comments

Describe the bug
Hi there, I am using nestedMapColumn for multiple nested columns in a dataframe. Any native method I use on the dataframe after multiple uses of nestedMapColumn will hang like .show(), .write(), .select()...etc. When I say hang I mean it just blocks the application code when I use a native dataframe method. When I just use nestedMapColumn once on the dataframe, it works fine. Is this behavior expected? Is there any workaround?

I am using: libraryDependencies += "za.co.absa" %% "spark-hats" % "0.2.2"

To Reproduce
Steps to reproduce the behavior:

  1. Have a dataframe with multiple nested fields.
  2. Apply the nestedMapColumn on multiple fields.
  3. Try to run outputDataframe.show(), outputDataframe.select()...etc

Expected behavior
I expect the dataframe to not hang after using the nestedMapColumn multiple times.

Hello @foti-nikko, thank you for the report.

Please share your Scala and Spark versions. Also, is it an application running in client or cluster mode?

Thanks for the response, @Zejnilovic. I am using Spark version "3.3.0" and Scala version is "2.12.15". And I am running the application locally. Let me know if you need any more information!

image

image

And here is how I am using the api:

val output = df.nestedMapColumn(columnPath + ".value", columnPath + ".value", value => hashWithSaltUdf(value))

It works perfectly when I call it once. But the dataframe hangs when I call it multiple times.

Hi, this is a known issue. Each mapping creates a big projection to the execution plan. The Catalyst optimizer traverses the execution plan using an algorithm that has quadratic or cubic complexity, so after some time the optimization routine becomes very slow.

The issue has been raised with Spark (https://issues.apache.org/jira/browse/SPARK-28090), and it seems resolved in 3.4.0 🥳 (I didn't know about it).

There is a workaround.

  1. Enable cross join:
    spark.conf.set("spark.sql.crossJoin.enabled", "true")
  2. Use a dummy cross join between multiple nested mappings.
    val dfWithId = inputDf.withColumn("some_id", lit(1))
    val dfJustId = Seq(1).toDF("some_id_2").cacheIfNotCachedYet()
    val dfOutput = dfWithId.crossJoin(dfJustId).drop(col("some_id_2"))

A cross-join creates an "optimization barrier", so the optimizer will optimize mappings inside groups that are separated by the join, but can't optimize over it, so the complexity of the traversal won't raise too much. We use this optimization in our project Enceladus:

Also, now after years I haven't looked at this project and you raised the issue, I have an idea how we can work around it directly from spark-hats.

We can implement method that take more than 1 mapping at the same time. All combined mappings should be a single big projection rather than multiple big projections, so the optimizer is happy and the app should not hang.

Thanks so much for the information! This is a big weight off my shoulders.