```from __future__ import annotations

import math
import warnings
from collections.abc import Iterable
from functools import partial, reduce, wraps
from numbers import Integral, Real

import numpy as np
from tlz import concat, interleave, sliding_window

Array,
asanyarray,
asarray,
blockwise,
concatenate,
elemwise,
from_array,
implements,
is_scalar_for_elemwise,
map_blocks,
stack,
tensordot_lookup,
)
from dask.array.creation import arange, diag, empty, indices, tri
from dask.array.einsumfuncs import einsum  # noqa
array_safe,
asarray_safe,
meta_from_array,
safe_wraps,
validate_axis,
)
from dask.utils import apply, derived_from, funcname, is_arraylike, is_cupy_type

# save built-in for histogram functions which use range as a kwarg.
_range = range

[docs]@derived_from(np)
def array(x, dtype=None, ndmin=None, *, like=None):
if not _numpy_120 and like is not None:
raise RuntimeError("The use of ``like`` required NumPy >= 1.20")
x = asarray(x, like=like)
while ndmin is not None and x.ndim < ndmin:
x = x[None, :]
if dtype is not None and x.dtype != dtype:
x = x.astype(dtype)
return x

[docs]@derived_from(np)
def result_type(*args):
args = [a if is_scalar_for_elemwise(a) else a.dtype for a in args]
return np.result_type(*args)

[docs]@derived_from(np)
def atleast_3d(*arys):
new_arys = []
for x in arys:
x = asanyarray(x)
if x.ndim == 0:
x = x[None, None, None]
elif x.ndim == 1:
x = x[None, :, None]
elif x.ndim == 2:
x = x[:, :, None]

new_arys.append(x)

if len(new_arys) == 1:
return new_arys
else:
return new_arys

[docs]@derived_from(np)
def atleast_2d(*arys):
new_arys = []
for x in arys:
x = asanyarray(x)
if x.ndim == 0:
x = x[None, None]
elif x.ndim == 1:
x = x[None, :]

new_arys.append(x)

if len(new_arys) == 1:
return new_arys
else:
return new_arys

[docs]@derived_from(np)
def atleast_1d(*arys):
new_arys = []
for x in arys:
x = asanyarray(x)
if x.ndim == 0:
x = x[None]

new_arys.append(x)

if len(new_arys) == 1:
return new_arys
else:
return new_arys

[docs]@derived_from(np)
def vstack(tup, allow_unknown_chunksizes=False):
if isinstance(tup, Array):
raise NotImplementedError(
"``vstack`` expects a sequence of arrays as the first argument"
)

tup = tuple(atleast_2d(x) for x in tup)
return concatenate(tup, axis=0, allow_unknown_chunksizes=allow_unknown_chunksizes)

[docs]@derived_from(np)
def hstack(tup, allow_unknown_chunksizes=False):
if isinstance(tup, Array):
raise NotImplementedError(
"``hstack`` expects a sequence of arrays as the first argument"
)

if all(x.ndim == 1 for x in tup):
return concatenate(
tup, axis=0, allow_unknown_chunksizes=allow_unknown_chunksizes
)
else:
return concatenate(
tup, axis=1, allow_unknown_chunksizes=allow_unknown_chunksizes
)

[docs]@derived_from(np)
def dstack(tup, allow_unknown_chunksizes=False):
if isinstance(tup, Array):
raise NotImplementedError(
"``dstack`` expects a sequence of arrays as the first argument"
)

tup = tuple(atleast_3d(x) for x in tup)
return concatenate(tup, axis=2, allow_unknown_chunksizes=allow_unknown_chunksizes)

@derived_from(np)
def swapaxes(a, axis1, axis2):
if axis1 == axis2:
return a
if axis1 < 0:
axis1 = axis1 + a.ndim
if axis2 < 0:
axis2 = axis2 + a.ndim
ind = list(range(a.ndim))
out = list(ind)
out[axis1], out[axis2] = axis2, axis1

return blockwise(np.swapaxes, out, a, ind, axis1=axis1, axis2=axis2, dtype=a.dtype)

[docs]@derived_from(np)
def transpose(a, axes=None):
if axes:
if len(axes) != a.ndim:
raise ValueError("axes don't match array")
axes = tuple(d + a.ndim if d < 0 else d for d in axes)
else:
axes = tuple(range(a.ndim))[::-1]
return blockwise(
np.transpose, axes, a, tuple(range(a.ndim)), dtype=a.dtype, axes=axes
)

[docs]def flip(m, axis=None):
"""
Reverse element order along axis.

Parameters
----------
m : array_like
Input array.
axis : None or int or tuple of ints, optional
Axis or axes to reverse element order of. None will reverse all axes.

Returns
-------
The flipped array.
"""

m = asanyarray(m)

sl = m.ndim * [slice(None)]
if axis is None:
axis = range(m.ndim)
if not isinstance(axis, Iterable):
axis = (axis,)
try:
for ax in axis:
sl[ax] = slice(None, None, -1)
except IndexError as e:
raise ValueError(
f"`axis` of {str(axis)} invalid for {str(m.ndim)}-D array"
) from e
sl = tuple(sl)

return m[sl]

[docs]@derived_from(np)
def flipud(m):
return flip(m, 0)

[docs]@derived_from(np)
def fliplr(m):
return flip(m, 1)

[docs]@derived_from(np)
def rot90(m, k=1, axes=(0, 1)):
axes = tuple(axes)
if len(axes) != 2:
raise ValueError("len(axes) must be 2.")

m = asanyarray(m)

if axes == axes or np.absolute(axes - axes) == m.ndim:
raise ValueError("Axes must be different.")

if axes >= m.ndim or axes < -m.ndim or axes >= m.ndim or axes < -m.ndim:
raise ValueError(f"Axes={axes} out of range for array of ndim={m.ndim}.")

k %= 4

if k == 0:
return m[:]
if k == 2:
return flip(flip(m, axes), axes)

axes_list = list(range(0, m.ndim))
(axes_list[axes], axes_list[axes]) = (axes_list[axes], axes_list[axes])

if k == 1:
return transpose(flip(m, axes), axes_list)
else:
# k == 3
return flip(transpose(m, axes_list), axes)

def _tensordot(a, b, axes, is_sparse):
x = max([a, b], key=lambda x: x.__array_priority__)
tensordot = tensordot_lookup.dispatch(type(x))
x = tensordot(a, b, axes=axes)
if is_sparse and len(axes) == 1:
return x
else:
ind = [slice(None, None)] * x.ndim
for a in sorted(axes):
ind.insert(a, None)
x = x[tuple(ind)]
return x

def _tensordot_is_sparse(x):
is_sparse = "sparse" in str(type(x._meta))
if is_sparse:
# exclude pydata sparse arrays, no workaround required for these in tensordot
is_sparse = "sparse._coo.core.COO" not in str(type(x._meta))
return is_sparse

[docs]@derived_from(np)
def tensordot(lhs, rhs, axes=2):
if not isinstance(lhs, Array):
lhs = from_array(lhs)
if not isinstance(rhs, Array):
rhs = from_array(rhs)

if isinstance(axes, Iterable):
left_axes, right_axes = axes
else:
left_axes = tuple(range(lhs.ndim - axes, lhs.ndim))
right_axes = tuple(range(0, axes))
if isinstance(left_axes, Integral):
left_axes = (left_axes,)
if isinstance(right_axes, Integral):
right_axes = (right_axes,)
if isinstance(left_axes, list):
left_axes = tuple(left_axes)
if isinstance(right_axes, list):
right_axes = tuple(right_axes)
is_sparse = _tensordot_is_sparse(lhs) or _tensordot_is_sparse(rhs)
if is_sparse and len(left_axes) == 1:
concatenate = True
else:
concatenate = False
dt = np.promote_types(lhs.dtype, rhs.dtype)
left_index = list(range(lhs.ndim))
right_index = list(range(lhs.ndim, lhs.ndim + rhs.ndim))
out_index = left_index + right_index
for l, r in zip(left_axes, right_axes):
out_index.remove(right_index[r])
right_index[r] = left_index[l]
if concatenate:
out_index.remove(left_index[l])
else:
intermediate = blockwise(
_tensordot,
out_index,
lhs,
left_index,
rhs,
right_index,
dtype=dt,
concatenate=concatenate,
axes=(left_axes, right_axes),
is_sparse=is_sparse,
)
if concatenate:
return intermediate
else:
return intermediate.sum(axis=left_axes)

[docs]@derived_from(np, ua_args=["out"])
def dot(a, b):
return tensordot(a, b, axes=((a.ndim - 1,), (b.ndim - 2,)))

[docs]@derived_from(np)
def vdot(a, b):
return dot(a.conj().ravel(), b.ravel())

def _chunk_sum(a, axis=None, dtype=None, keepdims=None):
# Caution: this is not your conventional array-sum: due
# to the special nature of the preceding blockwise con-
# traction,  each chunk is expected to have exactly the
# same shape,  with a size of 1 for the dimension given
# by `axis` (the reduction axis).  This makes mere ele-
# ment-wise addition of the arrays possible.   Besides,
# the output can be merely squeezed to lose the `axis`-
# dimension when keepdims = False
if type(a) is list:
else:
out = a

if keepdims:
return out
else:
return out.squeeze(axis)

def _sum_wo_cat(a, axis=None, dtype=None):
if dtype is None:
dtype = getattr(np.zeros(1, dtype=a.dtype).sum(), "dtype", object)

if a.shape[axis] == 1:
return a.squeeze(axis)

return reduction(
a, _chunk_sum, _chunk_sum, axis=axis, dtype=dtype, concatenate=False
)

def _matmul(a, b):
xp = np

if is_cupy_type(a):
# This branch appears to  be unnecessary since cupy
# version 9.0. See the following link:
# But it remains here  for  backward-compatibility.
# Consider removing it in a future version of dask.
import cupy

xp = cupy

chunk = xp.matmul(a, b)
# Since we have performed the contraction via xp.matmul
# but blockwise expects all dimensions back  (including
# the contraction-axis in  the 2nd-to-last position  of
# the output), we must then put it back in the expected
# the position ourselves:
return chunk[..., xp.newaxis, :]

[docs]@derived_from(np)
def matmul(a, b):
a = asanyarray(a)
b = asanyarray(b)

if a.ndim == 0 or b.ndim == 0:
raise ValueError("`matmul` does not support scalars.")

a_is_1d = False
if a.ndim == 1:
a_is_1d = True
a = a[np.newaxis, :]

b_is_1d = False
if b.ndim == 1:
b_is_1d = True
b = b[:, np.newaxis]

if a.ndim < b.ndim:
a = a[(b.ndim - a.ndim) * (np.newaxis,)]
elif a.ndim > b.ndim:
b = b[(a.ndim - b.ndim) * (np.newaxis,)]

# out_ind includes all dimensions to prevent contraction
# in the blockwise below.  We set the last two dimensions
# of the output to the contraction axis and the 2nd
# (last) dimension of b in that order
out_ind = tuple(range(a.ndim + 1))
# lhs_ind includes `a`/LHS dimensions
lhs_ind = tuple(range(a.ndim))
# on `b`/RHS everything above 2nd dimension, is the same
# as `a`, -2 dimension is "contracted" with the last dimension
# of `a`, last dimension of `b` is `b` specific
rhs_ind = tuple(range(a.ndim - 2)) + (lhs_ind[-1], a.ndim)

out = blockwise(
_matmul,
out_ind,
a,
lhs_ind,
b,
rhs_ind,
dtype=result_type(a, b),
concatenate=False,
)

# Because contraction + concatenate in blockwise leads to high
# memory footprints, we want to avoid them. Instead we will perform
# blockwise (without contraction) followed by reduction. More about

# We will also perform the reduction without concatenation
out = _sum_wo_cat(out, axis=-2)

if a_is_1d:
out = out.squeeze(-2)
if b_is_1d:
out = out.squeeze(-1)

return out

[docs]@derived_from(np)
def outer(a, b):
a = a.flatten()
b = b.flatten()

dtype = np.outer(a.dtype.type(), b.dtype.type()).dtype

return blockwise(np.outer, "ij", a, "i", b, "j", dtype=dtype)

def _inner_apply_along_axis(arr, func1d, func1d_axis, func1d_args, func1d_kwargs):
return np.apply_along_axis(func1d, func1d_axis, arr, *func1d_args, **func1d_kwargs)

[docs]@derived_from(np)
def apply_along_axis(func1d, axis, arr, *args, dtype=None, shape=None, **kwargs):
"""
This is a blocked variant of :func:`numpy.apply_along_axis` implemented via

Notes
-----
If either of `dtype` or `shape` are not provided, Dask attempts to
determine them by calling `func1d` on a dummy array. This may produce
incorrect values for `dtype` or `shape`, so we recommend providing them.
"""
arr = asarray(arr)

# Verify that axis is valid and throw an error otherwise
axis = len(arr.shape[:axis])

# If necessary, infer dtype and shape of the output of func1d by calling it on test data.
if shape is None or dtype is None:
test_data = np.ones((1,), dtype=arr.dtype)
test_result = np.array(func1d(test_data, *args, **kwargs))
if shape is None:
shape = test_result.shape
if dtype is None:
dtype = test_result.dtype

# Rechunk so that func1d is applied over the full axis.
arr = arr.rechunk(
arr.chunks[:axis] + (arr.shape[axis : axis + 1],) + arr.chunks[axis + 1 :]
)

# Map func1d over the data to get the result
# Adds other axes as needed.
result = arr.map_blocks(
_inner_apply_along_axis,
name=funcname(func1d) + "-along-axis",
dtype=dtype,
chunks=(arr.chunks[:axis] + shape + arr.chunks[axis + 1 :]),
drop_axis=axis,
new_axis=list(range(axis, axis + len(shape), 1)),
func1d=func1d,
func1d_axis=axis,
func1d_args=args,
func1d_kwargs=kwargs,
)

return result

[docs]@derived_from(np)
def apply_over_axes(func, a, axes):
# Validate arguments
a = asarray(a)
try:
axes = tuple(axes)
except TypeError:
axes = (axes,)

sl = a.ndim * (slice(None),)

# Compute using `apply_along_axis`.
result = a
for i in axes:
result = apply_along_axis(func, i, result, 0)

# Restore original dimensionality or error.
if result.ndim == (a.ndim - 1):
result = result[sl[:i] + (None,)]
elif result.ndim != a.ndim:
raise ValueError(
"func must either preserve dimensionality of the input"
" or reduce it by one."
)

return result

[docs]@derived_from(np)
def ptp(a, axis=None):
return a.max(axis=axis) - a.min(axis=axis)

[docs]@derived_from(np)
def diff(a, n=1, axis=-1, prepend=None, append=None):
a = asarray(a)
n = int(n)
axis = int(axis)

if n == 0:
return a
if n < 0:
raise ValueError("order must be non-negative but got %d" % n)

combined = []
if prepend is not None:
prepend = asarray_safe(prepend, like=meta_from_array(a))
if prepend.ndim == 0:
shape = list(a.shape)
shape[axis] = 1
combined.append(prepend)

combined.append(a)

if append is not None:
append = asarray_safe(append, like=meta_from_array(a))
if append.ndim == 0:
shape = list(a.shape)
shape[axis] = 1
combined.append(append)

if len(combined) > 1:
a = concatenate(combined, axis)

sl_1 = a.ndim * [slice(None)]
sl_2 = a.ndim * [slice(None)]

sl_1[axis] = slice(1, None)
sl_2[axis] = slice(None, -1)

sl_1 = tuple(sl_1)
sl_2 = tuple(sl_2)

r = a
for _ in range(n):
r = r[sl_1] - r[sl_2]

return r

[docs]@derived_from(np)
def ediff1d(ary, to_end=None, to_begin=None):
ary = asarray(ary)

aryf = ary.flatten()
r = aryf[1:] - aryf[:-1]

r = [r]
if to_begin is not None:
r = [asarray(to_begin).flatten()] + r
if to_end is not None:
r = r + [asarray(to_end).flatten()]
r = concatenate(r)

return r

"""
x: nd-array
array of one block
coord: 1d-array or scalar
coordinate along which the gradient is computed.
axis: int
axis along which the gradient is computed
array_locs:
actual location along axis. None if coordinate is scalar
keyword to be passed to np.gradient
"""
block_loc = block_id[axis]
if array_locs is not None:
coord = coord[array_locs[block_loc] : array_locs[block_loc]]

[docs]@derived_from(np)
f = asarray(f)

kwargs["edge_order"] = math.ceil(kwargs.get("edge_order", 1))
if kwargs["edge_order"] > 2:
raise ValueError("edge_order must be less than or equal to 2.")

drop_result_list = False
if axis is None:
axis = tuple(range(f.ndim))
elif isinstance(axis, Integral):
drop_result_list = True
axis = (axis,)

axis = validate_axis(axis, f.ndim)

if len(axis) != len(set(axis)):
raise ValueError("duplicate axes not allowed")

axis = tuple(ax % f.ndim for ax in axis)

if varargs == ():
varargs = (1,)
if len(varargs) == 1:
varargs = len(axis) * varargs
if len(varargs) != len(axis):
raise TypeError(
"Spacing must either be a single scalar, or a scalar / 1d-array per axis"
)

if issubclass(f.dtype.type, (np.bool_, Integral)):
f = f.astype(float)
elif issubclass(f.dtype.type, Real) and f.dtype.itemsize < 4:
f = f.astype(float)

results = []
for i, ax in enumerate(axis):
for c in f.chunks[ax]:
if np.min(c) < kwargs["edge_order"] + 1:
raise ValueError(
"Chunk size must be larger than edge_order + 1. "
"Minimum chunk for axis {} is {}. Rechunk to "
"proceed.".format(ax, np.min(c))
)

if np.isscalar(varargs[i]):
array_locs = None
else:
if isinstance(varargs[i], Array):
raise NotImplementedError("dask array coordinated is not supported.")
# coordinate position for each block taking overlap into account
chunk = np.array(f.chunks[ax])
array_loc_stop = np.cumsum(chunk) + 1
array_loc_start = array_loc_stop - chunk - 2
array_loc_stop[-1] -= 1
array_loc_start = 0
array_locs = (array_loc_start, array_loc_stop)

results.append(
f.map_overlap(
dtype=f.dtype,
depth={j: 1 if j == ax else 0 for j in range(f.ndim)},
boundary="none",
coord=varargs[i],
axis=ax,
array_locs=array_locs,
)
)

if drop_result_list:
results = results

return results

def _bincount_agg(bincounts, dtype, **kwargs):
if not isinstance(bincounts, list):
return bincounts

n = max(map(len, bincounts))
out = np.zeros_like(bincounts, shape=n, dtype=dtype)
for b in bincounts:
out[: len(b)] += b
return out

[docs]@derived_from(np)
def bincount(x, weights=None, minlength=0, split_every=None):
if x.ndim != 1:
raise ValueError("Input array must be one dimensional. Try using x.ravel()")
if weights is not None:
if weights.chunks != x.chunks:
raise ValueError("Chunks of input array x and weights must match.")

token = tokenize(x, weights, minlength)
args = [x, "i"]
if weights is not None:
meta = array_safe(np.bincount(, weights=), like=meta_from_array(x))
args.extend([weights, "i"])
else:
meta = array_safe(np.bincount([]), like=meta_from_array(x))

if minlength == 0:
output_size = (np.nan,)
else:
output_size = (minlength,)

chunked_counts = blockwise(
partial(np.bincount, minlength=minlength), "i", *args, token=token, meta=meta
)
chunked_counts._chunks = (
output_size * len(chunked_counts.chunks),
*chunked_counts.chunks[1:],
)

output = _tree_reduce(
chunked_counts,
aggregate=partial(_bincount_agg, dtype=meta.dtype),
axis=(0,),
keepdims=True,
dtype=meta.dtype,
split_every=split_every,
concatenate=False,
)
output._chunks = (output_size, *chunked_counts.chunks[1:])
output._meta = meta
return output

[docs]@derived_from(np)
def digitize(a, bins, right=False):
bins = asarray_safe(bins, like=meta_from_array(a))
dtype = np.digitize(asarray_safe(, like=bins), bins, right=False).dtype
return a.map_blocks(np.digitize, dtype=dtype, bins=bins, right=right)

def _searchsorted_block(x, y, side):
res = np.searchsorted(x, y, side=side)
# 0 is only correct for the first block of a, but blockwise doesn't have a way
# of telling which block is being operated on (unlike map_blocks),
# so set all 0 values to a special value and set back at the end of searchsorted
res[res == 0] = -1
return res[np.newaxis, :]

[docs]@derived_from(np)
def searchsorted(a, v, side="left", sorter=None):
if a.ndim != 1:
raise ValueError("Input array a must be one dimensional")

if sorter is not None:
raise NotImplementedError(
"da.searchsorted with a sorter argument is not supported"
)

# call np.searchsorted for each pair of blocks in a and v
meta = np.searchsorted(a._meta, v._meta)
out = blockwise(
_searchsorted_block,
list(range(v.ndim + 1)),
a,
,
v,
list(range(1, v.ndim + 1)),
side,
None,
meta=meta,
adjust_chunks={0: 1},  # one row for each block in a
)

# add offsets to take account of the position of each block within the array a
a_chunk_sizes = array_safe((0, *a.chunks), like=meta_from_array(a))
a_chunk_offsets = np.cumsum(a_chunk_sizes)[:-1]
a_chunk_offsets = a_chunk_offsets[(Ellipsis,) + v.ndim * (np.newaxis,)]
a_offsets = asarray(a_chunk_offsets, chunks=1)
out = where(out < 0, out, out + a_offsets)

# combine the results from each block (of a)
out = out.max(axis=0)

# fix up any -1 values
out[out == -1] = 0

return out

# TODO: dask linspace doesn't support delayed values
def _linspace_from_delayed(start, stop, num=50):
linspace_name = "linspace-" + tokenize(start, stop, num)
(start_ref, stop_ref, num_ref), deps = unpack_collections([start, stop, num])
if len(deps) == 0:
return np.linspace(start, stop, num=num)

linspace_dsk = {(linspace_name, 0): (np.linspace, start_ref, stop_ref, num_ref)}
linspace_graph = HighLevelGraph.from_collections(
linspace_name, linspace_dsk, dependencies=deps
)

chunks = ((np.nan,),) if is_dask_collection(num) else ((num,),)
return Array(linspace_graph, linspace_name, chunks, dtype=float)

def _block_hist(x, bins, range=None, weights=None):
return np.histogram(x, bins, range=range, weights=weights)[np.newaxis]

[docs]def histogram(a, bins=None, range=None, normed=False, weights=None, density=None):
"""
Blocked variant of :func:`numpy.histogram`.

Parameters
----------
Input data; the histogram is computed over the flattened
array. If the ``weights`` argument is used, the chunks of
``a`` are accessed to check chunking compatibility between
``a`` and ``weights``. If ``weights`` is ``None``, a
:py:class:`dask.dataframe.Series` object can be passed as
input data.
bins : int or sequence of scalars, optional
Either an iterable specifying the ``bins`` or the number of ``bins``
and a ``range`` argument is required as computing ``min`` and ``max``
over blocked arrays is an expensive operation that must be performed
explicitly.
If `bins` is an int, it defines the number of equal-width
bins in the given range (10, by default). If `bins` is a
sequence, it defines a monotonically increasing array of bin edges,
including the rightmost edge, allowing for non-uniform bin widths.
range : (float, float), optional
The lower and upper range of the bins.  If not provided, range
is simply ``(a.min(), a.max())``.  Values outside the range are
ignored. The first element of the range must be less than or
equal to the second. `range` affects the automatic bin
computation as well. While bin width is computed to be optimal
based on the actual data within `range`, the bin count will fill
the entire range including portions containing no data.
normed : bool, optional
This is equivalent to the ``density`` argument, but produces incorrect
results for unequal bin widths. It should not be used.
A dask.array.Array of weights, of the same block structure as ``a``.  Each value in
``a`` only contributes its associated weight towards the bin count
(instead of 1). If ``density`` is True, the weights are
normalized, so that the integral of the density over the range
remains 1.
density : bool, optional
If ``False``, the result will contain the number of samples in
each bin. If ``True``, the result is the value of the
probability *density* function at the bin, normalized such that
the *integral* over the range is 1. Note that the sum of the
histogram values will not be equal to 1 unless bins of unity
width are chosen; it is not a probability *mass* function.
Overrides the ``normed`` keyword if given.
If ``density`` is True, ``bins`` cannot be a single-number delayed
value. It must be a concrete number, or a (possibly-delayed)
array/sequence of the bin edges.

Returns
-------
The values of the histogram. See `density` and `weights` for a
description of the possible semantics.
bin_edges : dask Array of dtype float
Return the bin edges ``(length(hist)+1)``.

Examples
--------
Using number of bins and range:

>>> import numpy as np
>>> x = da.from_array(np.arange(10000), chunks=10)
>>> h, bins = da.histogram(x, bins=10, range=[0, 10000])
>>> bins
array([    0.,  1000.,  2000.,  3000.,  4000.,  5000.,  6000.,  7000.,
8000.,  9000., 10000.])
>>> h.compute()
array([1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000])

Explicitly specifying the bins:

>>> h, bins = da.histogram(x, bins=np.array([0, 5000, 10000]))
>>> bins
array([    0,  5000, 10000])
>>> h.compute()
array([5000, 5000])
"""
if isinstance(bins, Array):
scalar_bins = bins.ndim == 0
# ^ `np.ndim` is not implemented by Dask array.
elif isinstance(bins, Delayed):
scalar_bins = bins._length is None or bins._length == 1
else:
scalar_bins = np.ndim(bins) == 0

if bins is None or (scalar_bins and range is None):
raise ValueError(
"bins as an iterable or specifying both a range and "
"the number of bins"
)

if weights is not None and weights.chunks != a.chunks:
raise ValueError("Input array and weights must have the same chunked structure")

if normed is not False:
raise ValueError(
"The normed= keyword argument has been deprecated. "
)

if density and scalar_bins and isinstance(bins, (Array, Delayed)):
raise NotImplementedError(
"When `density` is True, `bins` cannot be a scalar Dask object. "
"It must be a concrete number or a (possibly-delayed) array/sequence of bin edges."
)

for argname, val in [("bins", bins), ("range", range), ("weights", weights)]:
if not isinstance(bins, (Array, Delayed)) and is_dask_collection(bins):
raise TypeError(
"Dask types besides Array and Delayed are not supported "
"for `histogram`. For argument `{}`, got: {!r}".format(argname, val)
)

if range is not None:
try:
if len(range) != 2:
raise ValueError(
f"range must be a sequence or array of length 2, but got {len(range)} items"
)
if isinstance(range, (Array, np.ndarray)) and range.shape != (2,):
raise ValueError(
f"range must be a 1-dimensional array of two items, but got an array of shape {range.shape}"
)
except TypeError:
raise TypeError(
f"Expected a sequence or array for range, not {range}"
) from None

token = tokenize(a, bins, range, weights, density)
name = "histogram-sum-" + token

if scalar_bins:
bins = _linspace_from_delayed(range, range, bins + 1)
# ^ NOTE `range` is safe because of the above check, and the initial check
# that range must not be None if `scalar_bins`
else:
if not isinstance(bins, (Array, np.ndarray)):
bins = asarray(bins)
if bins.ndim != 1:
raise ValueError(
f"bins must be a 1-dimensional array or sequence, got shape {bins.shape}"
)

(bins_ref, range_ref), deps = unpack_collections([bins, range])

# Map the histogram to all bins, forming a 2D array of histograms, stacked for each chunk
if weights is None:
dsk = {
(name, i, 0): (_block_hist, k, bins_ref, range_ref)
}
dtype = np.histogram([]).dtype
else:
dsk = {
(name, i, 0): (_block_hist, k, bins_ref, range_ref, w)
for i, (k, w) in enumerate(zip(a_keys, w_keys))
}
dtype = weights.dtype

deps = (a,) + deps
if weights is not None:
deps += (weights,)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=deps)

# Turn graph into a 2D Array of shape (nchunks, nbins)
nbins = bins.size - 1  # since `bins` is 1D
chunks = ((1,) * nchunks, (nbins,))
mapped = Array(graph, name, chunks, dtype=dtype)

# Sum over chunks to get the final histogram
n = mapped.sum(axis=0)

# We need to replicate normed and density options from numpy
if density is not None:
if density:
db = asarray(np.diff(bins).astype(float), chunks=n.chunks)
return n / db / n.sum(), bins
else:
return n, bins
else:
return n, bins

[docs]def histogram2d(x, y, bins=10, range=None, normed=None, weights=None, density=None):
"""Blocked variant of :func:`numpy.histogram2d`.

Parameters
----------
An array containing the `x`-coordinates of the points to be
histogrammed.
An array containing the `y`-coordinates of the points to be
histogrammed.
bins : sequence of arrays describing bin edges, int, or sequence of ints
The bin specification. See the `bins` argument description for
:py:func:`histogramdd` for a complete description of all
possible bin configurations (this function is a 2D specific
version of histogramdd).
range : tuple of pairs, optional.
The leftmost and rightmost edges of the bins along each
dimension when integers are passed to `bins`; of the form:
((xmin, xmax), (ymin, ymax)).
normed : bool, optional
An alias for the density argument that behaves identically. To
avoid confusion with the broken argument in the `histogram`
function, `density` should be preferred.
An array of values weighing each sample in the input data. The
chunks of the weights must be identical to the chunking along
the 0th (row) axis of the data sample.
density : bool, optional
If False (the default) return the number of samples in each
bin. If True, the returned array represents the probability
density function at each bin.

Returns
-------
The values of the histogram.
The edges along the `x`-dimension.
The edges along the `y`-dimension.

--------
histogram
histogramdd

Examples
--------
>>> x = da.array([2, 4, 2, 4, 2, 4])
>>> y = da.array([2, 2, 4, 4, 2, 4])
>>> bins = 2
>>> range = ((0, 6), (0, 6))
>>> h, xedges, yedges = da.histogram2d(x, y, bins=bins, range=range)
>>> h
dask.array<sum-aggregate, shape=(2, 2), dtype=float64, chunksize=(2, 2), chunktype=numpy.ndarray>
>>> xedges
>>> h.compute()
array([[2., 1.],
[1., 2.]])
"""
counts, edges = histogramdd(
(x, y),
bins=bins,
range=range,
normed=normed,
weights=weights,
density=density,
)
return counts, edges, edges

def _block_histogramdd_rect(sample, bins, range, weights):
"""Call numpy.histogramdd for a blocked/chunked calculation.

Slurps the result into an additional outer axis; this new axis
will be used to stack chunked calls of the numpy function and add
them together later.

Returns
-------
:py:object:`np.ndarray`
NumPy array with an additional outer dimension.

"""
return np.histogramdd(sample, bins, range=range, weights=weights)[0:1]

def _block_histogramdd_multiarg(*args):
"""Call numpy.histogramdd for a multi argument blocked/chunked calculation.

Slurps the result into an additional outer axis; this new axis
will be used to stack chunked calls of the numpy function and add
them together later.

The last three arguments _must be_ (bins, range, weights).

The difference between this function and
_block_histogramdd_rect is that here we expect the sample
to be composed of multiple arguments (multiple 1D arrays, each one
representing a coordinate), while _block_histogramdd_rect
expects a single rectangular (2D array where columns are
coordinates) sample.

"""
bins, range, weights = args[-3:]
sample = args[:-3]
return np.histogramdd(sample, bins=bins, range=range, weights=weights)[0:1]

[docs]def histogramdd(sample, bins, range=None, normed=None, weights=None, density=None):
"""Blocked variant of :func:`numpy.histogramdd`.

Chunking of the input data (``sample``) is only allowed along the
0th (row) axis (the axis corresponding to the total number of
samples). Data chunked along the 1st axis (column) axis is not
compatible with this function. If weights are used, they must be
chunked along the 0th axis identically to the input sample.

An example setup for a three dimensional histogram, where the
sample shape is ``(8, 3)`` and weights are shape ``(8,)``, sample
chunks would be ``((4, 4), (3,))`` and the weights chunks would be
``((4, 4),)`` a table of the structure:

+-------+-----------------------+-----------+
|       |      sample (8 x 3)   |  weights  |
+=======+=====+=====+=====+=====+=====+=====+
| chunk | row | `x` | `y` | `z` | row | `w` |
+-------+-----+-----+-----+-----+-----+-----+
|       |   0 |   5 |   6 |   6 |   0 | 0.5 |
|       +-----+-----+-----+-----+-----+-----+
|       |   1 |   8 |   9 |   2 |   1 | 0.8 |
|   0   +-----+-----+-----+-----+-----+-----+
|       |   2 |   3 |   3 |   1 |   2 | 0.3 |
|       +-----+-----+-----+-----+-----+-----+
|       |   3 |   2 |   5 |   6 |   3 | 0.7 |
+-------+-----+-----+-----+-----+-----+-----+
|       |   4 |   3 |   1 |   1 |   4 | 0.3 |
|       +-----+-----+-----+-----+-----+-----+
|       |   5 |   3 |   2 |   9 |   5 | 1.3 |
|   1   +-----+-----+-----+-----+-----+-----+
|       |   6 |   8 |   1 |   5 |   6 | 0.8 |
|       +-----+-----+-----+-----+-----+-----+
|       |   7 |   3 |   5 |   3 |   7 | 0.7 |
+-------+-----+-----+-----+-----+-----+-----+

If the sample 0th dimension and weight 0th (row) dimension are
chunked differently, a ``ValueError`` will be raised. If
coordinate groupings ((x, y, z) trios) are separated by a chunk
boundry, then a ``ValueError`` will be raised. We suggest that you
rechunk your data if it is of that form.

The chunks property of the data (and optional weights) are used to
check for compatibility with the blocked algorithm (as described
above); therefore, you must call `to_dask_array` on a collection

The function is also compatible with `x`, `y`, and `z` being
individual 1D arrays with equal chunking. In that case, the data
should be passed as a tuple: ``histogramdd((x, y, z), ...)``

Parameters
----------
Multidimensional data to be histogrammed.

Note the unusual interpretation of a sample when it is a

* When a (N, D) dask Array, each row is an entry in the sample
(coordinate in D dimensional space).
* When a sequence of dask Arrays, each element in the sequence
is the array of values for a single coordinate.
bins : sequence of arrays describing bin edges, int, or sequence of ints
The bin specification.

The possible binning configurations are:

* A sequence of arrays describing the monotonically increasing
bin edges along each dimension.
* A single int describing the total number of bins that will
be used in each dimension (this requires the ``range``
argument to be defined).
* A sequence of ints describing the total number of bins to be
used in each dimension (this requires the ``range`` argument
to be defined).

When bins are described by arrays, the rightmost edge is
included. Bins described by arrays also allows for non-uniform
bin widths.
range : sequence of pairs, optional
A sequence of length D, each a (min, max) tuple giving the
outer bin edges to be used if the edges are not given
explicitly in `bins`. If defined, this argument is required to
have an entry for each dimension. Unlike
:func:`numpy.histogramdd`, if `bins` does not define bin
edges, this argument is required (this function will not
automatically use the min and max of of the value in a given
dimension because the input data may be lazy in dask).
normed : bool, optional
An alias for the density argument that behaves identically. To
avoid confusion with the broken argument to `histogram`,
`density` should be preferred.
An array of values weighing each sample in the input data. The
chunks of the weights must be identical to the chunking along
the 0th (row) axis of the data sample.
density : bool, optional
If ``False`` (default), the returned array represents the
number of samples in each bin. If ``True``, the returned array
represents the probability density function at each bin.

--------
histogram

Returns
-------
The values of the histogram.
Sequence of arrays representing the bin edges along each
dimension.

Examples
--------
Computing the histogram in 5 blocks using different bin edges
along each dimension:

>>> x = da.random.uniform(0, 1, size=(1000, 3), chunks=(200, 3))
>>> edges = [
...     np.linspace(0, 1, 5), # 4 bins in 1st dim
...     np.linspace(0, 1, 6), # 5 in the 2nd
...     np.linspace(0, 1, 4), # 3 in the 3rd
... ]
>>> h, edges = da.histogramdd(x, bins=edges)
>>> result = h.compute()
>>> result.shape
(4, 5, 3)

Defining the bins by total number and their ranges, along with
using weights:

>>> bins = (4, 5, 3)
>>> ranges = ((0, 1),) * 3  # expands to ((0, 1), (0, 1), (0, 1))
>>> w = da.random.uniform(0, 1, size=(1000,), chunks=x.chunksize)
>>> h, edges = da.histogramdd(x, bins=bins, range=ranges, weights=w)
>>> np.isclose(h.sum().compute(), w.sum().compute())
True

Using a sequence of 1D arrays as the input:

>>> x = da.array([2, 4, 2, 4, 2, 4])
>>> y = da.array([2, 2, 4, 4, 2, 4])
>>> z = da.array([4, 2, 4, 2, 4, 2])
>>> bins = ([0, 3, 6],) * 3
>>> h, edges = da.histogramdd((x, y, z), bins)
>>> h
dask.array<sum-aggregate, shape=(2, 2, 2), dtype=float64, chunksize=(2, 2, 2), chunktype=numpy.ndarray>
>>> edges
>>> h.compute()
array([[[0., 2.],
[0., 1.]],
<BLANKLINE>
[[1., 0.],
[2., 0.]]])
>>> edges.compute()
array([0, 3, 6])
>>> edges.compute()
array([0, 3, 6])
>>> edges.compute()
array([0, 3, 6])

"""
# logic used in numpy.histogramdd to handle normed/density.
if normed is None:
if density is None:
density = False
elif density is None:
# an explicit normed argument was passed, alias it to the new name
density = normed
else:
raise TypeError("Cannot specify both 'normed' and 'density'")

# check if any dask collections (dc) were passed to bins= or
# range= these are unsupported.
if isinstance(bins, (list, tuple)):
dc_bins = dc_bins or any([is_dask_collection(b) for b in bins])
dc_range = (
any([is_dask_collection(r) for r in range]) if range is not None else False
)
if dc_bins or dc_range:
raise NotImplementedError(
"Passing dask collections to bins=... or range=... is not supported."
)

# generate token and name for task
token = tokenize(sample, bins, range, weights, density)
name = f"histogramdd-sum-{token}"

# N == total number of samples
# D == total number of dimensions
if hasattr(sample, "shape"):
if len(sample.shape) != 2:
raise ValueError("Single array input to histogramdd should be columnar")
else:
_, D = sample.shape
n_chunks = sample.numblocks
rectangular_sample = True
# Require data to be chunked along the first axis only.
if sample.shape[1:] != sample.chunksize[1:]:
raise ValueError("Input array can only be chunked along the 0th axis.")
elif isinstance(sample, (tuple, list)):
rectangular_sample = False
D = len(sample)
n_chunks = sample.numblocks
for i in _range(1, D):
if sample[i].chunks != sample.chunks:
raise ValueError("All coordinate arrays must be chunked identically.")
else:
raise ValueError(
"Incompatible sample. Must be a 2D array or a sequence of 1D arrays."
)

# Require only Array or Delayed objects for bins, range, and weights.
for argname, val in [("bins", bins), ("range", range), ("weights", weights)]:
if not isinstance(bins, (Array, Delayed)) and is_dask_collection(bins):
raise TypeError(
"Dask types besides Array and Delayed are not supported "
"for `histogramdd`. For argument `{}`, got: {!r}".format(argname, val)
)

# Require that the chunking of the sample and weights are compatible.
if weights is not None:
if rectangular_sample and weights.chunks != sample.chunks:
raise ValueError(
"Input array and weights must have the same shape "
"and chunk structure along the first dimension."
)
elif not rectangular_sample and weights.numblocks != n_chunks:
raise ValueError(
"Input arrays and weights must have the same shape "
"and chunk structure."
)

# if bins is a list, tuple, then make sure the length is the same
# as the number dimensions.
if isinstance(bins, (list, tuple)):
if len(bins) != D:
raise ValueError(
"The dimension of bins must be equal to the dimension of the sample."
)

# if range is defined, check that it's the right length and also a
# sequence of pairs.
if range is not None:
if len(range) != D:
raise ValueError(
"range argument requires one entry, a min max pair, per dimension."
)
if not all(len(r) == 2 for r in range):
raise ValueError("range argument should be a sequence of pairs")

# If bins is a single int, create a tuple of len `D` containing `bins`.
if isinstance(bins, int):
bins = (bins,) * D

# we will return the edges to mimic the NumPy API (we also use the
# edges later as a way to calculate the total number of bins).
if all(isinstance(b, int) for b in bins) and all(len(r) == 2 for r in range):
edges = [np.linspace(r, r, b + 1) for b, r in zip(bins, range)]
else:
edges = [np.asarray(b) for b in bins]

if rectangular_sample:
deps = (sample,)
else:
deps = tuple(sample)

if weights is not None:
deps += (weights,)
dtype = weights.dtype
else:
w_keys = (None,) * n_chunks
dtype = np.histogramdd([]).dtype

# This tuple of zeros represents the chunk index along the columns
# (we only allow chunking along the rows).
column_zeros = tuple(0 for _ in _range(D))

# With dsk below, we will construct a (D + 1) dimensional array
# stacked for each chunk. For example, if the histogram is going
# to be 3 dimensions, this creates a stack of cubes (1 cube for
# each sample chunk) that will be collapsed into a final cube (the
# result). Depending on the input data, we can do this in two ways
#
# 1. The rectangular case: when the sample is a single 2D array
#    where each column in the sample represents a coordinate of
#    the sample).
#
# 2. The sequence-of-arrays case, when the sample is a tuple or
#    list of arrays, with each array in that sequence representing
#    the entirety of one coordinate of the complete sample.

if rectangular_sample:
dsk = {
(name, i, *column_zeros): (_block_histogramdd_rect, k, bins, range, w)
for i, (k, w) in enumerate(zip(sample_keys, w_keys))
}
else:
sample_keys = [
]
fused_on_chunk_keys = [
tuple(sample_keys[j][i] for j in _range(D)) for i in _range(n_chunks)
]
dsk = {
(name, i, *column_zeros): (
_block_histogramdd_multiarg,
*(*k, bins, range, w),
)
for i, (k, w) in enumerate(zip(fused_on_chunk_keys, w_keys))
}

graph = HighLevelGraph.from_collections(name, dsk, dependencies=deps)
all_nbins = tuple((b.size - 1,) for b in edges)
stacked_chunks = ((1,) * n_chunks, *all_nbins)
mapped = Array(graph, name, stacked_chunks, dtype=dtype)
# Finally, sum over chunks providing to get the final D
# dimensional result array.
n = mapped.sum(axis=0)

if density:
# compute array of values to divide by the bin width along
# each dimension.
width_divider = np.ones(n.shape)
for i in _range(D):
shape = np.ones(D, int)
shape[i] = width_divider.shape[i]
width_divider *= np.diff(edges[i]).reshape(shape)
width_divider = asarray(width_divider, chunks=n.chunks)
return n / width_divider / n.sum(), edges

return n, [asarray(entry) for entry in edges]

[docs]@derived_from(np)
def cov(m, y=None, rowvar=1, bias=0, ddof=None):
# This was copied almost verbatim from np.cov
# or NUMPY_LICENSE.txt within this directory
if ddof is not None and ddof != int(ddof):
raise ValueError("ddof must be integer")

# Handles complex arrays too
m = asarray(m)
if y is None:
dtype = np.result_type(m, np.float64)
else:
y = asarray(y)
dtype = np.result_type(m, y, np.float64)
X = array(m, ndmin=2, dtype=dtype)

if X.shape == 1:
rowvar = 1
if rowvar:
N = X.shape
axis = 0
else:
N = X.shape
axis = 1

# check ddof
if ddof is None:
if bias == 0:
ddof = 1
else:
ddof = 0
fact = float(N - ddof)
if fact <= 0:
warnings.warn("Degrees of freedom <= 0 for slice", RuntimeWarning)
fact = 0.0

if y is not None:
y = array(y, ndmin=2, dtype=dtype)
X = concatenate((X, y), axis)

X = X - X.mean(axis=1 - axis, keepdims=True)
if not rowvar:
return (dot(X.T, X.conj()) / fact).squeeze()
else:
return (dot(X, X.T.conj()) / fact).squeeze()

[docs]@derived_from(np)
def corrcoef(x, y=None, rowvar=1):
c = cov(x, y, rowvar)
if c.shape == ():
return c / c
d = diag(c)
d = d.reshape((d.shape, 1))
sqr_d = sqrt(d)
return (c / sqr_d) / sqr_d.T

[docs]@implements(np.round, np.round_)
@derived_from(np)
def round(a, decimals=0):
return a.map_blocks(np.round, decimals=decimals, dtype=a.dtype)

@implements(np.ndim)
@derived_from(np)
def ndim(a):
return a.ndim

@implements(np.iscomplexobj)
@derived_from(np)
def iscomplexobj(x):
return issubclass(x.dtype.type, np.complexfloating)

def _unique_internal(ar, indices, counts, return_inverse=False):
"""
Helper/wrapper function for :func:`numpy.unique`.

Uses :func:`numpy.unique` to find the unique values for the array chunk.
Given this chunk may not represent the whole array, also take the
``indices`` and ``counts`` that are in 1-to-1 correspondence to ``ar``
and reduce them in the same fashion as ``ar`` is reduced. Namely sum
any counts that correspond to the same value and take the smallest
index that corresponds to the same value.

To handle the inverse mapping from the unique values to the original
array, simply return a NumPy array created with ``arange`` with enough
values to correspond 1-to-1 to the unique values. While there is more
work needed to be done to create the full inverse mapping for the
original array, this provides enough information to generate the

Given Dask likes to have one array returned from functions like
``blockwise``, some formatting is done to stuff all of the resulting arrays
into one big NumPy structured array. Dask is then able to handle this
object and can split it apart into the separate results on the Dask side,
which then can be passed back to this function in concatenated chunks for
further reduction or can be return to the user to perform other forms of
analysis.

By handling the problem in this way, it does not matter where a chunk
is in a larger array or how big it is. The chunk can still be computed
on the same way. Also it does not matter if the chunk is the result of
other chunks being run through this function multiple times. The end
result will still be just as accurate using this strategy.
"""

return_index = indices is not None
return_counts = counts is not None

u = np.unique(ar)

dt = [("values", u.dtype)]
if return_index:
dt.append(("indices", np.intp))
if return_inverse:
dt.append(("inverse", np.intp))
if return_counts:
dt.append(("counts", np.intp))

r = np.empty(u.shape, dtype=dt)
r["values"] = u
if return_inverse:
r["inverse"] = np.arange(len(r), dtype=np.intp)
if return_index or return_counts:
for i, v in enumerate(r["values"]):
m = ar == v
if return_index:
indices[m].min(keepdims=True, out=r["indices"][i : i + 1])
if return_counts:
counts[m].sum(keepdims=True, out=r["counts"][i : i + 1])

return r

def unique_no_structured_arr(
ar, return_index=False, return_inverse=False, return_counts=False
):
# A simplified version of `unique`, that allows computing unique for array
# types that don't support structured arrays (such as cupy.ndarray), but
# can only compute values at the moment.

if (
return_index is not False
or return_inverse is not False
or return_counts is not False
):
raise ValueError(
"dask.array.unique does not support `return_index`, `return_inverse` "
"or `return_counts` with array types that don't support structured "
"arrays."
)

ar = ar.ravel()

args = [ar, "i"]
meta = meta_from_array(ar)

out = blockwise(np.unique, "i", *args, meta=meta)
out._chunks = tuple((np.nan,) * len(c) for c in out.chunks)

out_parts = [out]

name = "unique-aggregate-" + out.name
dsk = {
(name, 0): (
(np.unique,)
+ tuple(
else o
for o in out_parts
)
)
}

dependencies = [o for o in out_parts if hasattr(o, "__dask_keys__")]
graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies)
chunks = ((np.nan,),)
out = Array(graph, name, chunks, meta=meta)

result = [out]

if len(result) == 1:
result = result
else:
result = tuple(result)

return result

[docs]@derived_from(np)
def unique(ar, return_index=False, return_inverse=False, return_counts=False):
# Test whether the downstream library supports structured arrays. If the
# `np.empty_like` call raises a `TypeError`, the downstream library (e.g.,
# CuPy) doesn't support it. In that case we return the
# `unique_no_structured_arr` implementation, otherwise (e.g., NumPy) just
# continue as normal.
try:
meta = meta_from_array(ar)
np.empty_like(meta, dtype=[("a", int), ("b", float)])
except TypeError:
return unique_no_structured_arr(
ar,
return_index=return_index,
return_inverse=return_inverse,
return_counts=return_counts,
)

ar = ar.ravel()

# Run unique on each chunk and collect results in a Dask Array of
# unknown size.

args = [ar, "i"]
out_dtype = [("values", ar.dtype)]
if return_index:
args.extend([arange(ar.shape, dtype=np.intp, chunks=ar.chunks), "i"])
out_dtype.append(("indices", np.intp))
else:
args.extend([None, None])
if return_counts:
args.extend([ones((ar.shape,), dtype=np.intp, chunks=ar.chunks), "i"])
out_dtype.append(("counts", np.intp))
else:
args.extend([None, None])

out = blockwise(_unique_internal, "i", *args, dtype=out_dtype, return_inverse=False)
out._chunks = tuple((np.nan,) * len(c) for c in out.chunks)

# Take the results from the unique chunks and do the following.
#
# 1. Collect all results as arguments.
# 2. Concatenate each result into one big array.
# 3. Pass all results as arguments to the internal unique again.
#
# TODO: This should be replaced with a tree reduction using this strategy.

out_parts = [out["values"]]
if return_index:
out_parts.append(out["indices"])
else:
out_parts.append(None)
if return_counts:
out_parts.append(out["counts"])
else:
out_parts.append(None)

name = "unique-aggregate-" + out.name
dsk = {
(name, 0): (
(_unique_internal,)
+ tuple(
else o
for o in out_parts
)
+ (return_inverse,)
)
}
out_dtype = [("values", ar.dtype)]
if return_index:
out_dtype.append(("indices", np.intp))
if return_inverse:
out_dtype.append(("inverse", np.intp))
if return_counts:
out_dtype.append(("counts", np.intp))

dependencies = [o for o in out_parts if hasattr(o, "__dask_keys__")]
graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies)
chunks = ((np.nan,),)
out = Array(graph, name, chunks, out_dtype)

result = [out["values"]]
if return_index:
result.append(out["indices"])
if return_inverse:
# Using the returned unique values and arange of unknown length, find
# each value matching a unique value and replace it with its
# corresponding index or `0`. There should be only one entry for this
# index in axis `1` (the one of unknown length). Reduce axis `1`
# through summing to get an array with known dimensionality and the
# mapping of the original values.
mtches = (ar[:, None] == out["values"][None, :]).astype(np.intp)
result.append((mtches * out["inverse"]).sum(axis=1))
if return_counts:
result.append(out["counts"])

if len(result) == 1:
result = result
else:
result = tuple(result)

return result

def _isin_kernel(element, test_elements, assume_unique=False):
values = np.in1d(element.ravel(), test_elements, assume_unique=assume_unique)
return values.reshape(element.shape + (1,) * test_elements.ndim)

@safe_wraps(getattr(np, "isin", None))
def isin(element, test_elements, assume_unique=False, invert=False):
element = asarray(element)
test_elements = asarray(test_elements)
element_axes = tuple(range(element.ndim))
test_axes = tuple(i + element.ndim for i in range(test_elements.ndim))
mapped = blockwise(
_isin_kernel,
element_axes + test_axes,
element,
element_axes,
test_elements,
test_axes,
adjust_chunks={axis: lambda _: 1 for axis in test_axes},
dtype=bool,
assume_unique=assume_unique,
)

result = mapped.any(axis=test_axes)
if invert:
result = ~result
return result

[docs]@derived_from(np)
def roll(array, shift, axis=None):
result = array

if axis is None:
result = ravel(result)

if not isinstance(shift, Integral):
raise TypeError(
"Expect `shift` to be an instance of Integral when `axis` is None."
)

shift = (shift,)
axis = (0,)
else:
try:
len(shift)
except TypeError:
shift = (shift,)
try:
len(axis)
except TypeError:
axis = (axis,)

if len(shift) != len(axis):
raise ValueError("Must have the same number of shifts as axes.")

for i, s in zip(axis, shift):
shape = result.shape[i]
s = 0 if shape == 0 else -s % shape

sl1 = result.ndim * [slice(None)]
sl2 = result.ndim * [slice(None)]

sl1[i] = slice(s, None)
sl2[i] = slice(None, s)

sl1 = tuple(sl1)
sl2 = tuple(sl2)

result = concatenate([result[sl1], result[sl2]], axis=i)

result = result.reshape(array.shape)
# Ensure that the output is always a new array object
result = result.copy() if result is array else result

return result

@derived_from(np)
def shape(array):
return array.shape

@derived_from(np)
def union1d(ar1, ar2):
return unique(concatenate((ar1.ravel(), ar2.ravel())))

[docs]@derived_from(np)
def ravel(array_like):
return asanyarray(array_like).reshape((-1,))

@derived_from(np)
def expand_dims(a, axis):
if type(axis) not in (tuple, list):
axis = (axis,)

out_ndim = len(axis) + a.ndim
axis = validate_axis(axis, out_ndim)

shape_it = iter(a.shape)
shape = [1 if ax in axis else next(shape_it) for ax in range(out_ndim)]

return a.reshape(shape)

[docs]@derived_from(np)
def squeeze(a, axis=None):
if axis is None:
axis = tuple(i for i, d in enumerate(a.shape) if d == 1)
elif not isinstance(axis, tuple):
axis = (axis,)

if any(a.shape[i] != 1 for i in axis):
raise ValueError("cannot squeeze axis with size other than one")

axis = validate_axis(axis, a.ndim)

sl = tuple(0 if i in axis else slice(None) for i, s in enumerate(a.shape))

# Return 0d Dask Array if all axes are squeezed,
if all(s == 0 for s in sl) and all(s == 1 for s in a.shape):
return a.map_blocks(
np.squeeze, meta=a._meta, drop_axis=tuple(range(len(a.shape)))
)

a = a[sl]

return a

[docs]@derived_from(np)
def compress(condition, a, axis=None):

if not is_arraylike(condition):
# Allow `condition` to be anything array-like, otherwise ensure `condition`
# is a numpy array.
condition = np.asarray(condition)
condition = condition.astype(bool)
a = asarray(a)

if condition.ndim != 1:
raise ValueError("Condition must be one dimensional")

if axis is None:
a = a.ravel()
axis = 0
axis = validate_axis(axis, a.ndim)

# Treat `condition` as filled with `False` (if it is too short)
a = a[
tuple(
slice(None, len(condition)) if i == axis else slice(None)
for i in range(a.ndim)
)
]

# Use `condition` to select along 1 dimension
a = a[tuple(condition if i == axis else slice(None) for i in range(a.ndim))]

return a

@derived_from(np)
def extract(condition, arr):
condition = asarray(condition).astype(bool)
arr = asarray(arr)
return compress(condition.ravel(), arr.ravel())

[docs]@derived_from(np)
def take(a, indices, axis=0):
axis = validate_axis(axis, a.ndim)

if isinstance(a, np.ndarray) and isinstance(indices, Array):
else:
return a[(slice(None),) * axis + (indices,)]

assert isinstance(a, np.ndarray)
assert isinstance(indices, Array)

return indices.map_blocks(
lambda block: np.take(a, block, axis), chunks=indices.chunks, dtype=a.dtype
)

[docs]@derived_from(np)
def around(x, decimals=0):
return map_blocks(partial(np.around, decimals=decimals), x, dtype=x.dtype)

def _asarray_isnull(values):
import pandas as pd

return np.asarray(pd.isnull(values))

[docs]def isnull(values):
# eagerly raise ImportError, if pandas isn't available
import pandas as pd  # noqa

return elemwise(_asarray_isnull, values, dtype="bool")

[docs]def notnull(values):
return ~isnull(values)

[docs]@derived_from(np)
def isclose(arr1, arr2, rtol=1e-5, atol=1e-8, equal_nan=False):
func = partial(np.isclose, rtol=rtol, atol=atol, equal_nan=equal_nan)
return elemwise(func, arr1, arr2, dtype="bool")

[docs]@derived_from(np)
def allclose(arr1, arr2, rtol=1e-5, atol=1e-8, equal_nan=False):
return isclose(arr1, arr2, rtol=rtol, atol=atol, equal_nan=equal_nan).all()

return np.choose(a, choices)

[docs]@derived_from(np)
def choose(a, choices):

def _isnonzero_vec(v):
return bool(np.count_nonzero(v))

_isnonzero_vec = np.vectorize(_isnonzero_vec, otypes=[bool])

def isnonzero(a):
if a.dtype.kind in {"U", "S"}:
# NumPy treats all-whitespace strings as falsy (like in `np.nonzero`).
# but not in `.astype(bool)`. To match the behavior of numpy at least until
# 1.19, we use `_isnonzero_vec`. When NumPy changes behavior, we should just
# use the try block below.
# https://github.com/numpy/numpy/issues/9875
return a.map_blocks(_isnonzero_vec, dtype=bool)
try:
np.zeros(tuple(), dtype=a.dtype).astype(bool)
except ValueError:
######################################################
# Handle special cases where conversion to bool does #
# not work correctly.                                #
#                                                    #
# xref: https://github.com/numpy/numpy/issues/9479   #
######################################################
return a.map_blocks(_isnonzero_vec, dtype=bool)
else:
return a.astype(bool)

[docs]@derived_from(np)
def argwhere(a):
a = asarray(a)

nz = isnonzero(a).flatten()

ind = indices(a.shape, dtype=np.intp, chunks=a.chunks)
if ind.ndim > 1:
ind = stack([ind[i].ravel() for i in range(len(ind))], axis=1)
ind = compress(nz, ind, axis=0)

return ind

[docs]@derived_from(np)
def where(condition, x=None, y=None):
if (x is None) != (y is None):
raise ValueError("either both or neither of x and y should be given")
if (x is None) and (y is None):
return nonzero(condition)

if np.isscalar(condition):
dtype = result_type(x, y)
x = asarray(x)
y = asarray(y)

out = x if condition else y

else:
return elemwise(np.where, condition, x, y)

[docs]@derived_from(np)
def count_nonzero(a, axis=None):
return isnonzero(asarray(a)).astype(np.intp).sum(axis=axis)

[docs]@derived_from(np)
def flatnonzero(a):
return argwhere(asarray(a).ravel())[:, 0]

[docs]@derived_from(np)
def nonzero(a):
ind = argwhere(a)
if ind.ndim > 1:
return tuple(ind[:, i] for i in range(ind.shape))
else:
return (ind,)

def _unravel_index_kernel(indices, func_kwargs):
return np.stack(np.unravel_index(indices, **func_kwargs))

[docs]@derived_from(np)
def unravel_index(indices, shape, order="C"):
if shape and indices.size:
unraveled_indices = tuple(
indices.map_blocks(
_unravel_index_kernel,
dtype=np.intp,
chunks=(((len(shape),),) + indices.chunks),
new_axis=0,
func_kwargs={"shape": shape, "order": order},
)
)
else:
unraveled_indices = tuple(empty((0,), dtype=np.intp, chunks=1) for i in shape)

return unraveled_indices

@wraps(np.ravel_multi_index)
def ravel_multi_index(multi_index, dims, mode="raise", order="C"):
if np.isscalar(dims):
dims = (dims,)
raise NotImplementedError(
f"Dask types are not supported in the `dims` argument: {dims!r}"
)

if is_arraylike(multi_index):
index_stack = asarray(multi_index)
else:
index_stack = stack(multi_index_arrs)

if not np.isnan(index_stack.shape).any() and len(index_stack) != len(dims):
raise ValueError(
f"parameter multi_index must be a sequence of length {len(dims)}"
)
if not np.issubdtype(index_stack.dtype, np.signedinteger):
raise TypeError("only int indices permitted")
return index_stack.map_blocks(
np.ravel_multi_index,
dtype=np.intp,
chunks=index_stack.chunks[1:],
drop_axis=0,
dims=dims,
mode=mode,
order=order,
)

def _int_piecewise(x, *condlist, **kwargs):
return np.piecewise(
x, list(condlist), kwargs["funclist"], *kwargs["func_args"], **kwargs["func_kw"]
)

[docs]@derived_from(np)
def piecewise(x, condlist, funclist, *args, **kw):
return map_blocks(
_int_piecewise,
x,
*condlist,
dtype=x.dtype,
name="piecewise",
funclist=funclist,
func_args=args,
func_kw=kw,
)

def _select(*args, **kwargs):
"""
This is a version of :func:`numpy.select` that acceptes an arbitrary number of arguments and
splits them in half to create ``condlist`` and ``choicelist`` params.
"""
split_at = len(args) // 2
condlist = args[:split_at]
choicelist = args[split_at:]
return np.select(condlist, choicelist, **kwargs)

@derived_from(np)
def select(condlist, choicelist, default=0):
# Making the same checks that np.select
# Check the size of condlist and choicelist are the same, or abort.
if len(condlist) != len(choicelist):
raise ValueError("list of cases must be same length as list of conditions")

if len(condlist) == 0:
raise ValueError("select with an empty condition list is not possible")

choicelist = [asarray(choice) for choice in choicelist]

try:
intermediate_dtype = result_type(*choicelist)
except TypeError as e:
msg = "Choicelist elements do not have a common dtype."
raise TypeError(msg) from e

blockwise_shape = tuple(range(choicelist.ndim))

condargs = [arg for elem in condlist for arg in (elem, blockwise_shape)]
choiceargs = [arg for elem in choicelist for arg in (elem, blockwise_shape)]

return blockwise(
_select,
blockwise_shape,
*condargs,
*choiceargs,
dtype=intermediate_dtype,
name="select",
default=default,
)

def _partition(total: int, divisor: int) -> tuple[tuple[int, ...], tuple[int, ...]]:
"""Given a total and a divisor, return two tuples: A tuple containing `divisor`
repeated the number of times it divides `total`, and length-1 or empty tuple
containing the remainder when `total` is divided by `divisor`. If `divisor` factors
`total`, i.e. if the remainder is 0, then `remainder` is empty.
"""
multiples = (divisor,) * (total // divisor)
remainder = (total % divisor,) if total % divisor else ()
return multiples, remainder

def aligned_coarsen_chunks(chunks: list[int], multiple: int) -> tuple[int, ...]:
"""
Returns a new chunking aligned with the coarsening multiple.
Any excess is at the end of the array.

Examples
--------
>>> aligned_coarsen_chunks(chunks=(1, 2, 3), multiple=4)
(4, 2)
>>> aligned_coarsen_chunks(chunks=(1, 20, 3, 4), multiple=4)
(4, 20, 4)
>>> aligned_coarsen_chunks(chunks=(20, 10, 15, 23, 24), multiple=10)
(20, 10, 20, 20, 20, 2)
"""
overflow = np.array(chunks) % multiple
excess = overflow.sum()
new_chunks = np.array(chunks) - overflow
# valid chunks are those that are already factorizable by `multiple`
chunk_validity = new_chunks == chunks
valid_inds, invalid_inds = np.where(chunk_validity), np.where(~chunk_validity)
# sort the invalid chunks by size (ascending), then concatenate the results of
# sorting the valid chunks by size (ascending)
chunk_modification_order = [
*invalid_inds[np.argsort(new_chunks[invalid_inds])],
*valid_inds[np.argsort(new_chunks[valid_inds])],
]
partitioned_excess, remainder = _partition(excess, multiple)
# add elements the partitioned excess to the smallest invalid chunks,
# then smallest valid chunks if needed.
for idx, extra in enumerate(partitioned_excess):
new_chunks[chunk_modification_order[idx]] += extra
# create excess chunk with remainder, if any remainder exists
new_chunks = np.array([*new_chunks, *remainder])
# remove 0-sized chunks
new_chunks = new_chunks[new_chunks > 0]
return tuple(new_chunks)

@wraps(chunk.coarsen)
def coarsen(reduction, x, axes, trim_excess=False, **kwargs):
if not trim_excess and not all(x.shape[i] % div == 0 for i, div in axes.items()):
msg = f"Coarsening factors {axes} do not align with array shape {x.shape}."
raise ValueError(msg)

reduction = getattr(np, reduction.__name__)

new_chunks = {}
for i, div in axes.items():
aligned = aligned_coarsen_chunks(x.chunks[i], div)
if aligned != x.chunks[i]:
new_chunks[i] = aligned
if new_chunks:
x = x.rechunk(new_chunks)

name = "coarsen-" + tokenize(reduction, x, axes, trim_excess)
dsk = {
(name,)
+ key[1:]: (apply, chunk.coarsen, [reduction, key, axes, trim_excess], kwargs)
}
chunks = tuple(
tuple(int(bd // axes.get(i, 1)) for bd in bds) for i, bds in enumerate(x.chunks)
)

meta = reduction(np.empty((1,) * x.ndim, dtype=x.dtype), **kwargs)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x])
return Array(graph, name, chunks, meta=meta)

def split_at_breaks(array, breaks, axis=0):
"""Split an array into a list of arrays (using slices) at the given breaks

>>> split_at_breaks(np.arange(6), [3, 5])
[array([0, 1, 2]), array([3, 4]), array()]
"""
slices = [slice(i, j) for i, j in sliding_window(2, padded_breaks)]
preslice = (slice(None),) * axis
split_array = [array[preslice + (s,)] for s in slices]
return split_array

[docs]@derived_from(np)
def insert(arr, obj, values, axis):
# axis is a required argument here to avoid needing to deal with the numpy
# default case (which reshapes the array to make it flat)
axis = validate_axis(axis, arr.ndim)

if isinstance(obj, slice):
obj = np.arange(*obj.indices(arr.shape[axis]))
obj = np.asarray(obj)
scalar_obj = obj.ndim == 0
if scalar_obj:
obj = np.atleast_1d(obj)

obj = np.where(obj < 0, obj + arr.shape[axis], obj)
if (np.diff(obj) < 0).any():
raise NotImplementedError(
"da.insert only implemented for monotonic ``obj`` argument"
)

split_arr = split_at_breaks(arr, np.unique(obj), axis)

if getattr(values, "ndim", 0) == 0:
# we need to turn values into a dask array
name = "values-" + tokenize(values)
dtype = getattr(values, "dtype", type(values))
values = Array({(name,): values}, name, chunks=(), dtype=dtype)

values_shape = tuple(
len(obj) if axis == n else s for n, s in enumerate(arr.shape)
)
elif scalar_obj:
values = values[(slice(None),) * axis + (None,)]

values_chunks = tuple(
values_bd if axis == n else arr_bd
for n, (arr_bd, values_bd) in enumerate(zip(arr.chunks, values.chunks))
)
values = values.rechunk(values_chunks)

counts = np.bincount(obj)[:-1]
values_breaks = np.cumsum(counts[counts > 0])
split_values = split_at_breaks(values, values_breaks, axis)

interleaved = list(interleave([split_arr, split_values]))
interleaved = [i for i in interleaved if i.nbytes]
return concatenate(interleaved, axis=axis)

@derived_from(np)
def delete(arr, obj, axis):
"""
NOTE: If ``obj`` is a dask array it is implicitly computed when this function
is called.
"""
# axis is a required argument here to avoid needing to deal with the numpy
# default case (which reshapes the array to make it flat)
axis = validate_axis(axis, arr.ndim)

if isinstance(obj, slice):
tmp = np.arange(*obj.indices(arr.shape[axis]))
obj = tmp[::-1] if obj.step and obj.step < 0 else tmp
else:
obj = np.asarray(obj)
obj = np.where(obj < 0, obj + arr.shape[axis], obj)
obj = np.unique(obj)

target_arr = split_at_breaks(arr, obj, axis)

target_arr = [
arr[
tuple(slice(1, None) if axis == n else slice(None) for n in range(arr.ndim))
]
if i != 0
else arr
for i, arr in enumerate(target_arr)
]
return concatenate(target_arr, axis=axis)

[docs]@derived_from(np)
def append(arr, values, axis=None):
# based on numpy.append
arr = asanyarray(arr)
if axis is None:
if arr.ndim != 1:
arr = arr.ravel()
values = ravel(asanyarray(values))
axis = arr.ndim - 1
return concatenate((arr, values), axis=axis)

def _average(
a, axis=None, weights=None, returned=False, is_masked=False, keepdims=False
):
# This was minimally modified from numpy.average
# or NUMPY_LICENSE.txt within this directory
# Wrapper used by da.average or da.ma.average.
a = asanyarray(a)

if weights is None:
avg = a.mean(axis, keepdims=keepdims)
scl = avg.dtype.type(a.size / avg.size)
else:
wgt = asanyarray(weights)

if issubclass(a.dtype.type, (np.integer, np.bool_)):
result_dtype = result_type(a.dtype, wgt.dtype, "f8")
else:
result_dtype = result_type(a.dtype, wgt.dtype)

# Sanity checks
if a.shape != wgt.shape:
if axis is None:
raise TypeError(
"Axis must be specified when shapes of a and weights differ."
)
if wgt.ndim != 1:
raise TypeError(
"1D weights expected when shapes of a and weights differ."
)
if wgt.shape != a.shape[axis]:
raise ValueError(
"Length of weights not compatible with specified axis."
)

# setup wgt to broadcast along axis
wgt = broadcast_to(wgt, (a.ndim - 1) * (1,) + wgt.shape)
wgt = wgt.swapaxes(-1, axis)

scl = wgt.sum(axis=axis, dtype=result_dtype, keepdims=keepdims)
avg = multiply(a, wgt, dtype=result_dtype).sum(axis, keepdims=keepdims) / scl

if returned:
if scl.shape != avg.shape:
return avg, scl
else:
return avg

[docs]@derived_from(np)
def average(a, axis=None, weights=None, returned=False, keepdims=False):
return _average(a, axis, weights, returned, is_masked=False, keepdims=keepdims)

[docs]@derived_from(np)
def tril(m, k=0):
m = asarray_safe(m, like=m)
*m.shape[-2:],
k=k,
dtype=bool,
chunks=m.chunks[-2:],
like=meta_from_array(m) if _numpy_120 else None,
)

[docs]@derived_from(np)
def triu(m, k=0):
m = asarray_safe(m, like=m)
*m.shape[-2:],
k=k - 1,
dtype=bool,
chunks=m.chunks[-2:],
like=meta_from_array(m) if _numpy_120 else None,
)

@derived_from(np)
def tril_indices(n, k=0, m=None, chunks="auto"):
return nonzero(tri(n, m, k=k, dtype=bool, chunks=chunks))

@derived_from(np)
def tril_indices_from(arr, k=0):
if arr.ndim != 2:
raise ValueError("input array must be 2-d")
return tril_indices(arr.shape[-2], k=k, m=arr.shape[-1], chunks=arr.chunks)

@derived_from(np)
def triu_indices(n, k=0, m=None, chunks="auto"):
return nonzero(~tri(n, m, k=k - 1, dtype=bool, chunks=chunks))

@derived_from(np)
def triu_indices_from(arr, k=0):
if arr.ndim != 2:
raise ValueError("input array must be 2-d")
return triu_indices(arr.shape[-2], k=k, m=arr.shape[-1], chunks=arr.chunks)
```