danzafar/tidyspark

Optimization needed with column window functions

Opened this issue · 1 comments

> iris_tbl %>% 
+     arrange(Petal_Length) %>% 
+     group_by(Species) %>%
+     mutate(lead = lead(Petal_Width, window = windowOrderBy(col("Petal_Length")))) %>% 
+     explain
== Physical Plan ==
Window [lead(Petal_Width#75, 1, null) windowspecdefinition(Species#76, Petal_Length#74 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS lead#275], [Species#76], [Petal_Length#74 ASC NULLS FIRST]
+- *(2) Sort [Species#76 ASC NULLS FIRST, Petal_Length#74 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(Species#76, 200)
      +- *(1) Sort [Petal_Length#74 ASC NULLS FIRST], true, 0
         +- Exchange rangepartitioning(Petal_Length#74 ASC NULLS FIRST, 200)
            +- Scan ExistingRDD[Sepal_Length#72,Sepal_Width#73,Petal_Length#74,Petal_Width#75,Species#76]

The arrange should be subsumed by the windowOrderBy of the same column. Right now it is causing an extra shuffle, which is not great. Will be out of scope for v0.0.1 release.

Also, when column_window_functions are set to the default window this will occur:

> iris_tbl %>% 
+     arrange(Petal_Length) %>% 
+     group_by(Species) %>%
+     mutate(lead = lead(Petal_Width)) %>% 
+     explain
== Physical Plan ==
*(3) Project [Sepal_Length#72, Sepal_Width#73, Petal_Length#74, Petal_Width#75, Species#76, lead#309]
+- Window [lead(Petal_Width#75, 1, null) windowspecdefinition(Species#76, _w0#318L ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS lead#309], [Species#76], [_w0#318L ASC NULLS FIRST]
   +- *(2) Sort [Species#76 ASC NULLS FIRST, _w0#318L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(Species#76, 200)
         +- *(1) Project [Sepal_Length#72, Sepal_Width#73, Petal_Length#74, Petal_Width#75, Species#76, monotonically_increasing_id() AS _w0#318L]
            +- *(1) Sort [Petal_Length#74 ASC NULLS FIRST], true, 0
               +- Exchange rangepartitioning(Petal_Length#74 ASC NULLS FIRST, 200)
                  +- Scan ExistingRDD[Sepal_Length#72,Sepal_Width#73,Petal_Length#74,Petal_Width#75,Species#76]

The default is an order by monotonicallyIncreasingId which should also be subsumed by the incoming arrange. This may require some special attributes to spark_tbl (similar to what we did with grouping) that keep track of these things.