apache/datafusion

Consolidate GroupByHash implementations `row_hash.rs` and `hash.rs` (remove duplication)

alamb opened this issue ยท 18 comments

alamb commented

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As part of the transition to the faster "row format" (#1861 ) , @yjshen implemented a Row based Hash Aggregate implementation in #2375 โค๏ธ

However, the implementation currently implements support for a subset of the data types that DataFusion supports. This made the code significantly faster for some cases but has some downsides:

  1. Not all data types benefit from the row format performance
  2. There are two parallel similar but not the same implementations of hash aggregate -- row_hash.rs and hash.rs

You can already see the potential challenge in PRs like #2716 where test coverage may miss one of the hash aggregate implementations by accident

Describe the solution you'd like
I would like to consolidate the hash aggregate implementations -- success is to delete hash.rs by adding the additional remaining type support to row_hash.rs

I think this would be a nice project for someone new to DataFusion to work on as the pattern is already defined, the outcome will be better performance, and they will get good experience with the code.

It will also increase the type support for row format and make it easier to roll out through the rest of the codebase

Describe alternatives you've considered
N/A

Additional context
More context about the ongoing row format conversion is #1861

I went through the codebase and some related github issue. So it seems that we need to:

  1. Support all DataType's encoding/decoding including nested types such as List, Struct.
  2. Go through all type of AggregateExpr and implement related RowAccumulator if it doesn't have one.
  3. Benchmark the above row based aggregation and compare it with the original column one.
  4. delete create_accumulator in AggregateExpr and always use create_row_accumulator.

Is there anything I am missing? I think I need to do some research on how to implement List, Struct.

alamb commented

Thanks @ming535 ! That list looks good to me

Is there anything I am missing? I think I need to do some research on how to implement List, Struct.

I am not sure DataFusion supports aggregating on List and Struct -- so I am not sure we need to support a row format for them. If that turns out not to be the case maybe we will ๐Ÿค”

I agree we need List support in Row since it's used by ApproxPercentileCont, ArrayAgg, Distinct*, etc., as state fields. Struct is not currently used as a state for existing accumulators, but we will need it soon for other purposes.

List and Struct would not be too complicated to implement in the row format:

  • For a struct, we could regard it as a variable-length-field(varlena), and interpret/serialize the data as another row.
  • List would need one more size field denoting its length.

You could refer to the Spark repo to lend some ideas from its UnsafeRow implementations.

On the other hand, I suggest we evaluate the Distinct* aggregates through double aggregation (agg logic rewrite through an optimizer rule) for more predictable/reasonable memory usage, the current "collecting values into list" logic would OOM fast with hot groupings.

This optimizer rule would rewrites an aggregate query with distinct aggregations into an expanded double aggregation in which the regular aggregation expressions and every distinct clause is aggregated in a separate group. The results are then combined in a second aggregate. Check this Spark optimizer rule

Note on RowType::WordAligned, which is used as the grouping state for hash aggregation:
Since the varlena field would expand its width as new updates come in, fields after varlena should be moved accordingly to make them readable/updatable after a varlena update.

We may consider an "out of place" store for these varlenas for RowType::WordAligned, to relieve the repeated need for moving/copying fixed-sized fields stored after valena fields. And put them in place when the update is finalized later.

I had a prototype working for recursive List now. I will submit different pull requests for this issue.

@yjshen Hi, do you know why Distinct*, for example DistinctCount's state_fields returns a vector DataType::List instead just a vector of DataType? I have ran a few examples and all of them actually using List of length 1.

My understanding is that DistinctCount is used in queries like: SELECT COUNT(DISTINCT c1) from foo; .

The logical plan is Aggregate: groupBy=[[]], aggr=[[COUNT(DISTINCT #foo.c1)]. The aggr in the plan will be translated into DistinctCount (when SingleDistinctToGroupBy is disabled). Since COUNT is not valid when there are multiple columns, I don't understand why DistinctCount's state_fields is a vector of DataType::List rather than just a vector of DataType.

alamb commented

I think Distinct basically serializes partial results as Lists when combining partial aggregates

I think Distinct basically serializes partial results as Lists when combining partial aggregates

Does this mean that the design was meant to support some thing like: COUNT(DISTINCT (c1, c2)) which counts the distinct combination of c1 and c2?

alamb commented

Does this mean that the design was meant to support some thing like: COUNT(DISTINCT (c1, c2)) which counts the distinct combination of c1 and c2?

I am not totally sure to be honest

Grant Plan

So @tustvold and I came up with a plan for this. Let me illustrate.

Prior Art

Let's first quickly illustrate what the current state is and what the problem is. We have two group-by implementations:

V1 (hash.rs)

This is the original, fully-featured implementation. It uses Box<[ScalarValue]> as a group key and for each group a Box<dyn Accumulator> to perform the aggregation and store the per-group state (so there's one Accumulator per group). hash.rs manages the entire group->state mapping. The grouping is is calculated once per record batch, then a take kernel reshuffles the batch and we pass slices of that to the individual Accumulators.

While this is nice from a flexibility PoV, this is also very slow, because:

  • Box<[ScalarValue]> is not exactly fast to compute
  • There's one dynamic-dispatched object per group

V1 currently does NOT support dynamic memory tracking, see #3940.

V2 (row_hash.rs)

This uses a row format ... or two to be precise. The group key is stored as DataFusion compact row format. There's one Box<dyn RowAccumulator> per aggregation target (NOT per group) and the per-group state is stored in the DataFusion word-aligned row format. row_hash.rs manages the mapping from group key to state and passes a RowAccessor to the RowAccumulator. The processing is similar to V1: groups are calculated once per batch, then a take kernel reshuffles everything and slices are passed to the RowAccumulator.

V2 is faster than V1, but in our opinion the reasoning is blurry: there are two differences (group key calculation and state storage) and we believe that esp. the first once is responsible for the improvement.

The DF word-aligned row format will never work for some aggregations like non-approximative median. So this is somewhat a dead end.

The good thing is that V2 support memory tracking (see #3940).

Row Formats

See #4179 -- Tl;Dr;: we have 3 row formats in our stack and we should focus on one.

Proposal

We think that V2 has some good ideas but will never get finished and could also be improved. As weird as it may sound (and please read the implementation plan before arguing against it) we propose a V3.

Design

Use the arrow row format as group key. I (= @crepererum ) have some early benchmarks that change alone would improve the V2 performance. Ditch the word-aligned row format for the state management and change the aggregator to:

trait Aggregator: MemoryConsumer {
    /// Update aggregator state for given groups.
    fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>;
    ...
}

Hence the aggregator will be dyn-dispatched ONCE per record batch and will keep its own internal state. This moves the key->state map from the [row_]hash.rs to the aggregators. We will provide tooling (macros or generics) to simplify the implementation and to avoid boilerplate code as much as possible.

Note that this also removes the take kernel since we think that it doesn't provide any performance improvements over iterating over the hashable rows and perform the aggregation row-by-row. We may bring the take handling back (as a "perform take if at least one aggregator wants that) if we can proof (via benchmarks) that this is desirable for certain aggregators, but we leave this out for now.

This also moves the memory consumer handling into the aggregator since the aggregator knows best how to split states and spill data.

Implementation Plan

  1. arrow row format prep: Fill the gaps that the arrow row format currently has:
    • size estimation: So we can hook it into the DF memory management
    • types: Support all the arrow types (except for very few that are so weird that nobody cares)
  2. row format use: Use the arrow row format as a group key in V2. This will strictly allow more group-bys to pass through V2 because we now allow more group key types.
  3. V2 state management: Change V2 state management to be like the V3 version above. We don't want to migrate all accumulators in one go, so we'll provide an adapter that uses the new Aggregator interface described above with the RowAggregator internally. This may result in a small performance regression for the current V2, but we think it will be very small (if even noticeable, it may even improve perf due to reduced dyn dispatch). Feature support for V2 will stay the same
  4. aggregator migration: Migrate one aggregator at the time to support the new interface. Notice that in contrast to V2 RowAggregator, the V3 proposal is flexible enough to support all aggregation (even the ones that require dynamic allocations).
  5. remove V1: V2 is now basically V3 but with full feature support. Delete V1.
alamb commented

I general, other than the fact that this proposal sounds like a lot of work, I think it sounds wonderful ๐Ÿ†

I did have a question about the proposed trait implementation:

trait Aggregator: MemoryConsumer {
    /// Update aggregator state for given groups.
    fn update_batch(&mut self, keys: Rows, batch: &RecordBatch)) -> Result<()>;
    ...
}

Is the idea that each aggregator would contain a HashMap or some other way to map keys --> intermediates (rather than the hash map being in the aggregator? This seems like it would result in a fair amount of duplication

I would have expected something like the following (basically push the take into the aggregator)

trait Aggregator: MemoryConsumer {
    /// Update aggregator state for given rows
    fn update_batch(&mut self, indices: Vec<usize>, batch: &RecordBatch)) -> Result<()>;
    ...
}

The big danger of the plan initially seems like that it wouldn't be finished and then we are in an even worse state (3 implementations!) but I think your idea to incrementally rewrite V2 to V3 and then remove V1 sounds like a good mitigation strategy

Is the idea that each aggregator would contain a HashMap or some other way to map keys --> intermediates (rather than the hash map being in the aggregator? This seems like it would result in a fair amount of duplication

Sure, but it reduces dyn dispatch by a lot (once per batch instead once per group), removes the take kernel and the duplication can be hidden by careful macros/generics.

I would have expected something like the following (basically push the take into the aggregator)

The aggregator CAN perform a take but doesn't have to. Our position is that in most cases it doesn't make sense. I also don't fully understand how your interface should work because there is no group key in your signature and we want to avoid having an aggregator per group.

The big danger of the plan initially seems like that it wouldn't be finished and then we are in an even worse state (3 implementations!) but I think your idea to incrementally rewrite V2 to V3 and then remove V1 sounds like a good mitigation strategy

There will never be a 3rd implementation. We morph V2 into V3 and than delete V1.

alamb commented

Sure, but it reduces dyn dispatch by a lot (once per batch instead once per group), removes the take kernel and the duplication can be hidden by careful macros/generics.

I understand your point. I would probably have to see a prototype to really understand how complicated it would be in practice. It doesn't feel right to me .

Another thing to consider is other potential aggregation algorithms:

  1. Externalization (how would aggregator state be dumped / read into external files)? Ideally this wouldn't have to be implemented and tested per algorithm
  2. GroupBy Merge (where the data is sorted by group keys, so all values for each group are contiguous in the input) -- this is sometimes used as part of externalized group by hash (to avoid rehashing inputs)

Ideally this wouldn't have to be implemented and tested per algorithm

Ideally we would follow the approach I've increasingly been applying to arrow-rs, and recently applied to InList. Namely use concrete types, and only use dyn-dispatch to type erase on dispatching the batches. It would be fairly trivial to implement a HashMapAggregator that takes a concrete aggregation function, or something

alamb commented

To be clear, I think the outcome of implementing the plan described by @crepererum in #2723 (comment) will be:

  1. Faster group by hash performance
  2. More accurate memory usage accounting for grouping
  3. Set us up for out-of-core grouping (aka spilling group by)
  4. Make the code easier to work with
alamb commented

So I think this ticket now is a bit complicated as it was originally about avoiding two GroupBy operations which #4924 from @mustafasrepo does, but also has since grown to include conslidating / improving the aggregate implementation as well.

So what I plan to do is to break out @crepererum 's plan in #2723 (comment) into a new ticket, and close this ticket when #4924 is merged

alamb commented

I filed #4973 to track consolidating the aggregators

alamb commented

It is actually quite cool to see that @crepererum 's plan from #2723 (comment) is moving right along