Source code for dask_histogram.core

"""Dask Histogram core High Level Graph API."""

from __future__ import annotations

import operator
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Hashable, Literal, Mapping, Sequence

import boost_histogram as bh
import dask.config
import numpy as np
from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize
from dask.blockwise import BlockwiseDep, blockwise, fuse_roots, optimize_blockwise
from dask.context import globalmethod
from dask.core import flatten
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
from dask.local import identity
from dask.threaded import get as tget
from dask.utils import is_dataframe_like, key_split

from dask_histogram.layers import MockableDataFrameTreeReduction

if TYPE_CHECKING:
    from numpy.typing import NDArray

    from dask_histogram.typing import DaskCollection

__all__ = (
    "AggHistogram",
    "PartitionedHistogram",
    "clone",
    "factory",
)


def hist_safe_sum(items):
    return sum(item for item in items if not isinstance(item, tuple))


def clone(histref: bh.Histogram | None = None) -> bh.Histogram:
    """Create a Histogram object based on another.

    The axes and storage of the `histref` will be used to create a new
    Histogram object.

    Parameters
    ----------
    histref : bh.Histogram
        The reference Histogram.

    Returns
    -------
    bh.Histogram
        New Histogram with identical axes and storage.

    """
    if histref is None:
        return bh.Histogram()
    return bh.Histogram(*histref.axes, storage=histref.storage_type())


def _blocked_sa(
    data: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; unweighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data)
    elif data.ndim == 2:
        return thehist.fill(*(data.T))
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_sa_s(
    data: Any,
    sample: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; unweighted; with sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data, sample=sample)
    elif data.ndim == 2:
        return thehist.fill(*(data.T), sample=sample)
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_sa_w(
    data: Any,
    weights: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data, weight=weights)
    elif data.ndim == 2:
        return thehist.fill(*(data.T), weight=weights)
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_sa_w_s(
    data: Any,
    weights: Any,
    sample: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; with sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data, weight=weights, sample=sample)
    elif data.ndim == 2:
        return thehist.fill(*(data.T), weight=weights, sample=sample)
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_ma(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; unweighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data)


def _blocked_ma_s(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; unweighted; with sample."""
    sample = data[-1]
    data = data[:-1]
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data, sample=sample)


def _blocked_ma_w(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; weighted; no sample."""
    weights = data[-1]
    data = data[:-1]
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data, weight=weights)


