Not ready for usage yet
A simple implementation of distributed shuffle, intended for learning.
Distributed shuffle is the core of algorithms such as distributed sort, group by, deduplicate, join of distributed data processing. It is employed by many frameworks including for example pyspark and ray.
Conceptually distributed shuffle is quite simple, but surprisingly known by few people. This repository is meant to teach what it is, how to implement it, and how it may be used.
The core idea of distributed shuffle is to partition the data into N groups. That's it. Partition functions include
- hash partitioning: all item of hash modulo 1 go to group 1, the ones of hash modulo 2 go to group 2, etc.
- range partitioning: all items of range 1 go to group 1, all items of range 2 go to group 2, etc.
Distributed shuffle can then be used for:
- group by, deduplicate: those consists of first using distributed shuffle with hash or range partitioning (assuring that each group is completely different from others), then applying local dedup or grouping, then each group can simply be written to the output independently
- joining: items of 2 datasets are first partitioning with hash or range partitioning, then locally joined, then each group can be written independently
- sorting: items can be partitioned using range partitioning, then locally sorting, then each group can be written independently
Note using range partitioning may need an efficient distributed sampler in order to establish ranges.
The pattern of these usage is similar:
- distributed shuffle using the appropriate partitioning function
- local combine
- independent writing
Step 2 and 3 can be done fully independently from other workers which is ideal.
So as you can see, the difficult part is the distributed shuffle.
The key idea of the implementation of distributed shuffle is to use N x N communication:
- the data is first split arbitrarily in N groups
- each worker will process one group, and send the data of partitioned group
ito workeriwhich will accumulate it, and progressively write it to disk (or keep it fully in ram if it fits)
That means that each worker needs to talk to all other workers, in an efficient way. What are options here:
- 1 to 1 tcp communication with a custom protocol
- going through a shared file system
- using an established reduce communication standard like MPI
Let's explore in this repo what is fast.
pip install distributed_shuffle
Checkout these examples to call this as a lib:
This module exposes a single function distribute_shuffle which takes the same arguments as the command line tool:
- input_file_paths a collection of parquet files on any storage supported by fsspec. (required)
- partitioner range or hash (required)
- output_dir an output dir on any storage supported by fsspec. (required)
Either locally, or in gitpod (do export PIP_USER=false there)
Setup a virtualenv:
python3 -m venv .env
source .env/bin/activate
pip install -e .
to run tests:
pip install -r requirements-test.txt
then
make lint
make test
You can use make black to reformat the code
python -m pytest -x -s -v tests -k "dummy" to run a specific test