PRQL/prql

Window functions, revisited

aljazerzen opened this issue ยท 16 comments

Working on the type checking lately, I've realized that window functions don't quite fit into the current type system.

If we ignore that we coerce scalars into tuples of scalars (select x into select {x}), the type of select would be:

let select = columns<{scalar..}> rel<relation> -> <relation> ...

... which says that it takes a tuple of any scalars and a relation to produce another relation.

But if you have:

select {x, sum x, lag 1 x}

... the type of x and sum x would be scalar, but type of lag 1 x would be [scalar] (array of scalars).

Also, what's the type of x? I just said that it is scalar, but how can it then be passed into sum and lag, which both expect an array of scalars?


What I propose to do:

  • change window function to work similar to select and derive:
    select {x}
    window {lag 1 x}
  • ban using aggregation or window functions in all functions except aggregate and window,

This would mean that:

  • in select, column references resolve to scalars and the resulting tuple contains scalars,
  • in window, column references resolve to arrays of scalars and the resulting tuple contains arrays of scalars
  • in window, you cannot do x + y, because x is an array and y is an array,
  • in aggregate, column references resolve to arrays of scalars and the resulting tuple contains scalars,
  • in aggregate, you cannot do sum (x + y), because x is an array and y is an array, but you could do (sum x) + (sum y), because the sums are scalars.

Essentially, select and derive would operate row-wise, window would operate column-wise and aggregate would do something in-between.

Examples:

  from tracks
