dask_expr._collection.DataFrame.map_partitions

dask_expr._collection.DataFrame.map_partitions

DataFrame.map_partitions(func, *args, meta=_NoDefault.no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, parent_meta=None, **kwargs)

Apply a Python function to each partition

Parameters
funcfunction

Function applied to each partition.

args, kwargs

Arguments and keywords to pass to the function. Arguments and keywords may contain FrameBase or regular python objects. DataFrame-like args (both dask and pandas) must have the same number of partitions as self or comprise a single partition. Key-word arguments, Single-partition arguments, and general python-object arguments will be broadcasted to all partitions.

enforce_metadatabool, default True

Whether to enforce at runtime that the structure of the DataFrame produced by func actually matches the structure of meta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.

transform_divisionsbool, default True

Whether to apply the function onto the divisions and apply those transformed divisions to the output.

clear_divisionsbool, default False

Whether divisions should be cleared. If True, transform_divisions will be ignored.

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

An empty pd.DataFrame or pd.Series that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of (name, dtype) can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

Examples

Given a DataFrame, Series, or Index, such as:

>>> import pandas as pd
>>> import dask_expr as dd
>>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
...                    'y': [1., 2., 3., 4., 5.]})
>>> ddf = dd.from_pandas(df, npartitions=2)

One can use map_partitions to apply a function on each partition. Extra arguments and keywords can optionally be provided, and will be passed to the function after the partition.

Here we apply a function with arguments and keywords to a DataFrame, resulting in a Series:

>>> def myadd(df, a, b=1):
...     return df.x + df.y + a + b
>>> res = ddf.map_partitions(myadd, 1, b=2)
>>> res.dtype
dtype('float64')

Here we apply a function to a Series resulting in a Series:

>>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure
>>> res.dtype
dtype('int64')

By default, dask tries to infer the output metadata by running your provided function on some fake data. This works well in many cases, but can sometimes be expensive, or even fail. To avoid this, you can manually specify the output metadata with the meta keyword. This can be specified in many forms, for more information see dask.dataframe.utils.make_meta.

Here we specify the output is a Series with no name, and dtype float64:

>>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))

Here we map a function that takes in a DataFrame, and returns a DataFrame with a new column:

>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
>>> res.dtypes
x      int64
y    float64
z    float64
dtype: object

As before, the output metadata can also be specified manually. This time we pass in a dict, as the output is a DataFrame:

>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y),
...                          meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})

In the case where the metadata doesn’t change, you can also pass in the object itself directly:

>>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)

Also note that the index and divisions are assumed to remain unchanged. If the function you’re mapping changes the index/divisions, you’ll need to pass clear_divisions=True.

>>> ddf.map_partitions(func, clear_divisions=True)  

Your map function gets information about where it is in the dataframe by accepting a special partition_info keyword argument.

>>> def func(partition, partition_info=None):
...     pass

This will receive the following information:

>>> partition_info  
{'number': 1, 'division': 3}

For each argument and keyword arguments that are dask dataframes you will receive the number (n) which represents the nth partition of the dataframe and the division (the first index value in the partition). If divisions are not known (for instance if the index is not sorted) then you will get None as the division.