Source code for dask_histogram.core

"""Dask Histogram core High Level Graph API."""

from __future__ import annotations

import operator
from typing import TYPE_CHECKING, Any, Callable, Hashable, Literal, Mapping, Sequence

import boost_histogram as bh
import dask.config
import numpy as np
from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize
from dask.blockwise import BlockwiseDep, blockwise, fuse_roots, optimize_blockwise
from dask.context import globalmethod
from dask.core import flatten
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
from dask.threaded import get as tget
from dask.utils import is_dataframe_like, key_split

from dask_histogram.layers import MockableDataFrameTreeReduction

if TYPE_CHECKING:
    from dask.blockwise import Blockwise
    from numpy.typing import NDArray

    from dask_histogram.typing import DaskCollection

__all__ = (
    "AggHistogram",
    "PartitionedHistogram",
    "clone",
    "factory",
)


def clone(histref: bh.Histogram | None = None) -> bh.Histogram:
    """Create a Histogram object based on another.

    The axes and storage of the `histref` will be used to create a new
    Histogram object.

    Parameters
    ----------
    histref : bh.Histogram
        The reference Histogram.

    Returns
    -------
    bh.Histogram
        New Histogram with identical axes and storage.

    """
    if histref is None:
        return bh.Histogram()
    return bh.Histogram(*histref.axes, storage=histref.storage_type())


