```from __future__ import annotations

import operator
import warnings
from functools import partial
from numbers import Number

import numpy as np
import tlz as toolz

from dask.array.core import Array, concatenate, dotmany, from_delayed
array_safe,
meta_from_array,
solve_triangular_safe,
svd_flip,
)

def _cumsum_blocks(it):
total = 0
for x in it:
total_previous = total
total += x
yield (total_previous, total)

def _cumsum_part(last, new):
return (last, last + new)

def _nanmin(m, n):
k_0 = min([m, n])
k_1 = m if np.isnan(n) else n
return k_1 if np.isnan(k_0) else k_0

def _wrapped_qr(a):
"""
A wrapper for np.linalg.qr that handles arrays with 0 rows

Notes: Created for tsqr so as to manage cases with uncertain
array dimensions. In particular, the case where arrays have
(uncertain) chunks with 0 rows.
"""
# workaround may be removed when numpy stops rejecting edge cases
if a.shape == 0:
return np.zeros_like(a, shape=(0, 0)), np.zeros_like(a, shape=(0, a.shape))
else:
return np.linalg.qr(a)

[docs]def tsqr(data, compute_svd=False, _max_vchunk_size=None):
"""Direct Tall-and-Skinny QR algorithm

As presented in:

A. Benson, D. Gleich, and J. Demmel.
Direct QR factorizations for tall-and-skinny matrices in
MapReduce architectures.
IEEE International Conference on Big Data, 2013.
https://arxiv.org/abs/1301.1071

This algorithm is used to compute both the QR decomposition and the
Singular Value Decomposition.  It requires that the input array have a
single column of blocks, each of which fit in memory.

Parameters
----------
data: Array
compute_svd: bool
Whether to compute the SVD rather than the QR decomposition
_max_vchunk_size: Integer
Used internally in recursion to set the maximum row dimension
of chunks in subsequent recursive calls.

Notes
-----
With ``k`` blocks of size ``(m, n)``, this algorithm has memory use that
scales as ``k * n * n``.

The implementation here is the recursive variant due to the ultimate
need for one "single core" QR decomposition. In the non-recursive version
of the algorithm, given ``k`` blocks, after ``k`` ``m * n`` QR
decompositions, there will be a "single core" QR decomposition that will
have to work with a ``(k * n, n)`` matrix.

Here, recursion is applied as necessary to ensure that ``k * n`` is not
larger than ``m`` (if ``m / n >= 2``). In particular, this is done
to ensure that single core computations do not have to work on blocks
larger than ``(m, n)``.

Where blocks are irregular, the above logic is applied with the "height" of
the "tallest" block used in place of ``m``.

Consider use of the ``rechunk`` method to control this behavior.
Taller blocks will reduce overall memory use (assuming that many of them
still fit in memory at once).

--------
Variant for short-and-fat arrays
"""
nr, nc = len(data.chunks), len(data.chunks)
cr_max, cc = max(data.chunks), data.chunks

if not (data.ndim == 2 and nc == 1):  # Is a matrix  # Only one column block
raise ValueError(
"Input must have the following properties:\n"
"  1. Have two dimensions\n"
"  2. Have only one column of blocks\n\n"
"Note: This function (tsqr) supports QR decomposition in the case of\n"
"tall-and-skinny matrices (single column chunk/block; see qr)\n"
"Current shape: {},\nCurrent chunksize: {}".format(
data.shape, data.chunksize
)
)

token = "-" + tokenize(data, compute_svd)

m, n = data.shape
numblocks = (nr, 1)

qq, rr = np.linalg.qr(np.ones(shape=(1, 1), dtype=data.dtype))

# Block qr
name_qr_st1 = "qr" + token
dsk_qr_st1 = blockwise(
_wrapped_qr,
name_qr_st1,
"ij",
data.name,
"ij",
numblocks={data.name: numblocks},
)
layers[name_qr_st1] = dsk_qr_st1

# Block qr
name_q_st1 = "getitem" + token + "-q1"
dsk_q_st1 = {
(name_q_st1, i, 0): (operator.getitem, (name_qr_st1, i, 0), 0)
for i in range(numblocks)
}
layers[name_q_st1] = dsk_q_st1
dependencies[name_q_st1] = {name_qr_st1}

# Block qr
name_r_st1 = "getitem" + token + "-r1"
dsk_r_st1 = {
(name_r_st1, i, 0): (operator.getitem, (name_qr_st1, i, 0), 1)
for i in range(numblocks)
}
layers[name_r_st1] = dsk_r_st1
dependencies[name_r_st1] = {name_qr_st1}

# Next step is to obtain a QR decomposition for the stacked R factors, so either:
# - gather R factors into a single core and do a QR decomposition
# - recurse with tsqr (if single core computation too large and a-priori "meaningful
#   reduction" possible, meaning that chunks have to be well defined)

single_core_compute_m = nr * cc
chunks_well_defined = not any(np.isnan(c) for cs in data.chunks for c in cs)
prospective_blocks = np.ceil(single_core_compute_m / cr_max)
meaningful_reduction_possible = (
cr_max if _max_vchunk_size is None else _max_vchunk_size
) >= 2 * cc
can_distribute = chunks_well_defined and int(prospective_blocks) > 1

if chunks_well_defined and meaningful_reduction_possible and can_distribute:
# stack chunks into blocks and recurse using tsqr

# Prepare to stack chunks into blocks (from block qr)
all_blocks = []
curr_block = []
curr_block_sz = 0
for idx, a_m in enumerate(data.chunks):
m_q = a_m
n_q = min(m_q, cc)
m_r = n_q
# n_r = cc
if curr_block_sz + m_r > cr_max:
all_blocks.append(curr_block)
curr_block = []
curr_block_sz = 0
curr_block.append((idx, m_r))
curr_block_sz += m_r
if len(curr_block) > 0:
all_blocks.append(curr_block)

# R_stacked
name_r_stacked = "stack" + token + "-r1"
dsk_r_stacked = {
(name_r_stacked, i, 0): (
np.vstack,
(tuple, [(name_r_st1, idx, 0) for idx, _ in sub_block_info]),
)
for i, sub_block_info in enumerate(all_blocks)
}
layers[name_r_stacked] = dsk_r_stacked
dependencies[name_r_stacked] = {name_r_st1}

# retrieve R_stacked for recursion with tsqr
vchunks_rstacked = tuple(
sum(map(lambda x: x, sub_block_info)) for sub_block_info in all_blocks
)
graph = HighLevelGraph(layers, dependencies)
# dsk.dependencies[name_r_stacked] = {data.name}
r_stacked_meta = meta_from_array(
data, len((sum(vchunks_rstacked), n)), dtype=rr.dtype
)
r_stacked = Array(
graph,
name_r_stacked,
shape=(sum(vchunks_rstacked), n),
chunks=(vchunks_rstacked, n),
meta=r_stacked_meta,
)

# recurse
q_inner, r_inner = tsqr(r_stacked, _max_vchunk_size=cr_max)

# Q_inner: "unstack"
name_q_st2 = "getitem" + token + "-q2"
dsk_q_st2 = {
(name_q_st2, j, 0): (
operator.getitem,
(q_inner.name, i, 0),
((slice(e, e)), (slice(0, n))),
)
for i, sub_block_info in enumerate(all_blocks)
for j, e in zip(
[x for x in sub_block_info],
_cumsum_blocks([x for x in sub_block_info]),
)
}
layers[name_q_st2] = dsk_q_st2

# R: R_inner
name_r_st2 = "r-inner" + token
dsk_r_st2 = {(name_r_st2, 0, 0): (r_inner.name, 0, 0)}
layers[name_r_st2] = dsk_r_st2

# Q: Block qr (*) Q_inner
name_q_st3 = "dot" + token + "-q3"
dsk_q_st3 = blockwise(
np.dot,
name_q_st3,
"ij",
name_q_st1,
"ij",
name_q_st2,
"ij",
numblocks={name_q_st1: numblocks, name_q_st2: numblocks},
)
layers[name_q_st3] = dsk_q_st3
dependencies[name_q_st3] = {name_q_st1, name_q_st2}
else:
# Do single core computation

# Stacking for in-core QR computation
to_stack = [(name_r_st1, i, 0) for i in range(numblocks)]
name_r_st1_stacked = "stack" + token + "-r1"
dsk_r_st1_stacked = {(name_r_st1_stacked, 0, 0): (np.vstack, (tuple, to_stack))}
layers[name_r_st1_stacked] = dsk_r_st1_stacked
dependencies[name_r_st1_stacked] = {name_r_st1}

# In-core QR computation
name_qr_st2 = "qr" + token + "-qr2"
dsk_qr_st2 = blockwise(
np.linalg.qr,
name_qr_st2,
"ij",
name_r_st1_stacked,
"ij",
numblocks={name_r_st1_stacked: (1, 1)},
)
layers[name_qr_st2] = dsk_qr_st2
dependencies[name_qr_st2] = {name_r_st1_stacked}

# In-core qr
name_q_st2_aux = "getitem" + token + "-q2-aux"
dsk_q_st2_aux = {
(name_q_st2_aux, 0, 0): (operator.getitem, (name_qr_st2, 0, 0), 0)
}
layers[name_q_st2_aux] = dsk_q_st2_aux
dependencies[name_q_st2_aux] = {name_qr_st2}

chucks_are_all_known = not any(np.isnan(c) for cs in data.chunks for c in cs)
if chucks_are_all_known:
# when chunks are all known...
# obtain slices on q from in-core compute (e.g.: (slice(10, 20), slice(0, 5)))
q2_block_sizes = [min(e, n) for e in data.chunks]
block_slices = [
(slice(e, e), slice(0, n)) for e in _cumsum_blocks(q2_block_sizes)
]
dsk_q_blockslices = {}
deps = set()
else:
# when chunks are not already known...

# request shape information: vertical chunk sizes & column dimension (n)
name_q2bs = "shape" + token + "-q2"
dsk_q2_shapes = {
(name_q2bs, i): (min, (getattr, (data.name, i, 0), "shape"))
for i in range(numblocks)
}
name_n = "getitem" + token + "-n"
dsk_n = {
name_n: (operator.getitem, (getattr, (data.name, 0, 0), "shape"), 1)
}

# cumulative sums (start, end)
name_q2cs = "cumsum" + token + "-q2"
dsk_q2_cumsum = {(name_q2cs, 0): [0, (name_q2bs, 0)]}

for i in range(1, numblocks):
dsk_q2_cumsum[(name_q2cs, i)] = (
_cumsum_part,
(name_q2cs, i - 1),
(name_q2bs, i),
)

# obtain slices on q from in-core compute (e.g.: (slice(10, 20), slice(0, 5)))
name_blockslice = "slice" + token + "-q"
dsk_block_slices = {
(name_blockslice, i): (
tuple,
[(apply, slice, (name_q2cs, i)), (slice, 0, name_n)],
)
for i in range(numblocks)
}

dsk_q_blockslices = toolz.merge(
dsk_n, dsk_q2_shapes, dsk_q2_cumsum, dsk_block_slices
)

deps = {data.name}
block_slices = [(name_blockslice, i) for i in range(numblocks)]

layers["q-blocksizes" + token] = dsk_q_blockslices
dependencies["q-blocksizes" + token] = deps

# In-core qr unstacking
name_q_st2 = "getitem" + token + "-q2"
dsk_q_st2 = {
(name_q_st2, i, 0): (operator.getitem, (name_q_st2_aux, 0, 0), b)
for i, b in enumerate(block_slices)
}
layers[name_q_st2] = dsk_q_st2
if chucks_are_all_known:
dependencies[name_q_st2] = {name_q_st2_aux}
else:
dependencies[name_q_st2] = {name_q_st2_aux, "q-blocksizes" + token}

# Q: Block qr (*) In-core qr
name_q_st3 = "dot" + token + "-q3"
dsk_q_st3 = blockwise(
np.dot,
name_q_st3,
"ij",
name_q_st1,
"ij",
name_q_st2,
"ij",
numblocks={name_q_st1: numblocks, name_q_st2: numblocks},
)
layers[name_q_st3] = dsk_q_st3
dependencies[name_q_st3] = {name_q_st1, name_q_st2}

# R: In-core qr
name_r_st2 = "getitem" + token + "-r2"
dsk_r_st2 = {(name_r_st2, 0, 0): (operator.getitem, (name_qr_st2, 0, 0), 1)}
layers[name_r_st2] = dsk_r_st2
dependencies[name_r_st2] = {name_qr_st2}

if not compute_svd:
is_unknown_m = np.isnan(data.shape) or any(
np.isnan(c) for c in data.chunks
)
is_unknown_n = np.isnan(data.shape) or any(
np.isnan(c) for c in data.chunks
)

if is_unknown_m and is_unknown_n:
# assumption: m >= n
q_shape = data.shape
q_chunks = (data.chunks, (np.nan,))
r_shape = (np.nan, np.nan)
r_chunks = ((np.nan,), (np.nan,))
elif is_unknown_m and not is_unknown_n:
# assumption: m >= n
q_shape = data.shape
q_chunks = (data.chunks, (n,))
r_shape = (n, n)
r_chunks = (n, n)
elif not is_unknown_m and is_unknown_n:
# assumption: m >= n
q_shape = data.shape
q_chunks = (data.chunks, (np.nan,))
r_shape = (np.nan, np.nan)
r_chunks = ((np.nan,), (np.nan,))
else:
q_shape = (
data.shape
if data.shape >= data.shape
else (data.shape, data.shape)
)
q_chunks = (
data.chunks
if data.shape >= data.shape
else (data.chunks, data.chunks)
)
r_shape = (n, n) if data.shape >= data.shape else data.shape
r_chunks = r_shape

# dsk.dependencies[name_q_st3] = {data.name}
# dsk.dependencies[name_r_st2] = {data.name}
graph = HighLevelGraph(layers, dependencies)
q_meta = meta_from_array(data, len(q_shape), qq.dtype)
r_meta = meta_from_array(data, len(r_shape), rr.dtype)
q = Array(graph, name_q_st3, shape=q_shape, chunks=q_chunks, meta=q_meta)
r = Array(graph, name_r_st2, shape=r_shape, chunks=r_chunks, meta=r_meta)
return q, r
else:
# In-core SVD computation
name_svd_st2 = "svd" + token + "-2"
dsk_svd_st2 = blockwise(
np.linalg.svd,
name_svd_st2,
"ij",
name_r_st2,
"ij",
numblocks={name_r_st2: (1, 1)},
)
# svd
name_u_st2 = "getitem" + token + "-u2"
dsk_u_st2 = {(name_u_st2, 0, 0): (operator.getitem, (name_svd_st2, 0, 0), 0)}
# svd
name_s_st2 = "getitem" + token + "-s2"
dsk_s_st2 = {(name_s_st2, 0): (operator.getitem, (name_svd_st2, 0, 0), 1)}
# svd
name_v_st2 = "getitem" + token + "-v2"
dsk_v_st2 = {(name_v_st2, 0, 0): (operator.getitem, (name_svd_st2, 0, 0), 2)}
# Q * U
name_u_st4 = "getitem" + token + "-u4"
dsk_u_st4 = blockwise(
dotmany,
name_u_st4,
"ij",
name_q_st3,
"ik",
name_u_st2,
"kj",
numblocks={name_q_st3: numblocks, name_u_st2: (1, 1)},
)

layers[name_svd_st2] = dsk_svd_st2
dependencies[name_svd_st2] = {name_r_st2}
layers[name_u_st2] = dsk_u_st2
dependencies[name_u_st2] = {name_svd_st2}
layers[name_u_st4] = dsk_u_st4
dependencies[name_u_st4] = {name_q_st3, name_u_st2}
layers[name_s_st2] = dsk_s_st2
dependencies[name_s_st2] = {name_svd_st2}
layers[name_v_st2] = dsk_v_st2
dependencies[name_v_st2] = {name_svd_st2}

uu, ss, vvh = np.linalg.svd(np.ones(shape=(1, 1), dtype=data.dtype))

k = _nanmin(m, n)  # avoid RuntimeWarning with np.nanmin([m, n])

m_u = m
n_u = int(k) if not np.isnan(k) else k
n_s = n_u
m_vh = n_u
n_vh = n
d_vh = max(m_vh, n_vh)  # full matrix returned: but basically n
graph = HighLevelGraph(layers, dependencies)
u_meta = meta_from_array(data, len((m_u, n_u)), uu.dtype)
s_meta = meta_from_array(data, len((n_s,)), ss.dtype)
vh_meta = meta_from_array(data, len((d_vh, d_vh)), vvh.dtype)
u = Array(
graph,
name_u_st4,
shape=(m_u, n_u),
chunks=(data.chunks, (n_u,)),
meta=u_meta,
)
s = Array(graph, name_s_st2, shape=(n_s,), chunks=((n_s,),), meta=s_meta)
vh = Array(
graph, name_v_st2, shape=(d_vh, d_vh), chunks=((n,), (n,)), meta=vh_meta
)
return u, s, vh

[docs]def sfqr(data, name=None):
"""Direct Short-and-Fat QR

Currently, this is a quick hack for non-tall-and-skinny matrices which
are one chunk tall and (unless they are one chunk wide) have chunks
that are wider than they are tall

Q [R_1 R_2 ...] = [A_1 A_2 ...]

it computes the factorization Q R_1 = A_1, then computes the other
R_k's in parallel.

Parameters
----------
data: Array

--------
Main user API that uses this function
Variant for tall-and-skinny case
"""
nr, nc = len(data.chunks), len(data.chunks)
cr, cc = data.chunks, data.chunks

if not (
(data.ndim == 2)
and (nr == 1)  # Is a matrix
and (  # Has exactly one block row
(cr <= cc)
or (nc == 1)  # Chunking dimension on rows is at least that on cols or...
)
):  # ... only one block col
raise ValueError(
"Input must have the following properties:\n"
"  1. Have two dimensions\n"
"  2. Have only one row of blocks\n"
"  3. Either one column of blocks or (first) chunk size on cols\n"
"     is at most that on rows (e.g.: for a 5x20 matrix,\n"
"     chunks=((5), (8,4,8)) is fine, but chunks=((5), (4,8,8)) is not;\n"
"     still, prefer something simple like chunks=(5,10) or chunks=5)\n\n"
"Note: This function (sfqr) supports QR decomposition in the case\n"
"of short-and-fat matrices (single row chunk/block; see qr)"
)

prefix = name or "sfqr-" + tokenize(data)
prefix += "_"

m, n = data.shape

qq, rr = np.linalg.qr(np.ones(shape=(1, 1), dtype=data.dtype))

# data = A = [A_1 A_rest]
name_A_1 = prefix + "A_1"
name_A_rest = prefix + "A_rest"
layers[name_A_1] = {(name_A_1, 0, 0): (data.name, 0, 0)}
layers[name_A_rest] = {
(name_A_rest, 0, idx): (data.name, 0, 1 + idx) for idx in range(nc - 1)
}
if len(layers[name_A_rest]) > 0:
else:
dependencies[name_A_rest] = set()

# Q R_1 = A_1
name_Q_R1 = prefix + "Q_R_1"
name_Q = prefix + "Q"
name_R_1 = prefix + "R_1"
layers[name_Q_R1] = {(name_Q_R1, 0, 0): (np.linalg.qr, (name_A_1, 0, 0))}
dependencies[name_Q_R1] = {name_A_1}

layers[name_Q] = {(name_Q, 0, 0): (operator.getitem, (name_Q_R1, 0, 0), 0)}
dependencies[name_Q] = {name_Q_R1}

layers[name_R_1] = {(name_R_1, 0, 0): (operator.getitem, (name_Q_R1, 0, 0), 1)}
dependencies[name_R_1] = {name_Q_R1}

graph = HighLevelGraph(layers, dependencies)

Q_meta = meta_from_array(data, len((m, min(m, n))), dtype=qq.dtype)
R_1_meta = meta_from_array(data, len((min(m, n), cc)), dtype=rr.dtype)
Q = Array(graph, name_Q, shape=(m, min(m, n)), chunks=(m, min(m, n)), meta=Q_meta)
R_1 = Array(graph, name_R_1, shape=(min(m, n), cc), chunks=(cr, cc), meta=R_1_meta)

# R = [R_1 Q'A_rest]
Rs = [R_1]

if nc > 1:
A_rest_meta = meta_from_array(data, len((min(m, n), n - cc)), dtype=rr.dtype)
A_rest = Array(
graph,
name_A_rest,
shape=(min(m, n), n - cc),
chunks=(cr, data.chunks[1:]),
meta=A_rest_meta,
)
Rs.append(Q.T.dot(A_rest))

R = concatenate(Rs, axis=1)

return Q, R

def compression_level(n, q, n_oversamples=10, min_subspace_size=20):
"""Compression level to use in svd_compressed

Given the size ``n`` of a space, compress that that to one of size
``q`` plus n_oversamples.

The oversampling allows for greater flexibility in finding an
appropriate subspace, a low value is often enough (10 is already a
very conservative choice, it can be further reduced).
``q + oversampling`` should not be larger than ``n``.  In this
specific implementation, ``q + n_oversamples`` is at least
``min_subspace_size``.

Parameters
----------
n: int
Column/row dimension of original matrix
q: int
Size of the desired subspace (the actual size will be bigger,
because of oversampling, see ``da.linalg.compression_level``)
n_oversamples: int, default=10
Number of oversamples used for generating the sampling matrix.
min_subspace_size : int, default=20
Minimum subspace size.

Examples
--------
>>> compression_level(100, 10)
20
"""
return min(max(min_subspace_size, q + n_oversamples), n)

def compression_matrix(
data,
q,
iterator="power",
n_power_iter=0,
n_oversamples=10,
seed=None,
compute=False,
):
"""Randomly sample matrix to find most active subspace

This compression matrix returned by this algorithm can be used to
compute both the QR decomposition and the Singular Value
Decomposition.

Parameters
----------
data: Array
q: int
Size of the desired subspace (the actual size will be bigger,
because of oversampling, see ``da.linalg.compression_level``)
iterator: {'power', 'QR'}, default='power'
Define the technique used for iterations to cope with flat
singular spectra or when the input matrix is very large.
n_power_iter: int
Number of power iterations, useful when the singular values
decay slowly. Error decreases exponentially as `n_power_iter`
increases. In practice, set `n_power_iter` <= 4.
n_oversamples: int, default=10
Number of oversamples used for generating the sampling matrix.
This value increases the size of the subspace computed, which is more
accurate at the cost of efficiency.  Results are rarely sensitive to this choice
though and in practice a value of 10 is very commonly high enough.
compute : bool
Whether or not to compute data at each use.
Recomputing the input while performing several passes reduces memory
pressure, but means that we have to compute the input multiple times.
This is a good choice if the data is larger than memory and cheap to
recreate.

References
----------
N. Halko, P. G. Martinsson, and J. A. Tropp.
Finding structure with randomness: Probabilistic algorithms for
constructing approximate matrix decompositions.
SIAM Rev., Survey and Review section, Vol. 53, num. 2,
pp. 217-288, June 2011
https://arxiv.org/abs/0909.4061
"""
if iterator not in ["power", "QR"]:
raise ValueError(
f"Iterator '{iterator}' not valid, must one one of ['power', 'QR']"
)
m, n = data.shape
comp_level = compression_level(min(m, n), q, n_oversamples=n_oversamples)
if isinstance(seed, RandomState):
state = seed
else:
state = default_rng(seed)
datatype = np.float64
if (data.dtype).type in {np.float32, np.complex64}:
datatype = np.float32
omega = state.standard_normal(
size=(n, comp_level), chunks=(data.chunks, (comp_level,))
).astype(datatype, copy=False)
mat_h = data.dot(omega)
if iterator == "power":
for _ in range(n_power_iter):
if compute:
mat_h = mat_h.persist()
wait(mat_h)
tmp = data.T.dot(mat_h)
if compute:
tmp = tmp.persist()
wait(tmp)
mat_h = data.dot(tmp)
q, _ = tsqr(mat_h)
else:
q, _ = tsqr(mat_h)
for _ in range(n_power_iter):
if compute:
q = q.persist()
wait(q)
q, _ = tsqr(data.T.dot(q))
if compute:
q = q.persist()
wait(q)
q, _ = tsqr(data.dot(q))
return q.T

[docs]def svd_compressed(
a,
k,
iterator="power",
n_power_iter=0,
n_oversamples=10,
seed=None,
compute=False,
coerce_signs=True,
):
"""Randomly compressed rank-k thin Singular Value Decomposition.

This computes the approximate singular value decomposition of a large
array.  This algorithm is generally faster than the normal algorithm
but does not provide exact results.  One can balance between
performance and accuracy with input parameters (see below).

Parameters
----------
a: Array
Input array
k: int
Rank of the desired thin SVD decomposition.
iterator: {'power', 'QR'}, default='power'
Define the technique used for iterations to cope with flat
singular spectra or when the input matrix is very large.
n_power_iter: int, default=0
Number of power iterations, useful when the singular values
decay slowly. Error decreases exponentially as `n_power_iter`
increases. In practice, set `n_power_iter` <= 4.
n_oversamples: int, default=10
Number of oversamples used for generating the sampling matrix.
This value increases the size of the subspace computed, which is more
accurate at the cost of efficiency.  Results are rarely sensitive to this choice
though and in practice a value of 10 is very commonly high enough.
compute : bool
Whether or not to compute data at each use.
Recomputing the input while performing several passes reduces memory
pressure, but means that we have to compute the input multiple times.
This is a good choice if the data is larger than memory and cheap to
recreate.
coerce_signs : bool
Whether or not to apply sign coercion to singular vectors in
order to maintain deterministic results, by default True.

Examples
--------
>>> u, s, v = svd_compressed(x, 20)  # doctest: +SKIP

Returns
-------
u:  Array, unitary / orthogonal
s:  Array, singular values in decreasing order (largest first)
v:  Array, unitary / orthogonal

References
----------
N. Halko, P. G. Martinsson, and J. A. Tropp.
Finding structure with randomness: Probabilistic algorithms for
constructing approximate matrix decompositions.
SIAM Rev., Survey and Review section, Vol. 53, num. 2,
pp. 217-288, June 2011
https://arxiv.org/abs/0909.4061
"""
comp = compression_matrix(
a,
k,
iterator=iterator,
n_power_iter=n_power_iter,
n_oversamples=n_oversamples,
seed=seed,
compute=compute,
)
if compute:
comp = comp.persist()
wait(comp)
a_compressed = comp.dot(a)
v, s, u = tsqr(a_compressed.T, compute_svd=True)
u = comp.T.dot(u.T)
v = v.T
u = u[:, :k]
s = s[:k]
v = v[:k, :]
if coerce_signs:
u, v = svd_flip(u, v)
return u, s, v

[docs]def qr(a):
"""
Compute the qr factorization of a matrix.

Parameters
----------
a : Array

Returns
-------
q:  Array, orthonormal
r:  Array, upper-triangular

Examples
--------
>>> q, r = da.linalg.qr(x)  # doctest: +SKIP

--------
numpy.linalg.qr: Equivalent NumPy Operation
"""

if len(a.chunks) == 1 and len(a.chunks) > 1:
return tsqr(a)
elif len(a.chunks) == 1:
return sfqr(a)
else:
raise NotImplementedError(
"qr currently supports only tall-and-skinny (single column chunk/block; see tsqr)\n"
"and short-and-fat (single row chunk/block; see sfqr) matrices\n\n"
"Consider use of the rechunk method. For example,\n\n"
"x.rechunk({0: -1, 1: 'auto'}) or x.rechunk({0: 'auto', 1: -1})\n\n"
"which rechunk one shorter axis to a single chunk, while allowing\n"
"the other axis to automatically grow/shrink appropriately."
)

[docs]def svd(a, coerce_signs=True):
"""
Compute the singular value decomposition of a matrix.

Parameters
----------
a : (M, N) Array
coerce_signs : bool
Whether or not to apply sign coercion to singular vectors in
order to maintain deterministic results, by default True.

Examples
--------

>>> u, s, v = da.linalg.svd(x)  # doctest: +SKIP

Returns
-------

u : (M, K) Array, unitary / orthogonal
Left-singular vectors of `a` (in columns) with shape (M, K)
where K = min(M, N).
s : (K,) Array, singular values in decreasing order (largest first)
Singular values of `a`.
v : (K, N) Array, unitary / orthogonal
Right-singular vectors of `a` (in rows) with shape (K, N)
where K = min(M, N).

Warnings
--------

SVD is only supported for arrays with chunking in one dimension.
This requires that all inputs either contain a single column
of chunks (tall-and-skinny) or a single row of chunks (short-and-fat).
For arrays with chunking in both dimensions, see da.linalg.svd_compressed.

--------

np.linalg.svd : Equivalent NumPy Operation
da.linalg.svd_compressed : Randomized SVD for fully chunked arrays
dask.array.linalg.tsqr : QR factorization for tall-and-skinny arrays
dask.array.utils.svd_flip : Sign normalization for singular vectors
"""
nb = a.numblocks
if a.ndim != 2:
raise ValueError(
"Array must be 2D.\n"
"Input shape: {}\n"
"Input ndim: {}\n".format(a.shape, a.ndim)
)
if nb > 1 and nb > 1:
raise NotImplementedError(
"Array must be chunked in one dimension only. "
"This function (svd) only supports tall-and-skinny or short-and-fat "
"matrices (see da.linalg.svd_compressed for SVD on fully chunked arrays).\n"
"Input shape: {}\n"
"Input numblocks: {}\n".format(a.shape, nb)
)

# Single-chunk case
if nb == nb == 1:
m, n = a.shape
k = min(a.shape)
mu, ms, mv = np.linalg.svd(
np.ones_like(a._meta, shape=(1, 1), dtype=a._meta.dtype)
)
u, s, v = delayed(np.linalg.svd, nout=3)(a, full_matrices=False)
u = from_delayed(u, shape=(m, k), meta=mu)
s = from_delayed(s, shape=(k,), meta=ms)
v = from_delayed(v, shape=(k, n), meta=mv)
# Multi-chunk cases
else:
# Tall-and-skinny case
if nb > nb:
u, s, v = tsqr(a, compute_svd=True)
truncate = a.shape < a.shape
# Short-and-fat case
else:
vt, s, ut = tsqr(a.T, compute_svd=True)
u, s, v = ut.T, s, vt.T
truncate = a.shape > a.shape
# Only when necessary, remove extra singular vectors if array
# has shape that contradicts chunking, e.g. the array is a
# column of chunks but still has more columns than rows overall
if truncate:
k = min(a.shape)
u, v = u[:, :k], v[:k, :]
if coerce_signs:
u, v = svd_flip(u, v)
return u, s, v

def _solve_triangular_lower(a, b):
return solve_triangular_safe(a, b, lower=True)

[docs]def lu(a):
"""
Compute the lu decomposition of a matrix.

Examples
--------

>>> p, l, u = da.linalg.lu(x)  # doctest: +SKIP

Returns
-------

p:  Array, permutation matrix
l:  Array, lower triangular matrix with unit diagonal.
u:  Array, upper triangular matrix
"""
import scipy.linalg

if a.ndim != 2:
raise ValueError("Dimension must be 2 to perform lu decomposition")

xdim, ydim = a.shape
if xdim != ydim:
raise ValueError("Input must be a square matrix to perform lu decomposition")
if not len(set(a.chunks + a.chunks)) == 1:
msg = (
"All chunks must be a square matrix to perform lu decomposition. "
"Use .rechunk method to change the size of chunks."
)
raise ValueError(msg)

vdim = len(a.chunks)
hdim = len(a.chunks)

token = tokenize(a)
name_lu = "lu-lu-" + token

name_p = "lu-p-" + token
name_l = "lu-l-" + token
name_u = "lu-u-" + token

# for internal calculation
name_p_inv = "lu-p-inv-" + token
name_l_permuted = "lu-l-permute-" + token
name_u_transposed = "lu-u-transpose-" + token
name_plu_dot = "lu-plu-dot-" + token
name_lu_dot = "lu-lu-dot-" + token

dsk = {}
for i in range(min(vdim, hdim)):
target = (a.name, i, i)
if i > 0:
prevs = []
for p in range(i):
prev = name_plu_dot, i, p, p, i
dsk[prev] = (np.dot, (name_l_permuted, i, p), (name_u, p, i))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
# diagonal block
dsk[name_lu, i, i] = (scipy.linalg.lu, target)

# sweep to horizontal
for j in range(i + 1, hdim):
target = (np.dot, (name_p_inv, i, i), (a.name, i, j))
if i > 0:
prevs = []
for p in range(i):
prev = name_lu_dot, i, p, p, j
dsk[prev] = (np.dot, (name_l, i, p), (name_u, p, j))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
dsk[name_lu, i, j] = (_solve_triangular_lower, (name_l, i, i), target)

# sweep to vertical
for k in range(i + 1, vdim):
target = (a.name, k, i)
if i > 0:
prevs = []
for p in range(i):
prev = name_plu_dot, k, p, p, i
dsk[prev] = (np.dot, (name_l_permuted, k, p), (name_u, p, i))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
# solving x.dot(u) = target is equal to u.T.dot(x.T) = target.T
dsk[name_lu, k, i] = (
np.transpose,
(
_solve_triangular_lower,
(name_u_transposed, i, i),
(np.transpose, target),
),
)

for i in range(min(vdim, hdim)):
for j in range(min(vdim, hdim)):
if i == j:
dsk[name_p, i, j] = (operator.getitem, (name_lu, i, j), 0)
dsk[name_l, i, j] = (operator.getitem, (name_lu, i, j), 1)
dsk[name_u, i, j] = (operator.getitem, (name_lu, i, j), 2)

# permuted l is required to be propagated to i > j blocks
dsk[name_l_permuted, i, j] = (np.dot, (name_p, i, j), (name_l, i, j))
dsk[name_u_transposed, i, j] = (np.transpose, (name_u, i, j))
# transposed permutation matrix is equal to its inverse
dsk[name_p_inv, i, j] = (np.transpose, (name_p, i, j))
elif i > j:
dsk[name_p, i, j] = (np.zeros, (a.chunks[i], a.chunks[j]))
# calculations are performed using permuted l,
# thus the result should be reverted by inverted (=transposed) p
# to have the same row order as diagonal blocks
dsk[name_l, i, j] = (np.dot, (name_p_inv, i, i), (name_lu, i, j))
dsk[name_u, i, j] = (np.zeros, (a.chunks[i], a.chunks[j]))
dsk[name_l_permuted, i, j] = (name_lu, i, j)
else:
dsk[name_p, i, j] = (np.zeros, (a.chunks[i], a.chunks[j]))
dsk[name_l, i, j] = (np.zeros, (a.chunks[i], a.chunks[j]))
dsk[name_u, i, j] = (name_lu, i, j)
# l_permuted is not referred in upper triangulars

pp, ll, uu = scipy.linalg.lu(np.ones(shape=(1, 1), dtype=a.dtype))
pp_meta = meta_from_array(a, dtype=pp.dtype)
ll_meta = meta_from_array(a, dtype=ll.dtype)
uu_meta = meta_from_array(a, dtype=uu.dtype)

graph = HighLevelGraph.from_collections(name_p, dsk, dependencies=[a])
p = Array(graph, name_p, shape=a.shape, chunks=a.chunks, meta=pp_meta)

graph = HighLevelGraph.from_collections(name_l, dsk, dependencies=[a])
l = Array(graph, name_l, shape=a.shape, chunks=a.chunks, meta=ll_meta)

graph = HighLevelGraph.from_collections(name_u, dsk, dependencies=[a])
u = Array(graph, name_u, shape=a.shape, chunks=a.chunks, meta=uu_meta)

return p, l, u

[docs]def solve_triangular(a, b, lower=False):
"""
Solve the equation `a x = b` for `x`, assuming a is a triangular matrix.

Parameters
----------
a : (M, M) array_like
A triangular matrix
b : (M,) or (M, N) array_like
Right-hand side matrix in `a x = b`
lower : bool, optional
Use only data contained in the lower triangle of `a`.
Default is to use upper triangle.

Returns
-------
x : (M,) or (M, N) array
Solution to the system `a x = b`. Shape of return matches `b`.
"""

if a.ndim != 2:
raise ValueError("a must be 2 dimensional")
if b.ndim <= 2:
if a.shape != b.shape:
raise ValueError("a.shape and b.shape must be equal")
if a.chunks != b.chunks:
msg = (
"a.chunks and b.chunks must be equal. "
"Use .rechunk method to change the size of chunks."
)
raise ValueError(msg)
else:
raise ValueError("b must be 1 or 2 dimensional")

vchunks = len(a.chunks)
hchunks = 1 if b.ndim == 1 else len(b.chunks)
token = tokenize(a, b, lower)
name = "solve-triangular-" + token

# for internal calculation
# (name, i, j, k, l) corresponds to a_ij.dot(b_kl)
name_mdot = "solve-tri-dot-" + token

def _b_init(i, j):
if b.ndim == 1:
return b.name, i
else:
return b.name, i, j

def _key(i, j):
if b.ndim == 1:
return name, i
else:
return name, i, j

dsk = {}
if lower:
for i in range(vchunks):
for j in range(hchunks):
target = _b_init(i, j)
if i > 0:
prevs = []
for k in range(i):
prev = name_mdot, i, k, k, j
dsk[prev] = (np.dot, (a.name, i, k), _key(k, j))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
dsk[_key(i, j)] = (_solve_triangular_lower, (a.name, i, i), target)
else:
for i in range(vchunks):
for j in range(hchunks):
target = _b_init(i, j)
if i < vchunks - 1:
prevs = []
for k in range(i + 1, vchunks):
prev = name_mdot, i, k, k, j
dsk[prev] = (np.dot, (a.name, i, k), _key(k, j))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
dsk[_key(i, j)] = (
solve_triangular_safe,
(a.name, i, i),
target,
)

graph = HighLevelGraph.from_collections(name, dsk, dependencies=[a, b])

a_meta = meta_from_array(a)
b_meta = meta_from_array(b)
res = _solve_triangular_lower(
array_safe([[1, 0], [1, 2]], dtype=a.dtype, like=a_meta),
array_safe([0, 1], dtype=b.dtype, like=b_meta),
)
meta = meta_from_array(a, b.ndim, dtype=res.dtype)
return Array(graph, name, shape=b.shape, chunks=b.chunks, meta=meta)

[docs]def solve(a, b, sym_pos=None, assume_a="gen"):
"""
Solve the equation ``a x = b`` for ``x``. By default, use LU
decomposition and forward / backward substitutions. When ``assume_a = "pos"``
use Cholesky decomposition.

Parameters
----------
a : (M, M) array_like
A square matrix.
b : (M,) or (M, N) array_like
Right-hand side matrix in ``a x = b``.
sym_pos : bool, optional
Assume a is symmetric and positive definite. If ``True``, use Cholesky
decomposition.

.. note::
``sym_pos`` is deprecated and will be removed in a future version.

assume_a : {'gen', 'pos'}, optional
Type of data matrix. It is used to choose the dedicated solver.
Note that Dask does not support 'her' and 'sym' types.

.. versionchanged:: 2022.8.0
``assume_a = 'pos'`` was previously defined as ``sym_pos = True``.

Returns
-------
x : (M,) or (M, N) Array
Solution to the system ``a x = b``.  Shape of the return matches the
shape of `b`.

--------
scipy.linalg.solve
"""
if sym_pos is not None:
warnings.warn(
"The sym_pos keyword is deprecated and should be replaced by using ``assume_a = 'pos'``."
"``sym_pos`` will be removed in a future version.",
category=FutureWarning,
)
if sym_pos:
assume_a = "pos"

if assume_a == "pos":
l, u = _cholesky(a)
elif assume_a == "gen":
p, l, u = lu(a)
b = p.T.dot(b)
else:
raise ValueError(
f"{assume_a = } is not a recognized matrix structure, valid structures in Dask are 'pos' and 'gen'."
)

uy = solve_triangular(l, b, lower=True)
return solve_triangular(u, uy)

[docs]def inv(a):
"""
Compute the inverse of a matrix with LU decomposition and
forward / backward substitutions.

Parameters
----------
a : array_like
Square matrix to be inverted.

Returns
-------
ainv : Array
Inverse of the matrix `a`.
"""
return solve(a, eye(a.shape, chunks=a.chunks))

def _cholesky_lower(a):
return np.linalg.cholesky(a)

[docs]def cholesky(a, lower=False):
"""
Returns the Cholesky decomposition, :math:`A = L L^*` or
:math:`A = U^* U` of a Hermitian positive-definite matrix A.

Parameters
----------
a : (M, M) array_like
Matrix to be decomposed
lower : bool, optional
Whether to compute the upper or lower triangular Cholesky
factorization.  Default is upper-triangular.

Returns
-------
c : (M, M) Array
Upper- or lower-triangular Cholesky factor of `a`.
"""

l, u = _cholesky(a)
if lower:
return l
else:
return u

def _cholesky(a):
"""
Private function to perform Cholesky decomposition, which returns both
lower and upper triangulars.
"""

if a.ndim != 2:
raise ValueError("Dimension must be 2 to perform cholesky decomposition")

xdim, ydim = a.shape
if xdim != ydim:
raise ValueError(
"Input must be a square matrix to perform cholesky decomposition"
)
if not len(set(a.chunks + a.chunks)) == 1:
msg = (
"All chunks must be a square matrix to perform cholesky decomposition. "
"Use .rechunk method to change the size of chunks."
)
raise ValueError(msg)

vdim = len(a.chunks)
hdim = len(a.chunks)

token = tokenize(a)
name = "cholesky-" + token

# (name_lt_dot, i, j, k, l) corresponds to l_ij.dot(l_kl.T)
name_lt_dot = "cholesky-lt-dot-" + token
# because transposed results are needed for calculation,
# we can build graph for upper triangular simultaneously
name_upper = "cholesky-upper-" + token

# calculates lower triangulars because subscriptions get simpler
dsk = {}
for i in range(vdim):
for j in range(hdim):
if i < j:
dsk[name, i, j] = (
partial(np.zeros_like, shape=(a.chunks[i], a.chunks[j])),
meta_from_array(a),
)
dsk[name_upper, j, i] = (name, i, j)
elif i == j:
target = (a.name, i, j)
if i > 0:
prevs = []
for p in range(i):
prev = name_lt_dot, i, p, i, p
dsk[prev] = (np.dot, (name, i, p), (name_upper, p, i))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
dsk[name, i, i] = (_cholesky_lower, target)
dsk[name_upper, i, i] = (np.transpose, (name, i, i))
else:
# solving x.dot(L11.T) = (A21 - L20.dot(L10.T)) is equal to
# L11.dot(x.T) = A21.T - L10.dot(L20.T)
# L11.dot(x.T) = A12 - L10.dot(L02)
target = (a.name, j, i)
if j > 0:
prevs = []
for p in range(j):
prev = name_lt_dot, j, p, i, p
dsk[prev] = (np.dot, (name, j, p), (name_upper, p, i))
prevs.append(prev)
target = (operator.sub, target, (sum, prevs))
dsk[name_upper, j, i] = (_solve_triangular_lower, (name, j, j), target)
dsk[name, i, j] = (np.transpose, (name_upper, j, i))

graph_upper = HighLevelGraph.from_collections(name_upper, dsk, dependencies=[a])
graph_lower = HighLevelGraph.from_collections(name, dsk, dependencies=[a])
a_meta = meta_from_array(a)
cho = np.linalg.cholesky(array_safe([[1, 2], [2, 5]], dtype=a.dtype, like=a_meta))
meta = meta_from_array(a, dtype=cho.dtype)

lower = Array(graph_lower, name, shape=a.shape, chunks=a.chunks, meta=meta)
# do not use .T, because part of transposed blocks are already calculated
upper = Array(graph_upper, name_upper, shape=a.shape, chunks=a.chunks, meta=meta)
return lower, upper

def _reverse(x):
return x[::-1]

[docs]def lstsq(a, b):
"""
Return the least-squares solution to a linear matrix equation using
QR decomposition.

Solves the equation `a x = b` by computing a vector `x` that
minimizes the Euclidean 2-norm `|| b - a x ||^2`.  The equation may
be under-, well-, or over- determined (i.e., the number of
linearly independent rows of `a` can be less than, equal to, or
greater than its number of linearly independent columns).  If `a`
is square and of full rank, then `x` (but for round-off error) is
the "exact" solution of the equation.

Parameters
----------
a : (M, N) array_like
"Coefficient" matrix.
b : {(M,), (M, K)} array_like
Ordinate or "dependent variable" values. If `b` is two-dimensional,
the least-squares solution is calculated for each of the `K` columns
of `b`.

Returns
-------
x : {(N,), (N, K)} Array
Least-squares solution. If `b` is two-dimensional,
the solutions are in the `K` columns of `x`.
residuals : {(1,), (K,)} Array
Sums of residuals; squared Euclidean 2-norm for each column in
``b - a*x``.
If `b` is 1-dimensional, this is a (1,) shape array.
Otherwise the shape is (K,).
rank : Array
Rank of matrix `a`.
s : (min(M, N),) Array
Singular values of `a`.
"""
q, r = qr(a)
x = solve_triangular(r, q.T.conj().dot(b))
residuals = b - a.dot(x)
residuals = abs(residuals**2).sum(axis=0, keepdims=b.ndim == 1)

token = tokenize(a, b)

# r must be a triangular with single block

# rank
rname = "lstsq-rank-" + token
rdsk = {(rname,): (np.linalg.matrix_rank, (r.name, 0, 0))}
graph = HighLevelGraph.from_collections(rname, rdsk, dependencies=[r])
# rank must be an integer
rank = Array(graph, rname, shape=(), chunks=(), dtype=int)

# singular
sname = "lstsq-singular-" + token
rt = r.T.conj()
sdsk = {
(sname, 0): (
_reverse,
(np.sqrt, (np.linalg.eigvalsh, (np.dot, (rt.name, 0, 0), (r.name, 0, 0)))),
)
}
graph = HighLevelGraph.from_collections(sname, sdsk, dependencies=[rt, r])
meta = meta_from_array(residuals, 1)
s = Array(graph, sname, shape=(r.shape,), chunks=r.shape, meta=meta)

return x, residuals, rank, s

[docs]@derived_from(np.linalg)
def norm(x, ord=None, axis=None, keepdims=False):
if axis is None:
axis = tuple(range(x.ndim))
elif isinstance(axis, Number):
axis = (int(axis),)
else:
axis = tuple(axis)

if len(axis) > 2:
raise ValueError("Improper number of dimensions to norm.")

if ord == "fro":
ord = None
if len(axis) == 1:
raise ValueError("Invalid norm order for vectors.")

# Coerce to double precision.
r = x.astype(np.promote_types(x.dtype, float))

if ord is None:
r = (abs(r) ** 2).sum(axis=axis, keepdims=keepdims) ** 0.5
elif ord == "nuc":
if len(axis) == 1:
raise ValueError("Invalid norm order for vectors.")
if x.ndim > 2:
raise NotImplementedError("SVD based norm not implemented for ndim > 2")

r = svd(x)[None].sum(keepdims=keepdims)
elif ord == np.inf:
r = abs(r)
if len(axis) == 1:
r = r.max(axis=axis, keepdims=keepdims)
else:
r = r.sum(axis=axis, keepdims=True).max(axis=axis, keepdims=True)
if keepdims is False:
r = r.squeeze(axis=axis)
elif ord == -np.inf:
r = abs(r)
if len(axis) == 1:
r = r.min(axis=axis, keepdims=keepdims)
else:
r = r.sum(axis=axis, keepdims=True).min(axis=axis, keepdims=True)
if keepdims is False:
r = r.squeeze(axis=axis)
elif ord == 0:
if len(axis) == 2:
raise ValueError("Invalid norm order for matrices.")

r = (r != 0).astype(r.dtype).sum(axis=axis, keepdims=keepdims)
elif ord == 1:
r = abs(r)
if len(axis) == 1:
r = r.sum(axis=axis, keepdims=keepdims)
else:
r = r.sum(axis=axis, keepdims=True).max(axis=axis, keepdims=True)
if keepdims is False:
r = r.squeeze(axis=axis)
elif len(axis) == 2 and ord == -1:
r = abs(r).sum(axis=axis, keepdims=True).min(axis=axis, keepdims=True)
if keepdims is False:
r = r.squeeze(axis=axis)
elif len(axis) == 2 and ord == 2:
if x.ndim > 2:
raise NotImplementedError("SVD based norm not implemented for ndim > 2")
r = svd(x)[None].max(keepdims=keepdims)
elif len(axis) == 2 and ord == -2:
if x.ndim > 2:
raise NotImplementedError("SVD based norm not implemented for ndim > 2")
r = svd(x)[None].min(keepdims=keepdims)
else:
if len(axis) == 2:
raise ValueError("Invalid norm order for matrices.")

r = (abs(r) ** ord).sum(axis=axis, keepdims=keepdims) ** (1.0 / ord)

return r
```