Source code for dask.dataframe.io.demo

from __future__ import annotations

import re
import string
from collections.abc import Callable
from dataclasses import asdict, dataclass, field
from typing import Any, cast

import numpy as np
import pandas as pd

from dask.dataframe._compat import PANDAS_GE_220, PANDAS_GE_300
from dask.dataframe._pyarrow import is_object_string_dtype
from dask.dataframe.core import tokenize
from dask.dataframe.io.utils import DataFrameIOFunction
from dask.utils import random_state_data

__all__ = [
    "make_timeseries",
    "with_spec",
    "ColumnSpec",
    "RangeIndexSpec",
    "DatetimeIndexSpec",
    "DatasetSpec",
]

default_int_args: dict[str, tuple[tuple[Any, ...], dict[str, Any]]] = {
    "poisson": ((), {"lam": 1000}),
    "normal": ((), {"scale": 1000}),
    "uniform": ((), {"high": 1000}),
    "binomial": ((1000, 0.5), {}),
    "random": ((0,), {"high": 1000}),
}


[docs]@dataclass class ColumnSpec: """Encapsulates properties of a family of columns with the same dtype. Different method can be specified for integer dtype ("poisson", "uniform", "binomial", etc.) Notes ----- This API is still experimental, and will likely change in the future""" prefix: str | None = None """Column prefix. If not specified, will default to str(dtype)""" dtype: str | type | None = None """Column data type. Only supports numpy dtypes""" number: int = 1 """How many columns to create with these properties. Default 1. If more than one columns are specified, they will be numbered: "int1", "int2", etc.""" nunique: int | None = None # number of unique categories """For a "category" column, how many unique categories to generate""" choices: list = field(default_factory=list) """For a "category" or str column, list of possible values""" low: int | None = None """Start value for an int column. Optional if random=True, since ``randint`` doesn't accept high and low.""" high: int | None = None """For an int column, high end of range""" length: int | None = None """For a str or "category" column with random=True, how large a string to generate""" random: bool = False """For an int column, whether to use ``randint``. For a string column produces a random string of specified ``length``""" method: str | None = None """For an int column, method to use when generating the value, such as "poisson", "uniform", "binomial". Default "poisson". Delegates to the same method of ``RandomState``""" args: tuple[Any, ...] = field(default_factory=tuple) """Args to pass into the method""" kwargs: dict[str, Any] = field(default_factory=dict) """Any other kwargs to pass into the method"""
[docs]@dataclass class RangeIndexSpec: """Properties of the dataframe RangeIndex Notes ----- This API is still experimental, and will likely change in the future""" dtype: str | type = int """Index dtype""" step: int = 1 """Step for a RangeIndex"""
[docs]@dataclass class DatetimeIndexSpec: """Properties of the dataframe DatetimeIndex Notes ----- This API is still experimental, and will likely change in the future""" dtype: str | type = int """Index dtype""" start: str | None = None """First value of the index""" freq: str = "1H" """Frequency for the index ("1H", "1D", etc.)""" partition_freq: str | None = None """Partition frequency ("1D", "1M", etc.)"""
[docs]@dataclass class DatasetSpec: """Defines a dataset with random data, such as which columns and data types to generate Notes ----- This API is still experimental, and will likely change in the future""" npartitions: int = 1 """How many partitions generate in the dataframe. If the dataframe has a DatetimeIndex, specify its ``partition_freq`` instead""" nrecords: int = 1000 """Total number of records to generate""" index_spec: RangeIndexSpec | DatetimeIndexSpec = field( default_factory=RangeIndexSpec ) """Properties of the index""" column_specs: list[ColumnSpec] = field(default_factory=list) """List of column definitions"""
def make_float(n, rstate, random=False, **kwargs): kwargs.pop("dtype", None) kwargs.pop("args", None) if random: return rstate.random(size=n, **kwargs) return rstate.rand(n) * 2 - 1 def make_int( n: int, rstate: Any, random: bool = False, dtype: str | type = int, method: str | Callable = "poisson", args: tuple[Any, ...] = (), **kwargs, ): def _with_defaults(_method): handler_args, handler_kwargs = default_int_args.get(_method, ((), {})) handler_kwargs = handler_kwargs.copy() handler_kwargs.update(**kwargs) handler_args = args if args else handler_args return handler_args, handler_kwargs if random: handler_args, handler_kwargs = _with_defaults("random") if "low" in handler_kwargs: handler_args = () data = rstate.randint(*handler_args, size=n, **handler_kwargs) else: if isinstance(method, str): # "poisson", "binomial", etc. handler_args, handler_kwargs = _with_defaults(method) handler = getattr(rstate, method) data = handler(*handler_args, size=n, **handler_kwargs) else: # method is a Callable data = method(*args, state=rstate, size=n, **kwargs) return data names = [ "Alice", "Bob", "Charlie", "Dan", "Edith", "Frank", "George", "Hannah", "Ingrid", "Jerry", "Kevin", "Laura", "Michael", "Norbert", "Oliver", "Patricia", "Quinn", "Ray", "Sarah", "Tim", "Ursula", "Victor", "Wendy", "Xavier", "Yvonne", "Zelda", ] def make_random_string(n, rstate, length: int = 25) -> list[str]: choices = list(string.ascii_letters + string.digits + string.punctuation + " ") return ["".join(rstate.choice(choices, size=length)) for _ in range(n)] def make_string(n, rstate, choices=None, random=False, length=None, **kwargs): kwargs.pop("args", None) if random: return make_random_string(n, rstate, length=length) choices = choices or names return rstate.choice(choices, size=n) def make_categorical(n, rstate, choices=None, nunique=None, **kwargs): kwargs.pop("args", None) if nunique is not None: cat_len = len(str(nunique)) choices = [str(x + 1).zfill(cat_len) for x in range(nunique)] else: choices = choices or names return pd.Categorical.from_codes(rstate.randint(0, len(choices), size=n), choices) make: dict[type | str, Callable] = { float: make_float, int: make_int, str: make_string, object: make_string, "string[python]": make_string, "string[pyarrow]": make_string, "category": make_categorical, "int8": make_int, "int16": make_int, "int32": make_int, "int64": make_int, "float8": make_float, "float16": make_float, "float32": make_float, "float64": make_float, } class MakeDataframePart(DataFrameIOFunction): """ Wrapper Class for ``make_dataframe_part`` Makes a timeseries partition. """ def __init__(self, index_dtype, dtypes, kwargs, columns=None): self.index_dtype = index_dtype self._columns = columns or list(dtypes.keys()) self.dtypes = dtypes self.kwargs = kwargs @property def columns(self): return self._columns def project_columns(self, columns): """Return a new MakeTimeseriesPart object with a sub-column projection. """ if columns == self.columns: return self return MakeDataframePart( self.index_dtype, self.dtypes, self.kwargs, columns=columns, ) def __call__(self, part): divisions, state_data = part return make_dataframe_part( self.index_dtype, divisions[0], divisions[1], self.dtypes, self.columns, state_data, self.kwargs, ) def make_dataframe_part(index_dtype, start, end, dtypes, columns, state_data, kwargs): state = np.random.RandomState(state_data) if pd.api.types.is_datetime64_any_dtype(index_dtype): # FIXME: tzinfo would be lost in pd.date_range index = pd.date_range( start=start, end=end, freq=kwargs.get("freq"), name="timestamp" ) elif pd.api.types.is_integer_dtype(index_dtype): step = kwargs.get("freq") index = pd.RangeIndex(start=start, stop=end + step, step=step).astype( index_dtype ) else: raise TypeError(f"Unhandled index dtype: {index_dtype}") df = make_partition(columns, dtypes, index, kwargs, state) while df.index[-1] >= end: df = df.iloc[:-1] return df def same_astype(a: str | type, b: str | type): """Same as pandas.api.types.is_dtype_equal, but also returns True for str / object""" return pd.api.types.is_dtype_equal(a, b) or ( is_object_string_dtype(a) and is_object_string_dtype(b) ) def make_partition(columns: list, dtypes: dict[str, type | str], index, kwargs, state): data = {} for k, dt in dtypes.items(): kws = { kk.rsplit("_", 1)[1]: v for kk, v in kwargs.items() if kk.rsplit("_", 1)[0] == k } # Note: we compute data for all dtypes in order, not just those in the output # columns. This ensures the same output given the same state_data, regardless # of whether there is any column projection. # cf. https://github.com/dask/dask/pull/9538#issuecomment-1267461887 result = make[dt](len(index), state, **kws) if k in columns: data[k] = result df = pd.DataFrame(data, index=index, columns=columns) update_dtypes = { k: v for k, v in dtypes.items() if k in columns and not same_astype(v, df[k].dtype) } if update_dtypes: kwargs = {} if PANDAS_GE_300 else {"copy": False} df = df.astype(update_dtypes, **kwargs) return df _ME = "ME" if PANDAS_GE_220 else "M" def make_timeseries( start="2000-01-01", end="2000-12-31", dtypes=None, freq="10s", partition_freq=f"1{_ME}", seed=None, **kwargs, ): """Create timeseries dataframe with random data Parameters ---------- start: datetime (or datetime-like string) Start of time series end: datetime (or datetime-like string) End of time series dtypes: dict (optional) Mapping of column names to types. Valid types include {float, int, str, 'category'} freq: string String like '2s' or '1H' or '12W' for the time series frequency partition_freq: string String like '1M' or '2Y' to divide the dataframe into partitions seed: int (optional) Randomstate seed kwargs: Keywords to pass down to individual column creation functions. Keywords should be prefixed by the column name and then an underscore. Examples -------- >>> import dask.dataframe as dd >>> df = dd.demo.make_timeseries('2000', '2010', ... {'value': float, 'name': str, 'id': int}, ... freq='2h', partition_freq='1D', seed=1) >>> df.head() # doctest: +SKIP id name value 2000-01-01 00:00:00 969 Jerry -0.309014 2000-01-01 02:00:00 1010 Ray -0.760675 2000-01-01 04:00:00 1016 Patricia -0.063261 2000-01-01 06:00:00 960 Charlie 0.788245 2000-01-01 08:00:00 1031 Kevin 0.466002 """ if dtypes is None: dtypes = {"name": str, "id": int, "x": float, "y": float} divisions = list(pd.date_range(start=start, end=end, freq=partition_freq)) npartitions = len(divisions) - 1 if seed is None: # Get random integer seed for each partition. We can # call `random_state_data` in `MakeDataframePart` state_data = np.random.randint(2e9, size=npartitions) else: state_data = random_state_data(npartitions, seed) # Build parts parts = [] for i in range(len(divisions) - 1): parts.append((divisions[i : i + 2], state_data[i])) kwargs["freq"] = freq index_dtype = "datetime64[ns]" meta_start, meta_end = list(pd.date_range(start="2000", freq=freq, periods=2)) from dask.dataframe import _dask_expr_enabled if _dask_expr_enabled(): from dask_expr import from_map k = {} else: from dask.dataframe.io.io import from_map k = {"token": tokenize(start, end, dtypes, freq, partition_freq, state_data)} # Construct the output collection with from_map return from_map( MakeDataframePart(index_dtype, dtypes, kwargs), parts, meta=make_dataframe_part( index_dtype, meta_start, meta_end, dtypes, list(dtypes.keys()), state_data[0], kwargs, ), divisions=divisions, label="make-timeseries", enforce_metadata=False, **k, )
[docs]def with_spec(spec: DatasetSpec, seed: int | None = None): """Generate a random dataset according to provided spec Parameters ---------- spec : DatasetSpec Specify all the parameters of the dataset seed: int (optional) Randomstate seed Notes ----- This API is still experimental, and will likely change in the future Examples -------- >>> from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec >>> ddf = with_spec( ... DatasetSpec( ... npartitions=10, ... nrecords=10_000, ... column_specs=[ ... ColumnSpec(dtype=int, number=2, prefix="p"), ... ColumnSpec(dtype=int, number=2, prefix="n", method="normal"), ... ColumnSpec(dtype=float, number=2, prefix="f"), ... ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10), ... ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]), ... ], ... ), seed=42) >>> ddf.head(10) # doctest: +SKIP p1 p2 n1 n2 f1 f2 s1 s2 c1 0 1002 972 -811 20 0.640846 -0.176875 L#h98#}J`? _8C607/:6e N 1 985 982 -1663 -777 0.790257 0.792796 u:XI3,omoZ w~@ /d)'-@ N 2 947 970 799 -269 0.740869 -0.118413 O$dnwCuq\\ !WtSe+(;#9 Y 3 1003 983 1133 521 -0.987459 0.278154 j+Qr_2{XG& &XV7cy$y1T Y 4 1017 1049 826 5 -0.875667 -0.744359 \4bJ3E-{:o {+jC).?vK+ Y 5 984 1017 -492 -399 0.748181 0.293761 ~zUNHNgD"! yuEkXeVot| Y 6 992 1027 -856 67 -0.125132 -0.234529 j.7z;o]Gc9 g|Fi5*}Y92 Y 7 1011 974 762 -1223 0.471696 0.937935 yT?j~N/-u] JhEB[W-}^$ N 8 984 974 856 74 0.109963 0.367864 _j"&@ i&;/ OYXQ)w{hoH N 9 1030 1001 -792 -262 0.435587 -0.647970 Pmrwl{{|.K 3UTqM$86Sg N """ if len(spec.column_specs) == 0: spec.column_specs = [ ColumnSpec(prefix="i", dtype="int64", low=0, high=1_000_000, random=True), ColumnSpec(prefix="f", dtype=float, random=True), ColumnSpec(prefix="c", dtype="category", choices=["a", "b", "c", "d"]), ColumnSpec(prefix="s", dtype=str), ] columns = [] dtypes = {} partition_freq: str | int | None step: str | int if isinstance(spec.index_spec, DatetimeIndexSpec): start = pd.Timestamp(spec.index_spec.start) step = spec.index_spec.freq partition_freq = spec.index_spec.partition_freq end = pd.Timestamp(spec.index_spec.start) + spec.nrecords * pd.Timedelta(step) divisions = list(pd.date_range(start=start, end=end, freq=partition_freq)) if divisions[-1] < end: divisions.append(end) meta_start, meta_end = start, start + pd.Timedelta(step) elif isinstance(spec.index_spec, RangeIndexSpec): step = spec.index_spec.step partition_freq = spec.nrecords * step // spec.npartitions end = spec.nrecords * step - 1 divisions = list(pd.RangeIndex(0, stop=end, step=partition_freq)) if divisions[-1] < (end + 1): divisions.append(end + 1) meta_start, meta_end = 0, step else: raise ValueError(f"Unhandled index: {spec.index_spec}") kwargs: dict[str, Any] = {"freq": step} for col in spec.column_specs: if col.prefix: prefix = col.prefix elif isinstance(col.dtype, str): prefix = re.sub(r"[^a-zA-Z0-9]", "_", f"{col.dtype}").rstrip("_") elif hasattr(col.dtype, "name"): prefix = col.dtype.name # type: ignore[union-attr] else: prefix = col.dtype.__name__ # type: ignore[union-attr] for i in range(col.number): col_n = i + 1 while (col_name := f"{prefix}{col_n}") in dtypes: col_n = col_n + 1 columns.append(col_name) dtypes[col_name] = col.dtype kwargs.update( { f"{col_name}_{k}": v for k, v in asdict(col).items() if k not in {"prefix", "number", "kwargs"} and v not in (None, []) } ) # set untyped kwargs, if any for kw_name, kw_val in col.kwargs.items(): kwargs[f"{col_name}_{kw_name}"] = kw_val npartitions = len(divisions) - 1 if seed is None: state_data = cast(list[Any], np.random.randint(int(2e9), size=npartitions)) else: state_data = random_state_data(npartitions, seed) parts = [(divisions[i : i + 2], state_data[i]) for i in range(npartitions)] from dask.dataframe import _dask_expr_enabled if _dask_expr_enabled(): from dask_expr import from_map k = {} else: from dask.dataframe.io.io import from_map k = { "token": tokenize( 0, spec.nrecords, dtypes, step, partition_freq, state_data ) } return from_map( MakeDataframePart(spec.index_spec.dtype, dtypes, kwargs, columns=columns), parts, meta=make_dataframe_part( spec.index_spec.dtype, meta_start, meta_end, dtypes, columns, state_data[0], kwargs, ), divisions=divisions, label="make-random", enforce_metadata=False, **k, )