def _blocked_sa(
    data: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; unweighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data)
    elif data.ndim == 2:
        return thehist.fill(*(data.T))
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_sa_s(
    data: Any,
    sample: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; unweighted; with sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data, sample=sample)
    elif data.ndim == 2:
        return thehist.fill(*(data.T), sample=sample)
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_sa_w(
    data: Any,
    weights: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data, weight=weights)
    elif data.ndim == 2:
        return thehist.fill(*(data.T), weight=weights)
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_sa_w_s(
    data: Any,
    weights: Any,
    sample: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; with sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    if data.ndim == 1:
        return thehist.fill(data, weight=weights, sample=sample)
    elif data.ndim == 2:
        return thehist.fill(*(data.T), weight=weights, sample=sample)
    else:
        raise ValueError("Data must be one or two dimensional.")


def _blocked_ma(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; unweighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data)


def _blocked_ma_s(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; unweighted; with sample."""
    sample = data[-1]
    data = data[:-1]
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data, sample=sample)


def _blocked_ma_w(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; weighted; no sample."""
    weights = data[-1]
    data = data[:-1]
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data, weight=weights)


def _blocked_ma_w_s(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; multiargument; weighted; with sample."""
    weights = data[-2]
    sample = data[-1]
    data = data[:-2]
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*data, weight=weights, sample=sample)


def _blocked_df(
    data: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), weight=None)


def _blocked_df_s(
    data: Any,
    sample: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), sample=sample)


def _blocked_df_w(
    data: Any,
    weights: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; no sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), weight=weights)


def _blocked_df_w_s(
    data: Any,
    weights: Any,
    sample: Any,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    """Blocked calculation; single argument; weighted; with sample."""
    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*(data[c] for c in data.columns), weight=weights, sample=sample)


def _blocked_dak(
    data: Any,
    weights: Any | None,
    sample: Any | None,
    *,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = data
    theweights = weights
    thesample = sample
    if isinstance(thedata, ak.Array) and ak.backend(thedata) == "typetracer":
        thedata.layout._touch_data(recursive=True)
        thedata = ak.Array(
            data.layout.form.length_zero_array(highlevel=False), behavior=data.behavior
        )

    if isinstance(theweights, ak.Array) and ak.backend(theweights) == "typetracer":
        theweights.layout._touch_data(recursive=True)
        theweights = ak.Array(
            weights.layout.form.length_zero_array(highlevel=False),
            behavior=weights.behavior,
        )

    if isinstance(thesample, ak.Array) and ak.backend(thesample) == "typetracer":
        thesample.layout._touch_data(recursive=True)
        thesample = ak.Array(
            sample.layout.form.length_zero_array(highlevel=False),
            behavior=sample.behavior,
        )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(thedata, weight=theweights, sample=thesample)


def _blocked_dak_ma(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = list(data)
    for idata, adatum in enumerate(thedata):
        if isinstance(adatum, ak.Array) and ak.backend(adatum) == "typetracer":
            adatum.layout._touch_data(recursive=True)
            thedata[idata] = ak.Array(
                adatum.layout.form.length_zero_array(highlevel=False),
                behavior=adatum.behavior,
            )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata))


def _blocked_dak_ma_w(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = list(data[:-1])
    theweights = data[-1]
    for idata, adatum in enumerate(thedata):
        if isinstance(adatum, ak.Array) and ak.backend(adatum) == "typetracer":
            adatum.layout._touch_data(recursive=True)
            thedata[idata] = ak.Array(
                adatum.layout.form.length_zero_array(highlevel=False),
                behavior=adatum.behavior,
            )

    if isinstance(theweights, ak.Array) and ak.backend(theweights) == "typetracer":
        theweights.layout._touch_data(recursive=True)
        theweights = ak.Array(
            data[-1].layout.form.length_zero_array(highlevel=False),
            behavior=data[-1].behavior,
        )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata), weight=theweights)


def _blocked_dak_ma_s(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = list(data[:-1])
    thesample = data[-1]
    for idata, adatum in enumerate(thedata):
        if isinstance(adatum, ak.Array) and ak.backend(adatum) == "typetracer":
            adatum.layout._touch_data(recursive=True)
            thedata[idata] = ak.Array(
                adatum.layout.form.length_zero_array(highlevel=False),
                behavior=adatum.behavior,
            )

    if isinstance(thesample, ak.Array) and ak.backend(thesample) == "typetracer":
        thesample.layout._touch_data(recursive=True)
        thesample = ak.Array(
            data[-1].layout.form.length_zero_array(highlevel=False),
            behavior=data[-1].behavior,
        )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata), sample=thesample)


def _blocked_dak_ma_w_s(
    *data: Any,
    histref: bh.Histogram | None = None,
) -> bh.Histogram:
    import awkward as ak

    thedata = list(data[:-2])
    theweights = data[-2]
    thesample = data[-1]
    for idata, adatum in enumerate(thedata):
        if isinstance(adatum, ak.Array) and ak.backend(adatum) == "typetracer":
            adatum.layout._touch_data(recursive=True)
            thedata[idata] = ak.Array(
                adatum.layout.form.length_zero_array(highlevel=False),
                behavior=adatum.behavior,
            )

    if isinstance(theweights, ak.Array) and ak.backend(theweights) == "typetracer":
        theweights.layout._touch_data(recursive=True)
        theweights = ak.Array(
            data[-2].layout.form.length_zero_array(highlevel=False),
            behavior=data[-2].behavior,
        )

    if isinstance(thesample, ak.Array) and ak.backend(thesample) == "typetracer":
        thesample.layout._touch_data(recursive=True)
        thesample = ak.Array(
            data[-1].layout.form.length_zero_array(highlevel=False),
            behavior=data[-1].behavior,
        )

    thehist = (
        clone(histref)
        if not isinstance(histref, tuple)
        else bh.Histogram(*histref[0], storage=histref[1], metadata=histref[2])
    )
    return thehist.fill(*tuple(thedata), weight=theweights, sample=thesample)


def optimize(
    dsk: Mapping,
    keys: Hashable | list[Hashable] | set[Hashable],
    **kwargs: Any,
) -> Mapping:
    if not isinstance(keys, (list, set)):
        keys = [keys]
    keys = list(flatten(keys))

    if not isinstance(dsk, HighLevelGraph):
        dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())

    # Here we try to run optimizations from dask-awkward (if we detect
    # that dask-awkward has been imported). There is no cost to
    # running this optimization even in cases where it's unncessary
    # because if no AwkwardInputLayers from daks-awkward are not
    # detected then the original graph is returned unchanged.
    if dask.config.get("awkward", default=False):
        from dask_awkward.lib.optimize import optimize

        dsk = optimize(dsk, keys=keys)

    dsk = optimize_blockwise(dsk, keys=keys)
    dsk = fuse_roots(dsk, keys=keys)  # type: ignore
    dsk = dsk.cull(set(keys))  # type: ignore
    return dsk


[docs]class AggHistogram(DaskMethodsMixin): """Aggregated Histogram collection. The class constructor is typically used internally; :py:func:`dask_histogram.factory` is recommended for users (along with the `dask_histogram.routines` module). See Also -------- dask_histogram.factory """
[docs] def __init__( self, dsk: HighLevelGraph, name: str, histref: bh.Histogram, layer: Any | None = None, ) -> None: self._dask: HighLevelGraph = dsk self._name: str = name self._meta: bh.Histogram = histref # NOTE: Layer only used by `Item.from_delayed`, to handle # Delayed objects created by other collections. e.g.: # Item.from_delayed(da.ones(1).to_delayed()[0]) See # Delayed.__init__ self._layer = layer or name if isinstance(dsk, HighLevelGraph) and self._layer not in dsk.layers: raise ValueError( f"Layer {self._layer} not in the HighLevelGraph's layers: {list(dsk.layers)}" )
def __dask_graph__(self) -> HighLevelGraph: return self._dask def __dask_keys__(self) -> list[str]: return [self.key] def __dask_layers__(self) -> tuple[str, ...]: if isinstance(self._dask, HighLevelGraph) and len(self._dask.layers) == 1: return tuple(self._dask.layers) return (self.name,) def __dask_tokenize__(self) -> Any: return self.key def __dask_postcompute__(self) -> Any: return _finalize_agg_histogram, () def __dask_postpersist__(self) -> Any: return self._rebuild, () __dask_optimize__ = globalmethod( optimize, key="histogram_optimize", falsey=dont_optimize ) __dask_scheduler__ = staticmethod(tget) def _rebuild( self, dsk: HighLevelGraph, *, rename: Mapping[str, str] | None = None, ) -> Any: name = self._name if rename: name = rename.get(name, name) return type(self)(dsk, name, self.histref) @property def name(self) -> str: return self._name @property def dask(self) -> HighLevelGraph: return self._dask @property def key(self) -> tuple[str, Literal[0]]: return (self.name, 0) @property def histref(self) -> bh.Histogram: """Empty reference boost-histogram object.""" return self._meta @property def _storage_type(self) -> type[bh.storage.Storage]: """Storage type of the histogram.""" return self.histref.storage_type @property def ndim(self) -> int: """Total number of dimensions.""" return self.histref.ndim @property def shape(self) -> tuple[int, ...]: """Shape of the histogram as an array.""" return self.histref.shape @property def size(self) -> int: """Size of the histogram.""" return self.histref.size def __str__(self) -> str: return ( "dask_histogram.AggHistogram<" f"{key_split(self.name)}, " f"ndim={self.ndim}, " f"storage={self._storage_type()}" ">" ) __repr__ = __str__ def __reduce__(self): return (AggHistogram, (self._dask, self._name, self._meta)) def to_dask_array(self, flow: bool = False, dd: bool = False) -> Any: """Convert histogram object to dask.array form. Parameters ---------- flow : bool Include the flow bins. dd : bool Use the histogramdd return syntax, where the edges are in a tuple. Otherwise, this is the histogram/histogram2d return style. Returns ------- contents : dask.array.Array The bin contents *edges : dask.array.Array The edges for each dimension """ return to_dask_array(self, flow=flow, dd=dd) def to_boost(self) -> bh.Histogram: """Convert to a boost_histogram.Histogram via computation. This is an alias of `.compute()`. """ return self.compute() def to_delayed(self, optimize_graph: bool = True) -> Delayed: keys = self.__dask_keys__() graph = self.__dask_graph__() layer = self.__dask_layers__()[0] if optimize_graph: graph = self.__dask_optimize__(graph, keys) layer = f"delayed-{self.name}" graph = HighLevelGraph.from_collections(layer, graph, dependencies=()) return Delayed(keys[0], graph, layer=layer) def values(self, flow: bool = False) -> NDArray[Any]: return self.to_boost().values(flow=flow) def variances(self, flow: bool = False) -> Any: return self.to_boost().variances(flow=flow) def counts(self, flow: bool = False) -> NDArray[Any]: return self.to_boost().counts(flow=flow) def __array__(self) -> NDArray[Any]: return self.compute().__array__() def __iadd__(self, other: Any) -> AggHistogram: return _iadd(self, other) def __add__(self, other: Any) -> AggHistogram: return self.__iadd__(other) def __radd__(self, other: Any) -> AggHistogram: return self.__iadd__(other) def __isub__(self, other: Any) -> AggHistogram: return _isub(self, other) def __sub__(self, other: Any) -> AggHistogram: return self.__isub__(other) def __itruediv__(self, other: Any) -> AggHistogram: return _itruediv(self, other) def __truediv__(self, other: Any) -> AggHistogram: return self.__itruediv__(other) def __idiv__(self, other: Any) -> AggHistogram: return self.__itruediv__(other) def __div__(self, other: Any) -> AggHistogram: return self.__idiv__(other) def __imul__(self, other: Any) -> AggHistogram: return _imul(self, other) def __mul__(self, other: Any) -> AggHistogram: return self.__imul__(other) def __rmul__(self, other: Any) -> AggHistogram: return self.__mul__(other)
def _finalize_partitioned_histogram(results: Any) -> Any: return results def _finalize_agg_histogram(results: Any) -> Any: return results[0]
[docs]class PartitionedHistogram(DaskMethodsMixin): """Partitioned Histogram collection. The class constructor is typically used internally; :py:func:`dask_histogram.factory` is recommended for users (along with the `dask_histogram.routines` module). See Also -------- dask_histogram.factory dask_histogram.AggHistogram """
[docs] def __init__( self, dsk: HighLevelGraph, name: str, npartitions: int, histref: bh.Histogram ) -> None: self._dask: HighLevelGraph = dsk self._name: str = name self._npartitions: int = npartitions self._meta: bh.Histogram = histref
@property def name(self) -> str: return self._name @property def dask(self) -> HighLevelGraph: return self._dask @property def npartitions(self) -> int: return self._npartitions def __dask_graph__(self) -> HighLevelGraph: return self.dask def __dask_keys__(self) -> list[tuple[str, int]]: return [(self.name, i) for i in range(self.npartitions)] def __dask_layers__(self) -> tuple[str]: return (self.name,) def __dask_tokenize__(self) -> str: return self.name def __dask_postcompute__(self) -> Any: return _finalize_partitioned_histogram, () def _rebuild(self, dsk: Any, *, rename: Any = None) -> Any: name = self.name if rename: name = rename.get(name, name) return type(self)(dsk, name, self.npartitions, self.histref) __dask_optimize__ = globalmethod( optimize, key="histogram_optimize", falsey=dont_optimize ) __dask_scheduler__ = staticmethod(tget) def __str__(self) -> str: return "dask_histogram.PartitionedHistogram,<%s, npartitions=%d>" % ( key_split(self.name), self.npartitions, ) __repr__ = __str__ def __reduce__(self): return ( PartitionedHistogram, ( self._dask, self._name, self._npartitions, self._meta, ), ) @property def histref(self) -> bh.Histogram: """boost_histogram.Histogram: reference histogram.""" return self._meta def collapse(self, split_every: int | None = None) -> AggHistogram: """Translate into a reduced aggregated histogram.""" return _reduction(self, split_every=split_every) def to_delayed(self, optimize_graph: bool = True) -> list[Delayed]: keys = self.__dask_keys__() graph = self.__dask_graph__() layer = self.__dask_layers__()[0] if optimize_graph: graph = self.__dask_optimize__(graph, keys) layer = f"delayed-{self.name}" graph = HighLevelGraph.from_collections(layer, graph, dependencies=()) return [Delayed(k, graph, layer=layer) for k in keys]
def _reduction( ph: PartitionedHistogram, split_every: int | None = None, ) -> AggHistogram: if split_every is None: split_every = dask.config.get("histogram.aggregation.split_every", 8) if split_every is False: split_every = ph.npartitions token = tokenize(ph, sum, split_every) label = "histreduce" name_comb = f"{label}-combine-{token}" name_agg = f"{label}-agg-{token}" def hist_safe_sum(items): safe_items = [item for item in items if not isinstance(item, tuple)] return sum(safe_items) mdftr = MockableDataFrameTreeReduction( name=name_agg, name_input=ph.name, npartitions_input=ph.npartitions, concat_func=hist_safe_sum, tree_node_func=lambda x: x, finalize_func=lambda x: x, split_every=split_every, tree_node_name=name_comb, ) graph = HighLevelGraph.from_collections(name_agg, mdftr, dependencies=(ph,)) return AggHistogram(graph, name_agg, histref=ph.histref) def _dependencies( *args: DaskCollection, weights: DaskCollection | None = None, sample: DaskCollection | None = None, ) -> tuple[DaskCollection, ...]: dask_args = [arg for arg in args if is_dask_collection(arg)] if is_dask_collection(weights): dask_args.append(weights) if is_dask_collection(sample): dask_args.append(sample) return tuple(dask_args) def _weight_sample_check( *data: DaskCollection, weights: DaskCollection | None = None, sample: DaskCollection | None = None, ) -> int: if weights is None and sample is None: return 0 if weights is not None: if weights.ndim != 1: raise ValueError("weights must be one dimensional.") if data[0].npartitions != weights.npartitions: raise ValueError("weights must have as many partitions as the data.") if sample is not None: if sample.ndim != 1: raise ValueError("sample must be one dimensional.") if data[0].npartitions != sample.npartitions: raise ValueError("sample must have as many partitions as the data.") return 0 def _is_dask_dataframe(obj): return ( obj.__class__.__module__ == "dask.dataframe.core" and obj.__class__.__name__ == "DataFrame" ) def _is_dask_series(obj): return ( obj.__class__.__module__ == "dask.dataframe.core" and obj.__class__.__name__ == "Series" ) def _partitionwise( func: Callable, layer_name: str, *args: Any, **kwargs: Any, ) -> Blockwise: from dask.array.core import Array as DaskArray pairs: list[Any] = [] numblocks: dict[Any, int | tuple[int, ...]] = {} for arg in args: if isinstance(arg, DaskArray): if arg.ndim == 1: pairs.extend([arg.name, "i"]) elif arg.ndim == 0: pairs.extend([arg.name, ""]) elif arg.ndim == 2: pairs.extend([arg.name, "ij"]) else: raise ValueError("Can't add multi-dimensional array to dataframes") numblocks[arg._name] = arg.numblocks elif _is_dask_dataframe(arg) or _is_dask_series(arg): pairs.extend([arg._name, "i"]) numblocks[arg._name] = (arg.npartitions,) elif isinstance(arg, BlockwiseDep): if len(arg.numblocks) == 1: pairs.extend([arg, "i"]) elif len(arg.numblocks) == 2: pairs.extend([arg, "ij"]) else: raise ValueError( f"BlockwiseDep arg {arg!r} has {len(arg.numblocks)} dimensions; " "only 1 or 2 are supported." ) else: pairs.extend([arg, None]) return blockwise( func, layer_name, "i", *pairs, numblocks=numblocks, concatenate=True, **kwargs, ) def _partitioned_histogram( *data: DaskCollection, histref: bh.Histogram, weights: DaskCollection | None = None, sample: DaskCollection | None = None, split_every: int | None = None, ) -> PartitionedHistogram: name = f"hist-on-block-{tokenize(data, histref, weights, sample, split_every)}" dask_data = tuple(datum for datum in data if is_dask_collection(datum)) if len(dask_data) == 0: dask_data = data data_is_df = is_dataframe_like(dask_data[0]) data_is_dak = is_dask_awkward_like(dask_data[0]) if is_dask_collection(weights): _weight_sample_check(*dask_data, weights=weights) # Single awkward array object. if len(data) == 1 and data_is_dak: from dask_awkward.lib.core import partitionwise_layer as dak_pwl x = data[0] if weights is not None and sample is not None: g = dak_pwl(_blocked_dak, name, x, weights, sample, histref=histref) elif weights is not None and sample is None: g = dak_pwl(_blocked_dak, name, x, weights, None, histref=histref) elif weights is None and sample is not None: g = dak_pwl(_blocked_dak, name, x, None, sample, histref=histref) else: g = dak_pwl(_blocked_dak, name, x, None, None, histref=histref) # Single object, not a dataframe elif len(data) == 1 and not data_is_df: x = data[0] if weights is not None and sample is not None: g = _partitionwise( _blocked_sa_w_s, name, x, weights, sample, histref=histref ) elif weights is not None and sample is None: g = _partitionwise(_blocked_sa_w, name, x, weights, histref=histref) elif weights is None and sample is not None: g = _partitionwise(_blocked_sa_s, name, x, sample, histref=histref) else: g = _partitionwise(_blocked_sa, name, x, histref=histref) # Single object, is a dataframe elif len(data) == 1 and data_is_df: x = data[0] if weights is not None and sample is not None: g = _partitionwise( _blocked_df_w_s, name, x, weights, sample, histref=histref ) elif weights is not None and sample is None: g = _partitionwise(_blocked_df_w, name, x, weights, histref=histref) elif weights is None and sample is not None: g = _partitionwise(_blocked_df_s, name, x, sample, histref=histref) else: g = _partitionwise(_blocked_df, name, x, histref=histref) # Multiple objects else: # Awkward array collection detected as first argument if data_is_dak: from dask_awkward.lib.core import partitionwise_layer as dak_pwl if weights is not None and sample is None: g = dak_pwl(_blocked_dak_ma_w, name, *data, weights, histref=histref) elif weights is not None and sample is not None: g = dak_pwl( _blocked_dak_ma_w_s, name, *data, weights, sample, histref=histref, ) elif weights is None and sample is not None: g = dak_pwl(_blocked_dak_ma_s, name, *data, sample, histref=histref) else: g = dak_pwl(_blocked_dak_ma, name, *data, histref=histref) # Not an awkward array collection elif weights is not None and sample is not None: g = _partitionwise( _blocked_ma_w_s, name, *data, weights, sample, histref=histref ) elif weights is not None and sample is None: g = _partitionwise(_blocked_ma_w, name, *data, weights, histref=histref) elif weights is None and sample is not None: g = _partitionwise(_blocked_ma_s, name, *data, sample, histref=histref) else: g = _partitionwise(_blocked_ma, name, *data, histref=histref) dependencies = _dependencies(*data, weights=weights, sample=sample) hlg = HighLevelGraph.from_collections(name, g, dependencies=dependencies) return PartitionedHistogram(hlg, name, dask_data[0].npartitions, histref=histref) def to_dask_array(agghist: AggHistogram, flow: bool = False, dd: bool = False) -> Any: """Convert `agghist` to a `dask.array` return style. Parameters ---------- agghist : AggHistogram The aggregated histogram collection to convert. flow : bool If ``True``, include under- and over-flow bins dd : bool If True, use ``histogramdd`` style return. See Also -------- dask_histogram.AggHistogram.to_dask_array Returns ------- Union[Tuple[DaskCollection, List[DaskCollection]], Tuple[DaskCollection, ...]] The first return is always the bin counts. If `dd` is ``True`` the second return is a list where each element is an array of bin edges for each axis. If `dd` is ``False``, the bin edge arrays will not be stored in a list (`histogram2d` style return). """ from dask.array import Array, asarray name = f"to-dask-array-{tokenize(agghist)}" thehist = agghist.histref if isinstance(thehist, tuple): thehist = bh.Histogram( *agghist.histref[0], storage=agghist.histref[1], metadata=agghist.histref[2] ) zeros = (0,) * thehist.ndim dsk = {(name, *zeros): (lambda x, f: x.to_numpy(flow=f)[0], agghist.key, flow)} graph = HighLevelGraph.from_collections(name, dsk, dependencies=(agghist,)) shape = thehist.shape if flow: shape = tuple(i + 2 for i in shape) int_storage = thehist.storage_type in ( bh.storage.Int64, bh.storage.AtomicInt64, ) dt = int if int_storage else float c = Array(graph, name=name, shape=shape, chunks=shape, dtype=dt) axes = thehist.axes if flow: edges = [ asarray(np.concatenate([[-np.inf], ax.edges, [np.inf]])) for ax in axes ] else: edges = [asarray(ax.edges) for ax in axes] if dd: return c, edges return (c, *tuple(edges)) class BinaryOpAgg: def __init__( self, func: Callable[[Any, Any], Any], name: str | None = None, ) -> None: self.func = func self.__name__ = func.__name__ if name is None else name def __call__(self, a: AggHistogram, b: AggHistogram) -> AggHistogram: name = f"{self.__name__}-hist-{tokenize(a, b)}" deps = [] if is_dask_collection(a): deps.append(a) if is_dask_collection(b): deps.append(b) k1 = a.__dask_keys__()[0] if is_dask_collection(a) else a # type: ignore k2 = b.__dask_keys__()[0] if is_dask_collection(b) else b # type: ignore llg = {(name, 0): (self.func, k1, k2)} g = HighLevelGraph.from_collections(name, llg, dependencies=deps) try: ref = a.histref except AttributeError: ref = b.histref return AggHistogram(g, name, histref=ref) _iadd = BinaryOpAgg(operator.iadd, name="add") _isub = BinaryOpAgg(operator.isub, name="sub") _imul = BinaryOpAgg(operator.imul, name="mul") _itruediv = BinaryOpAgg(operator.itruediv, name="div")
[docs]def factory( *data: DaskCollection, histref: bh.Histogram | None = None, axes: Sequence[bh.axis.Axis] | None = None, storage: bh.storage.Storage | None = None, weights: DaskCollection | None = None, sample: DaskCollection | None = None, split_every: int | None = None, keep_partitioned: bool = False, ) -> AggHistogram: """Daskified Histogram collection factory function. Given some data represented by Dask collections and the characteristics of a histogram (either a reference :py:obj:`boost_histogram.Histogram` object or a set of axes), this routine will create an :py:obj:`AggHistogram` or :py:obj:`PartitionedHistogram` collection. Parameters ---------- *data : DaskCollection The data to histogram. The supported forms of input data: * Single one dimensional dask array or Series: for creating a 1D histogram. * Single multidimensional dask array or DataFrame: for creating multidimensional histograms. * Multiple one dimensional dask arrays or Series: for creating multidimensional histograms. histref : bh.Histogram, optional A reference histogram object, required if `axes` is not used. The dimensionality of `histref` must be compatible with the input data. axes : Sequence[bh.axis.Axis], optional The axes of the histogram, required if `histref` is not used. The total number of axes must be equal to the number of dimensions of the resulting histogram given the structure of `data`. storage : bh.storage.Storage, optional Storage type of the histogram, only compatible with use of the `axes` argument. weights : DaskCollection, optional Weights associated with the `data`. The partitioning/chunking of the weights must be compatible with the input data. sample : DaskCollection, optional Provide samples if the histogram storage allows it. The partitioning/chunking of the samples must be compatible with the input data. split_every : int, optional How many blocks to use in each split during aggregation. keep_partitioned : bool, optional **Deprecated argument**. Use :py:func:`partitioned_factory`. Returns ------- AggHistogram or PartitionedHistogram The resulting histogram collection. Raises ------ ValueError If `histref` and `axes` are both not ``None``, or if `storage` is used with `histref`. Examples -------- Creating a three dimensional histogram using the `axes` argument: >>> import boost_histogram as bh >>> import dask.array as da >>> import dask_histogram as dh >>> x = da.random.uniform(size=(10000,), chunks=(2000,)) >>> y = da.random.uniform(size=(10000,), chunks=(2000,)) >>> z = da.random.uniform(size=(10000,), chunks=(2000,)) >>> bins = [ ... [0.0, 0.3, 0.4, 0.5, 1.0], ... [0.0, 0.1, 0.2, 0.8, 1.0], ... [0.0, 0.2, 0.3, 0.4, 1.0], ... ] >>> axes = [bh.axis.Variable(b) for b in bins] >>> h = dh.factory(x, y, z, axes=axes) >>> h.shape (4, 4, 4) >>> h.compute() Histogram( Variable([0, 0.3, 0.4, 0.5, 1]), Variable([0, 0.1, 0.2, 0.8, 1]), Variable([0, 0.2, 0.3, 0.4, 1]), storage=Double()) # Sum: 10000.0 Creating a weighted one dimensional histogram with the `histref` argument, then converting to the dask.array histogramming return style. >>> x = da.random.uniform(size=(10000,), chunks=(2000,)) >>> w = da.random.uniform(size=(10000,), chunks=(2000,)) >>> ref = bh.Histogram(bh.axis.Regular(10, 0, 1)) >>> h = dh.factory(x, histref=ref, weights=w) >>> counts, edges = h.to_dask_array() >>> counts dask.array<to-dask-array, shape=(10,), dtype=float64, chunksize=(10,), chunktype=numpy.ndarray> >>> edges dask.array<array, shape=(11,), dtype=float64, chunksize=(11,), chunktype=numpy.ndarray> """ if keep_partitioned: raise ValueError( "keep_partitioned=True is no longer supported; " "use dask_histogram.partitioned_factory." ) ph = partitioned_factory( *data, histref=histref, axes=axes, storage=storage, weights=weights, sample=sample, ) return ph.collapse(split_every=split_every)
[docs]def partitioned_factory( *data: DaskCollection, histref: bh.Histogram | None = None, axes: Sequence[bh.axis.Axis] | None = None, storage: bh.storage.Storage | None = None, weights: DaskCollection | None = None, sample: DaskCollection | None = None, ) -> PartitionedHistogram: """Daskified Histogram collection factory function; keep partitioned. This is a version of the :py:func:`factory` function that **remains partitioned**. The :py:func:`factory` function includes a step in the task graph that aggregates all partitions into a single final histogram. See Also -------- dask_histogram.factory """ if histref is None and axes is None: raise ValueError("Either histref or axes must be defined.") if histref is not None and storage is not None: raise ValueError("Storage cannot be defined along with histref.") elif histref is None: if storage is None: storage = bh.storage.Double() histref = bh.Histogram(*axes, storage=storage) # type: ignore return _partitioned_histogram( *data, histref=histref, weights=weights, sample=sample )
def is_dask_awkward_like(x: Any) -> bool: """Check if an object is Awkward collection like. Parameters ---------- x : Any The object of interest. Returns ------- bool ``True`` if `x` is an Awkward Dask collection. """ return ( hasattr(x, "__dask_graph__") and hasattr(x, "layout") and hasattr(x, "fields") )