from __future__ import annotations
import datetime
from numbers import Integral
import pandas as pd
from pandas.api.types import is_datetime64_any_dtype
from pandas.core.window import Rolling as pd_Rolling
from dask.array.core import normalize_arg
from dask.base import tokenize
from dask.blockwise import BlockwiseDepDict
from dask.dataframe import methods
from dask.dataframe._compat import check_axis_keyword_deprecation
from dask.dataframe.core import (
Scalar,
_Frame,
_get_divisions_map_partitions,
_get_meta_map_partitions,
_maybe_from_pandas,
apply_and_enforce,
new_dd_object,
partitionwise_graph,
)
from dask.dataframe.io import from_pandas
from dask.dataframe.multi import _maybe_align_partitions
from dask.dataframe.utils import (
insert_meta_param_description,
is_dask_collection,
is_dataframe_like,
is_series_like,
)
from dask.delayed import unpack_collections
from dask.highlevelgraph import HighLevelGraph
from dask.typing import no_default
from dask.utils import M, apply, derived_from, funcname, has_keyword
CombinedOutput = type("CombinedOutput", (tuple,), {})
def _combined_parts(prev_part, current_part, next_part, before, after):
msg = (
"Partition size is less than overlapping "
"window size. Try using ``df.repartition`` "
"to increase the partition size."
)
if prev_part is not None and isinstance(before, Integral):
if prev_part.shape[0] != before:
raise NotImplementedError(msg)
if next_part is not None and isinstance(after, Integral):
if next_part.shape[0] != after:
raise NotImplementedError(msg)
parts = [p for p in (prev_part, current_part, next_part) if p is not None]
combined = methods.concat(parts)
return CombinedOutput(
(
combined,
len(prev_part) if prev_part is not None else None,
len(next_part) if next_part is not None else None,
)
)
def overlap_chunk(func, before, after, *args, **kwargs):
dfs = [df for df in args if isinstance(df, CombinedOutput)]
combined, prev_part_length, next_part_length = dfs[0]
args = [arg[0] if isinstance(arg, CombinedOutput) else arg for arg in args]
out = func(*args, **kwargs)
if prev_part_length is None:
before = None
if isinstance(before, datetime.timedelta):
before = prev_part_length
expansion = None
if combined.shape[0] != 0:
expansion = out.shape[0] // combined.shape[0]
if before and expansion:
before *= expansion
if next_part_length is None:
return out.iloc[before:]
if isinstance(after, datetime.timedelta):
after = next_part_length
if after and expansion:
after *= expansion
return out.iloc[before:-after]
[docs]@insert_meta_param_description
def map_overlap(
func,
df,
before,
after,
*args,
meta=no_default,
enforce_metadata=True,
transform_divisions=True,
align_dataframes=True,
**kwargs,
):
"""Apply a function to each partition, sharing rows with adjacent partitions.
Parameters
----------
func : function
The function applied to each partition. If this function accepts
the special ``partition_info`` keyword argument, it will recieve
information on the partition's relative location within the
dataframe.
df: dd.DataFrame, dd.Series
args, kwargs :
Positional and keyword arguments to pass to the function.
Positional arguments are computed on a per-partition basis, while
keyword arguments are shared across all partitions. The partition
itself will be the first positional argument, with all other
arguments passed *after*. Arguments can be ``Scalar``, ``Delayed``,
or regular Python objects. DataFrame-like args (both dask and
pandas) will be repartitioned to align (if necessary) before
applying the function; see ``align_dataframes`` to control this
behavior.
enforce_metadata : bool, default True
Whether to enforce at runtime that the structure of the DataFrame
produced by ``func`` actually matches the structure of ``meta``.
This will rename and reorder columns for each partition,
and will raise an error if this doesn't work,
but it won't raise if dtypes don't match.
before : int, timedelta or string timedelta
The rows to prepend to partition ``i`` from the end of
partition ``i - 1``.
after : int, timedelta or string timedelta
The rows to append to partition ``i`` from the beginning
of partition ``i + 1``.
transform_divisions : bool, default True
Whether to apply the function onto the divisions and apply those
transformed divisions to the output.
align_dataframes : bool, default True
Whether to repartition DataFrame- or Series-like args
(both dask and pandas) so their divisions align before applying
the function. This requires all inputs to have known divisions.
Single-partition inputs will be split into multiple partitions.
If False, all inputs must have either the same number of partitions
or a single partition. Single-partition inputs will be broadcast to
every partition of multi-partition inputs.
$META
See Also
--------
dd.DataFrame.map_overlap
"""
df = (
from_pandas(df, 1)
if (is_series_like(df) or is_dataframe_like(df)) and not is_dask_collection(df)
else df
)
args = (df,) + args
if isinstance(before, str):
before = pd.to_timedelta(before)
if isinstance(after, str):
after = pd.to_timedelta(after)
if isinstance(before, datetime.timedelta) or isinstance(after, datetime.timedelta):
if not is_datetime64_any_dtype(df.index._meta_nonempty.inferred_type):
raise TypeError(
"Must have a `DatetimeIndex` when using string offset "
"for `before` and `after`"
)
else:
if not (
isinstance(before, Integral)
and before >= 0
and isinstance(after, Integral)
and after >= 0
):
raise ValueError("before and after must be positive integers")
name = kwargs.pop("token", None)
parent_meta = kwargs.pop("parent_meta", None)
assert callable(func)
if name is not None:
token = tokenize(meta, before, after, *args, **kwargs)
else:
name = "overlap-" + funcname(func)
token = tokenize(func, meta, before, after, *args, **kwargs)
name = f"{name}-{token}"
if align_dataframes:
args = _maybe_from_pandas(args)
try:
args = _maybe_align_partitions(args)
except ValueError as e:
raise ValueError(
f"{e}. If you don't want the partitions to be aligned, and are "
"calling `map_overlap` directly, pass `align_dataframes=False`."
) from e
dfs = [df for df in args if isinstance(df, _Frame)]
meta = _get_meta_map_partitions(args, dfs, func, kwargs, meta, parent_meta)
if all(isinstance(arg, Scalar) for arg in args):
layer = {
(name, 0): (
apply,
func,
(tuple, [(arg._name, 0) for arg in args]),
kwargs,
)
}
graph = HighLevelGraph.from_collections(name, layer, dependencies=args)
return Scalar(graph, name, meta)
args2 = []
dependencies = []
divisions = _get_divisions_map_partitions(
align_dataframes, transform_divisions, dfs, func, args, kwargs
)
def _handle_frame_argument(arg):
dsk = {}
prevs_parts_dsk, prevs = _get_previous_partitions(arg, before)
dsk.update(prevs_parts_dsk)
nexts_parts_dsk, nexts = _get_nexts_partitions(arg, after)
dsk.update(nexts_parts_dsk)
name_a = "overlap-concat-" + tokenize(arg)
for i, (prev, current, next) in enumerate(
zip(prevs, arg.__dask_keys__(), nexts)
):
key = (name_a, i)
dsk[key] = (_combined_parts, prev, current, next, before, after)
graph = HighLevelGraph.from_collections(name_a, dsk, dependencies=[arg])
return new_dd_object(graph, name_a, meta, divisions)
for arg in args:
if isinstance(arg, _Frame):
arg = _handle_frame_argument(arg)
args2.append(arg)
dependencies.append(arg)
continue
arg = normalize_arg(arg)
arg2, collections = unpack_collections(arg)
if collections:
args2.append(arg2)
dependencies.extend(collections)
else:
args2.append(arg)
kwargs3 = {}
simple = True
for k, v in kwargs.items():
v = normalize_arg(v)
v, collections = unpack_collections(v)
dependencies.extend(collections)
kwargs3[k] = v
if collections:
simple = False
if has_keyword(func, "partition_info"):
partition_info = {
(i,): {"number": i, "division": division}
for i, division in enumerate(divisions[:-1])
}
args2.insert(0, BlockwiseDepDict(partition_info))
orig_func = func
def func(partition_info, *args, **kwargs):
return orig_func(*args, **kwargs, partition_info=partition_info)
if enforce_metadata:
dsk = partitionwise_graph(
apply_and_enforce,
name,
func,
before,
after,
*args2,
dependencies=dependencies,
_func=overlap_chunk,
_meta=meta,
**kwargs3,
)
else:
kwargs4 = kwargs if simple else kwargs3
dsk = partitionwise_graph(
overlap_chunk,
name,
func,
before,
after,
*args2,
**kwargs4,
dependencies=dependencies,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=dependencies)
return new_dd_object(graph, name, meta, divisions)
def _get_nexts_partitions(df, after):
"""
Helper to get the nexts partitions required for the overlap
"""
dsk = {}
df_name = df._name
timedelta_partition_message = (
"Partition size is less than specified window. "
"Try using ``df.repartition`` to increase the partition size"
)
name_b = "overlap-append-" + tokenize(df, after)
if after and isinstance(after, Integral):
nexts = []
for i in range(1, df.npartitions):
key = (name_b, i)
dsk[key] = (M.head, (df_name, i), after)
nexts.append(key)
nexts.append(None)
elif isinstance(after, datetime.timedelta):
# TODO: Do we have a use-case for this? Pandas doesn't allow negative rolling windows
deltas = pd.Series(df.divisions).diff().iloc[1:-1]
if (after > deltas).any():
raise ValueError(timedelta_partition_message)
nexts = []
for i in range(1, df.npartitions):
key = (name_b, i)
dsk[key] = (_head_timedelta, (df_name, i - 0), (df_name, i), after)
nexts.append(key)
nexts.append(None)
else:
nexts = [None] * df.npartitions
return dsk, nexts
def _get_previous_partitions(df, before):
"""
Helper to get the previous partitions required for the overlap
"""
dsk = {}
df_name = df._name
name_a = "overlap-prepend-" + tokenize(df, before)
if before and isinstance(before, Integral):
prevs = [None]
for i in range(df.npartitions - 1):
key = (name_a, i)
dsk[key] = (M.tail, (df_name, i), before)
prevs.append(key)
elif isinstance(before, datetime.timedelta):
# Assumes monotonic (increasing?) index
divs = pd.Series(df.divisions)
deltas = divs.diff().iloc[1:-1]
# In the first case window-size is larger than at least one partition, thus it is
# necessary to calculate how many partitions must be used for each rolling task.
# Otherwise, these calculations can be skipped (faster)
if (before > deltas).any():
pt_z = divs[0]
prevs = [None]
for i in range(df.npartitions - 1):
# Select all indexes of relevant partitions between the current partition and
# the partition with the highest division outside the rolling window (before)
pt_i = divs[i + 1]
# lower-bound the search to the first division
lb = max(pt_i - before, pt_z)
first, j = divs[i], i
while first > lb and j > 0:
first = first - deltas[j]
j = j - 1
key = (name_a, i)
dsk[key] = (
_tail_timedelta,
[(df_name, k) for k in range(j, i + 1)],
(df_name, i + 1),
before,
)
prevs.append(key)
else:
prevs = [None]
for i in range(df.npartitions - 1):
key = (name_a, i)
dsk[key] = (
_tail_timedelta,
[(df_name, i)],
(df_name, i + 1),
before,
)
prevs.append(key)
else:
prevs = [None] * df.npartitions
return dsk, prevs
def _head_timedelta(current, next_, after):
"""Return rows of ``next_`` whose index is before the last
observation in ``current`` + ``after``.
Parameters
----------
current : DataFrame
next_ : DataFrame
after : timedelta
Returns
-------
overlapped : DataFrame
"""
return next_[next_.index < (current.index.max() + after)]
def _tail_timedelta(prevs, current, before):
"""Return the concatenated rows of each dataframe in ``prevs`` whose
index is after the first observation in ``current`` - ``before``.
Parameters
----------
current : DataFrame
prevs : list of DataFrame objects
before : timedelta
Returns
-------
overlapped : DataFrame
"""
selected = methods.concat(
[prev[prev.index > (current.index.min() - before)] for prev in prevs]
)
return selected
class Rolling:
"""Provides rolling window calculations."""
def __init__(
self,
obj,
window=None,
min_periods=None,
center=False,
win_type=None,
axis=no_default,
):
self.obj = obj # dataframe or series
self.window = window
self.min_periods = min_periods
self.center = center
self.axis = axis
self.win_type = win_type
# Allow pandas to raise if appropriate
obj._meta.rolling(**self._rolling_kwargs())
# Using .rolling(window='2s'), pandas will convert the
# offset str to a window in nanoseconds. But pandas doesn't
# accept the integer window with win_type='freq', so we store
# that information here.
# See https://github.com/pandas-dev/pandas/issues/15969
self._win_type = None if isinstance(self.window, int) else "freq"
def _rolling_kwargs(self):
kwargs = {
"window": self.window,
"min_periods": self.min_periods,
"center": self.center,
"win_type": self.win_type,
}
if self.axis is not no_default:
kwargs["axis"] = self.axis
return kwargs
@property
def _has_single_partition(self):
"""
Indicator for whether the object has a single partition (True)
or multiple (False).
"""
return (
self.axis in (1, "columns")
or (isinstance(self.window, Integral) and self.window <= 1)
or self.obj.npartitions == 1
)
@staticmethod
def pandas_rolling_method(df, rolling_kwargs, name, *args, **kwargs):
with check_axis_keyword_deprecation():
rolling = df.rolling(**rolling_kwargs)
return getattr(rolling, name)(*args, **kwargs)
def _call_method(self, method_name, *args, **kwargs):
rolling_kwargs = self._rolling_kwargs()
meta = self.pandas_rolling_method(
self.obj._meta_nonempty, rolling_kwargs, method_name, *args, **kwargs
)
if self._has_single_partition:
# There's no overlap just use map_partitions
return self.obj.map_partitions(
self.pandas_rolling_method,
rolling_kwargs,
method_name,
*args,
token=method_name,
meta=meta,
**kwargs,
)
# Convert window to overlap
if self.center:
before = self.window // 2
after = self.window - before - 1
elif self._win_type == "freq":
before = pd.Timedelta(self.window)
after = 0
else:
before = self.window - 1
after = 0
return map_overlap(
self.pandas_rolling_method,
self.obj,
before,
after,
rolling_kwargs,
method_name,
*args,
token=method_name,
meta=meta,
**kwargs,
)
[docs] @derived_from(pd_Rolling)
def count(self):
return self._call_method("count")
@derived_from(pd_Rolling)
def cov(self):
return self._call_method("cov")
[docs] @derived_from(pd_Rolling)
def sum(self):
return self._call_method("sum")
[docs] @derived_from(pd_Rolling)
def mean(self):
return self._call_method("mean")
[docs] @derived_from(pd_Rolling)
def min(self):
return self._call_method("min")
[docs] @derived_from(pd_Rolling)
def max(self):
return self._call_method("max")
[docs] @derived_from(pd_Rolling)
def std(self, ddof=1):
return self._call_method("std", ddof=1)
[docs] @derived_from(pd_Rolling)
def var(self, ddof=1):
return self._call_method("var", ddof=1)
[docs] @derived_from(pd_Rolling)
def skew(self):
return self._call_method("skew")
[docs] @derived_from(pd_Rolling)
def kurt(self):
return self._call_method("kurt")
[docs] @derived_from(pd_Rolling)
def quantile(self, quantile):
return self._call_method("quantile", quantile)
[docs] @derived_from(pd_Rolling)
def apply(
self,
func,
raw=False,
engine="cython",
engine_kwargs=None,
args=None,
kwargs=None,
):
kwargs = kwargs or {}
args = args or ()
return self._call_method(
"apply",
func,
raw=raw,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
@derived_from(pd_Rolling)
def aggregate(self, func, *args, **kwargs):
return self._call_method("agg", func, *args, **kwargs)
agg = aggregate
def __repr__(self):
def order(item):
k, v = item
_order = {
"window": 0,
"min_periods": 1,
"center": 2,
"win_type": 3,
"axis": 4,
}
return _order[k]
rolling_kwargs = self._rolling_kwargs()
rolling_kwargs["window"] = self.window
rolling_kwargs["win_type"] = self._win_type
return "Rolling [{}]".format(
",".join(
f"{k}={v}"
for k, v in sorted(rolling_kwargs.items(), key=order)
if v is not None
)
)
class RollingGroupby(Rolling):
def __init__(
self,
groupby,
window=None,
min_periods=None,
center=False,
win_type=None,
axis=0,
):
self._groupby_kwargs = groupby._groupby_kwargs
self._groupby_slice = groupby._slice
obj = groupby.obj
if self._groupby_slice is not None:
if isinstance(self._groupby_slice, str):
sliced_plus = [self._groupby_slice]
else:
sliced_plus = list(self._groupby_slice)
if isinstance(groupby.by, str):
sliced_plus.append(groupby.by)
else:
sliced_plus.extend(groupby.by)
obj = obj[sliced_plus]
super().__init__(
obj,
window=window,
min_periods=min_periods,
center=center,
win_type=win_type,
axis=axis,
)
def _rolling_kwargs(self):
kwargs = super()._rolling_kwargs()
if kwargs.get("axis", None) in (0, "index"):
# it's a default, no need to pass
kwargs.pop("axis")
return kwargs
@staticmethod
def pandas_rolling_method(
df,
rolling_kwargs,
name,
*args,
groupby_kwargs=None,
groupby_slice=None,
**kwargs,
):
groupby = df.groupby(**groupby_kwargs)
if groupby_slice:
groupby = groupby[groupby_slice]
rolling = groupby.rolling(**rolling_kwargs)
return getattr(rolling, name)(*args, **kwargs).sort_index(level=-1)
def _call_method(self, method_name, *args, **kwargs):
return super()._call_method(
method_name,
*args,
groupby_kwargs=self._groupby_kwargs,
groupby_slice=self._groupby_slice,
**kwargs,
)