dask_expr.from_map
dask_expr.from_map¶
- dask_expr.from_map(func, *iterables, args=None, meta=_NoDefault.no_default, divisions=None, label=None, enforce_metadata=False, **kwargs)[source]¶
Create a DataFrame collection from a custom function map.
from_map
is the preferred option when reading from data sources that are not natively supported by Dask or if the data source requires custom handling before handing things of to Dask DataFrames. Examples are things like binary files or other unstructured data that doesn’t have an IO connector.from_map
supports column projection by the optimizer. The optimizer tries to push column selections into the from_map call if the function supports acolumns
argument.- Parameters
- funccallable
Function used to create each partition. Column projection will be enabled if the function has a
columns
keyword argument.- *iterablesIterable objects
Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to
func
for each partition).- argslist or tuple, optional
Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to
func
after theiterables
positional arguments.- $META
- divisionstuple, str, optional
Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string ‘sorted’ will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won’t use index information
- labelstr, optional
String to use as the function-name label in the output collection-key names.
- tokenstr, optional
String to use as the “token” in the output collection-key names.
- enforce_metadatabool, default True
Whether to enforce at runtime that the structure of the DataFrame produced by
func
actually matches the structure ofmeta
. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.- **kwargs:
Key-word arguments to broadcast to each output partition. These same arguments will be passed to
func
for every output partition.
Examples
>>> import pandas as pd >>> import dask.dataframe as dd >>> func = lambda x, size=0: pd.Series([x] * size) >>> inputs = ["A", "B"] >>> dd.from_map(func, inputs, size=2).compute() 0 A 1 A 0 B 1 B dtype: object
The optimizer will identify a column selection that happens after from_map and push the columns argument into the actual map call to drop unnecessary columns as early as possible.
>>> def map_function(x, columns=None): >>> df = pd.DataFrame({"a": [1, 2], "b": x}) >>> if columns is not None: >>> df = df[columns] >>> return df >>> >>> dd.from_map(map_function, [1, 2])["b"].compute() 0 1 1 1 0 2 1 2 Name: b, dtype: int64
This API can also be used as an alternative to other file-based IO functions, like
read_csv
(which are already justfrom_map
wrapper functions):>>> import pandas as pd >>> import dask.dataframe as dd >>> paths = ["0.csv", "1.csv", "2.csv"] >>> dd.from_map(pd.read_csv, paths).head() name timestamp 2000-01-01 00:00:00 Laura 2000-01-01 00:00:01 Oliver 2000-01-01 00:00:02 Alice 2000-01-01 00:00:03 Victor 2000-01-01 00:00:04 Bob
Since
from_map
allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex:>>> import pandas as pd >>> import numpy as np >>> import dask.dataframe as dd >>> paths = ["0.csv", "1.csv", "2.csv"] >>> file_sizes = [86400, 86400, 86400] >>> def func(path, row_offset): ... # Read parquet file and set RangeIndex offset ... df = pd.read_csv(path) ... return df.set_index( ... pd.RangeIndex(row_offset, row_offset+len(df)) ... ) >>> def get_ddf(paths, file_sizes): ... offsets = [0] + list(np.cumsum(file_sizes)) ... return dd.from_map( ... func, paths, offsets[:-1], divisions=offsets ... ) >>> ddf = get_ddf(paths, file_sizes) >>> ddf.index Dask Index Structure: npartitions=3 0 int64 86400 ... 172800 ... 259200 ... dtype: int64 Dask Name: myfunc, 6 tasks