dask.dataframe.Series.map_overlap
dask.dataframe.Series.map_overlap¶
- Series.map_overlap(func, before, after, *args, **kwargs)¶
Apply a function to each partition, sharing rows with adjacent partitions.
This can be useful for implementing windowing functions such as
df.rolling(...).mean()
ordf.diff()
.- Parameters
- funcfunction
Function applied to each partition.
- beforeint
The number of rows to prepend to partition
i
from the end of partitioni - 1
.- afterint
The number of rows to append to partition
i
from the beginning of partitioni + 1
.- args, kwargs :
Arguments and keywords to pass to the function. The partition will be the first argument, and these will be passed after.
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrame
orpd.Series
that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame
, adict
of{name: dtype}
or iterable of(name, dtype)
can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)
can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmeta
is recommended. For more information, seedask.dataframe.utils.make_meta
.
Notes
Given positive integers
before
andafter
, and a functionfunc
,map_overlap
does the following:Prepend
before
rows to each partitioni
from the end of partitioni - 1
. The first partition has no rows prepended.Append
after
rows to each partitioni
from the beginning of partitioni + 1
. The last partition has no rows appended.Apply
func
to each partition, passing in any extraargs
andkwargs
if provided.Trim
before
rows from the beginning of all but the first partition.Trim
after
rows from the end of all but the last partition.
Note that the index and divisions are assumed to remain unchanged.
Examples
Given a DataFrame, Series, or Index, such as:
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
A rolling sum with a trailing moving window of size 2 can be computed by overlapping 2 rows before each partition, and then mapping calls to
df.rolling(2).sum()
:>>> ddf.compute() x y 0 1 1.0 1 2 2.0 2 4 3.0 3 7 4.0 4 11 5.0 >>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() x y 0 NaN NaN 1 3.0 3.0 2 6.0 5.0 3 11.0 7.0 4 18.0 9.0
The pandas
diff
method computes a discrete difference shifted by a number of periods (can be positive or negative). This can be implemented by mapping calls todf.diff
to each partition after prepending/appending that many rows, depending on sign:>>> def diff(df, periods=1): ... before, after = (periods, 0) if periods > 0 else (0, -periods) ... return df.map_overlap(lambda df, periods=1: df.diff(periods), ... periods, 0, periods=periods) >>> diff(ddf, 1).compute() x y 0 NaN NaN 1 1.0 1.0 2 2.0 1.0 3 3.0 1.0 4 4.0 1.0
If you have a
DatetimeIndex
, you can use apd.Timedelta
for time- based windows.>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10)) >>> dts = dd.from_pandas(ts, npartitions=2) >>> dts.map_overlap(lambda df: df.rolling('2D').sum(), ... pd.Timedelta('2D'), 0).compute() 2017-01-01 0.0 2017-01-02 1.0 2017-01-03 3.0 2017-01-04 5.0 2017-01-05 7.0 2017-01-06 9.0 2017-01-07 11.0 2017-01-08 13.0 2017-01-09 15.0 2017-01-10 17.0 Freq: D, dtype: float64