dask_expr._collection.Index.map_overlap
dask_expr._collection.Index.map_overlap¶
- Index.map_overlap(func, before, after, *args, meta=_NoDefault.no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, **kwargs)¶
Apply a function to each partition, sharing rows with adjacent partitions.
This can be useful for implementing windowing functions such as
df.rolling(...).mean()
ordf.diff()
.- Parameters
- funcfunction
Function applied to each partition.
- beforeint, timedelta or string timedelta
The rows to prepend to partition
i
from the end of partitioni - 1
.- afterint, timedelta or string timedelta
The rows to append to partition
i
from the beginning of partitioni + 1
.- args, kwargs
Positional and keyword arguments to pass to the function. Positional arguments are computed on a per-partition basis, while keyword arguments are shared across all partitions. The partition itself will be the first positional argument, with all other arguments passed after. Arguments can be
Scalar
,Delayed
, or regular Python objects. DataFrame-like args (both dask and pandas) will be repartitioned to align (if necessary) before applying the function; seealign_dataframes
to control this behavior.- enforce_metadatabool, default True
Whether to enforce at runtime that the structure of the DataFrame produced by
func
actually matches the structure ofmeta
. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.- transform_divisionsbool, default True
Whether to apply the function onto the divisions and apply those transformed divisions to the output.
- align_dataframesbool, default True
Whether to repartition DataFrame- or Series-like args (both dask and pandas) so their divisions align before applying the function. This requires all inputs to have known divisions. Single-partition inputs will be split into multiple partitions.
If False, all inputs must have either the same number of partitions or a single partition. Single-partition inputs will be broadcast to every partition of multi-partition inputs.
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrame
orpd.Series
that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame
, adict
of{name: dtype}
or iterable of(name, dtype)
can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)
can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmeta
is recommended. For more information, seedask.dataframe.utils.make_meta
.
Notes
Given positive integers
before
andafter
, and a functionfunc
,map_overlap
does the following:Prepend
before
rows to each partitioni
from the end of partitioni - 1
. The first partition has no rows prepended.Append
after
rows to each partitioni
from the beginning of partitioni + 1
. The last partition has no rows appended.Apply
func
to each partition, passing in any extraargs
andkwargs
if provided.Trim
before
rows from the beginning of all but the first partition.Trim
after
rows from the end of all but the last partition.
Examples
Given a DataFrame, Series, or Index, such as:
>>> import pandas as pd >>> import dask_expr as dd >>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
A rolling sum with a trailing moving window of size 2 can be computed by overlapping 2 rows before each partition, and then mapping calls to
df.rolling(2).sum()
:>>> ddf.compute() x y 0 1 1.0 1 2 2.0 2 4 3.0 3 7 4.0 4 11 5.0 >>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() x y 0 NaN NaN 1 3.0 3.0 2 6.0 5.0 3 11.0 7.0 4 18.0 9.0
The pandas
diff
method computes a discrete difference shifted by a number of periods (can be positive or negative). This can be implemented by mapping calls todf.diff
to each partition after prepending/appending that many rows, depending on sign:>>> def diff(df, periods=1): ... before, after = (periods, 0) if periods > 0 else (0, -periods) ... return df.map_overlap(lambda df, periods=1: df.diff(periods), ... periods, 0, periods=periods) >>> diff(ddf, 1).compute() x y 0 NaN NaN 1 1.0 1.0 2 2.0 1.0 3 3.0 1.0 4 4.0 1.0
If you have a
DatetimeIndex
, you can use apd.Timedelta
for time- based windows or anypd.Timedelta
convertible string:>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10)) >>> dts = dd.from_pandas(ts, npartitions=2) >>> dts.map_overlap(lambda df: df.rolling('2D').sum(), ... pd.Timedelta('2D'), 0).compute() 2017-01-01 0.0 2017-01-02 1.0 2017-01-03 3.0 2017-01-04 5.0 2017-01-05 7.0 2017-01-06 9.0 2017-01-07 11.0 2017-01-08 13.0 2017-01-09 15.0 2017-01-10 17.0 Freq: D, dtype: float64