/flox

Fast & furious GroupBy operations for dask.array

Primary LanguagePythonApache License 2.0Apache-2.0

GitHub Workflow CI Status pre-commit.ci status image Documentation Status

PyPI Conda-forge

NASA-80NSSC18M0156 NASA-80NSSC22K0345

flox

This project explores strategies for fast GroupBy reductions with dask.array. It used to be called dask_groupby It was motivated by

  1. Dask Dataframe GroupBy blogpost
  2. numpy_groupies in Xarray issue

(See a presentation about this package, from the Pangeo Showcase).

Acknowledgements

This work was funded in part by

  1. NASA-ACCESS 80NSSC18M0156 "Community tools for analysis of NASA Earth Observing System Data in the Cloud" (PI J. Hamman, NCAR),
  2. NASA-OSTFL 80NSSC22K0345 "Enhancing analysis of NASA data with the open-source Python Xarray Library" (PIs Scott Henderson, University of Washington; Deepak Cherian, NCAR; Jessica Scheick, University of New Hampshire), and
  3. NCAR's Earth System Data Science Initiative.

It was motivated by very very many discussions in the Pangeo community.

API

There are two main functions

  1. flox.groupby_reduce(dask_array, by_dask_array, "mean") "pure" dask array interface
  2. flox.xarray.xarray_reduce(xarray_object, by_dataarray, "mean") "pure" xarray interface; though work is ongoing to integrate this package in xarray.

Implementation

See the documentation for details on the implementation.

Custom reductions

flox implements all common reductions provided by numpy_groupies in aggregations.py. It also allows you to specify a custom Aggregation (again inspired by dask.dataframe), though this might not be fully functional at the moment. See aggregations.py for examples.

mean = Aggregation(
    # name used for dask tasks
    name="mean",
    # operation to use for pure-numpy inputs
    numpy="mean",
    # blockwise reduction
    chunk=("sum", "count"),
    # combine intermediate results: sum the sums, sum the counts
    combine=("sum", "sum"),
    # generate final result as sum / count
    finalize=lambda sum_, count: sum_ / count,
    # Used when "reindexing" at combine-time
    fill_value=0,
    # Used when any member of `expected_groups` is not found
    final_fill_value=np.nan,
)