"""Dask-ified boost-histogram like API."""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any, Mapping
import boost_histogram as bh
import boost_histogram.axis as axis
import boost_histogram.storage as storage
import dask.array as da
from dask.base import DaskMethodsMixin, dont_optimize, is_dask_collection, tokenize
from dask.context import globalmethod
from dask.delayed import Delayed, delayed
from dask.highlevelgraph import HighLevelGraph
from dask.threaded import get as tget
from dask.utils import is_arraylike, is_dataframe_like
from tlz import first
from dask_histogram.bins import normalize_bins_range
from dask_histogram.core import AggHistogram, factory, optimize
if TYPE_CHECKING:
from dask_histogram.typing import (
BinArg,
BinType,
DaskCollection,
RangeArg,
RangeType,
)
import dask_histogram
__all__ = ("Histogram", "histogram", "histogram2d", "histogramdd")
[docs]class Histogram(bh.Histogram, DaskMethodsMixin, family=dask_histogram):
"""Histogram object capable of lazy computation.
Parameters
----------
*axes : boost_histogram.axis.Axis
Provide one or more Axis objects.
storage : boost_histogram.storage.Storage, optional
Select a storage to use in the histogram. The default storage
type is :py:class:`boost_histogram.storage.Double`.
metadata : Any
Data that is passed along if a new histogram is created.
See Also
--------
dask_histogram.histogram
dask_histogram.histogram2d
dask_histogram.histogramdd
Examples
--------
A two dimensional histogram with one fixed bin width axis and
another variable bin width axis:
Note that (for convenience) the ``boost_histogram.axis`` namespace
is mirrored as ``dask_histogram.axis`` and the
``boost_histogram.storage`` namespace is mirrored as
``dask_histogram.storage``.
>>> import dask.array as da
>>> import dask_histogram.boost as dhb
>>> x = da.random.standard_normal(size=(1000,), chunks=200)
>>> y = da.random.standard_normal(size=(1000,), chunks=200)
>>> w = da.random.uniform(0.2, 0.8, size=(1000,), chunks=200)
>>> h = dhb.Histogram(
... dhb.axis.Regular(10, -3, 3),
... dhb.axis.Variable([-3, -2, -1, 0, 1.1, 2.2, 3.3]),
... storage=dhb.storage.Weight()
... ).fill(x, y, weight=w).compute()
"""
[docs] def __init__(
self,
*axes: bh.axis.Axis,
storage: bh.storage.Storage = bh.storage.Double(),
metadata: Any = None,
) -> None:
"""Construct a Histogram object."""
super().__init__(*axes, storage=storage, metadata=metadata)
self._staged: AggHistogram | None = None
self._dask_name: str | None = None
self._dask: HighLevelGraph | None = None
@property
def _histref(self):
return (
tuple(self.axes),
self.storage_type(),
self.metadata,
)
def __iadd__(self, other):
if self.staged_fills() and other.staged_fills():
self._staged += other._staged
elif not self.staged_fills() and other.staged_fills():
self._staged = other._staged
if self.staged_fills():
self._dask = self._staged.__dask_graph__()
self._dask_name = self._staged.name
return self
def __add__(self, other):
return self.__iadd__(other)
def __radd__(self, other):
return other.__iadd__(self)
def __dask_graph__(self) -> HighLevelGraph:
return self.dask
def __dask_keys__(self) -> list[tuple[str, int]]:
return [(self.dask_name, 0)]
def __dask_layers__(self) -> tuple[str, ...]:
if isinstance(self.dask, HighLevelGraph) and len(self.dask.layers) == 1:
return tuple(self.dask.layers)
return (self.dask_name,)
def __dask_tokenize__(self) -> str:
if self._dask_name is None:
return tokenize(*self.axes, self.storage_type, self.metadata)
return self.dask_name
def __dask_postcompute__(self) -> Any:
return lambda x: self._in_memory_type(first(x)), ()
def __dask_postpersist__(self) -> Any:
return self._rebuild, ()
__dask_optimize__ = globalmethod(
optimize, key="histogram_optimize", falsey=dont_optimize
)
__dask_scheduler__ = staticmethod(tget)
def _rebuild(
self,
dsk: HighLevelGraph,
*,
rename: Mapping[str, str] | None = None,
) -> Any:
dask_name = self.dask_name
if rename:
dask_name = rename.get(dask_name, dask_name)
new = type(self)(
*self.axes, storage=self.storage_type(), metadata=self.metadata
)
new._dask_name = dask_name
new._dask = dsk
return new
@property
def _in_memory_type(self) -> type[bh.Histogram]:
if type(self) != Histogram:
warnings.warn(
"""dask_histogram.boost.Histogram has been subclassed without
overriding '_in_memory_type', please do so if you would like to
receive an instance of subclass rather than a boost_histogram.Histogram!"""
)
return bh.Histogram
@property
def dask_name(self) -> str:
if self._dask_name is None:
raise RuntimeError(
"The dask name should never be None when it's requested."
)
return self._dask_name
@property
def dask(self) -> HighLevelGraph:
if self._dask is None:
raise RuntimeError(
"The dask graph should never be None when it's requested."
)
return self._dask
def fill( # type: ignore
self,
*args: DaskCollection,
weight: DaskCollection | None = None,
sample: Any | None = None,
threads: Any | None = None,
) -> Histogram:
"""Stage a fill call using a Dask collection as input.
If materialized NumPy ararys are passed to this function, all
arguments are forwarded :func:`concrete_fill`.
Parameters
----------
*args : one or more Dask collections
Provide one dask collection per dimension, or a single
columnar Dask collection (DataFrame or 2D Array) where the
total number of columns equals to the total number of
histogram dimensions.
* A single one dimensional collection
(:obj:`dask.array.Array` or
:obj:`dask.dataframe.Series`)
* Multiple one dimensional collections, each representing
one an array of one coordinate of the dataset to be
histogrammed.
* A single two dimensional collection
(:obj:`dask.array.Array` or
:obj:`dask.dataframe.DataFrame`), each column
representing one coordinate of the dataset to be
histogrammed.
If multiple one dimensional arguments are passed (i.e. an
`x` array and a `y` array for a two dimensional
histogram), the collections must have equal
chunking/partitioning.
If a single two dimensional array is passed (i.e. an array
of shape ``(2000, 3)`` for a three dimensional histogram),
chunking can only exist along the 0th (row) axis.
(coordinates cannot be separated by a chunk boundry, only
whole individual samples can be separated).
weight : dask.array.Array or dask.dataframe.Series, optional
Weights associated with each sample. The weights must be
chunked/partitioned in a way compatible with the dataset.
sample : dask.array.Array or dask.dataframe.Series, optional
Provide samples if the histogram storage allows it. The
partitioning/chunking of the samples must be compatible
with the input data.
threads : int, optional
Ignored argument kept for compatibility with boost-histogram.
We let Dask have complete control over threads.
Returns
-------
dask_histogram.Histogram
Class instance with a staged (delayed) fill added.
"""
if len(args) == 1 and args[0].ndim == 1:
pass
elif len(args) == 1 and args[0].ndim == 2:
pass
elif len(args) > 1:
pass
else:
raise ValueError(f"Cannot interpret input data: {args}")
new_fill = factory(*args, histref=self._histref, weights=weight, sample=sample)
if self._staged is None:
self._staged = new_fill
else:
self._staged += new_fill
self._dask = self._staged.__dask_graph__()
self._dask_name = self._staged.name
return self
def staged_fills(self) -> bool:
"""Check if histogram has staged fills.
Returns
-------
bool
True if the object contains staged delayed fills.
"""
return self._staged is not None
def to_delayed(self) -> Delayed:
"""Histogram as a delayed object.
Wraps the current state of the Histogram in
:py:func:`dask.delayed.delayed` if no fills are staged;
otherwise, the most downstream delayed Histogram is returned,
such that:
.. code-block:: python
dask.compute(h.to_delayed())
will yield a histogram with the same counts and variances
yielded by:
.. code-block:: python
h.compute()
In both cases if ``h`` doesn't have any delayed fill calls
staged, then no concrete fill computations will be triggered
and the eventual computed object from the Delayed object will
be a ``dask_histogram.boost.Histogram`` object. If staged
fills exist the computed object from the Delayed object will
be a ``boost_histogram.Histogram``.
Returns
-------
dask.delayed.Delayed
Wrapping of the histogram as a delayed object.
Examples
--------
>>> import dask_histogram.boost as dhb
>>> import dask
>>> h = dhb.Histogram(dhb.axis.Regular(10, -3, 3))
>>> x = da.random.standard_normal(size=(100,), chunks=(20,))
>>> h.fill(x)
Histogram(Regular(10, -3, 3), storage=Double()) # (has staged fills)
>>> h, = dask.compute(h.to_delayed())
"""
if self._staged is not None:
return self._staged.to_delayed()
return delayed(bh.Histogram(self))
def __repr__(self) -> str:
"""Text representation of the histogram.
Mostly copied from the parent boost_histogram.Histogram class;
appeneded to the end of the string information about staged
fills.
"""
newline = "\n "
sep = "," if len(self.axes) > 0 else ""
ret = "{self.__class__.__name__}({newline}".format(
self=self, newline=newline if len(self.axes) > 1 else ""
)
ret += f",{newline}".join(repr(ax) for ax in self.axes)
ret += "{comma}{newline}storage={storage}".format(
storage=self.storage_type(),
newline=newline
if len(self.axes) > 1
else " "
if len(self.axes) > 0
else "",
comma=sep,
)
ret += ")"
outer = self.sum(flow=True)
if outer:
inner = self.sum(flow=False)
ret += f" # Sum: {inner}"
if inner != outer:
ret += f" ({outer} with flow)"
if self.staged_fills() and outer:
ret += " (has staged fills)"
elif self.staged_fills():
ret += " # (has staged fills)"
return ret
def agg_histogram(self) -> AggHistogram | None:
if self._staged is None:
warnings.warn("No staged AggHistogram; returning None")
return self._staged
def to_dask_array(self, flow: bool = False, dd: bool = True) -> Any:
"""Convert to dask.array style of return arrays.
Edges are converted to match NumPy standards, with upper edge
inclusive, unlike boost-histogram, where upper edge is
exclusive.
Parameters
----------
flow : bool
Include the flow bins.
dd : bool
Use the histogramdd return syntax, where the edges are in a tuple.
Otherwise, this is the histogram/histogram2d return style.
Returns
-------
contents : dask.array.Array
The bin contents
*edges : dask.array.Array
The edges for each dimension
"""
if self._staged is not None:
return self._staged.to_dask_array(flow=flow, dd=dd)
else:
counts, edges = self.to_numpy(flow=flow, dd=True, view=False)
counts = da.from_array(counts)
edges = [da.from_array(ea) for ea in edges] # type: ignore
if dd:
return counts, edges
return tuple([counts, *edges])
def histogramdd(
a: DaskCollection | tuple[DaskCollection, ...],
bins: BinArg = 10,
range: RangeArg = None,
normed: bool | None = None,
weights: DaskCollection | None = None,
density: bool = False,
*,
histogram: Any | None = None,
storage: storage.Storage = storage.Double(),
threads: int | None = None,
) -> Any:
"""Histogram Dask data in multiple dimensions.
Parameters
----------
a : dask collection or tuple of dask collections
Data to histogram. Acceptable input data can be of the form:
* A dask.array.Array of shape (N, D) where each row is a
sample and each column is a specific coordinate for the
data.
* A sequence of dask collections where each collection (e.g.
array or series) contains all values for one coordinate of
all data.
bins : sequence of arrays, int, or sequence of ints
The bin specification.
The possible binning configurations are:
* A sequence of arrays describing the monotonically increasing
bin edges along each dimension.
* A single int describing the total number of bins that will
be used in each dimension (this requires the `range`
argument to be defined).
* A sequence of ints describing the total number of bins to be
used in each dimension (this requires the `range` argument
to be defined).
When bins are described by arrays, the rightmost edge is
included. Bins described by arrays also allows for non-uniform
bin widths.
range : tuple(tuple(float, float), ...) optional
A sequence of length D, each a (min, max) tuple giving the
outer bin edges to be used if the edges are not given
explicitly in `bins`. If defined, this argument is required to
have an entry for each dimension. Unlike
:func:`numpy.histogramdd`, if `bins` does not define bin
edges, this argument is required (this function will not
automatically use the min and max of of the value in a given
dimension because the input data may be lazy in dask).
normed : bool, optional
An unsupported argument that has been deprecated in the NumPy
API (preserved to maintain calls dependent on argument order).
weights : dask.array.Array or dask.dataframe.Series, optional
An array of values weighing each sample in the input data. The
chunks of the weights must be identical to the chunking along
the 0th (row) axis of the data sample.
density : bool
If ``False`` (default), the returned array represents the
number of samples in each bin. If ``True``, the returned array
represents the probability density function at each bin.
histogram : dask_histogram.Histogram, optional
If `dh.Histogram`, object based output is enabled.
storage : boost_histogram.storage.Storage
Define the storage used by the :py:class:`Histogram` object.
threads : int, optional
Ignored argument kept for compatibility with boost-histogram.
We let Dask have complete control over threads.
Returns
-------
tuple(dask.array.Array, tuple(dask.array.Array)) or Histogram
The default return is the style of
:func:`dask.array.histogramdd`: An array of bin contents and a
tuple of edges arrays (one for each dimension). If the
`histogram` argument is used then the return is a
:obj:`dask_histogram.Histogram` object.
See Also
--------
histogram
histogram2d
Examples
--------
Creating a three dimensional histogram with variable width bins in
each dimension. First, using three 1D arrays for each coordinate:
>>> import dask.array as da
>>> import dask_histogram.boost as dhb
>>> x = da.random.standard_normal(size=(10000,), chunks=(2000,))
>>> y = da.random.standard_normal(size=(10000,), chunks=(2000,))
>>> z = da.random.standard_normal(size=(10000,), chunks=(2000,))
>>> bins = [
... [-3, -2, 0, 1, 3],
... [-3, -1, 1, 2, 3],
... [-3, -2, 0, 2, 3],
... ]
>>> h, edges = dhb.histogramdd((x, y, z), bins=bins)
>>> type(h)
<class 'dask.array.core.Array'>
>>> h.shape
(4, 4, 4)
>>> len(edges)
3
Now the same histogram but instead of a
:py:func:`dask.array.histogramdd` style return (which mirrors the
return style of :py:func:`numpy.histogramdd`), we use the
`histogram` argument to trigger the return of a
:obj:`dask_histogram.Histogram` object:
>>> import dask.array as da
>>> import dask_histogram.boost as dhb
>>> x = da.random.standard_normal(size=(10000,), chunks=(2000,))
>>> y = da.random.standard_normal(size=(10000,), chunks=(2000,))
>>> z = da.random.standard_normal(size=(10000,), chunks=(2000,))
>>> bins = [
... [-3, -2, 0, 1, 3],
... [-3, -1, 1, 2, 3],
... [-3, -2, 0, 2, 3],
... ]
>>> h = dhb.histogramdd((x, y, z), bins=bins, histogram=dhb.Histogram)
>>> h
Histogram(
Variable([-3, -2, 0, 1, 3]),
Variable([-3, -1, 1, 2, 3]),
Variable([-3, -2, 0, 2, 3]),
storage=Double()) # (has staged fills)
>>> h.staged_fills()
True
>>> h = h.compute()
>>> h # doctest: +SKIP
Histogram(
Variable([-3, -2, 0, 1, 3]),
Variable([-3, -1, 1, 2, 3]),
Variable([-3, -2, 0, 2, 3]),
storage=Double()) # Sum: 9919.0 (10000.0 with flow)
Another 3D histogram example but with an alternative dataset form
(a single array with three columns), fixed bin widths, sample
weights, and usage of the boost-histogram ``Weight()`` storage:
>>> import dask.array as da
>>> import dask_histogram.boost as dhb
>>> a = da.random.standard_normal(size=(10000, 3), chunks=(2000, 3))
>>> w = da.random.uniform(0.5, 0.7, size=(10000,), chunks=2000)
>>> bins = (7, 5, 6)
>>> range = ((-3, 3), (-2.9, 2.9), (-3.1, 3.1))
>>> h = dhb.histogramdd(
... a,
... bins=bins,
... range=range,
... weights=w,
... histogram=dhb.Histogram,
... storage=dhb.storage.Weight()
... )
>>> h
Histogram(
Regular(7, -3, 3),
Regular(5, -2.9, 2.9),
Regular(6, -3.1, 3.1),
storage=Weight()) # Sum: WeightedSum(value=0, variance=0) (has staged fills)
>>> h.staged_fills()
True
>>> h = h.compute()
"""
# Check for invalid argument combinations.
if normed is not None:
raise KeyError(
"normed=True is deprecated in NumPy and not supported by dask-histogram."
)
if density and histogram is not None:
raise KeyError(
"dask-histogram does not support the density keyword when returning a "
"dask-histogram object."
)
# If input is a multidimensional array or dataframe, we wrap it in
# a tuple that will be passed to fill and unrolled in the backend.
if (is_arraylike(a) and a.ndim > 1) or is_dataframe_like(a): # type: ignore
ndim = a.shape[1] # type: ignore
a = (a,) # type: ignore
else:
ndim = len(a)
for entry in a:
if not is_dask_collection(entry):
raise ValueError(
"non-dask collection was passed; this function only supports dask "
"collections as input"
)
bins, range = normalize_bins_range(ndim, bins, range)
# Create the axes based on the bins and range values.
axes = []
for _, (b, r) in enumerate(zip(bins, range)):
if r is None:
axes.append(axis.Variable(b)) # type: ignore
else:
axes.append(axis.Regular(bins=b, start=r[0], stop=r[1])) # type: ignore
# Finally create and fill the histogram object.
hist = Histogram(*axes, storage=storage).fill(*a, weight=weights)
if histogram != Histogram:
return hist.to_dask_array(flow=False, dd=True)
return hist
def histogram2d(
x: DaskCollection,
y: DaskCollection,
bins: BinArg = 10,
range: RangeArg = None,
normed: bool | None = None,
weights: DaskCollection | None = None,
density: bool = False,
*,
histogram: Any | None = None,
storage: storage.Storage = storage.Double(),
threads: int | None = None,
) -> Any:
"""Histogram Dask data in two dimensions.
Parameters
----------
x : dask.array.Array or dask.dataframe.Series
Array representing the `x` coordinates of the data to the
histogrammed.
y : dask.array.Array or dask.dataframe.Series
Array representing the `y` coordinates of the data to the
histogrammed.
bins : int, (int, int), array, (array, array), optional
The bin specification:
* If a singe int, both dimensions will that that number of bins
* If a pair of ints, the first int is the total number of bins
along the `x`-axis, and the second is the total number of
bins along the `y`-axis.
* If a single array, the array represents the bin edges along
each dimension.
* If a pair of arrays, the first array corresponds to the
edges along `x`-axis, the second corresponds to the edges
along the `y`-axis.
range : ((float, float), (float, float)), optional
If integers are passed to the `bins` argument, `range` is
required to define the min and max of each axis, that is:
`((xmin, xmax), (ymin, ymax))`.
normed : bool, optional
An unsupported argument that has been deprecated in the NumPy
API (preserved to maintain calls dependent on argument order).
weights : dask.array.Array or dask.dataframe.Series, optional
An array of values weighing each sample in the input data. The
chunks of the weights must be identical to the chunking along
the 0th (row) axis of the data sample.
density : bool
If ``False`` (default), the returned array represents the
number of samples in each bin. If ``True``, the returned array
represents the probability density function at each bin.
histogram : dask_histogram.Histogram, optional
If `dh.Histogram`, object based output is enabled.
storage : boost_histogram.storage.Storage
Define the storage used by the :py:class:`Histogram` object.
threads : int, optional
Ignored argument kept for compatibility with boost-histogram.
We let Dask have complete control over threads.
Returns
-------
tuple(dask.array.Array, dask.array.Array, dask.array.Array) or Histogram
The default return is the style of
:func:`dask.array.histogram2d`: An array of bin contents, an
array of the x-edges, and an array of the y-edges. If the
`histogram` argument is used then the return is a
:obj:`dask_histogram.Histogram` object.
See Also
--------
histogram
histogramdd
Examples
--------
Uniform distributions along each dimension with the array return style:
>>> import dask_histogram.boost as dhb
>>> import dask.array as da
>>> x = da.random.uniform(0.0, 1.0, size=(1000,), chunks=200)
>>> y = da.random.uniform(0.4, 0.6, size=(1000,), chunks=200)
>>> h, edgesx, edgesy = dhb.histogram2d(x, y, bins=(12, 4), range=((0, 1), (0.4, 0.6)))
Now with the object return style:
>>> h = dhb.histogram2d(
... x, y, bins=(12, 4), range=((0, 1), (0.4, 0.6)), histogram=dhb.Histogram
... )
With variable bins and sample weights from a
:py:obj:`dask.dataframe.Series` originating from a
:py:obj:`dask.dataframe.DataFrame` column (`df` below must have
`npartitions` equal to the size of the chunks in `x` and `y`):
>>> x = da.random.uniform(0.0, 1.0, size=(1000,), chunks=200)
>>> y = da.random.uniform(0.4, 0.6, size=(1000,), chunks=200)
>>> df = dask_dataframe_factory() # doctest: +SKIP
>>> w = df["weights"] # doctest: +SKIP
>>> binsx = [0.0, 0.2, 0.6, 0.8, 1.0]
>>> binsy = [0.40, 0.45, 0.50, 0.55, 0.60]
>>> h, e1, e2 = dhb.histogram2d(
... x, y, bins=[binsx, binsy], weights=w
... ) # doctest: +SKIP
"""
hist = histogramdd(
(x, y),
bins=bins,
range=range,
normed=normed,
weights=weights,
density=density,
histogram=Histogram,
storage=storage,
threads=threads,
)
if histogram != Histogram:
return hist.to_dask_array(flow=False, dd=False)
return hist
def histogram(
x: DaskCollection,
bins: BinType = 10,
range: RangeType = None,
normed: bool | None = None,
weights: DaskCollection | None = None,
density: bool = False,
*,
histogram: Any | None = None,
storage: storage.Storage = storage.Double(),
threads: int | None = None,
) -> Any:
"""Histogram Dask data in one dimension.
Parameters
----------
x : dask.array.Array or dask.dataframe.Series
Data to be histogrammed.
bins : int or sequence of scalars.
If `bins` is an int, it defines the total number of bins to be
used (this requires the `range` argument to be defined). If
`bins` is a sequence of scalars (e.g. an array) then it
defines the bin edges.
range : (float, float)
The minimum and maximum of the histogram axis.
normed : bool, optional
An unsupported argument that has been deprecated in the NumPy
API (preserved to maintain calls dependent on argument order).
weights : dask.array.Array or dask.dataframe.Series, optional
An array of values weighing each sample in the input data. The
chunks of the weights must be identical to the chunking along
the 0th (row) axis of the data sample.
density : bool
If ``False`` (default), the returned array represents the
number of samples in each bin. If ``True``, the returned array
represents the probability density function at each bin.
histogram : dask_histogram.Histogram, optional
If `dh.Histogram`, object based output is enabled.
storage : boost_histogram.storage.Storage
Define the storage used by the :py:class:`Histogram` object.
threads : int, optional
Ignored argument kept for compatibility with boost-histogram.
We let Dask have complete control over threads.
Returns
-------
tuple(dask.array.Array, dask.array.Array) or Histogram
The default return is the style of
:func:`dask.array.histogram`: An array of bin contents and an
array of bin edges. If the `histogram` argument is used then
the return is a :obj:`dask_histogram.Histogram` object.
See Also
--------
histogram2d
histogramdd
Examples
--------
Gaussian distribution with object return style and ``Weight`` storage:
>>> import dask_histogram.boost as dhb
>>> import dask.array as da
>>> x = da.random.standard_normal(size=(1000,), chunks=(250,))
>>> h = dhb.histogram(
... x, bins=10, range=(-3, 3), histogram=dhb.Histogram, storage=dhb.storage.Weight()
... )
Now with variable width bins and the array return style:
>>> bins = [-3, -2.2, -1.0, -0.2, 0.2, 1.2, 2.2, 3.2]
>>> h, edges = dhb.histogram(x, bins=bins)
Now with weights and the object return style:
>>> w = da.random.uniform(0.0, 1.0, size=x.shape[0], chunks=x.chunksize[0])
>>> h = dhb.histogram(x, bins=bins, weights=w, histogram=dhb.Histogram)
>>> h
Histogram(Variable([-3, -2.2, -1, -0.2, 0.2, 1.2, 2.2, 3.2]), storage=Double()) # (has staged fills)
"""
hist = histogramdd(
(x,),
bins=bins,
range=range,
normed=normed,
weights=weights,
density=density,
histogram=Histogram,
storage=storage,
)
if histogram != Histogram:
return hist.to_dask_array(flow=False, dd=False)
return hist