Memory Limited GroupBy (Externalized / Spill)
alamb opened this issue ยท 23 comments
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Support Grouping "arbitrarily" large inputs (e.g. when the group by hash table doesn't fit in RAM or within some user defined budget)
This typically happens when there are a "large" number of distinct groups. For example, with queries like
-- Find the top ten users by event count
SELECT user_id, count(events)
FROM users
GROUP BY user_id
ORDER BY count(events) DESC
limit 10
When there are large number of groups
This ticket concerns the memory used the HashAggregateExec
operator -- it doesn't cover other potential targets (e.g. externalized sort or join). That will be covered by other tasks tracked by #587
Describe the solution you'd like
- Allow DataFusion users to specify a RAM budget (aka via the config introduced in #1526) and have their queries complete running without the group by exceeding the budget allocated to it via the memory manager.
For the HashAggregateExec
operator, I think the best behavior would be:
- Use an in memory hash table (as is done today in
HashAggregateExec
), if the memory budget allows - If all the input does not fit in the RAM budget, the hash table and partial aggregates are sorted by group key and written to temporary disk files
- Temporary disk files are read / merged to produce the final results
Hopefully after #1568 is complete we'll have an efficient N-way merge that can be reused.
Some ideas of how to break this task down
Describe alternatives you've considered
TBD
Context
This is follow on work from the great PR from @yjshen in #1526 and part of the story of limiting memory used by DataFusion #587
For hash aggregation I quite like the implementation described in https://dl.acm.org/doi/pdf/10.1145/2588555.2610507 (4.4). The algorithm there optimizes for cache locality by pre-aggregating into fixed-size thread-local hash tables and then overflowing into main memory partitions, but I think it should be easy to extend with also flushing these partitions to disk if they get too large.
An idea to brainstorm this feature. master...milenkovicm:arrow-datafusion:spill_aggregation_to_mmap has a POC code which uses a memory mapped file as the aggregation store, which should spill to disc in case of memory pressure.
This solution is just a prototype, far from perfect, and good chance it is not fit for purpose.
The main change is that Vec<RowGroupState>
has been replaced with Vec<u8>
, which is then replaced with mmap file.
Problems with this solution:
- assumption is that aggregation buffer size will not change. This assumption may go away but working with underlying state will get way more complicated, as we will need to implement buffer reallocation, eviction ...
- we don't have control when the data is going to be evicted from memory, we can use
lock
andunlock
on mmap to try to keep it in memory but not sure if that would be the case
good point(s):
- this solution chunks aggregation state in multiple batches when downstream producer finishes, thus limiting memory needed to read aggregation state (in theory)
no performance numbers at the moment, and not much experience with datafusion
Any comments welcomed
I will try and review this sometime later today.
thanks @alamb,
just a follow up with some numbers
running a simple query on 10M distinct keys and 50M records total (each key has 5 records) ( /usr/bin/time -l ./bench_10m
)
with this change:
112.10 real 363.15 user 21.28 sys
2246549504 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
3689321 page reclaims
503785 page faults
0 swaps
0 block input operations
0 block output operations
0 messages sent
0 messages received
0 signals received
5924 voluntary context switches
442244 involuntary context switches
2212997211010 instructions retired
1232264629987 cycles elapsed
1254973440 peak memory footprint
without change:
109.09 real 355.84 user 23.10 sys
3891322880 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
5401700 page reclaims
0 page faults
0 swaps
0 block input operations
0 block output operations
0 messages sent
0 messages received
0 signals received
15 voluntary context switches
414334 involuntary context switches
2222284866547 instructions retired
1208398130091 cycles elapsed
3404832768 peak memory footprint
Thanks for the info @milenkovicm
In general I think database systems tend to try and shy away from using mmap for spilling operations, for the reasons very clearly articulated in "Are You Sure You Want to Use MMAP in Your Database Management System?" https://db.cs.cmu.edu/mmap-cidr2022/
I think the current state of the art / best practice for dealing with grouping when the hashtable doesn't fit in memory is to:
- Sort the data by groups and write that data to a spill file (ideally sharing / reusing the code used for externalized sort)
- Create a new hash table, spilling repeatedly if needed
- When all the input has been processed, then the spill files are read back and merged (again sharing code for externalized sort) and a merge group by is performed.
Thanks to @yjshen we have much of the spilling / merging code already. We are lacking:
- merge group by (where the groups are guaranteed to be contiguous in the input stream)
- The orchestration logic to spill and then re-merge
Does that make sense? I know it is a high level description
thanks @alamb,
your comment makes perfect sense, I can't disagree with you. As I said in my comment approach is not without its problems and shortcomings. This solution is quite simple and it is question if there is a speed from simplicity :)
Would you have any design doc regarding merge group by and re-merge which you can share?
Would you have any design doc regarding merge group by and re-merge which you can share?
I do not have a design document, sadly. I can try and help write one (perhaps I can make a diagram and example) if you are interested in working on it @milenkovicm
I'd be interested. after a big sleep I think I get your approach, but if you can produce a diagram it would be great.
can't really commit on timeframe, will have to have a look at code first
thanks @alamb
๐ sounds good -- I will try and write up something in the coming days to start a discussion
Update on this task is @crepererum has added initial support to track the memory use in the grouping operation. We @tustvold / @crepererum (and myself as ticket master) have plans to improve the grouping operation which I will make a bit more discoverable shortly
I'd be interested. after a big sleep I think I get your approach, but if you can produce a diagram it would be great.
@milenkovicm very belatedly, here is a document / diagrams: https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing
@alamb @yjshen
Can we make the GroupState
and the Accumulator states serializable ?
With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside the GroupState
to avoid the re-calculating.
Can we make the
GroupState
and the Accumulator states serializable ? With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside theGroupState
to avoid the re-calculating.
You still need to disk spilling, no? Or where do you store the serialized state? Also I guess that serialization may become a major bottleneck for some of the accumulators.
Can we make the
GroupState
and the Accumulator states serializable ? With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside theGroupState
to avoid the re-calculating.You still need to disk spilling, no? Or where do you store the serialized state? Also I guess that serialization may become a major bottleneck for some of the accumulators.
Yes, we still need the disk spilling, the disk spilling can be managed and tracked by the disk_manager
in the RuntimeEnv
, but anyway it avoid sort the entire group data or hash table before the spilling.
I'm not sure I see many benefits of having it serializable, would agree with @crepererum
Now this discussion would make more sense if we would know more about your implementation.
IMHO, aggregation should start with hash map, we can assume that there is not going to be spill, if we're wrong we would pay penalty of being wrong as we will have to sort it before spill.
Once we have it spill to disc I'd argue it would make more sense to switch from hash map to b-tree, as we would need to merge it with spill, it is slower but from my experience it is a bit faster than sorting hash map.
Spilling can be implemented using two column parquet file (key: blob, value: blob) .
Implementation like this works quite well from my experience, especially that in most cases we wont trigger spill
Can we make the GroupState and the Accumulator states serializable ?
With this approach, we do not need to do any sort when spiiling data to disks. And when we read the data back, we reconstruct our raw hash table quickly from the hash values and indexes, because our hashmap is very lightweight, the hash value can be re-calculated from grouping rows, or we can cache the hash value inside the GroupState to avoid the re-calculating.
@mingmwang when we serialize the groups to disk when the hash table is full, we then need to read them back in again somehow and do the final combination. My assumption is that we don't have the memory to read them all back in at once as we had to spill in the first place
If we sort the data that is spilled on the group keys, we can stream data from the different spill files in parallel, merge, and then do a streaming group by, which we will have sufficient memory to accomplished.
IMHO, aggregation should start with hash map, we can assume that there is not going to be spill, if we're wrong we would pay penalty of being wrong as we will have to sort it before spill.
Yes I agree with @milenkovicm - this approach will work well. It does have a disadvantage of a performance "cliff" when the query goes from in memory --> spilling (it doesn't degrade gracefully) but it is likely the fastest for queries that have sufficient memory
BTW after #6889 I think we'll be in pretty good shape to dump the accumulator state to disk (as Arrow arrays, perhaps) if we want to pursue this more
Hi @alamb i have a poc code, a bit stale though, which will dump state to arrow. I've tried to rebase it recently but did not have time to finish it. It's in my branch at https://github.com/milenkovicm/arrow-datafusion/commit/e9d17d2a456707f88497606b3977a8a82199c7d1
Not sure if it still makes sense after all this time.
Hi @alamb i have a poc code, a bit stale though, which will dump state to arrow. I've tried to rebase it recently but did not have time to finish it. It's in my branch at https://github.com/milenkovicm/arrow-datafusion/commit/e9d17d2a456707f88497606b3977a8a82199c7d1
Thanks @milenkovicm -- that actually looks quite cool. If/when someone picks up this ticket I think it would be a good place to start from. I especially like how you encapsulated the spilling / state code in its own module
Hello,
We have an internal spilling implementation for partial aggregation. We are currently in the middle of applying back the recent improvements of aggregation (such as #7095). After that is done, we plan to contribute back the work to DataFusion, but I just found this ticket. I am wondering what the current status of this ticket is.
Our implementation is quite similar to what the description of this thicket says, but (obviously) different from @milenkovicm 's poc.
I am wondering if the community is willing to review yet another spilling implementation...
After that is done, we plan to contribute back the work to DataFusion, but I just found this ticket. I am wondering what the current status of this ticket is.
That sounds good
I am wondering if the community is willing to review yet another spilling implementation as it is one of the missing pices
I am willing to review another spilling implementation for sure -- it is one of the missing features for datafusion to be a complete analytic solution
To assit review, it would help to explain / document how it works (e.g. is it based on mmap, or are the intermediate groups spilled as arrow, or are they sorted?) ideally in code comments
I made some diagrams here that might be helpful (but are mostly related to streaming groupby)
https://docs.google.com/document/d/16rm5VR1nGkY6DedMCh1NUmThwf3RduAweaBH9b1h6AY/edit?usp=sharing
Thank you @alamb
Our implementation is using the intermediate groups spilled as arrow, very similar to ExternalSorter
. I will post the PR and doc shortly
I finally posted a PR for this #7400
In particular, I explained how the implementation works at https://github.com/apache/arrow-datafusion/pull/7400/files#diff-0b72cdc7a5e02b6650430c3eb7bdea4f1d2d32867b927633d92abb4fcf31c81bR135
Thanks! cc @alamb