Optimization needed with column window functions
Opened this issue · 1 comments
danzafar commented
> iris_tbl %>%
+ arrange(Petal_Length) %>%
+ group_by(Species) %>%
+ mutate(lead = lead(Petal_Width, window = windowOrderBy(col("Petal_Length")))) %>%
+ explain
== Physical Plan ==
Window [lead(Petal_Width#75, 1, null) windowspecdefinition(Species#76, Petal_Length#74 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS lead#275], [Species#76], [Petal_Length#74 ASC NULLS FIRST]
+- *(2) Sort [Species#76 ASC NULLS FIRST, Petal_Length#74 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Species#76, 200)
+- *(1) Sort [Petal_Length#74 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Petal_Length#74 ASC NULLS FIRST, 200)
+- Scan ExistingRDD[Sepal_Length#72,Sepal_Width#73,Petal_Length#74,Petal_Width#75,Species#76]
The arrange
should be subsumed by the windowOrderBy
of the same column. Right now it is causing an extra shuffle, which is not great. Will be out of scope for v0.0.1 release.
danzafar commented
Also, when column_window_functions
are set to the default window this will occur:
> iris_tbl %>%
+ arrange(Petal_Length) %>%
+ group_by(Species) %>%
+ mutate(lead = lead(Petal_Width)) %>%
+ explain
== Physical Plan ==
*(3) Project [Sepal_Length#72, Sepal_Width#73, Petal_Length#74, Petal_Width#75, Species#76, lead#309]
+- Window [lead(Petal_Width#75, 1, null) windowspecdefinition(Species#76, _w0#318L ASC NULLS FIRST, specifiedwindowframe(RowFrame, 1, 1)) AS lead#309], [Species#76], [_w0#318L ASC NULLS FIRST]
+- *(2) Sort [Species#76 ASC NULLS FIRST, _w0#318L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(Species#76, 200)
+- *(1) Project [Sepal_Length#72, Sepal_Width#73, Petal_Length#74, Petal_Width#75, Species#76, monotonically_increasing_id() AS _w0#318L]
+- *(1) Sort [Petal_Length#74 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(Petal_Length#74 ASC NULLS FIRST, 200)
+- Scan ExistingRDD[Sepal_Length#72,Sepal_Width#73,Petal_Length#74,Petal_Width#75,Species#76]
The default is an order by monotonicallyIncreasingId
which should also be subsumed by the incoming arrange. This may require some special attributes to spark_tbl
(similar to what we did with grouping) that keep track of these things.