dask.dataframe.DataFrame.map_partitions

DataFrame.map_partitions(func, *args, **kwargs)

Apply Python function on each DataFrame partition.

Note that the index and divisions are assumed to remain unchanged.

Parameters
funcfunction

Function applied to each partition.

args, kwargs :

Arguments and keywords to pass to the function. The partition will be the first argument, and these will be passed after. Arguments and keywords may contain Scalar, Delayed, partition_info or regular python objects. DataFrame-like args (both dask and pandas) will be repartitioned to align (if necessary) before applying the function (see align_dataframes to control).

enforce_metadatabool, default True

Whether to enforce at runtime that the structure of the DataFrame produced by func actually matches the structure of meta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work or types don’t match.

transform_divisionsbool, default True

Whether to apply the function onto the divisions and apply those transformed divisions to the output.

align_dataframesbool, default True

Whether to repartition DataFrame- or Series-like args (both dask and pandas) so their divisions align before applying the function. This requires all inputs to have known divisions. Single-partition inputs will be split into multiple partitions.

If False, all inputs must have either the same number of partitions or a single partition. Single-partition inputs will be broadcast to every partition of multi-partition inputs.

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

Examples

Given a DataFrame, Series, or Index, such as:

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
...                    'y': [1., 2., 3., 4., 5.]})
>>> ddf = dd.from_pandas(df, npartitions=2)

One can use map_partitions to apply a function on each partition. Extra arguments and keywords can optionally be provided, and will be passed to the function after the partition.

Here we apply a function with arguments and keywords to a DataFrame, resulting in a Series:

>>> def myadd(df, a, b=1):
...     return df.x + df.y + a + b
>>> res = ddf.map_partitions(myadd, 1, b=2)
>>> res.dtype
dtype('float64')

By default, dask tries to infer the output metadata by running your provided function on some fake data. This works well in many cases, but can sometimes be expensive, or even fail. To avoid this, you can manually specify the output metadata with the meta keyword. This can be specified in many forms, for more information see dask.dataframe.utils.make_meta.

Here we specify the output is a Series with no name, and dtype float64:

>>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))

Here we map a function that takes in a DataFrame, and returns a DataFrame with a new column:

>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
>>> res.dtypes
x      int64
y    float64
z    float64
dtype: object

As before, the output metadata can also be specified manually. This time we pass in a dict, as the output is a DataFrame:

>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y),
...                          meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})

In the case where the metadata doesn’t change, you can also pass in the object itself directly:

>>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)

Also note that the index and divisions are assumed to remain unchanged. If the function you’re mapping changes the index/divisions, you’ll need to clear them afterwards:

>>> ddf.map_partitions(func).clear_divisions()  

Your map function gets information about where it is in the dataframe by accepting a special partition_info keyword argument.

>>> def func(partition, partition_info=None):
...     pass

This will receive the following information:

>>> partition_info  
{'number': 1, 'division': 3}

For each argument and keyword arguments that are dask dataframes you will receive the number (n) which represents the nth partition of the dataframe and the division (the first index value in the partition). If divisions are not known (for instance if the index is not sorted) then you will get None as the division.