- derive {normalized_price = price / average price}
+ window {avg_price = average price}
+ derive {normalized_price = price / avg_price
  from tracks
- derive {rnk = rank}
+ window {rnk = rank}

This would also resolve the unexpected "windowing by default" behavior.

To me this seems like a more consistent change, but I am concerned about the conflict between a function that takes only one argument, such as lag, and a window that takes a range.

from orders
sort day
# Collect
window rows:-3..3 {centered_weekly_average = average value}
from orders
sort day
# Collect?
window rows:-3..0 {lag_value = lag value}
from orders
sort day
# Error?
window rows:-3..3 {lag_value = lag value}
snth commented

I'm hesitant to weigh in here because I don't know the type system that well but since you asked for comments I will give it a go.

Aren't the arguments to select "scalar expressions" rather than scalars? It seems to me that in select {x, sum x, lag 1 x}, each of the three column projections is a scalar expression which evaluates to a scalar, including lag 1 x.

For example

from artists
take 3
select {artist_id, name, lag 1 artist_id}

produces the SQL

WITH table_1 AS (
  SELECT
    artist_id,
    name
  FROM
    artists
  LIMIT
    3
)
SELECT
  artist_id,
  name,
  LAG(artist_id, 1) OVER ()
FROM
  table_1 AS table_0

-- Generated by PRQL compiler version:0.8.1 (https://prql-lang.org)

which in the playground evaluates to

artist_id name lag(artist_id, 1) OVER ()
1 AC/DC null
2 Accept 1
3 Aerosmith 2

Overall I really like the current behaviour of the window functions and would be sad to lose it because to me it's one of the great features of PRQL. Being able to do things like the following is a great selling point:

from tbl
filter x > average x

I understood less of the points below the line and will have another look at them in the light of day in the morning.

@eitsupi I'm not sure if I understand your concern here. Are you taking about how average produces only one value, so it makes sense to compute that value for different windows (expressed with rows:x..y)? And how lag produces the whole array at once, so it does not make sense to use different windows?


@snth Well, the type of select arguments is "scalars", there is no such type as "scalar expression" (or rather, when talking about types, this would be the same thing).

Type of lag 1 x cannot be scalar - that would mean that x is just one value (i.e. 42) and lag 1 x is just one value (i.e. 198). I think of lag as taking a whole array (i.e. [198, 42, 33]) and returning the whole array (i.e. [null, 198, 33]).

Current implementation does not concern itself with types too much, and does what SQL does: on the surface it seems like you are operating with scalar values, but actually most of the time you are operating with whole columns at once.

I do get that current behavior is convenient and I do hope we'll be able to tweak the type system to allow it.

snth commented

x is as much of an array as lag 1 x is. In your example x is [198, 42, 33] and lag 1 x is [null, 198, 42]. When evaluated for the current row, they are both scalars.

Similarly for sum x. While it is a reduction that reduces the whole array x to a single scalar, the semantics of the window function are such that that value is repeated for each row so it also evaluates to a scalar in the current row context.

This is the problem:

When evaluated for the current row, they are both scalars.

If x is an array and lag 1 x is an arrays, then {x, lag 1 x} is a tuple of arrays. How would that be evaluated for a row? I'm saying that the tuple is the row and it is what's evaluated for each row of the input relation.

Am I explaining myself well enough?

@eitsupi I'm not sure if I understand your concern here. Are you taking about how average produces only one value, so it makes sense to compute that value for different windows (expressed with rows:x..y)? And how lag produces the whole array at once, so it does not make sense to use different windows?

Sorry for the incorrect explanation. I should have thought carefully about the meaning of your example.

Am I correct in understanding that perhaps functions like lag can only be used in a window that does not have a range?

window {foo = lag 1 bar} # Ok
window rows:.. {foo = lag 1 bar} # Ok
window -1..0 {foo = lag 1 bar} # Err

Me and @snth had a call and we discussed this issue, because we figured it
needed more bandwidth. We've found that the problem that you brought to light,
@eitsupi, really highlights the core of inconsistencies here.

So let me try to state the problem again:

Edit: this is getting really long, I hope this does not discourage the reader :D

The problem

There are three types of functions in PRQL (and SQL):

let floor   = <scalar>   -> <scalar>     # select / derive
let sum     = <[scalar]> -> <scalar>     # window / aggregate
let lag_one = <[scalar]> -> <[scalar]>   # window?

... and three transforms that we want to define dynamic semantics of:

let select    =           <{scalar..}> <relation> -> <relation>
let aggregate =           <{scalar..}> <relation> -> <relation>
let window    = rows:0..0 <{scalar..}> <relation> -> <relation>

With this I mean that we need to define a set of rules that can be used to take
an expressions and apply these rules until the expression becomes a value, in
this case a relation literal.

For example, here are semantics for our arithmetic:

    5 * (6 / 2 + 2) % 2
=>  5 * (3     + 2) % 2
=>  5 * 5           % 2
=>  25              % 2
=>  1

So let's use this example relation:

let my_rel = [
    { x = 1, y = 2 },
    { x = 2, y = 4 },
    { x = 6, y = 7 },
    { x = 3, y = 7 },
]

And try to define what would be result of this:

    my_rel | window rows:-1..1 { a = sum x }
=>  cols_to_rows {a = [
        [1, 2] | sum,
        [1, 2, 6] | sum,
        [2, 6, 3] | sum,
        [6, 3] | sum,
    ]}
=>  cols_to_rows {a = [
        3,
        9,
        11,
        9,
    ]}
=>  [
        {a = 3},
        {a = 9},
        {a = 11},
        {a = 9},
    ]

... here, I've inlined what window does: creates windows into input relation and
applies the arg expression to each of the windows. Then it gets a result for
each of the windows and composes that back into a relation.

There is a few questionable things happening here, but let's focus on what
happens if we use lag_one instead of sum:

   my_rel | window { a = lag_one x } rows:-1..0
=>  {a = [
        [1, 2] | lag_one,
        [1, 2, 6] | lag_one,
        [2, 6, 3] | lag_one,
        [6, 3] | lag_one,
    ]}
=>  {a = [
        [null, 1],
        [null, 1, 2],
        [null, 2, 6],
        [null, 6],
    ]}

... now what? The result should be [null, 1, 2, 6]? There is clearly something
wrong and it could only be my derivation rule of how window works.

Solution 1: broadcasting

Idea is simple: if an array is expected, but a scalar is found, the scalar
broadcasts into an array:

   [1, 2, 4] == 4
=> [1, 2, 4] == [4, 4, 4]
=> [1 == 4, 2 == 4, 4 == 4]

In the last derivation, I've also used a vectorization, i.e. converting an
function that works on two scalars into a function that is applied to two
arrays, for each pair of elements separately.

We can now define that our transforms work like this:

    select {...} my_rel
=>  (x y -> {...}) { x = [1, 2, 6, 3], y = [2, 4, 7, 7] }

... in words, transforms convert the relation from row-wise format into a
column-wise format. Any references to the columns would then return arrays of
values for the whole column.

    my_rel | select {
        a = floor x,
        b = sum x,
        c = lag_one x,
        d = 5,
    }
=>  cols_to_rows {
        a = floor [1, 2, 6, 3],
        b = sum [1, 2, 6, 3],
        c = lag_one [1, 2, 6, 3],
        d = 5,
    }
=>  cols_to_rows {
        a = [floor 1, floor 2, floor 6, floor 3],
        b = 12,
        c = [null, 1, 2, 6],
        d = 5,
    }
=>  cols_to_rows {
        a = [1, 2, 6, 3],
        b = [12, 12, 12, 12], # select expects an array -> broadcast
        c = [null, 1, 2, 6],
        d = [5, 5, 5, 5],
    }
=>  [
        { a = 1, b = 12, c = null, d = 5 },
        { a = 2, b = 12, c = 1,    d = 5 },
        { a = 6, b = 12, c = 2,    d = 5 },
        { a = 3, b = 12, c = 6,    d = 5 },
    ]

In Pythonic pseudocode, it would be expressed as:

    select { a = floor x, b = sum x, c = lag_one x, d = 5 }
=>  {
        a = [floor i for i in x],
        b = [sum x   for _ in x],
        c = lag_one x
        d = [5       for _ in x]
    }
    window rows:-1..0 {a = floor x, b = sum x, c = lag 1 x, d = 5}
=>  {
        a = [floor i for i in x],
        b = [sum [x[i-1:i]] for i, _ in enumarate(x)],
        c = lag_one x,
        d = [5       for _ in x]
    }

Pros:

  • this is what numpy does and to some degree, SQL too,
  • simple and convientient,

Cons:

  • sometimes it is not obvious what is being broadcast, esspecially if you are
    unfamiliar with signatures of functions,
  • if rows contain arrays, it may get confusing (see examples below),
  • I'm not certain if this is ambigious for functions with 3 arguments (what
    happens when you do lag x y, where x and y are both arrays?)
    [
        {x = 1, y = [5, 6]},
        {x = 2, y = [2]},
        {x = 6, y = [1, 2, 3]},
        {x = 3, y = [3, 5]},
    ] | select { x == y }
=>  cols_to_rows {
        [1, 2, 6, 3] == [[5, 6], [2], [1, 2, 3], [3, 5]]
    }
=>  cols_to_rows {
        [
            1 == [5, 6],
            2 == [2],
            6 == [1, 2, 3],
            3 == [3, 5]
        ]
    }
=>  cols_to_rows {
        [
            [1, 1] == [5, 6],
            [2] == [2],
            [6, 6, 6] == [1, 2, 3],
            [3, 3] == [3, 5]
        ]
    }
=>  cols_to_rows {
        [
            [false, false],
            [true],
            [false, false, false],
            [true, false]
        ]
    }

Solution 2: row-wise computation

Idea here is that the argument to select is evaluated for each row.
This allows all expressions in the tuple to be scalars.

    my_rel | select {a = floor x}
=>  [
        { a = floor 1 },
        { a = floor 2 },
        { a = floor 6 },
        { a = floor 3 },
    ]
=>  [
        { a = 1 },
        { a = 2 },
        { a = 6 },
        { a = 3 },
    ]

This has obvious advantage of not having to deal with vectorized operations,
at the cost of complicating the transforms. Here is a list of changed transforms,
with types that they provide to their argument expression (x) and types that
the their argument expression should have.

select:

  • provides scalars,
  • expects scalars,
  • executes once per input row,

window:

  • provides an array,
  • expects scalars,
  • executes once per input row,

aggregate:

  • provides an array,
  • expects scalars,
  • executes once,

columnar (this needs a better name):

  • provides an array,
  • expects an array,
  • executes once,
my_rel | select {
    a = floor x, # ok
    b = sum x,   # error: expected an array but got scalar
    c = lag 1 x, # error: lag expected an array but got scalar, select expected a scalar but got array
    d = 5        # ok
}

my_rel | window {
    a = floor x, # error: window expected a scalar but got array
    b = sum x,   # ok
    c = lag 1 x, # error: window expected a scalar but got array
    d = 5        # ok
}
my_rel | aggregate {
    a = floor x, # error: expected a scalar but got array
    b = sum x,   # ok
    c = lag 1 x, # error: expected a scalar but got array
    d = 5        # ok
}
my_rel | columnar {
    a = floor x, # error: floor expected a scalar but got array, columnar expected an array but got scalar
    b = sum x,   # error: sum expected an array but got scalar
    c = lag 1 x, # ok
    d = 5        # error: columnar expected an array but got scalar
}

@aljazerzen Thank you for your very detailed and clear explanation!

R vector (a column of DataFrame) do exactly that broadcasting, which is why functions like dplyr::mutate() sometimes do not work well for list type (array) columns, so dplyr has rowwise() function to swich to row-wise computation.
https://dplyr.tidyverse.org/articles/rowwise.html#list-columns

df <- tibble::tibble(x = c(list(1:5), list(5:6)))

df |>
  dplyr::mutate(y = length(x))
#> # A tibble: 2 ร— 2
#>   x             y
#>   <list>    <int>
#> 1 <int [5]>     2
#> 2 <int [2]>     2

# Make `mutate` to row-wise operation
df |>
  dplyr::rowwise() |>
  dplyr::mutate(y = length(x))
#> # A tibble: 2 ร— 2
#> # Rowwise:
#>   x             y
#>   <list>    <int>
#> 1 <int [5]>     5
#> 2 <int [2]>     2

# Or, use map function
df |>
  dplyr::mutate(y = purrr::map_int(x, \(x) length(x)))
#> # A tibble: 2 ร— 2
#>   x             y
#>   <list>    <int>
#> 1 <int [5]>     5
#> 2 <int [2]>     2

Created on 2023-06-06 with reprex v2.0.2

Separating columnar operations seems like a worthwhile thing for me.

However, I think that just as select has derive, columnar operation needs another operation to only add columns, so would it be better to cover it with something like group that changes the behavior of the select or derive?

my_rel | columnar (select {
    a = floor x, # error: floor expected a scalar but got array, columnar expected an array but got scalar
    b = sum x,   # error: sum expected an array but got scalar
    c = lag 1 x, # ok
    d = 5        # error: columnar expected an array but got scalar
})

my_rel | columnar (derive {
    a = floor x, # error: floor expected a scalar but got array, columnar expected an array but got scalar
    b = sum x,   # error: sum expected an array but got scalar
    c = lag 1 x, # ok
    d = 5        # error: columnar expected an array but got scalar
})

This is exactly the inverse of how dplyr switches between vectorise and rowwise operations by rowwise().

One problem is that the order becomes even more complicated when group is also added to this.......

my_rel | group {} (columnar (select {})) # Ok?
my_rel | columnar (group {} (select {})) # ?

Given the reduction in complexity, would something like select:columnar, select:rowwise or select:window be better?

Edit: I realized after I wrote it that this should be not done because it ruins the separation of SQL's SELECT into PRQL's select and aggregate......

Some of this (less well-developed) was in this comment back in the ancient days of 2022: #1069 (comment). I think it's worth considering the different types of window functions, which IIRC lead to the difference in results between LAG and SUM above. I think that:

  • The "Most window functions" map to "Columnar"
  • The "Aggregate analytic functions" map to "Window"

So I do think those are different types, and we could handle them differently. How would this look in terms of the SQL produced? Is the proposal (at least with Row-wise), to tighten up what PRQL processes, but otherwise produce similar SQL?


I'm trying to describe to myself with from foo | select (sum bar) is bad. What's the type violation there? That select must operate row-wise?
What's the reason it can't operate based on the type of the function that is passed in โ€” i.e. it just runs the inner function on the column, and that function could operate on each row (e.g. floor) or on the whole column (e.g. sum).

Is it that the typing is too dynamic / flimsy, and stops us from making it helpful?


(+ thank you @aljazerzen for the excellent and thorough issue!)

Is the proposal (at least with Row-wise), to tighten up what PRQL processes, but otherwise produce similar SQL?

Yes, just on the PRQL side. Emitted SQL should be the same.


I'm trying to describe to myself with from foo | select (sum bar) is bad. What's the type violation there?

That depends on which proposed solution you adhere to. Solution 1 would say "everything is alright", solution 2 would say:

from foo | select (sum bar)
                       ---
                        \___ sum expected an array, but found a scalar

What's the reason it can't operate based on the type of the function that is passed in โ€” i.e. it just runs the inner function on the column, and that function could operate on each row (e.g. floor) or on the whole column (e.g. sum).

That's the solution 1.


PS: I'm still working on adaptation what @eitsupi proposed

snth commented

I believe the semantics of SQL are largely "row-wise operation". I think for PRQL the broadcasting model is slightly more general and nicer. For scalars (e.g. 5) and scalar functions (e.g. floor x) the results are the same and it's only aggregate functions (e.g. sum x) that provide an extension which is nice to have and I believe well-defined enough to include.

lag 1 x is a red herring IMHO and I will address that in the next section. However for the purposes of this discussion, window functions always produce a single scalar value for each row so we don't have to worry about the array/vector case. I guess they are probably best thought of as "columnar" in that the value produced for the current row may not depend on any value input from the current row (LAG and LEAD being good examples of this). Therefore the broadcasting model probably better incorporates this conceptually than row-wise operation.


Now to LAG. I don't think it works as in the examples above. Exactly how it works I have been trying to figure out over the past two days and I still don't understand it properly. However it's not just that the whole column vector is lagged by 1 (or n) places. When an ORDER BY clause is provided in the OVER clause then the windows are selected over logical ranges rather than physical rows and if a RANGE argument is given then the number of elements in each window can differ by row.

I tried to read the Postgres docs on Window Function Calls closely but what I understood from that doesn't actually pan out in practice like I thought it would.

Another thing is that Postgresql and DuckDB don't produce the same results. I haven't tried other RDBMs yet.

Let's look at the following example:

from_text format:json """
[
  {"i":1, "x":3},
  {"i":2, "x":1},
  {"i":3, "x":2},
  {"i":4, "x":3},
  {"i":5, "x":1}
]
"""
derive [a=lag 2 x]
window (
  sort x
  derive [b = lag 2 x]
  )
window rows:-1..0 (
  derive [c = lag 2 x]
  )
window range:-1..0 (
  derive [d = lag 2 x]
  )
sort i
select [x, a, b, c, d]

in the playground this produces the following SQL:

WITH table_0 AS (
  SELECT
    1 AS i,
    3 AS x
  UNION
  ALL
  SELECT
    2 AS i,
    1 AS x
  UNION
  ALL
  SELECT
    3 AS i,
    2 AS x
  UNION
  ALL
  SELECT
    4 AS i,
    3 AS x
  UNION
  ALL
  SELECT
    5 AS i,
    1 AS x
),
table_3 AS (
  SELECT
    x,
    LAG(x, 2) OVER () AS a,
    LAG(x, 2) OVER (
      ORDER BY
        x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS b,
    LAG(x, 2) OVER (
      ORDER BY
        x ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    ) AS c,
    LAG(x, 2) OVER (
      ORDER BY
        x RANGE BETWEEN 1 PRECEDING AND CURRENT ROW
    ) AS d,
    i
  FROM
    table_0 AS table_1
  ORDER BY
    i
)
SELECT
  x,
  a,
  b,
  c,
  d
FROM
  table_3 AS table_2

-- Generated by PRQL compiler version:0.8.1 (https://prql-lang.org)

and the following output (DuckDB):

x a b c d
3 null 1 1 1
1 null null null null
2 3 1 1 1
3 1 2 2 2
1 2 null null null

In Postgres I get the following output:

x a b c d
3 1 1 1 1
1 null null null null
2 1 1 1 1
3 2 2 2 2
1 null null null null

I would have thought that column c should be NULL everywhere since the window only contains two rows and the LAG(x,2) would take you outside of that? ๐Ÿคทโ€โ™‚๏ธ

For column a DuckDB acts based on the physical layout of the rows whereas Postgres uses the logical ordering of x. However my understanding of the Postgres docs was that since no ORDER BY was given, there is no ordering imposed on the values so all values should be "peers" so I would have thought this should be all NULLs as well. I'm thoroughly confused. I'd appreciate any insights you can share on this.

I guess they are probably best thought of as "columnar" in that the value produced for the current row may not depend on any value input from the current row

That's the best description of the semantics. Window functions in SQL are "tunnels" to other rows - they can pull scalar values from other rows! Quite magic :D


The nuances of ROW vs RANGE are also a bit lost on me. I'm not sure what I'd expect the result to be in these cases, but fortunately this question is tangential to the types of the window functions. @snth you could you move this finding to a new issue?

I found that there is no lag/lead function in ClickHouse.
https://clickhouse.com/docs/en/sql-reference/window-functions

It seems that the lagInFrame/leadInFrame functions can be used instead.

use lagInFrame/leadInFrame, which are analogous, but respect the window frame. To get behavior identical to lag/lead, use rows between unbounded preceding and unbounded following

I only post this because I feel it is relevant to the discussion here.

The nuances of ROW vs RANGE are also a bit lost on me. I'm not sure what I'd expect the result to be in these cases, but fortunately this question is tangential to the types of the window functions. @snth you could you move this finding to a new issue?

AFAIK row is deciding which rows fits into window computation not looking at what the ordering data have, just the order. On the other hand range is deciding what rows will fit into window based on order column values, being it a Date, timestamp or even an integer, where all those types are representing a sparse data. For a (not sparse) data that ordering columns is full sequence 1:n (or a Date having continuous values) there will be no difference between row and range, but if ordering column has gaps (sparse data) then they will differ. Use case is also presented in Rdatatable/data.table#3241

I just wanted to throw in a common use case: forward-filling / backfilling timeseries values.

SELECT 
      day,
      reading_time,
      LAST_VALUE(temperature) IGNORE NULLS OVER (
          PARTITION BY day
          ORDER BY reading_time
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
     ) AS temperature,
     LAST_VALUE(pressure) IGNORE NULLS OVER  (
          PARTITION BY day
          ORDER BY reading_time
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
     ) AS pressure
FROM weather_data

I can't find a clean way of doing this in PRQL right now. This is also related to this ticket: #2622

I just wanted to throw in a common use case: forward-filling / backfilling timeseries values.

SELECT 
      day,
      reading_time,
      LAST_VALUE(temperature) IGNORE NULLS OVER (
          PARTITION BY day
          ORDER BY reading_time
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
     ) AS temperature,
     LAST_VALUE(pressure) IGNORE NULLS OVER  (
          PARTITION BY day
          ORDER BY reading_time
          ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
     ) AS pressure
FROM weather_data

I can't find a clean way of doing this in PRQL right now. This is also related to this ticket: #2622

I think it should go to its own issue. Another way to achieve it is as-of joins.