dask.dataframe.Series.map_overlap

Series.map_overlap(func, before, after, *args, **kwargs)

Apply a function to each partition, sharing rows with adjacent partitions.

This can be useful for implementing windowing functions such as df.rolling(...).mean() or df.diff().

Parameters
funcfunction

Function applied to each partition.

beforeint

The number of rows to prepend to partition i from the end of partition i - 1.

afterint

The number of rows to append to partition i from the beginning of partition i + 1.

args, kwargs :

Arguments and keywords to pass to the function. The partition will be the first argument, and these will be passed after.

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

Notes

Given positive integers before and after, and a function func, map_overlap does the following:

  1. Prepend before rows to each partition i from the end of partition i - 1. The first partition has no rows prepended.

  2. Append after rows to each partition i from the beginning of partition i + 1. The last partition has no rows appended.

  3. Apply func to each partition, passing in any extra args and kwargs if provided.

  4. Trim before rows from the beginning of all but the first partition.

  5. Trim after rows from the end of all but the last partition.

Note that the index and divisions are assumed to remain unchanged.

Examples

Given a DataFrame, Series, or Index, such as:

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11],
...                    'y': [1., 2., 3., 4., 5.]})
>>> ddf = dd.from_pandas(df, npartitions=2)

A rolling sum with a trailing moving window of size 2 can be computed by overlapping 2 rows before each partition, and then mapping calls to df.rolling(2).sum():

>>> ddf.compute()
    x    y
0   1  1.0
1   2  2.0
2   4  3.0
3   7  4.0
4  11  5.0
>>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()
      x    y
0   NaN  NaN
1   3.0  3.0
2   6.0  5.0
3  11.0  7.0
4  18.0  9.0

The pandas diff method computes a discrete difference shifted by a number of periods (can be positive or negative). This can be implemented by mapping calls to df.diff to each partition after prepending/appending that many rows, depending on sign:

>>> def diff(df, periods=1):
...     before, after = (periods, 0) if periods > 0 else (0, -periods)
...     return df.map_overlap(lambda df, periods=1: df.diff(periods),
...                           periods, 0, periods=periods)
>>> diff(ddf, 1).compute()
     x    y
0  NaN  NaN
1  1.0  1.0
2  2.0  1.0
3  3.0  1.0
4  4.0  1.0

If you have a DatetimeIndex, you can use a pd.Timedelta for time- based windows.

>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10))
>>> dts = dd.from_pandas(ts, npartitions=2)
>>> dts.map_overlap(lambda df: df.rolling('2D').sum(),
...                 pd.Timedelta('2D'), 0).compute()
2017-01-01     0.0
2017-01-02     1.0
2017-01-03     3.0
2017-01-04     5.0
2017-01-05     7.0
2017-01-06     9.0
2017-01-07    11.0
2017-01-08    13.0
2017-01-09    15.0
2017-01-10    17.0
Freq: D, dtype: float64