Consolidate GroupByHash implementations `row_hash.rs` and `hash.rs` (remove duplication)
alamb opened this issue ยท 18 comments
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As part of the transition to the faster "row format" (#1861 ) , @yjshen implemented a Row based Hash Aggregate implementation in #2375 โค๏ธ
However, the implementation currently implements support for a subset of the data types that DataFusion supports. This made the code significantly faster for some cases but has some downsides:
- Not all data types benefit from the row format performance
- There are two parallel similar but not the same implementations of hash aggregate --
row_hash.rs
andhash.rs
You can already see the potential challenge in PRs like #2716 where test coverage may miss one of the hash aggregate implementations by accident
Describe the solution you'd like
I would like to consolidate the hash aggregate implementations -- success is to delete hash.rs
by adding the additional remaining type support to row_hash.rs
I think this would be a nice project for someone new to DataFusion to work on as the pattern is already defined, the outcome will be better performance, and they will get good experience with the code.
It will also increase the type support for row format and make it easier to roll out through the rest of the codebase
Describe alternatives you've considered
N/A
Additional context
More context about the ongoing row format conversion is #1861
I went through the codebase and some related github issue. So it seems that we need to:
- Support all
DataType
's encoding/decoding including nested types such asList
,Struct
. - Go through all type of
AggregateExpr
and implement relatedRowAccumulator
if it doesn't have one. - Benchmark the above row based aggregation and compare it with the original column one.
- delete
create_accumulator
inAggregateExpr
and always usecreate_row_accumulator
.
Is there anything I am missing? I think I need to do some research on how to implement List
, Struct
.
Thanks @ming535 ! That list looks good to me
Is there anything I am missing? I think I need to do some research on how to implement List, Struct.
I am not sure DataFusion supports aggregating on List
and Struct
-- so I am not sure we need to support a row format for them. If that turns out not to be the case maybe we will ๐ค
I agree we need List
support in Row since it's used by ApproxPercentileCont
, ArrayAgg
, Distinct*
, etc., as state fields. Struct
is not currently used as a state for existing accumulators, but we will need it soon for other purposes.
List and Struct would not be too complicated to implement in the row format:
- For a struct, we could regard it as a variable-length-field(varlena), and interpret/serialize the data as another row.
- List would need one more size field denoting its length.
You could refer to the Spark repo to lend some ideas from its UnsafeRow
implementations.
On the other hand, I suggest we evaluate the Distinct*
aggregates through double aggregation (agg logic rewrite through an optimizer rule) for more predictable/reasonable memory usage, the current "collecting values into list" logic would OOM fast with hot groupings.
This optimizer rule would rewrites an aggregate query with distinct aggregations into an expanded double aggregation in which the regular aggregation expressions and every distinct clause is aggregated in a separate group. The results are then combined in a second aggregate. Check this Spark optimizer rule
Note on RowType::WordAligned
, which is used as the grouping state for hash aggregation:
Since the varlena field would expand its width as new updates come in, fields after varlena should be moved accordingly to make them readable/updatable after a varlena update.
We may consider an "out of place" store for these varlenas for RowType::WordAligned
, to relieve the repeated need for moving/copying fixed-sized fields stored after valena fields. And put them in place when the update is finalized later.
I had a prototype working for recursive List
now. I will submit different pull requests for this issue.
@yjshen Hi, do you know why Distinct*
, for example DistinctCount
's state_fields
returns a vector DataType::List
instead just a vector of DataType
? I have ran a few examples and all of them actually using List
of length 1
.
My understanding is that DistinctCount
is used in queries like: SELECT COUNT(DISTINCT c1) from foo;
.
The logical plan is Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT #foo.c1)]
. The aggr
in the plan will be translated into DistinctCount
(when SingleDistinctToGroupBy is disabled). Since COUNT
is not valid when there are multiple columns, I don't understand why DistinctCount
's state_fields
is a vector of DataType::List
rather than just a vector of DataType.
I think Distinct basically serializes partial results as List
s when combining partial aggregates
I think Distinct basically serializes partial results as
List
s when combining partial aggregates
Does this mean that the design was meant to support some thing like: COUNT(DISTINCT (c1, c2))
which counts the distinct combination of c1 and c2?
Does this mean that the design was meant to support some thing like: COUNT(DISTINCT (c1, c2)) which counts the distinct combination of c1 and c2?
I am not totally sure to be honest
Grant Plan
So @tustvold and I came up with a plan for this. Let me illustrate.
Prior Art
Let's first quickly illustrate what the current state is and what the problem is. We have two group-by implementations:
V1 (hash.rs
)
This is the original, fully-featured implementation. It uses Box<[ScalarValue]>
as a group key and for each group a Box<dyn Accumulator>
to perform the aggregation and store the per-group state (so there's one Accumulator
per group). hash.rs
manages the entire group->state mapping. The grouping is is calculated once per record batch, then a take
kernel reshuffles the batch and we pass slices of that to the individual Accumulator
s.
While this is nice from a flexibility PoV, this is also very slow, because:
Box<[ScalarValue]>
is not exactly fast to compute- There's one dynamic-dispatched object per group
V1 currently does NOT support dynamic memory tracking, see #3940.
V2 (row_hash.rs
)
This uses a row format ... or two to be precise. The group key is stored as DataFusion compact row format. There's one Box<dyn RowAccumulator>
per aggregation target (NOT per group) and the per-group state is stored in the DataFusion word-aligned row format. row_hash.rs
manages the mapping from group key to state and passes a RowAccessor
to the RowAccumulator
. The processing is similar to V1: groups are calculated once per batch, then a take
kernel reshuffles everything and slices are passed to the RowAccumulator
.
V2 is faster than V1, but in our opinion the reasoning is blurry: there are two differences (group key calculation and state storage) and we believe that esp. the first once is responsible for the improvement.
The DF word-aligned row format will never work for some aggregations like non-approximative median. So this is somewhat a dead end.
The good thing is that V2 support memory tracking (see #3940).
Row Formats
See #4179 -- Tl;Dr;: we have 3 row formats in our stack and we should focus on one.
Proposal
We think that V2 has some good ideas but will never get finished and could also be improved. As weird as it may sound (and please read the implementation plan before arguing against it) we propose a V3.
Design
Use the arrow row format as group key. I (= @crepererum ) have some early benchmarks that change alone would improve the V2 performance. Ditch the word-aligned row format for the state management and change the aggregator to:
trait Aggregator: MemoryConsumer {
/// Update aggregator state for given groups.
fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>;
...
}
Hence the aggregator will be dyn-dispatched ONCE per record batch and will keep its own internal state. This moves the key->state map from the [row_]hash.rs
to the aggregators. We will provide tooling (macros or generics) to simplify the implementation and to avoid boilerplate code as much as possible.
Note that this also removes the take
kernel since we think that it doesn't provide any performance improvements over iterating over the hashable rows and perform the aggregation row-by-row. We may bring the take
handling back (as a "perform take
if at least one aggregator wants that) if we can proof (via benchmarks) that this is desirable for certain aggregators, but we leave this out for now.
This also moves the memory consumer handling into the aggregator since the aggregator knows best how to split states and spill data.
Implementation Plan
- arrow row format prep: Fill the gaps that the arrow row format currently has:
- size estimation: So we can hook it into the DF memory management
- types: Support all the arrow types (except for very few that are so weird that nobody cares)
- row format use: Use the arrow row format as a group key in V2. This will strictly allow more group-bys to pass through V2 because we now allow more group key types.
- V2 state management: Change V2 state management to be like the V3 version above. We don't want to migrate all accumulators in one go, so we'll provide an adapter that uses the new
Aggregator
interface described above with theRowAggregator
internally. This may result in a small performance regression for the current V2, but we think it will be very small (if even noticeable, it may even improve perf due to reduced dyn dispatch). Feature support for V2 will stay the same - aggregator migration: Migrate one aggregator at the time to support the new interface. Notice that in contrast to V2
RowAggregator
, the V3 proposal is flexible enough to support all aggregation (even the ones that require dynamic allocations). - remove V1: V2 is now basically V3 but with full feature support. Delete V1.
I general, other than the fact that this proposal sounds like a lot of work, I think it sounds wonderful ๐
I did have a question about the proposed trait implementation:
trait Aggregator: MemoryConsumer {
/// Update aggregator state for given groups.
fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>;
...
}
Is the idea that each aggregator would contain a HashMap or some other way to map keys --> intermediates (rather than the hash map being in the aggregator? This seems like it would result in a fair amount of duplication
I would have expected something like the following (basically push the take
into the aggregator)
trait Aggregator: MemoryConsumer {
/// Update aggregator state for given rows
fn update_batch(&mut self, indices: Vec<usize>, batch: &RecordBatch)) -> Result<()>;
...
}
The big danger of the plan initially seems like that it wouldn't be finished and then we are in an even worse state (3 implementations!) but I think your idea to incrementally rewrite V2 to V3 and then remove V1 sounds like a good mitigation strategy
Is the idea that each aggregator would contain a HashMap or some other way to map keys --> intermediates (rather than the hash map being in the aggregator? This seems like it would result in a fair amount of duplication
Sure, but it reduces dyn dispatch by a lot (once per batch instead once per group), removes the take
kernel and the duplication can be hidden by careful macros/generics.
I would have expected something like the following (basically push the take into the aggregator)
The aggregator CAN perform a take
but doesn't have to. Our position is that in most cases it doesn't make sense. I also don't fully understand how your interface should work because there is no group key in your signature and we want to avoid having an aggregator per group.
The big danger of the plan initially seems like that it wouldn't be finished and then we are in an even worse state (3 implementations!) but I think your idea to incrementally rewrite V2 to V3 and then remove V1 sounds like a good mitigation strategy
There will never be a 3rd implementation. We morph V2 into V3 and than delete V1.
Sure, but it reduces dyn dispatch by a lot (once per batch instead once per group), removes the take kernel and the duplication can be hidden by careful macros/generics.
I understand your point. I would probably have to see a prototype to really understand how complicated it would be in practice. It doesn't feel right to me .
Another thing to consider is other potential aggregation algorithms:
- Externalization (how would aggregator state be dumped / read into external files)? Ideally this wouldn't have to be implemented and tested per algorithm
- GroupBy Merge (where the data is sorted by group keys, so all values for each group are contiguous in the input) -- this is sometimes used as part of externalized group by hash (to avoid rehashing inputs)
Ideally this wouldn't have to be implemented and tested per algorithm
Ideally we would follow the approach I've increasingly been applying to arrow-rs, and recently applied to InList. Namely use concrete types, and only use dyn-dispatch to type erase on dispatching the batches. It would be fairly trivial to implement a HashMapAggregator
that takes a concrete aggregation function, or something
To be clear, I think the outcome of implementing the plan described by @crepererum in #2723 (comment) will be:
- Faster group by hash performance
- More accurate memory usage accounting for grouping
- Set us up for out-of-core grouping (aka spilling group by)
- Make the code easier to work with
So I think this ticket now is a bit complicated as it was originally about avoiding two GroupBy operations which #4924 from @mustafasrepo does, but also has since grown to include conslidating / improving the aggregate implementation as well.
So what I plan to do is to break out @crepererum 's plan in #2723 (comment) into a new ticket, and close this ticket when #4924 is merged
It is actually quite cool to see that @crepererum 's plan from #2723 (comment) is moving right along