matloff/partools

improve filesort()

Opened this issue · 6 comments

Taking a look at the file based sort now, and recording observations and design notes here as I go.

Current implementation

based on bucket sort

  1. Manager reads the first nsamp=1000 rows of file (or a sample of distributed data frame)
  2. Manager uses quantiles to choose approximately equal sized bins based on sample
  3. Each worker reads all the data in and keeps only those that belong to its chunk
  4. Each worker sorts it's chunk and makes it a global variable outdfnm

Notes

  1. Data must fit in memory on the cluster
  2. The entire data is read k times, once for each of the k workers.

Alternative 1: Single read

data.table has high performance reading and sorting using multithreading. If the data will fit in memory then it may well be faster to read and sort in the manager, then send to the workers using distribsplit().

Alternative 2: Temporary file based

Use steps 1 and 2 in the current implementation to determine bins. Then:

  1. Assign each worker a subset of the distributed files to read and split
  2. Each worker reads these files in chunks, splitting the chunk and appending into it's own private set of bins. This results in the following temporary directory structure:
- worker1
    file_bin1
    file_bin2
    ...
- worker2
    file_bin1
    file_bin2
    ...
etc.

Finally, each worker is responsible for reading and sorting a subset of the bins. If there are more bins than workers then we can write the sorted files to disk without requiring that the data fit in memory. This approach requires 2 reads and 1 write, instead of k reads.

Approach 1 can be done right now. Approach 2 offers a new capability, and is something I could use in the project I'm working on, so I'm leaning towards this one.

Regarding leveraging existing stuff, I think using base R and data.table for the in memory parts of the sort will get us pretty far. Thanks for that paper link.

Started working on this today. To monitor progress I'd like to compare implementations here using the sort benchmarks http://sortbenchmark.org/. This in turn has required passing in more arguments for read.table() and write.table() because of the messy strings. So I'm working on that.

The sortbenchmark is more of a convenient source of data to be sorted. Then we can get a baseline on the current performance of partools for comparison as we work on it.