def _blocked_ma_w_s(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; weighted; with sample."""
    weights = data[-2]
    sample = data[-1]
    data = data[:-2]
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data, weight=weights, sample=sample)


def _blocked_df(
    data: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), weight=None)


def _blocked_df_s(
    data: Any,
    sample: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), sample=sample)


def _blocked_df_w(
    data: Any,
    weights: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), weight=weights)


def _blocked_df_w_s(
    data: Any,
    weights: Any,
    sample: Any,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; with sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), weight=weights, sample=sample)


def _blocked_dak(
    data: Any,
    weights: Any | None,
    sample: Any | None,
    *,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = (
        ak.typetracer.length_zero_if_typetracer(data)
        if isinstance(data, ak.Array)
        else data
    )
    theweights = (
        ak.typetracer.length_zero_if_typetracer(weights)
        if isinstance(weights, ak.Array)
        else weights
    )
    thesample = (
        ak.typetracer.length_zero_if_typetracer(sample)
        if isinstance(sample, ak.Array)
        else sample
    )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(thedata, weight=theweights, sample=thesample)


def _blocked_dak_ma(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = [
        (
            ak.typetracer.length_zero_if_typetracer(datum)
            if isinstance(datum, ak.Array)
            else datum
        )
        for datum in data
    ]

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata))


def _blocked_dak_ma_w(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = [
        (
            ak.typetracer.length_zero_if_typetracer(datum)
            if isinstance(datum, ak.Array)
            else datum
        )
        for datum in data[:-1]
    ]
    theweights = (
        ak.typetracer.length_zero_if_typetracer(data[-1])
        if isinstance(data[-1], ak.Array)
        else data[-1]
    )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )

    if ak.backend(*data) != "typetracer":
        thehist.fill(*tuple(thedata), weight=theweights)

    return thehist


def _blocked_dak_ma_s(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = [
        (
            ak.typetracer.length_zero_if_typetracer(datum)
            if isinstance(datum, ak.Array)
            else datum
        )
        for datum in data[:-1]
    ]
    thesample = (
        ak.typetracer.length_zero_if_typetracer(data[-1])
        if isinstance(data[-1], ak.Array)
        else data[-1]
    )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata), sample=thesample)


def _blocked_dak_ma_w_s(
    *data: Any,
    histref: tuple | bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = [
        (
            ak.typetracer.length_zero_if_typetracer(datum)
            if isinstance(datum, ak.Array)
            else datum
        )
        for datum in data[:-2]
    ]
    theweights = (
        ak.typetracer.length_zero_if_typetracer(data[-2])
        if isinstance(data[-2], ak.Array)
        else data[-2]
    )
    thesample = (
        ak.typetracer.length_zero_if_typetracer(data[-1])
        if isinstance(data[-1], ak.Array)
        else data[-1]
    )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata), weight=theweights, sample=thesample)


def _blocked_multi(
    repacker: Callable,
    *flattened_inputs: tuple[Any],
) -> bh.Histogram:
    data_list, weights, samples, histref = repacker(flattened_inputs)

    weights = weights or (None for _ in range(len(data_list)))
    samples = samples or (None for _ in range(len(data_list)))

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )

    for (
        datatuple,
        weight,
        sample,
    ) in zip(data_list, weights, samples):
        data = datatuple
        if len(data) == 1 and data[0].ndim == 2:
            data = data[0].T
        thehist.fill(*data, weight=weight, sample=sample)

    return thehist


def _blocked_multi_df(
    repacker: Callable,
    *flattened_inputs: tuple[Any],
) -> bh.Histogram:
    data_list, weights, samples, histref = repacker(flattened_inputs)

    weights = weights or (None for _ in range(len(data_list)))
    samples = samples or (None for _ in range(len(data_list)))

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )

    for (
        datatuple,
        weight,
        sample,
    ) in zip(data_list, weights, samples):
        data = datatuple
        if len(datatuple) == 1:
            data = data[0]
        thehist.fill(*(data[c] for c in data.columns), weight=weight, sample=sample)

    return thehist


def _blocked_multi_dak(
    repacker: Callable,
    *flattened_inputs: tuple[Any],
) -> bh.Histogram:
    import awkward as ak

    data_list, weights, samples, histref = repacker(flattened_inputs)

    weights = weights or (None for _ in range(len(data_list)))
    samples = samples or (None for _ in range(len(data_list)))

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )

    backend = ak.backend(*flattened_inputs)

    for (
        data,
        weight,
        sample,
    ) in zip(data_list, weights, samples):
        if backend != "typetracer":
            thehist.fill(*data, weight=weight, sample=sample)
        else:
            for datum in data:
                if isinstance(datum, ak.highlevel.Array):
                    ak.typetracer.touch_data(datum)
            if isinstance(weight, ak.highlevel.Array):
                ak.typetracer.touch_data(weight)
            if isinstance(sample, ak.highlevel.Array):
                ak.typetracer.touch_data(sample)

    return thehist


def optimize(
    dsk: Mapping,
    keys: Hashable | list[Hashable] | set[Hashable],
    **kwargs: Any,
) -> Mapping:
    keys = tuple(flatten(keys))

    if not isinstance(dsk, HighLevelGraph):
        dsk = HighLevelGraph.from_collections(str(id(dsk)), dsk, dependencies=())

    dsk = optimize_blockwise(dsk, keys=keys)
    dsk = fuse_roots(dsk, keys=keys)  # type: ignore
    dsk = dsk.cull(set(keys))  # type: ignore
    return dsk


def _get_optimization_function():
    # Here we try to run optimizations from dask-awkward (if we detect
    # that dask-awkward has been imported). There is no cost to
    # running this optimization even in cases where it's unncessary
    # because if no AwkwardInputLayers from dask-awkward are
    # detected then the original graph is returned unchanged.
    try:
        from dask_awkward.lib.optimize import all_optimizations

        return all_optimizations
    except (ImportError, ModuleNotFoundError):
        pass
    return optimize


[docs]class AggHistogram(DaskMethodsMixin): """Aggregated Histogram collection. The class constructor is typically used internally; :py:func:`dask_histogram.factory` is recommended for users (along with the `dask_histogram.routines` module). See Also -------- dask_histogram.factory """
[docs] def __init__( self, dsk: HighLevelGraph, name: str, histref: bh.Histogram, layer: Any | None = None, ) -> None: self._dask: HighLevelGraph = dsk self._name: str = name self._meta: bh.Histogram = histref # NOTE: Layer only used by `Item.from_delayed`, to handle # Delayed objects created by other collections. e.g.: # Item.from_delayed(da.ones(1).to_delayed()[0]) See # Delayed.__init__ self._layer = layer or name if isinstance(dsk, HighLevelGraph) and self._layer not in dsk.layers: raise ValueError( f"Layer {self._layer} not in the HighLevelGraph's layers: {list(dsk.layers)}" )
def __dask_graph__(self) -> HighLevelGraph: return self._dask def __dask_keys__(self) -> list[tuple[str, int]]: return [self.key] def __dask_layers__(self) -> tuple[str, ...]: if isinstance(self._dask, HighLevelGraph) and len(self._dask.layers) == 1: return tuple(self._dask.layers) return (self.name,) def __dask_tokenize__(self) -> Any: return self.key def __dask_postcompute__(self) -> Any: return _finalize_agg_histogram, () def __dask_postpersist__(self) -> Any: return self._rebuild, () __dask_optimize__ = globalmethod( _get_optimization_function(), key="histogram_optimize", falsey=dont_optimize ) __dask_scheduler__ = staticmethod(tget) def _rebuild( self, dsk: HighLevelGraph, *, rename: Mapping[str, str] | None = None, ) -> Any: name = self._name if rename: name = rename.get(name, name) return type(self)(dsk, name, self.histref) @property def name(self) -> str: return self._name @property def dask(self) -> HighLevelGraph: return self._dask @property def key(self) -> tuple[str, Literal[0]]: return (self.name, 0) @property def histref(self): """Empty reference boost-histogram object.""" return self._meta @property def _storage_type(self) -> type[bh.storage.Storage]: """Storage type of the histogram.""" return self.histref.storage_type @property def ndim(self) -> int: """Total number of dimensions.""" return self.histref.ndim @property def shape(self) -> tuple[int, ...]: """Shape of the histogram as an array.""" return self.histref.shape @property def size(self) -> int: """Size of the histogram.""" return self.histref.size def __str__(self) -> str: return ( "dask_histogram.AggHistogram<" f"{key_split(self.name)}, " f"ndim={self.ndim}, " f"storage={self._storage_type()}" ">" ) __repr__ = __str__ def __reduce__(self): return (AggHistogram, (self._dask, self._name, self._meta)) def to_dask_array(self, flow: bool = False, dd: bool = False) -> Any: """Convert histogram object to dask.array form. Parameters ---------- flow : bool Include the flow bins. dd : bool Use the histogramdd return syntax, where the edges are in a tuple. Otherwise, this is the histogram/histogram2d return style. Returns ------- contents : dask.array.Array The bin contents *edges : dask.array.Array The edges for each dimension """ return to_dask_array(self, flow=flow, dd=dd) def to_boost(self) -> bh.Histogram: """Convert to a boost_histogram.Histogram via computation. This is an alias of `.compute()`. """ return self.compute() def to_delayed(self, optimize_graph: bool = True) -> Delayed: keys = self.__dask_keys__() graph = self.__dask_graph__() layer = self.__dask_layers__()[0] if optimize_graph: graph = self.__dask_optimize__(graph, keys) layer = f"delayed-{self.name}" graph = HighLevelGraph.from_collections(layer, graph, dependencies=()) return Delayed(keys[0], graph, layer=layer) def values(self, flow: bool = False) -> NDArray[Any]: return self.to_boost().values(flow=flow) def variances(self, flow: bool = False) -> Any: return self.to_boost().variances(flow=flow) def counts(self, flow: bool = False) -> NDArray[Any]: return self.to_boost().counts(flow=flow) def __array__(self) -> NDArray[Any]: return self.compute().__array__() def __iadd__(self, other: Any) -> AggHistogram: return _iadd(self, other) def __add__(self, other: Any) -> AggHistogram: return self.__iadd__(other) def __radd__(self, other: Any) -> AggHistogram: return self.__iadd__(other) def __isub__(self, other: Any) -> AggHistogram: return _isub(self, other) def __sub__(self, other: Any) -> AggHistogram: return self.__isub__(other) def __itruediv__(self, other: Any) -> AggHistogram: return _itruediv(self, other) def __truediv__(self, other: Any) -> AggHistogram: return self.__itruediv__(other) def __idiv__(self, other: Any) -> AggHistogram: return self.__itruediv__(other) def __div__(self, other: Any) -> AggHistogram: return self.__idiv__(other) def __imul__(self, other: Any) -> AggHistogram: return _imul(self, other) def __mul__(self, other: Any) -> AggHistogram: return self.__imul__(other) def __rmul__(self, other: Any) -> AggHistogram: return self.__mul__(other)
def _finalize_partitioned_histogram(results: Any) -> Any: return results def _finalize_agg_histogram(results: Any) -> Any: return results[0]
[docs]class PartitionedHistogram(DaskMethodsMixin): """Partitioned Histogram collection. The class constructor is typically used internally; :py:func:`dask_histogram.factory` is recommended for users (along with the `dask_histogram.routines` module). See Also -------- dask_histogram.factory dask_histogram.AggHistogram """
[docs] def __init__( self, dsk: HighLevelGraph, name: str, npartitions: int, histref: bh.Histogram | tuple, ) -> None: self._dask: HighLevelGraph = dsk self._name: str = name self._npartitions: int = npartitions self._meta: tuple | bh.Histogram = histref
@property def name(self) -> str: return self._name @property def dask(self) -> HighLevelGraph: return self._dask @property def npartitions(self) -> int: return self._npartitions def __dask_graph__(self) -> HighLevelGraph: return self.dask def __dask_keys__(self) -> list[tuple[str, int]]: return [(self.name, i) for i in range(self.npartitions)] def __dask_layers__(self) -> tuple[str]: return (self.name,) def __dask_tokenize__(self) -> str: return self.name def __dask_postcompute__(self) -> Any: return _finalize_partitioned_histogram, () def _rebuild(self, dsk: Any, *, rename: Any = None) -> Any: name = self.name if rename: name = rename.get(name, name) return type(self)(dsk, name, self.npartitions, self.histref) __dask_optimize__ = globalmethod( _get_optimization_function(), key="histogram_optimize", falsey=dont_optimize ) __dask_scheduler__ = staticmethod(tget) def __str__(self) -> str: return "dask_histogram.PartitionedHistogram,<%s, npartitions=%d>" % ( key_split(self.name), self.npartitions, ) __repr__ = __str__ def __reduce__(self): return ( PartitionedHistogram, ( self._dask, self._name, self._npartitions, self._meta, ), ) @property def histref(self): """boost_histogram.Histogram: reference histogram.""" return self._meta def collapse(self, split_every: int | None = None) -> AggHistogram: """Translate into a reduced aggregated histogram.""" return _reduction(self, split_every=split_every) def to_delayed(self, optimize_graph: bool = True) -> list[Delayed]: keys = self.__dask_keys__() graph = self.__dask_graph__() layer = self.__dask_layers__()[0] if optimize_graph: graph = self.__dask_optimize__(graph, keys) layer = f"delayed-{self.name}" graph = HighLevelGraph.from_collections(layer, graph, dependencies=()) return [Delayed(k, graph, layer=layer) for k in keys]
def _reduction( ph: PartitionedHistogram, split_every: int | None = None, ) -> AggHistogram: if split_every is None: split_every = dask.config.get("histogram.aggregation.split-every", 8) if split_every is False: split_every = ph.npartitions token = tokenize(ph, sum, split_every) label = "histreduce" name_comb = f"{label}-combine-{token}" name_agg = f"{label}-agg-{token}" mdftr = MockableDataFrameTreeReduction( name=name_agg, name_input=ph.name, npartitions_input=ph.npartitions, concat_func=hist_safe_sum, tree_node_func=identity, finalize_func=identity, split_every=split_every, tree_node_name=name_comb, ) graph = HighLevelGraph.from_collections(name_agg, mdftr, dependencies=(ph,)) return AggHistogram(graph, name_agg, histref=ph.histref) def _dependencies( *args: DaskCollection, weights: DaskCollection | None = None, sample: DaskCollection | None = None, ) -> tuple[DaskCollection, ...]: dask_args = [arg for arg in args if is_dask_collection(arg)] if is_dask_collection(weights): dask_args.append(weights) # type: ignore[arg-type] if is_dask_collection(sample): dask_args.append(sample) # type: ignore[arg-type] return tuple(dask_args) def _weight_sample_check( *data: DaskCollection, weights: DaskCollection | None = None, sample: DaskCollection | None = None, ) -> int: if weights is None and sample is None: return 0 if weights is not None: if weights.ndim != 1: raise ValueError("weights must be one dimensional.") if data[0].npartitions != weights.npartitions: raise ValueError("weights must have as many partitions as the data.") if sample is not None: if sample.ndim != 1: raise ValueError("sample must be one dimensional.") if data[0].npartitions != sample.npartitions: raise ValueError("sample must have as many partitions as the data.") return 0 def _is_dask_dataframe(obj): return ( obj.__class__.__module__ == "dask.dataframe.core" and obj.__class__.__name__ == "DataFrame" ) def _is_dask_series(obj): return ( obj.__class__.__module__ == "dask.dataframe.core" and obj.__class__.__name__ == "Series" ) def _partitionwise(func, layer_name, *args, **kwargs): from dask.array.core import Array as DaskArray pairs = [] numblocks = {} for arg in args: if isinstance(arg, DaskArray): if arg.ndim == 1: pairs.extend([arg.name, "i"]) elif arg.ndim == 0: pairs.extend([arg.name, ""]) elif arg.ndim == 2: pairs.extend([arg.name, "ij"]) else: raise ValueError("Can't add multi-dimensional array to dataframes") numblocks[arg._name] = arg.numblocks elif _is_dask_dataframe(arg) or _is_dask_series(arg): pairs.extend([arg._name, "i"]) numblocks[arg._name] = (arg.npartitions,) elif isinstance(arg, BlockwiseDep): if len(arg.numblocks) == 1: pairs.extend([arg, "i"]) elif len(arg.numblocks) == 2: pairs.extend([arg, "ij"]) else: raise ValueError( f"BlockwiseDep arg {arg!r} has {len(arg.numblocks)} dimensions; " "only 1 or 2 are supported." ) else: pairs.extend([arg, None]) return blockwise( func, layer_name, "i", *pairs, numblocks=numblocks, concatenate=True, **kwargs, ) def _partitioned_histogram_multifill( data: tuple[DaskCollection | tuple], histref: bh.Histogram | tuple, weights: tuple[DaskCollection] | None = None, samples: tuple[DaskCollection] | None = None, ): name = f"hist-on-block-{tokenize(data, histref, weights, samples)}" from dask.base import unpack_collections flattened_deps, repacker = unpack_collections(data, weights, samples, histref) if is_dask_awkward_like(flattened_deps[0]): from dask_awkward.lib.core import partitionwise_layer as dak_pwl unpacked_multifill = partial(_blocked_multi_dak, repacker) graph = dak_pwl(unpacked_multifill, name, *flattened_deps) elif is_dataframe_like(flattened_deps[0]): unpacked_multifill = partial(_blocked_multi_df, repacker) graph = _partitionwise(unpacked_multifill, name, *flattened_deps) else: unpacked_multifill = partial(_blocked_multi, repacker) graph = _partitionwise(unpacked_multifill, name, *flattened_deps) hlg = HighLevelGraph.from_collections(name, graph, dependencies=flattened_deps) return PartitionedHistogram( hlg, name, flattened_deps[0].npartitions, histref=histref ) def _partitioned_histogram( *data: DaskCollection, histref: bh.Histogram | tuple, weights: DaskCollection | None = None, sample: DaskCollection | None = None, split_every: int | None = None, ) -> PartitionedHistogram: name = f"hist-on-block-{tokenize(data, histref, weights, sample, split_every)}" dask_data = tuple(datum for datum in data if is_dask_collection(datum)) if len(dask_data) == 0: dask_data = data data_is_df = is_dataframe_like(dask_data[0]) data_is_dak = is_dask_awkward_like(dask_data[0]) if is_dask_collection(weights): _weight_sample_check(*dask_data, weights=weights) # Single awkward array object. if len(data) == 1 and data_is_dak: from dask_awkward.lib.core import partitionwise_layer as dak_pwl f = partial(_blocked_dak, histref=histref) g = dak_pwl(f, name, data[0], weights, sample) # Single object, not a dataframe elif len(data) == 1 and not data_is_df: x = data[0] if weights is not None and sample is not None: g = _partitionwise( _blocked_sa_w_s, name, x, weights, sample, histref=histref ) elif weights is not None and sample is None: g = _partitionwise(_blocked_sa_w, name, x, weights, histref=histref) elif weights is None and sample is not None: g = _partitionwise(_blocked_sa_s, name, x, sample, histref=histref) else: g = _partitionwise(_blocked_sa, name, x, histref=histref) # Single object, is a dataframe elif len(data) == 1 and data_is_df: x = data[0] if weights is not None and sample is not None: g = _partitionwise( _blocked_df_w_s, name, x, weights, sample, histref=histref ) elif weights is not None and sample is None: g = _partitionwise(_blocked_df_w, name, x, weights, histref=histref) elif weights is None and sample is not None: g = _partitionwise(_blocked_df_s, name, x, sample, histref=histref) else: g = _partitionwise(_blocked_df, name, x, histref=histref) # Multiple objects else: # Awkward array collection detected as first argument if data_is_dak: from dask_awkward.lib.core import partitionwise_layer as dak_pwl if weights is not None and sample is None: g = dak_pwl(_blocked_dak_ma_w, name, *data, weights, histref=histref) elif weights is not None and sample is not None: g = dak_pwl( _blocked_dak_ma_w_s, name, *data, weights, sample, histref=histref, ) elif weights is None and sample is not None: g = dak_pwl(_blocked_dak_ma_s, name, *data, sample, histref=histref) else: g = dak_pwl(_blocked_dak_ma, name, *data, histref=histref) # Not an awkward array collection elif weights is not None and sample is not None: g = _partitionwise( _blocked_ma_w_s, name, *data, weights, sample, histref=histref ) elif weights is not None and sample is None: g = _partitionwise(_blocked_ma_w, name, *data, weights, histref=histref) elif weights is None and sample is not None: g = _partitionwise(_blocked_ma_s, name, *data, sample, histref=histref) else: g = _partitionwise(_blocked_ma, name, *data, histref=histref) dependencies = _dependencies(*data, weights=weights, sample=sample) hlg = HighLevelGraph.from_collections(name, g, dependencies=dependencies) return PartitionedHistogram(hlg, name, dask_data[0].npartitions, histref=histref) def to_dask_array(agghist: AggHistogram, flow: bool = False, dd: bool = False) -> Any: """Convert `agghist` to a `dask.array` return style. Parameters ---------- agghist : AggHistogram The aggregated histogram collection to convert. flow : bool If ``True``, include under- and over-flow bins dd : bool If True, use ``histogramdd`` style return. See Also -------- dask_histogram.AggHistogram.to_dask_array Returns ------- Union[Tuple[DaskCollection, List[DaskCollection]], Tuple[DaskCollection, ...]] The first return is always the bin counts. If `dd` is ``True`` the second return is a list where each element is an array of bin edges for each axis. If `dd` is ``False``, the bin edge arrays will not be stored in a list (`histogram2d` style return). """ from dask.array import Array, asarray name = f"to-dask-array-{tokenize(agghist)}" thehist = agghist.histref if isinstance(thehist, tuple): thehist = bh.Histogram( *agghist.histref[0], storage=agghist.histref[1], metadata=agghist.histref[2] ) zeros = (0,) * thehist.ndim dsk = {(name, *zeros): (lambda x, f: x.to_numpy(flow=f)[0], agghist.key, flow)} graph = HighLevelGraph.from_collections(name, dsk, dependencies=(agghist,)) shape = thehist.shape if flow: shape = tuple(i + 2 for i in shape) int_storage = thehist.storage_type in ( bh.storage.Int64, bh.storage.AtomicInt64, ) dt = int if int_storage else float c = Array(graph, name=name, shape=shape, chunks=shape, dtype=dt) axes = thehist.axes if flow: edges = [ asarray(np.concatenate([[-np.inf], ax.edges, [np.inf]])) for ax in axes ] else: edges = [asarray(ax.edges) for ax in axes] if dd: return c, edges return (c, *tuple(edges)) class BinaryOpAgg: def __init__( self, func: Callable[[Any, Any], Any], name: str | None = None, ) -> None: self.func = func self.__name__ = func.__name__ if name is None else name def __call__(self, a: AggHistogram, b: AggHistogram) -> AggHistogram: name = f"{self.__name__}-hist-{tokenize(a, b)}" deps = [] if is_dask_collection(a): deps.append(a) if is_dask_collection(b): deps.append(b) k1 = a.__dask_keys__()[0] if is_dask_collection(a) else a k2 = b.__dask_keys__()[0] if is_dask_collection(b) else b llg = {(name, 0): (self.func, k1, k2)} g = HighLevelGraph.from_collections(name, llg, dependencies=deps) try: ref = a.histref except AttributeError: ref = b.histref return AggHistogram(g, name, histref=ref) _iadd = BinaryOpAgg(operator.iadd, name="add") _isub = BinaryOpAgg(operator.isub, name="sub") _imul = BinaryOpAgg(operator.imul, name="mul") _itruediv = BinaryOpAgg(operator.itruediv, name="div")
[docs]def factory( *data: DaskCollection, histref: bh.Histogram | tuple | None = None, axes: Sequence[bh.axis.Axis] | None = None, storage: bh.storage.Storage | None = None, weights: DaskCollection | None = None, sample: DaskCollection | None = None, split_every: int | None = None, keep_partitioned: bool = False, ) -> AggHistogram: """Daskified Histogram collection factory function. Given some data represented by Dask collections and the characteristics of a histogram (either a reference :py:obj:`boost_histogram.Histogram` object or a set of axes), this routine will create an :py:obj:`AggHistogram` or :py:obj:`PartitionedHistogram` collection. Parameters ---------- *data : DaskCollection The data to histogram. The supported forms of input data: * Single one dimensional dask array or Series: for creating a 1D histogram. * Single multidimensional dask array or DataFrame: for creating multidimensional histograms. * Multiple one dimensional dask arrays or Series: for creating multidimensional histograms. histref : bh.Histogram or tuple, optional A reference histogram object, required if `axes` is not used. The dimensionality of `histref` must be compatible with the input data. If a tuple, it must be three elements where element one is a tuple of axes, element two is the storage, and element three is the metadata. axes : Sequence[bh.axis.Axis], optional The axes of the histogram, required if `histref` is not used. The total number of axes must be equal to the number of dimensions of the resulting histogram given the structure of `data`. storage : bh.storage.Storage, optional Storage type of the histogram, only compatible with use of the `axes` argument. weights : DaskCollection, optional Weights associated with the `data`. The partitioning/chunking of the weights must be compatible with the input data. sample : DaskCollection, optional Provide samples if the histogram storage allows it. The partitioning/chunking of the samples must be compatible with the input data. split_every : int, optional How many blocks to use in each split during aggregation. keep_partitioned : bool, optional **Deprecated argument**. Use :py:func:`partitioned_factory`. Returns ------- AggHistogram or PartitionedHistogram The resulting histogram collection. Raises ------ ValueError If `histref` and `axes` are both not ``None``, or if `storage` is used with `histref`. Examples -------- Creating a three dimensional histogram using the `axes` argument: >>> import boost_histogram as bh >>> import dask.array as da >>> import dask_histogram as dh >>> x = da.random.uniform(size=(10000,), chunks=(2000,)) >>> y = da.random.uniform(size=(10000,), chunks=(2000,)) >>> z = da.random.uniform(size=(10000,), chunks=(2000,)) >>> bins = [ ... [0.0, 0.3, 0.4, 0.5, 1.0], ... [0.0, 0.1, 0.2, 0.8, 1.0], ... [0.0, 0.2, 0.3, 0.4, 1.0], ... ] >>> axes = [bh.axis.Variable(b) for b in bins] >>> h = dh.factory(x, y, z, axes=axes) >>> h.shape (4, 4, 4) >>> h.compute() Histogram( Variable([0, 0.3, 0.4, 0.5, 1]), Variable([0, 0.1, 0.2, 0.8, 1]), Variable([0, 0.2, 0.3, 0.4, 1]), storage=Double()) # Sum: 10000.0 Creating a weighted one dimensional histogram with the `histref` argument, then converting to the dask.array histogramming return style. >>> x = da.random.uniform(size=(10000,), chunks=(2000,)) >>> w = da.random.uniform(size=(10000,), chunks=(2000,)) >>> ref = bh.Histogram(bh.axis.Regular(10, 0, 1)) >>> h = dh.factory(x, histref=ref, weights=w) >>> counts, edges = h.to_dask_array() >>> counts dask.array<to-dask-array, shape=(10,), dtype=float64, chunksize=(10,), chunktype=numpy.ndarray> >>> edges dask.array<array, shape=(11,), dtype=float64, chunksize=(11,), chunktype=numpy.ndarray> """ if keep_partitioned: raise ValueError( "keep_partitioned=True is no longer supported; " "use dask_histogram.partitioned_factory." ) ph = partitioned_factory( *data, histref=histref, axes=axes, storage=storage, weights=weights, sample=sample, ) return ph.collapse(split_every=split_every)
[docs]def partitioned_factory( *data: DaskCollection, histref: bh.Histogram | tuple | None = None, axes: Sequence[bh.axis.Axis] | None = None, storage: bh.storage.Storage | None = None, weights: DaskCollection | None = None, sample: DaskCollection | None = None, ) -> PartitionedHistogram: """Daskified Histogram collection factory function; keep partitioned. This is a version of the :py:func:`factory` function that **remains partitioned**. The :py:func:`factory` function includes a step in the task graph that aggregates all partitions into a single final histogram. See Also -------- dask_histogram.factory """ if histref is None and axes is None: raise ValueError("Either histref or axes must be defined.") if histref is not None and storage is not None: raise ValueError("Storage cannot be defined along with histref.") elif histref is None: if storage is None: storage = bh.storage.Double() histref = bh.Histogram(*axes, storage=storage) # type: ignore return _partitioned_histogram( *data, histref=histref, weights=weights, sample=sample )
def is_dask_awkward_like(x: Any) -> bool: """Check if an object is Awkward collection like. Parameters ---------- x : Any The object of interest. Returns ------- bool ``True`` if `x` is an Awkward Dask collection. """ return ( hasattr(x, "__dask_graph__") and hasattr(x, "layout") and hasattr(x, "fields") )