From 25dfe8cd5ecc6aab3890b9fada945480c34c4902 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 2 Aug 2024 11:06:31 -0600 Subject: [PATCH 1/6] Auto rechunk to enable blockwise reduction Done when 1. `method` is None 2. Grouping and reducing by a 1D array We gate this on fractional change in number of chunks and change in size of largest chunk. Closes #359 --- flox/core.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index d8600b375..6caddb353 100644 --- a/flox/core.py +++ b/flox/core.py @@ -119,6 +119,15 @@ # _simple_combine. DUMMY_AXIS = -2 +# Thresholds below which we will automatically rechunk to blockwise if it makes sense +# 1. Fractional change in number of chunks after rechunking +BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 +# 2. Fractional change in max chunk size after rechunking +BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 0.25 +# 3. If input arrays have chunk size smaller than `dask.array.chunk-size`, +# then adjust chunks to meet that size first. +BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR = 1.25 + logger = logging.getLogger("flox") @@ -223,8 +232,11 @@ def identity(x: T) -> T: return x -def _issorted(arr: np.ndarray) -> bool: - return bool((arr[:-1] <= arr[1:]).all()) +def _issorted(arr: np.ndarray, ascending=True) -> bool: + if ascending: + return bool((arr[:-1] <= arr[1:]).all()) + else: + return bool((arr[:-1] >= arr[1:]).all()) def _is_arg_reduction(func: T_Agg) -> bool: @@ -325,6 +337,8 @@ def _get_optimal_chunks_for_groups(chunks, labels): Δl = abs(c - l) if c == 0 or newchunkidx[-1] > l: continue + f = f.item() # noqa + l = l.item() # noqa if Δf < Δl and f > newchunkidx[-1]: newchunkidx.append(f) else: @@ -716,7 +730,9 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray: +def rechunk_for_blockwise( + array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True +) -> tuple[T_MethodOpt, DaskArray]: """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarrassingly parallel group reductions. @@ -739,14 +755,43 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray Rechunked array """ - # TODO: this should be unnecessary? - labels = factorize_((labels,), axes=())[0] + + import dask + from dask.utils import parse_bytes + chunks = array.chunks[axis] - newchunks = _get_optimal_chunks_for_groups(chunks, labels) + if len(chunks) == 1: + return "blockwise", array + + factor = parse_bytes(dask.config.get("array.chunk-size")) / ( + math.prod(array.chunksize) * array.dtype.itemsize + ) + if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR: + new_constant_chunks = math.ceil(factor) * max(chunks) + q, r = divmod(array.shape[axis], new_constant_chunks) + new_input_chunks = (new_constant_chunks,) * q + (r,) + else: + new_input_chunks = chunks + + # FIXME: this should be unnecessary? + labels = factorize_((labels,), axes=())[0] + newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels) if newchunks == chunks: return array + + Δn = abs(len(newchunks) - len(new_input_chunks)) + if force or ( + (Δn / len(new_input_chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) + and ( + abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks) + < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ) + ): + logger.debug("Rechunking to enable blockwise.") + return "blockwise", array.rechunk({axis: newchunks}) else: - return array.rechunk({axis: newchunks}) + logger.debug("Didn't meet thresholds to do automatic rechunking for blockwise reductions.") + return None, array def reindex_numpy(array, from_: pd.Index, to: pd.Index, fill_value, dtype, axis: int): @@ -2712,6 +2757,17 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) + if ( + method is None + and is_duck_dask_array(array) + and not any_by_dask + and by_.ndim == 1 + and _issorted(by_, ascending=True) + ): + # Let's try rechunking for sorted 1D by. + (single_axis,) = axis_ + method, array = rechunk_for_blockwise(array, single_axis, by_, force=False) + is_first_last = _is_first_last_reduction(func) if is_first_last: if has_dask and nax != 1: @@ -2899,7 +2955,7 @@ def groupby_reduce( # if preferred method is already blockwise, no need to rechunk if preferred_method != "blockwise" and method == "blockwise" and by_.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by_) + _, array = rechunk_for_blockwise(array, axis=-1, labels=by_) result, groups = partial_agg( array=array, From 8bae19e63806e69be3b40665fa654b573cd03ffd Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 17 Jun 2025 10:33:10 -0600 Subject: [PATCH 2/6] Small fix. --- flox/core.py | 2 +- flox/xarray.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 6caddb353..c444d2772 100644 --- a/flox/core.py +++ b/flox/core.py @@ -777,7 +777,7 @@ def rechunk_for_blockwise( labels = factorize_((labels,), axes=())[0] newchunks = _get_optimal_chunks_for_groups(new_input_chunks, labels) if newchunks == chunks: - return array + return "blockwise", array Δn = abs(len(newchunks) - len(new_input_chunks)) if force or ( diff --git a/flox/xarray.py b/flox/xarray.py index ea388ba05..e1357eb41 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import toolz import xarray as xr from packaging.version import Version @@ -589,7 +590,7 @@ def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_Data DataArray or Dataset Xarray object with rechunked arrays. """ - return _rechunk(rechunk_array_for_blockwise, obj, dim, labels) + return _rechunk(toolz.compose(toolz.last, rechunk_array_for_blockwise), obj, dim, labels) def _rechunk(func, obj, dim, labels, **kwargs): From b6ac7f7fe2e08719b61be0f5f4c02b3cbbce97db Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 16 Jul 2025 08:34:16 -0600 Subject: [PATCH 3/6] Comment out chunk size factor --- flox/core.py | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/flox/core.py b/flox/core.py index c444d2772..8c2cdf423 100644 --- a/flox/core.py +++ b/flox/core.py @@ -123,10 +123,10 @@ # 1. Fractional change in number of chunks after rechunking BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 # 2. Fractional change in max chunk size after rechunking -BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 0.25 +BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 1.5 # 3. If input arrays have chunk size smaller than `dask.array.chunk-size`, # then adjust chunks to meet that size first. -BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR = 1.25 +# BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR = 1.25 logger = logging.getLogger("flox") @@ -756,22 +756,21 @@ def rechunk_for_blockwise( Rechunked array """ - import dask - from dask.utils import parse_bytes - chunks = array.chunks[axis] if len(chunks) == 1: return "blockwise", array - factor = parse_bytes(dask.config.get("array.chunk-size")) / ( - math.prod(array.chunksize) * array.dtype.itemsize - ) - if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR: - new_constant_chunks = math.ceil(factor) * max(chunks) - q, r = divmod(array.shape[axis], new_constant_chunks) - new_input_chunks = (new_constant_chunks,) * q + (r,) - else: - new_input_chunks = chunks + # import dask + # from dask.utils import parse_bytes + # factor = parse_bytes(dask.config.get("array.chunk-size")) / ( + # math.prod(array.chunksize) * array.dtype.itemsize + # ) + # if factor > BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR: + # new_constant_chunks = math.ceil(factor) * max(chunks) + # q, r = divmod(array.shape[axis], new_constant_chunks) + # new_input_chunks = (new_constant_chunks,) * q + (r,) + # else: + new_input_chunks = chunks # FIXME: this should be unnecessary? labels = factorize_((labels,), axes=())[0] From ed590ee61157d298c81c1cfb7e25993e6ed81e19 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 16 Jul 2025 08:44:15 -0600 Subject: [PATCH 4/6] Add options --- flox/__init__.py | 16 ++++++++++-- flox/core.py | 26 +++++++++---------- flox/options.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_core.py | 38 ++++++++++++++++----------- 4 files changed, 113 insertions(+), 31 deletions(-) create mode 100644 flox/options.py diff --git a/flox/__init__.py b/flox/__init__.py index 898c10e24..132c952c7 100644 --- a/flox/__init__.py +++ b/flox/__init__.py @@ -3,7 +3,7 @@ """Top-level module for flox .""" from . import cache -from .aggregations import Aggregation, Scan # noqa +from .aggregations import Aggregation, Scan from .core import ( groupby_reduce, groupby_scan, @@ -11,7 +11,8 @@ rechunk_for_cohorts, ReindexStrategy, ReindexArrayType, -) # noqa +) +from .options import set_options def _get_version(): @@ -24,3 +25,14 @@ def _get_version(): __version__ = _get_version() + +__all__ = [ + "Aggregation", + "Scan", + "groupby_reduce", + "groupby_scan", + "rechunk_for_blockwise", + "rechunk_for_cohorts", + "ReindexStrategy", + "ReindexArrayType", +] diff --git a/flox/core.py b/flox/core.py index 8c2cdf423..7c14d35ec 100644 --- a/flox/core.py +++ b/flox/core.py @@ -49,6 +49,7 @@ ) from .cache import memoize from .lib import ArrayLayer, dask_array_type, sparse_array_type +from .options import OPTIONS from .xrutils import ( _contains_cftime_datetimes, _to_pytimedelta, @@ -119,14 +120,6 @@ # _simple_combine. DUMMY_AXIS = -2 -# Thresholds below which we will automatically rechunk to blockwise if it makes sense -# 1. Fractional change in number of chunks after rechunking -BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 -# 2. Fractional change in max chunk size after rechunking -BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 1.5 -# 3. If input arrays have chunk size smaller than `dask.array.chunk-size`, -# then adjust chunks to meet that size first. -# BLOCKWISE_DEFAULT_ARRAY_CHUNK_SIZE_FACTOR = 1.25 logger = logging.getLogger("flox") @@ -779,13 +772,18 @@ def rechunk_for_blockwise( return "blockwise", array Δn = abs(len(newchunks) - len(new_input_chunks)) - if force or ( - (Δn / len(new_input_chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) - and ( - abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks) - < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD - ) + if pass_num_chunks_threshold := ( + Δn / len(new_input_chunks) < OPTIONS["rechunk_blockwise_num_chunks_threshold"] + ): + logger.debug("blockwise rechunk passes num chunks threshold") + if pass_chunk_size_threshold := ( + # we just pick the max because number of chunks may have changed. + (abs(max(newchunks) - max(new_input_chunks)) / max(new_input_chunks)) + < OPTIONS["rechunk_blockwise_chunk_size_threshold"] ): + logger.debug("blockwise rechunk passes chunk size change threshold") + + if force or (pass_num_chunks_threshold and pass_chunk_size_threshold): logger.debug("Rechunking to enable blockwise.") return "blockwise", array.rechunk({axis: newchunks}) else: diff --git a/flox/options.py b/flox/options.py new file mode 100644 index 000000000..5013db696 --- /dev/null +++ b/flox/options.py @@ -0,0 +1,64 @@ +""" +Started from xarray options.py; vendored from cf-xarray +""" + +import copy +from collections.abc import MutableMapping +from typing import Any + +OPTIONS: MutableMapping[str, Any] = { + # Thresholds below which we will automatically rechunk to blockwise if it makes sense + # 1. Fractional change in number of chunks after rechunking + "rechunk_blockwise_num_chunks_threshold": 0.25, + # 2. Fractional change in max chunk size after rechunking + "rechunk_blockwise_chunk_size_threshold": 1.5, + # 3. If input arrays have chunk size smaller than `dask.array.chunk-size`, + # then adjust chunks to meet that size first. + # "rechunk.blockwise.chunk_size_factor": 1.5, +} + + +class set_options: # numpydoc ignore=PR01,PR02 + """ + Set options for cf-xarray in a controlled context. + + Parameters + ---------- + rechunk_blockwise_num_chunks_threshold : float + Rechunk if fractional change in number of chunks after rechunking + is less than this amount. + rechunk_blockwise_chunk_size_threshold: float + Rechunk if fractional change in max chunk size after rechunking + is less than this threshold. + + Examples + -------- + + You can use ``set_options`` either as a context manager: + + >>> import flox + >>> with flox.set_options(rechunk_blockwise_num_chunks_threshold=1): + ... pass + + Or to set global options: + + >>> flox.set_options(rechunk_blockwise_num_chunks_threshold=1): + """ + + def __init__(self, **kwargs): + self.old = {} + for k in kwargs: + if k not in OPTIONS: + raise ValueError(f"argument name {k!r} is not in the set of valid options {set(OPTIONS)!r}") + self.old[k] = OPTIONS[k] + self._apply_update(kwargs) + + def _apply_update(self, options_dict): + options_dict = copy.deepcopy(options_dict) + OPTIONS.update(options_dict) + + def __enter__(self): + return + + def __exit__(self, type, value, traceback): + self._apply_update(self.old) diff --git a/tests/test_core.py b/tests/test_core.py index 7499bf996..d5e0eb3e6 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -14,8 +14,8 @@ from numpy_groupies.aggregate_numpy import aggregate import flox +from flox import set_options, xrutils from flox import xrdtypes as dtypes -from flox import xrutils from flox.aggregations import Aggregation, _initialize_aggregation from flox.core import ( HAS_NUMBAGG, @@ -31,6 +31,7 @@ find_group_cohorts, groupby_reduce, groupby_scan, + rechunk_for_blockwise, rechunk_for_cohorts, reindex_, subset_to_blocks, @@ -979,27 +980,34 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: assert_equal(actual, expected) +@requires_dask @pytest.mark.parametrize( - "inchunks, expected", + "inchunks, expected, expected_method", [ - [(1,) * 10, (3, 2, 2, 3)], - [(2,) * 5, (3, 2, 2, 3)], - [(3, 3, 3, 1), (3, 2, 5)], - [(3, 1, 1, 2, 1, 1, 1), (3, 2, 2, 3)], - [(3, 2, 2, 3), (3, 2, 2, 3)], - [(4, 4, 2), (3, 4, 3)], - [(5, 5), (5, 5)], - [(6, 4), (5, 5)], - [(7, 3), (7, 3)], - [(8, 2), (7, 3)], - [(9, 1), (10,)], - [(10,), (10,)], + [(1,) * 10, (3, 2, 2, 3), None], + [(2,) * 5, (3, 2, 2, 3), None], + [(3, 3, 3, 1), (3, 2, 5), None], + [(3, 1, 1, 2, 1, 1, 1), (3, 2, 2, 3), None], + [(3, 2, 2, 3), (3, 2, 2, 3), "blockwise"], + [(4, 4, 2), (3, 4, 3), None], + [(5, 5), (5, 5), "blockwise"], + [(6, 4), (5, 5), None], + [(7, 3), (7, 3), "blockwise"], + [(8, 2), (7, 3), None], + [(9, 1), (10,), None], + [(10,), (10,), "blockwise"], ], ) -def test_rechunk_for_blockwise(inchunks, expected): +def test_rechunk_for_blockwise(inchunks, expected, expected_method): labels = np.array([1, 1, 1, 2, 2, 3, 3, 5, 5, 5]) assert _get_optimal_chunks_for_groups(inchunks, labels) == expected + with set_options(rechunk_blockwise_chunk_size_threshold=-1): + array = dask.array.ones(labels.size, chunks=(inchunks,)) + method, array = rechunk_for_blockwise(array, -1, labels, force=False) + assert method == expected_method + assert array.chunks == (inchunks,) + @requires_dask @pytest.mark.parametrize( From bb92309300524e3585dc9592539217f3932e45d2 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 16 Jul 2025 09:07:11 -0600 Subject: [PATCH 5/6] Support descending --- flox/core.py | 10 ++-------- tests/test_core.py | 6 ++++++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index 7c14d35ec..8e7b79853 100644 --- a/flox/core.py +++ b/flox/core.py @@ -312,7 +312,7 @@ def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray: def _get_optimal_chunks_for_groups(chunks, labels): chunkidx = np.cumsum(chunks) - 1 # what are the groups at chunk boundaries - labels_at_chunk_bounds = _unique(labels[chunkidx]) + labels_at_chunk_bounds = pd.unique(labels[chunkidx]) # what's the last index of all groups last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last") # what's the last index of groups at the chunk boundaries. @@ -2754,13 +2754,7 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) - if ( - method is None - and is_duck_dask_array(array) - and not any_by_dask - and by_.ndim == 1 - and _issorted(by_, ascending=True) - ): + if method is None and is_duck_dask_array(array) and not any_by_dask and by_.ndim == 1 and _issorted(by_): # Let's try rechunking for sorted 1D by. (single_axis,) = axis_ method, array = rechunk_for_blockwise(array, single_axis, by_, force=False) diff --git a/tests/test_core.py b/tests/test_core.py index d5e0eb3e6..31c6ab5a7 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1001,6 +1001,8 @@ def test_groupby_bins(chunk_labels, kwargs, chunks, engine, method) -> None: def test_rechunk_for_blockwise(inchunks, expected, expected_method): labels = np.array([1, 1, 1, 2, 2, 3, 3, 5, 5, 5]) assert _get_optimal_chunks_for_groups(inchunks, labels) == expected + # reversed + assert _get_optimal_chunks_for_groups(inchunks, labels[::-1]) == expected with set_options(rechunk_blockwise_chunk_size_threshold=-1): array = dask.array.ones(labels.size, chunks=(inchunks,)) @@ -1008,6 +1010,10 @@ def test_rechunk_for_blockwise(inchunks, expected, expected_method): assert method == expected_method assert array.chunks == (inchunks,) + method, array = rechunk_for_blockwise(array, -1, labels[::-1], force=False) + assert method == expected_method + assert array.chunks == (inchunks,) + @requires_dask @pytest.mark.parametrize( From 0789c1f8a9232e0d347782e370dfed618bc1adde Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Wed, 16 Jul 2025 09:10:41 -0600 Subject: [PATCH 6/6] fix init --- flox/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flox/__init__.py b/flox/__init__.py index 132c952c7..dc14c8371 100644 --- a/flox/__init__.py +++ b/flox/__init__.py @@ -33,6 +33,7 @@ def _get_version(): "groupby_scan", "rechunk_for_blockwise", "rechunk_for_cohorts", + "set_options", "ReindexStrategy", "ReindexArrayType", ]