dask_expr.from_map

dask_expr.from_map

dask_expr.from_map(func, *iterables, args=None, meta=_NoDefault.no_default, divisions=None, label=None, enforce_metadata=False, **kwargs)[source]

Create a DataFrame collection from a custom function map.

from_map is the preferred option when reading from data sources that are not natively supported by Dask or if the data source requires custom handling before handing things of to Dask DataFrames. Examples are things like binary files or other unstructured data that doesn’t have an IO connector.

from_map supports column projection by the optimizer. The optimizer tries to push column selections into the from_map call if the function supports a columns argument.

Parameters
funccallable

Function used to create each partition. Column projection will be enabled if the function has a columns keyword argument.

*iterablesIterable objects

Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to func for each partition).

argslist or tuple, optional

Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to func after the iterables positional arguments.

$META
divisionstuple, str, optional

Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string ‘sorted’ will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won’t use index information

labelstr, optional

String to use as the function-name label in the output collection-key names.

tokenstr, optional

String to use as the “token” in the output collection-key names.

enforce_metadatabool, default True

Whether to enforce at runtime that the structure of the DataFrame produced by func actually matches the structure of meta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.

**kwargs:

Key-word arguments to broadcast to each output partition. These same arguments will be passed to func for every output partition.

Examples

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> func = lambda x, size=0: pd.Series([x] * size)
>>> inputs = ["A", "B"]
>>> dd.from_map(func, inputs, size=2).compute()
0    A
1    A
0    B
1    B
dtype: object

The optimizer will identify a column selection that happens after from_map and push the columns argument into the actual map call to drop unnecessary columns as early as possible.

>>> def map_function(x, columns=None):
>>>     df = pd.DataFrame({"a": [1, 2], "b": x})
>>>     if columns is not None:
>>>         df = df[columns]
>>>     return df
>>>
>>> dd.from_map(map_function, [1, 2])["b"].compute()
0    1
1    1
0    2
1    2
Name: b, dtype: int64

This API can also be used as an alternative to other file-based IO functions, like read_csv (which are already just from_map wrapper functions):

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> paths = ["0.csv", "1.csv", "2.csv"]
>>> dd.from_map(pd.read_csv, paths).head()  
                    name
timestamp
2000-01-01 00:00:00   Laura
2000-01-01 00:00:01  Oliver
2000-01-01 00:00:02   Alice
2000-01-01 00:00:03  Victor
2000-01-01 00:00:04     Bob

Since from_map allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex:

>>> import pandas as pd
>>> import numpy as np
>>> import dask.dataframe as dd
>>> paths = ["0.csv", "1.csv", "2.csv"]
>>> file_sizes = [86400, 86400, 86400]
>>> def func(path, row_offset):
...     # Read parquet file and set RangeIndex offset
...     df = pd.read_csv(path)
...     return df.set_index(
...         pd.RangeIndex(row_offset, row_offset+len(df))
...     )
>>> def get_ddf(paths, file_sizes):
...     offsets = [0] + list(np.cumsum(file_sizes))
...     return dd.from_map(
...         func, paths, offsets[:-1], divisions=offsets
...     )
>>> ddf = get_ddf(paths, file_sizes)  
>>> ddf.index  
Dask Index Structure:
npartitions=3
0         int64
86400       ...
172800      ...
259200      ...
dtype: int64
Dask Name: myfunc, 6 tasks