Source code for dask_expr._collection

from __future__ import annotations

import datetime
import functools
import inspect
import warnings
from collections.abc import Callable, Hashable, Mapping
from numbers import Integral, Number
from typing import Any, ClassVar, Iterable, Literal

import dask.array as da
import dask.dataframe.methods as methods
import numpy as np
import pandas as pd
import pyarrow as pa
from dask import compute, get_annotations
from dask.array import Array
from dask.base import DaskMethodsMixin, is_dask_collection, named_schedulers
from dask.core import flatten
from dask.dataframe._compat import PANDAS_GE_210, PANDAS_GE_220, PANDAS_VERSION
from dask.dataframe.accessor import CachedAccessor
from dask.dataframe.core import (
    _concat,
    _convert_to_numeric,
    _Frame,
    _repr_data_series,
    _sqrt_and_convert_to_timedelta,
    check_divisions,
    has_parallel_type,
    is_arraylike,
    is_dataframe_like,
    is_series_like,
    meta_warning,
    new_dd_object,
)
from dask.dataframe.dispatch import is_categorical_dtype, make_meta, meta_nonempty
from dask.dataframe.multi import warn_dtype_mismatch
from dask.dataframe.utils import (
    AttributeNotImplementedError,
    has_known_categories,
    index_summary,
    insert_meta_param_description,
    meta_frame_constructor,
    meta_series_constructor,
    pyarrow_strings_enabled,
)
from dask.delayed import delayed
from dask.utils import (
    IndexCallable,
    M,
    derived_from,
    get_default_shuffle_method,
    get_meta_library,
    key_split,
    maybe_pluralize,
    memory_repr,
    put_lines,
    random_state_data,
    typename,
)
from dask.widgets import get_template
from fsspec.utils import stringify_path
from packaging.version import parse as parse_version
from pandas import CategoricalDtype
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype
from pandas.api.types import is_scalar as pd_is_scalar
from pandas.api.types import is_timedelta64_dtype
from pyarrow import fs as pa_fs
from tlz import first

import dask_expr._backends  # noqa: F401
from dask_expr import _expr as expr
from dask_expr._backends import dataframe_creation_dispatch
from dask_expr._categorical import CategoricalAccessor, Categorize, GetCategories
from dask_expr._concat import Concat
from dask_expr._core import OptimizerStage
from dask_expr._datetime import DatetimeAccessor
from dask_expr._describe import DescribeNonNumeric, DescribeNumeric
from dask_expr._dispatch import get_collection_type
from dask_expr._expr import (
    BFill,
    Diff,
    Eval,
    FFill,
    FillnaCheck,
    Query,
    Shift,
    ToDatetime,
    ToNumeric,
    ToTimedelta,
    no_default,
)
from dask_expr._merge import JoinRecursive, Merge
from dask_expr._quantile import SeriesQuantile
from dask_expr._quantiles import RepartitionQuantiles
from dask_expr._reductions import (
    Corr,
    Cov,
    CustomReduction,
    DropDuplicates,
    IndexCount,
    IsMonotonicDecreasing,
    IsMonotonicIncreasing,
    Len,
    MemoryUsageFrame,
    MemoryUsageIndex,
    Moment,
    NLargest,
    NSmallest,
    PivotTable,
    Unique,
    ValueCounts,
)
from dask_expr._repartition import Repartition, RepartitionFreq
from dask_expr._shuffle import (
    RearrangeByColumn,
    SetIndex,
    SetIndexBlockwise,
    SortValues,
)
from dask_expr._str_accessor import StringAccessor
from dask_expr._util import (
    _BackendData,
    _convert_to_list,
    _get_shuffle_preferring_order,
    _is_any_real_numeric_dtype,
    _maybe_from_pandas,
    _raise_if_object_series,
    _tokenize_deterministic,
    _validate_axis,
    get_specified_shuffle,
    is_scalar,
)
from dask_expr.io import FromPandasDivisions, FromScalars

#
# Utilities to wrap Expr API
# (Helps limit boiler-plate code in collection APIs)
#


def _wrap_expr_api(*args, wrap_api=None, **kwargs):
    # Use Expr API, but convert to/from Expr objects
    assert wrap_api is not None
    result = wrap_api(
        *[arg.expr if isinstance(arg, FrameBase) else arg for arg in args],
        **kwargs,
    )
    if isinstance(result, expr.Expr):
        return new_collection(result)
    return result


def _wrap_expr_op(self, other, op=None):
    # Wrap expr operator
    assert op is not None
    if isinstance(other, FrameBase):
        other = other.expr
    elif isinstance(other, da.Array):
        other = from_dask_array(
            other, index=self.index.to_legacy_dataframe(), columns=self.columns
        )
        if self.ndim == 1 and len(self.columns):
            other = other[self.columns[0]]

    if (
        not isinstance(other, expr.Expr)
        and is_dataframe_like(other)
        or is_series_like(other)
    ):
        other = self._create_alignable_frame(other).expr

    if not isinstance(other, expr.Expr):
        return new_collection(getattr(self.expr, op)(other))
    elif expr.are_co_aligned(self.expr, other):
        return new_collection(getattr(self.expr, op)(other))
    else:
        return new_collection(expr.OpAlignPartitions(self, other, op))


def _wrap_expr_method_operator(name, class_):
    """
    Add method operators to Series or DataFrame like DataFrame.add.
    _wrap_expr_method_operator("add", DataFrame)
    """
    if class_ == DataFrame:

        def method(self, other, axis="columns", level=None, fill_value=None):
            if level is not None:
                raise NotImplementedError("level must be None")

            axis = _validate_axis(axis)

            if (
                is_dataframe_like(other)
                or is_series_like(other)
                and axis in (0, "index")
            ) and not is_dask_collection(other):
                other = self._create_alignable_frame(other)

            if axis in (1, "columns"):
                if isinstance(other, Series):
                    msg = f"Unable to {name} dd.Series with axis=1"
                    raise ValueError(msg)

            frame = self
            if isinstance(other, FrameBase) and not expr.are_co_aligned(
                self.expr, other.expr
            ):
                return new_collection(
                    expr.MethodOperatorAlign(
                        op=name,
                        frame=frame,
                        other=other,
                        axis=axis,
                        level=level,
                        fill_value=fill_value,
                    )
                )

            return new_collection(
                expr.MethodOperator(
                    name=name,
                    left=frame,
                    right=other,
                    axis=axis,
                    level=level,
                    fill_value=fill_value,
                )
            )

    elif class_ == Series:

        def method(self, other, level=None, fill_value=None, axis=0):
            if level is not None:
                raise NotImplementedError("level must be None")

            axis = _validate_axis(axis)

            if is_series_like(other) and not is_dask_collection(other):
                other = self._create_alignable_frame(other)

            frame = self
            if isinstance(other, FrameBase) and not expr.are_co_aligned(
                self.expr, other.expr
            ):
                return new_collection(
                    expr.MethodOperatorAlign(
                        op=name,
                        frame=frame,
                        other=other,
                        axis=axis,
                        level=level,
                        fill_value=fill_value,
                    )
                )

            return new_collection(
                expr.MethodOperator(
                    name=name,
                    left=frame,
                    right=other,
                    axis=axis,
                    fill_value=fill_value,
                    level=level,
                )
            )

    else:
        raise NotImplementedError(f"Cannot create method operator for {class_=}")

    method.__name__ = name
    return method


def _wrap_unary_expr_op(self, op=None):
    # Wrap expr operator
    assert op is not None
    return new_collection(getattr(self.expr, op)())


_WARN_ANNOTATIONS = True
#
# Collection classes
#


class FrameBase(DaskMethodsMixin):
    """Base class for Expr-backed Collections"""

    __dask_scheduler__ = staticmethod(
        named_schedulers.get("threads", named_schedulers["sync"])
    )
    __dask_optimize__ = staticmethod(lambda dsk, keys, **kwargs: dsk)

    def __init__(self, expr):
        global _WARN_ANNOTATIONS
        if _WARN_ANNOTATIONS and (annot := get_annotations()):
            _WARN_ANNOTATIONS = False
            warnings.warn(
                f"Dask annotations {annot} detected. Annotations will be ignored when using query-planning."
            )
        self._expr = expr

    @property
    def expr(self) -> expr.Expr:
        return self._expr

    @property
    def _meta(self):
        return self.expr._meta

    @functools.cached_property
    def _meta_nonempty(self):
        return meta_nonempty(self._meta)

    @property
    def divisions(self):
        """
        Tuple of ``npartitions + 1`` values, in ascending order, marking the
        lower/upper bounds of each partition's index. Divisions allow Dask
        to know which partition will contain a given value, significantly
        speeding up operations like `loc`, `merge`, and `groupby` by not
        having to search the full dataset.

        Example: for ``divisions = (0, 10, 50, 100)``, there are three partitions,
        where the index in each partition contains values [0, 10), [10, 50),
        and [50, 100], respectively. Dask therefore knows ``df.loc[45]``
        will be in the second partition.

        When every item in ``divisions`` is ``None``, the divisions are unknown.
        Most operations can still be performed, but some will be much slower,
        and a few may fail.

        It is not supported to set ``divisions`` directly. Instead, use ``set_index``,
        which sorts and splits the data as needed.
        See https://docs.dask.org/en/latest/dataframe-design.html#partitions.
        """
        return self.expr.divisions

    @property
    def npartitions(self):
        """Return number of partitions"""
        return self.expr.npartitions

    @property
    def dtypes(self):
        """Return data types"""
        return self.expr._meta.dtypes

    @property
    def size(self):
        """Size of the Series or DataFrame as a Delayed object.

        Examples
        --------
        >>> series.size  # doctest: +SKIP
        <dask_expr.expr.Scalar: expr=df.size(), dtype=int64>
        """
        return new_collection(self.expr.size)

    @property
    def columns(self):
        return self._meta.columns

    @columns.setter
    def columns(self, columns):
        if len(columns) != len(self.columns):
            # surface pandas error
            self._expr._meta.columns = columns
        self._expr = expr.ColumnsSetter(self, columns)

    def clear_divisions(self):
        """Forget division information.

        This is useful if the divisions are no longer meaningful.
        """
        return new_collection(expr.ClearDivisions(self))

    def __len__(self):
        return new_collection(Len(self)).compute()

    @property
    def nbytes(self):
        raise NotImplementedError("nbytes is not implemented on DataFrame")

    def __reduce__(self):
        return new_collection, (self._expr,)

    def __getitem__(self, other):
        if isinstance(other, FrameBase):
            return new_collection(self.expr.__getitem__(other.expr))
        elif isinstance(other, slice):
            from pandas.api.types import is_float_dtype

            is_integer_slice = any(
                isinstance(i, Integral) for i in (other.start, other.step, other.stop)
            )
            if (
                self.ndim == 2
                and is_integer_slice
                and not is_float_dtype(self.index.dtype)
            ):
                return self.iloc[other]
            else:
                return self.loc[other]
        if isinstance(other, np.ndarray) or is_series_like(other):
            other = list(other)
        elif isinstance(other, list):
            other = other.copy()
        return new_collection(self.expr.__getitem__(other))

    def __dask_tokenize__(self):
        return type(self).__name__, self._expr._name

    def __repr__(self):
        data = self._repr_data().to_string(max_rows=5)
        _str_fmt = """Dask {klass} Structure:
{data}
Dask Name: {name}, {n_expr}
Expr={expr}"""
        if not isinstance(self, Series) and not len(self.columns):
            data = data.partition("\n")[-1].replace("Index", "Divisions")
            _str_fmt = f"Empty {_str_fmt}"
        n_expr = len({e._name for e in self.expr.walk()})

        return _str_fmt.format(
            klass=self.__class__.__name__,
            data=data,
            name=key_split(self._name),
            n_expr=maybe_pluralize(n_expr, "expression"),
            expr=self.expr,
        )

    def __bool__(self):
        raise ValueError(
            f"The truth value of a {self.__class__.__name__} is ambiguous. "
            "Use a.any() or a.all()."
        )

    def __array__(self, dtype=None, **kwargs):
        return np.array(self.compute())

    def persist(self, fuse=True, **kwargs):
        out = self.optimize(fuse=fuse)
        return DaskMethodsMixin.persist(out, **kwargs)

    def compute(self, fuse=True, **kwargs):
        """Compute this DataFrame.

        This turns a lazy Dask DataFrame into an in-memory pandas DataFrame.
        The entire dataset must fit into memory before calling this operation.

        The optimizer runs over the DataFrame before triggering the computation.
        The optimizer injects a repartition operation that reduces the partition
        count to 1 to enable better optimization strategies.

        Parameters
        ----------
        fuse : bool, default True
            Whether to fuse the expression tree before computing. Fusing significantly
            reduces the number of tasks and improves performance. It shouldn't be
            disabled unless absolutely necessary.
        kwargs
            Extra keywords to forward to the base compute function.

        See Also
        --------
        dask.compute
        """
        out = self
        if not isinstance(out, Scalar):
            out = out.repartition(npartitions=1)
        out = out.optimize(fuse=fuse)
        return DaskMethodsMixin.compute(out, **kwargs)

    def analyze(self, filename: str | None = None, format: str | None = None) -> None:
        """Outputs statistics about every node in the expression.

        analyze optimizes the expression and triggers a computation. It records statistics
        like memory usage per partition to analyze how data flow through the graph.

        .. warning::
            analyze adds plugins to the scheduler and the workers that have a non-trivial
            cost. This method should not be used in production workflows.

        Parameters
        ----------
        filename: str, None
            File to store the graph representation.
        format: str, default is png
            File format for the graph representation.

        Returns
        -------
            None, but writes a graph representation of the expression enriched with
            statistics to disk.
        """
        out = self
        if not isinstance(out, Scalar):
            out = out.repartition(npartitions=1)
        return out.expr.analyze(filename=filename, format=format)

    def explain(self, stage: OptimizerStage = "fused", format: str | None = None):
        """Create a graph representation of the Expression.

        explain runs the optimizer and creates a graph of the optimized expression
        with graphviz. No computation is triggered.

        Parameters
        ----------
        stage: {"logical", "simplified-logical", "tuned-logical", "physical", "simplified-physical", "fused"}
            The optimizer stage that is returned. Default is "fused".

            - logical: outputs the expression as is
            - simplified-logical: simplifies the expression which includes predicate
              pushdown and column projection.
            - tuned-logical: applies additional optimizations like partition squashing
            - physical: outputs the physical expression; this expression can actually
              be computed
            - simplified-physical: runs another simplification after the physical
              plan is generated
            - fused: fuses the physical expression to reduce the nodes in thr graph.

            .. warning::
                The optimizer stages are subject to change.
        format: str, default None
            The format of the output. Default is "png".

        Returns
        -------
            None, but opens a new window with the graph visualization and outputs
            a file with the graph representation.
        """
        out = self
        if not isinstance(out, Scalar):
            out = out.repartition(npartitions=1)
        return out.expr.explain(stage, format)

    def pprint(self):
        """Outputs a string representation of the DataFrame.

        The expression is returned as is. Please run optimize manually if necessary.

        Returns
        -------
            None, the representation is put into stdout.
        """
        return self.expr.pprint()

    @property
    def dask(self):
        return self.__dask_graph__()

    def __dask_graph__(self):
        out = self.expr
        out = out.lower_completely()
        return out.__dask_graph__()

    def __dask_keys__(self):
        out = self.expr
        out = out.lower_completely()
        return out.__dask_keys__()

    def simplify(self):
        return new_collection(self.expr.simplify())

    def lower_once(self):
        return new_collection(self.expr.lower_once())

    def optimize(self, fuse: bool = True):
        """Optimizes the DataFrame.

        Runs the optimizer with all steps over the DataFrame and wraps the result in a
        new DataFrame collection. Only use this method if you want to analyze the
        optimized expression.

        Parameters
        ----------
        fuse: bool, default True
            Whether to fuse the expression tree after running the optimizer.
            It is often easier to look at the non-fused expression when analyzing
            the result.

        Returns
        -------
            The optimized Dask Dataframe
        """
        return new_collection(self.expr.optimize(fuse=fuse))

    @property
    def dask(self):
        return self.__dask_graph__()

    def __dask_postcompute__(self):
        state = new_collection(self.expr.lower_completely())
        if type(self) != type(state):
            return state.__dask_postcompute__()
        return _concat, ()

    def __dask_postpersist__(self):
        state = new_collection(self.expr.lower_completely())
        return from_graph, (
            state._meta,
            state.divisions,
            state.__dask_keys__(),
            key_split(state._name),
        )

    def __getattr__(self, key):
        try:
            # Prioritize `FrameBase` attributes
            return object.__getattribute__(self, key)
        except AttributeError as err:
            try:
                # Fall back to `expr` API
                # (Making sure to convert to/from Expr)
                val = getattr(self.expr, key)
                if callable(val):
                    return functools.partial(_wrap_expr_api, wrap_api=val)
                return val
            except AttributeError:
                # Raise original error
                raise err

    def visualize(self, tasks: bool = False, **kwargs):
        """Visualize the expression or task graph

        Parameters
        ----------
        tasks:
            Whether to visualize the task graph. By default
            the expression graph will be visualized instead.
        """
        if tasks:
            return super().visualize(**kwargs)
        return self.expr.visualize(**kwargs)

    @property
    def known_divisions(self):
        """Whether the divisions are known.

        This check can be expensive if the division calculation is expensive.
        DataFrame.set_index is a good example where the calculation needs an
        inspection of the data.
        """
        return self.expr.known_divisions

    @property
    def index(self):
        """Return dask Index instance"""
        return new_collection(self.expr.index)

    @index.setter
    def index(self, value):
        assert expr.are_co_aligned(
            self.expr, value.expr
        ), "value needs to be aligned with the index"
        _expr = expr.AssignIndex(self, value)
        self._expr = _expr

    def reset_index(self, drop: bool = False):
        """Reset the index to the default index.

        Note that unlike in ``pandas``, the reset index for a Dask DataFrame will
        not be monotonically increasing from 0. Instead, it will restart at 0
        for each partition (e.g. ``index1 = [0, ..., 10], index2 = [0, ...]``).
        This is due to the inability to statically know the full length of the
        index.

        For DataFrame with multi-level index, returns a new DataFrame with
        labeling information in the columns under the index names, defaulting
        to 'level_0', 'level_1', etc. if any are None. For a standard index,
        the index name will be used (if set), otherwise a default 'index' or
        'level_0' (if 'index' is already taken) will be used.

        Parameters
        ----------
        drop : boolean, default False
            Do not try to insert index into dataframe columns.
        """
        return new_collection(expr.ResetIndex(self, drop))

    def head(self, n: int = 5, npartitions=1, compute: bool = True):
        """First n rows of the dataset

        Parameters
        ----------
        n : int, optional
            The number of rows to return. Default is 5.
        npartitions : int, optional
            Elements are only taken from the first ``npartitions``, with a
            default of 1. If there are fewer than ``n`` rows in the first
            ``npartitions`` a warning will be raised and any found rows
            returned. Pass -1 to use all partitions.
        compute : bool, optional
            Whether to compute the result, default is True.
        """
        out = new_collection(expr.Head(self, n=n, npartitions=npartitions))
        if compute:
            out = out.compute()
        return out

    def tail(self, n: int = 5, compute: bool = True):
        """Last n rows of the dataset

        Caveat, the only checks the last n rows of the last partition.
        """
        out = new_collection(expr.Tail(self, n=n))
        if compute:
            out = out.compute()
        return out

    def copy(self, deep: bool = False):
        """Make a copy of the dataframe

        This is strictly a shallow copy of the underlying computational graph.
        It does not affect the underlying data

        Parameters
        ----------
        deep : boolean, default False
            The deep value must be `False` and it is declared as a parameter just for
            compatibility with third-party libraries like cuDF and pandas
        """
        if deep is not False:
            raise ValueError(
                "The `deep` value must be False. This is strictly a shallow copy "
                "of the underlying computational graph."
            )
        return new_collection(self.expr)

    @derived_from(pd.DataFrame)
    def isin(self, values):
        if isinstance(self, DataFrame):
            # DataFrame.isin does weird alignment stuff
            bad_types = (FrameBase, pd.Series, pd.DataFrame)
        else:
            bad_types = (FrameBase,)
        if isinstance(values, bad_types):
            if (
                isinstance(values, FrameBase)
                and values.ndim == 1
                and values.npartitions == 1
            ):
                # Can broadcast
                return new_collection(expr.Isin(self, values=values))
            raise NotImplementedError("Passing a %r to `isin`" % typename(type(values)))

        # We wrap values in a delayed for two reasons:
        # - avoid serializing data in every task
        # - avoid cost of traversal of large list in optimizations
        if isinstance(values, list):
            # Motivated by https://github.com/dask/dask/issues/9411.  This appears to be
            # caused by https://github.com/dask/distributed/issues/6368, and further
            # exacerbated by the fact that the list contains duplicates.  This is a patch until
            # we can create a better fix for Serialization.
            try:
                values = list(set(values))
            except TypeError:
                pass
            if not any(is_dask_collection(v) for v in values):
                try:
                    values = np.fromiter(values, dtype=object)
                except ValueError:
                    # Numpy 1.23 supports creating arrays of iterables, while lower
                    # version 1.21.x and 1.22.x do not
                    pass
        from dask_expr.io._delayed import _DelayedExpr

        return new_collection(
            expr.Isin(
                self,
                values=_DelayedExpr(
                    delayed(values, name="delayed-" + _tokenize_deterministic(values))
                ),
            )
        )

    def _partitions(self, index):
        # Used by `partitions` for partition-wise slicing

        # Convert index to list
        if isinstance(index, int):
            index = [index]
        index = np.arange(self.npartitions, dtype=object)[index].tolist()

        # Check that selection makes sense
        assert set(index).issubset(range(self.npartitions))

        # Return selected partitions
        return new_collection(expr.Partitions(self, index))

    @property
    def partitions(self):
        """Slice dataframe by partitions

        This allows partitionwise slicing of a Dask Dataframe.  You can perform normal
        Numpy-style slicing, but now rather than slice elements of the array you
        slice along partitions so, for example, ``df.partitions[:5]`` produces a new
        Dask Dataframe of the first five partitions. Valid indexers are integers, sequences
        of integers, slices, or boolean masks.

        Examples
        --------
        >>> df.partitions[0]  # doctest: +SKIP
        >>> df.partitions[:3]  # doctest: +SKIP
        >>> df.partitions[::10]  # doctest: +SKIP

        Returns
        -------
        A Dask DataFrame
        """
        return IndexCallable(self._partitions)

    def get_partition(self, n):
        """
        Get a dask DataFrame/Series representing the `nth` partition.

        Parameters
        ----------
        n : int
            The 0-indexed partition number to select.

        Returns
        -------
        Dask DataFrame or Series
            The same type as the original object.

        See Also
        --------
        DataFrame.partitions
        """
        if not 0 <= n < self.npartitions:
            msg = f"n must be 0 <= n < {self.npartitions}"
            raise ValueError(msg)
        return self.partitions[n]

    def shuffle(
        self,
        on: str | list | no_default = no_default,
        ignore_index: bool = False,
        npartitions: int | None = None,
        shuffle_method: str | None = None,
        on_index: bool = False,
        **options,
    ):
        """Rearrange DataFrame into new partitions

        Uses hashing of `on` to map rows to output partitions. After this
        operation, rows with the same value of `on` will be in the same
        partition.

        Parameters
        ----------
        on : str, list of str, or Series, Index, or DataFrame
            Column names to shuffle by.
        ignore_index : optional
            Whether to ignore the index. Default is ``False``.
        npartitions : optional
            Number of output partitions. The partition count will
            be preserved by default.
        shuffle_method : optional
            Desired shuffle method. Default chosen at optimization time.
        on_index : bool, default False
            Whether to shuffle on the index. Mutually exclusive with 'on'.
            Set this to ``True`` if 'on' is not provided.
        **options : optional
            Algorithm-specific options.

        Notes
        -----
        This does not preserve a meaningful index/partitioning scheme. This
        is not deterministic if done in parallel.

        Examples
        --------
        >>> df = df.shuffle(df.columns[0])  # doctest: +SKIP
        """
        if on is no_default and not on_index:
            raise TypeError(
                "Must shuffle on either columns or the index; currently shuffling on "
                "neither. Pass column(s) to 'on' or set 'on_index' to True."
            )
        elif on is not no_default and on_index:
            raise TypeError(
                "Cannot shuffle on both columns and the index. Do not pass column(s) "
                "to 'on' or set 'on_index' to False."
            )

        # Preserve partition count by default
        npartitions = npartitions or self.npartitions

        if isinstance(on, FrameBase):
            if not expr.are_co_aligned(self.expr, on.expr):
                raise TypeError(
                    "index must be aligned with the DataFrame to use as shuffle index."
                )
        elif pd.api.types.is_list_like(on) and not is_dask_collection(on):
            on = list(on)

        if (shuffle_method or get_default_shuffle_method()) == "p2p":
            from distributed.shuffle._arrow import check_dtype_support

            check_dtype_support(self._meta)

            if any(not isinstance(c, str) for c in self._meta.columns):
                unsupported = {
                    c: type(c) for c in self._meta.columns if not isinstance(c, str)
                }
                raise TypeError(
                    f"p2p requires all column names to be str, found: {unsupported}",
                )

        # Returned shuffled result
        return new_collection(
            RearrangeByColumn(
                self,
                on,
                npartitions,
                ignore_index,
                get_specified_shuffle(shuffle_method),
                options,
                index_shuffle=on_index,
            )
        )

    @derived_from(pd.DataFrame)
    def resample(self, rule, closed=None, label=None):
        from dask_expr._resample import Resampler

        return Resampler(self, rule, **{"closed": closed, "label": label})

    def rolling(self, window, **kwargs):
        """Provides rolling transformations.

        Parameters
        ----------
        window : int, str, offset
           Size of the moving window. This is the number of observations used
           for calculating the statistic. When not using a ``DatetimeIndex``,
           the window size must not be so large as to span more than one
           adjacent partition. If using an offset or offset alias like '5D',
           the data must have a ``DatetimeIndex``
        min_periods : int, default None
            Minimum number of observations in window required to have a value
            (otherwise result is NA).
        center : boolean, default False
            Set the labels at the center of the window.
        win_type : string, default None
            Provide a window type. The recognized window types are identical
            to pandas.
        axis : int, str, None, default 0
            This parameter is deprecated with ``pandas>=2.1``.

        Returns
        -------
        a Rolling object on which to call a method to compute a statistic
        """
        from dask_expr._rolling import Rolling

        return Rolling(self, window, **kwargs)

    @insert_meta_param_description(pad=12)
    def map_partitions(
        self,
        func,
        *args,
        meta=no_default,
        enforce_metadata=True,
        transform_divisions=True,
        clear_divisions=False,
        align_dataframes=False,
        parent_meta=None,
        **kwargs,
    ):
        """Apply a Python function to each partition

        Parameters
        ----------
        func : function
            Function applied to each partition.
        args, kwargs :
            Arguments and keywords to pass to the function. Arguments and
            keywords may contain ``FrameBase`` or regular python objects.
            DataFrame-like args (both dask and pandas) must have the same
            number of partitions as ``self`` or comprise a single partition.
            Key-word arguments, Single-partition arguments, and general
            python-object arguments will be broadcasted to all partitions.
        enforce_metadata : bool, default True
            Whether to enforce at runtime that the structure of the DataFrame
            produced by ``func`` actually matches the structure of ``meta``.
            This will rename and reorder columns for each partition, and will
            raise an error if this doesn't work, but it won't raise if dtypes
            don't match.
        transform_divisions : bool, default True
            Whether to apply the function onto the divisions and apply those
            transformed divisions to the output.
        clear_divisions : bool, default False
            Whether divisions should be cleared. If True, `transform_divisions`
            will be ignored.
        $META

        Examples
        --------
        Given a DataFrame, Series, or Index, such as:

        >>> import pandas as pd
        >>> import dask_expr as dd
        >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
        ...                    'y': [1., 2., 3., 4., 5.]})
        >>> ddf = dd.from_pandas(df, npartitions=2)

        One can use ``map_partitions`` to apply a function on each partition.
        Extra arguments and keywords can optionally be provided, and will be
        passed to the function after the partition.

        Here we apply a function with arguments and keywords to a DataFrame,
        resulting in a Series:

        >>> def myadd(df, a, b=1):
        ...     return df.x + df.y + a + b
        >>> res = ddf.map_partitions(myadd, 1, b=2)
        >>> res.dtype
        dtype('float64')

        Here we apply a function to a Series resulting in a Series:

        >>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure
        >>> res.dtype
        dtype('int64')

        By default, dask tries to infer the output metadata by running your
        provided function on some fake data. This works well in many cases, but
        can sometimes be expensive, or even fail. To avoid this, you can
        manually specify the output metadata with the ``meta`` keyword. This
        can be specified in many forms, for more information see
        ``dask.dataframe.utils.make_meta``.

        Here we specify the output is a Series with no name, and dtype
        ``float64``:

        >>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))

        Here we map a function that takes in a DataFrame, and returns a
        DataFrame with a new column:

        >>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
        >>> res.dtypes
        x      int64
        y    float64
        z    float64
        dtype: object

        As before, the output metadata can also be specified manually. This
        time we pass in a ``dict``, as the output is a DataFrame:

        >>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y),
        ...                          meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})

        In the case where the metadata doesn't change, you can also pass in
        the object itself directly:

        >>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)

        Also note that the index and divisions are assumed to remain unchanged.
        If the function you're mapping changes the index/divisions, you'll need
        to pass ``clear_divisions=True``.

        >>> ddf.map_partitions(func, clear_divisions=True)  # doctest: +SKIP

        Your map function gets information about where it is in the dataframe by
        accepting a special ``partition_info`` keyword argument.

        >>> def func(partition, partition_info=None):
        ...     pass

        This will receive the following information:

        >>> partition_info  # doctest: +SKIP
        {'number': 1, 'division': 3}

        For each argument and keyword arguments that are dask dataframes you will
        receive the number (n) which represents the nth partition of the dataframe
        and the division (the first index value in the partition). If divisions
        are not known (for instance if the index is not sorted) then you will get
        None as the division.
        """
        return map_partitions(
            func,
            self,
            *args,
            meta=meta,
            enforce_metadata=enforce_metadata,
            transform_divisions=transform_divisions,
            clear_divisions=clear_divisions,
            align_dataframes=align_dataframes,
            parent_meta=parent_meta,
            **kwargs,
        )

    @insert_meta_param_description(pad=12)
    def map_overlap(
        self,
        func,
        before,
        after,
        *args,
        meta=no_default,
        enforce_metadata=True,
        transform_divisions=True,
        clear_divisions=False,
        align_dataframes=False,
        **kwargs,
    ):
        """Apply a function to each partition, sharing rows with adjacent partitions.

        This can be useful for implementing windowing functions such as
        ``df.rolling(...).mean()`` or ``df.diff()``.

        Parameters
        ----------
        func : function
            Function applied to each partition.
        before : int, timedelta or string timedelta
            The rows to prepend to partition ``i`` from the end of
            partition ``i - 1``.
        after : int, timedelta or string timedelta
            The rows to append to partition ``i`` from the beginning
            of partition ``i + 1``.
        args, kwargs :
            Positional and keyword arguments to pass to the function.
            Positional arguments are computed on a per-partition basis, while
            keyword arguments are shared across all partitions. The partition
            itself will be the first positional argument, with all other
            arguments passed *after*. Arguments can be ``Scalar``, ``Delayed``,
            or regular Python objects. DataFrame-like args (both dask and
            pandas) will be repartitioned to align (if necessary) before
            applying the function; see ``align_dataframes`` to control this
            behavior.
        enforce_metadata : bool, default True
            Whether to enforce at runtime that the structure of the DataFrame
            produced by ``func`` actually matches the structure of ``meta``.
            This will rename and reorder columns for each partition,
            and will raise an error if this doesn't work,
            but it won't raise if dtypes don't match.
        transform_divisions : bool, default True
            Whether to apply the function onto the divisions and apply those
            transformed divisions to the output.
        align_dataframes : bool, default True
            Whether to repartition DataFrame- or Series-like args
            (both dask and pandas) so their divisions align before applying
            the function. This requires all inputs to have known divisions.
            Single-partition inputs will be split into multiple partitions.

            If False, all inputs must have either the same number of partitions
            or a single partition. Single-partition inputs will be broadcast to
            every partition of multi-partition inputs.
        $META

        Notes
        -----
        Given positive integers ``before`` and ``after``, and a function
        ``func``, ``map_overlap`` does the following:

        1. Prepend ``before`` rows to each partition ``i`` from the end of
           partition ``i - 1``. The first partition has no rows prepended.

        2. Append ``after`` rows to each partition ``i`` from the beginning of
           partition ``i + 1``. The last partition has no rows appended.

        3. Apply ``func`` to each partition, passing in any extra ``args`` and
           ``kwargs`` if provided.

        4. Trim ``before`` rows from the beginning of all but the first
           partition.

        5. Trim ``after`` rows from the end of all but the last partition.

        Examples
        --------
        Given a DataFrame, Series, or Index, such as:

        >>> import pandas as pd
        >>> import dask_expr as dd
        >>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11],
        ...                    'y': [1., 2., 3., 4., 5.]})
        >>> ddf = dd.from_pandas(df, npartitions=2)

        A rolling sum with a trailing moving window of size 2 can be computed by
        overlapping 2 rows before each partition, and then mapping calls to
        ``df.rolling(2).sum()``:

        >>> ddf.compute()
            x    y
        0   1  1.0
        1   2  2.0
        2   4  3.0
        3   7  4.0
        4  11  5.0
        >>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()
              x    y
        0   NaN  NaN
        1   3.0  3.0
        2   6.0  5.0
        3  11.0  7.0
        4  18.0  9.0

        The pandas ``diff`` method computes a discrete difference shifted by a
        number of periods (can be positive or negative). This can be
        implemented by mapping calls to ``df.diff`` to each partition after
        prepending/appending that many rows, depending on sign:

        >>> def diff(df, periods=1):
        ...     before, after = (periods, 0) if periods > 0 else (0, -periods)
        ...     return df.map_overlap(lambda df, periods=1: df.diff(periods),
        ...                           periods, 0, periods=periods)
        >>> diff(ddf, 1).compute()
             x    y
        0  NaN  NaN
        1  1.0  1.0
        2  2.0  1.0
        3  3.0  1.0
        4  4.0  1.0

        If you have a ``DatetimeIndex``, you can use a ``pd.Timedelta`` for time-
        based windows or any ``pd.Timedelta`` convertible string:

        >>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10))
        >>> dts = dd.from_pandas(ts, npartitions=2)
        >>> dts.map_overlap(lambda df: df.rolling('2D').sum(),
        ...                 pd.Timedelta('2D'), 0).compute()
        2017-01-01     0.0
        2017-01-02     1.0
        2017-01-03     3.0
        2017-01-04     5.0
        2017-01-05     7.0
        2017-01-06     9.0
        2017-01-07    11.0
        2017-01-08    13.0
        2017-01-09    15.0
        2017-01-10    17.0
        Freq: D, dtype: float64
        """
        return map_overlap(
            func,
            self,
            before,
            after,
            *args,
            meta=meta,
            enforce_metadata=enforce_metadata,
            transform_divisions=transform_divisions,
            clear_divisions=clear_divisions,
            align_dataframes=align_dataframes,
            **kwargs,
        )

    def repartition(
        self,
        divisions: tuple | None = None,
        npartitions: int | None = None,
        partition_size: str = None,
        freq=None,
        force: bool = False,
    ):
        """Repartition a collection

        Exactly one of `divisions`, `npartitions` or `partition_size` should be
        specified. A ``ValueError`` will be raised when that is not the case.

        Parameters
        ----------
        divisions : list, optional
            The "dividing lines" used to split the dataframe into partitions.
            For ``divisions=[0, 10, 50, 100]``, there would be three output partitions,
            where the new index contained [0, 10), [10, 50), and [50, 100), respectively.
            See https://docs.dask.org/en/latest/dataframe-design.html#partitions.
        npartitions : int, Callable, optional
            Approximate number of partitions of output. The number of
            partitions used may be slightly lower than npartitions depending
            on data distribution, but will never be higher.
            The Callable gets the number of partitions of the input as an argument
            and should return an int.
        partition_size : str, optional
            Max number of bytes of memory for each partition. Use numbers or strings
            like 5MB. If specified npartitions and divisions will be ignored. Note that
            the size reflects the number of bytes used as computed by
            pandas.DataFrame.memory_usage, which will not necessarily match the size
            when storing to disk.

            .. warning::

               This keyword argument triggers computation to determine
               the memory size of each partition, which may be expensive.

        force : bool, default False
            Allows the expansion of the existing divisions.
            If False then the new divisions' lower and upper bounds must be
            the same as the old divisions'.
        freq : str, pd.Timedelta
            A period on which to partition timeseries data like ``'7D'`` or
            ``'12h'`` or ``pd.Timedelta(hours=12)``.  Assumes a datetime index.

        Notes
        -----
        Exactly one of `divisions`, `npartitions`, `partition_size`, or `freq`
        should be specified. A ``ValueError`` will be raised when that is
        not the case.

        Also note that ``len(divisons)`` is equal to ``npartitions + 1``. This is because ``divisions``
        represents the upper and lower bounds of each partition. The first item is the
        lower bound of the first partition, the second item is the lower bound of the
        second partition and the upper bound of the first partition, and so on.
        The second-to-last item is the lower bound of the last partition, and the last
        (extra) item is the upper bound of the last partition.

        Examples
        --------
        >>> df = df.repartition(npartitions=10)  # doctest: +SKIP
        >>> df = df.repartition(divisions=[0, 5, 10, 20])  # doctest: +SKIP
        >>> df = df.repartition(freq='7d')  # doctest: +SKIP

        See Also
        --------
        DataFrame.memory_usage_per_partition
        pandas.DataFrame.memory_usage
        """

        if (
            sum(
                [
                    divisions is not None,
                    npartitions is not None,
                    partition_size is not None,
                    freq is not None,
                ]
            )
            != 1
        ):
            raise ValueError(
                "Please provide exactly one of the ``npartitions=`` or "
                "``divisions=`` keyword arguments."
            )
        if divisions is not None:
            check_divisions(divisions)
        if freq is not None:
            if not isinstance(self.divisions[0], pd.Timestamp):
                raise TypeError("Can only repartition on frequency for timeseries")
            return new_collection(RepartitionFreq(self, freq))
        else:
            return new_collection(
                Repartition(self, npartitions, divisions, force, partition_size, freq)
            )

    def to_dask_dataframe(self, *args, **kwargs) -> _Frame:
        """Convert to a legacy dask-dataframe collection

        WARNING: This API is deprecated. Please use `to_legacy_dataframe`.

        Parameters
        ----------
        optimize
            Whether to optimize the underlying `Expr` object before conversion.
        **optimize_kwargs
            Key-word arguments to pass through to `optimize`.
        """
        warnings.warn(
            "`to_dask_dataframe` is deprecated, please use `to_legacy_dataframe`.",
            FutureWarning,
        )
        return self.to_legacy_dataframe(*args, **kwargs)

    def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame:
        """Convert to a legacy dask-dataframe collection

        Parameters
        ----------
        optimize
            Whether to optimize the underlying `Expr` object before conversion.
        **optimize_kwargs
            Key-word arguments to pass through to `optimize`.
        """
        df = self.optimize(**optimize_kwargs) if optimize else self
        return new_dd_object(df.dask, df._name, df._meta, df.divisions)

    def to_dask_array(
        self, lengths=None, meta=None, optimize: bool = True, **optimize_kwargs
    ) -> Array:
        """Convert a dask DataFrame to a dask array.

        Parameters
        ----------
        lengths : bool or Sequence of ints, optional
            How to determine the chunks sizes for the output array.
            By default, the output array will have unknown chunk lengths
            along the first axis, which can cause some later operations
            to fail.

            * True : immediately compute the length of each partition
            * Sequence : a sequence of integers to use for the chunk sizes
              on the first axis. These values are *not* validated for
              correctness, beyond ensuring that the number of items
              matches the number of partitions.
        meta : object, optional
            An optional `meta` parameter can be passed for dask to override the
            default metadata on the underlying dask array.
        optimize : bool
            Whether to optimize the expression before converting to an Array.

        Returns
        -------
        A Dask Array
        """
        return self.to_legacy_dataframe(optimize, **optimize_kwargs).to_dask_array(
            lengths=lengths, meta=meta
        )

    @property
    def values(self):
        """Return a dask.array of the values of this dataframe

        Warning: This creates a dask.array without precise shape information.
        Operations that depend on shape information, like slicing or reshaping,
        will not work.
        """
        return self.to_dask_array()

    def __divmod__(self, other):
        result = self.expr.__divmod__(other)
        return new_collection(result[0]), new_collection(result[1])

    def __rdivmod__(self, other):
        result = self.expr.__rdivmod__(other)
        return new_collection(result[0]), new_collection(result[1])

    def __abs__(self):
        return self.abs()

    @derived_from(pd.DataFrame)
    def sum(
        self,
        axis=0,
        skipna=True,
        numeric_only=False,
        min_count=0,
        split_every=False,
        **kwargs,
    ):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.sum,
                skipna=skipna,
                numeric_only=numeric_only,
                axis=axis,
                min_count=min_count,
            )

        result = new_collection(self.expr.sum(skipna, numeric_only, split_every, axis))
        return self._apply_min_count(result, min_count)

    def _apply_min_count(self, result, min_count):
        if min_count:
            cond = self.notnull().sum() >= min_count
            cond_meta = cond._meta
            if not is_series_like(cond_meta):
                result = result.to_series()
                cond = cond.to_series()

            result = result.where(cond, other=np.nan)
            if not is_series_like(cond_meta):
                return result.min()
            else:
                return result
        else:
            return result

    @derived_from(pd.DataFrame)
    def prod(
        self,
        axis=0,
        skipna=True,
        numeric_only=False,
        min_count=0,
        split_every=False,
        **kwargs,
    ):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.prod,
                skipna=skipna,
                numeric_only=numeric_only,
                axis=axis,
                min_count=min_count,
            )
        result = new_collection(self.expr.prod(skipna, numeric_only, split_every, axis))
        return self._apply_min_count(result, min_count)

    product = prod

    @derived_from(pd.DataFrame)
    def var(
        self,
        axis=0,
        skipna=True,
        ddof=1,
        numeric_only=False,
        split_every=False,
        **kwargs,
    ):
        _raise_if_object_series(self, "var")
        axis = self._validate_axis(axis)
        self._meta.var(axis=axis, skipna=skipna, numeric_only=numeric_only)
        frame = self
        if is_dataframe_like(self._meta) and numeric_only:
            frame = frame[list(self._meta.var(numeric_only=True).index)]
        return new_collection(
            frame.expr.var(axis, skipna, ddof, numeric_only, split_every=split_every)
        )

    @derived_from(pd.DataFrame)
    def std(
        self,
        axis=0,
        skipna=True,
        ddof=1,
        numeric_only=False,
        split_every=False,
        **kwargs,
    ):
        _raise_if_object_series(self, "std")
        axis = self._validate_axis(axis)
        numeric_dd = self
        meta = meta_nonempty(self._meta).std(
            axis=axis, skipna=skipna, ddof=ddof, numeric_only=numeric_only
        )
        needs_time_conversion, time_cols = False, None
        if is_dataframe_like(self._meta):
            if axis == 0:
                numeric_dd = numeric_dd[list(meta.index)]
            else:
                numeric_dd = numeric_dd.copy()

            if numeric_only is True:
                _meta = numeric_dd._meta.select_dtypes(include=[np.number])
            else:
                _meta = numeric_dd._meta
            time_cols = _meta.select_dtypes(include=["datetime", "timedelta"]).columns
            if len(time_cols) > 0:
                if axis == 1 and len(time_cols) != len(self.columns):
                    numeric_dd = from_pandas(
                        meta_frame_constructor(self)(
                            {"_": meta_series_constructor(self)([np.nan])},
                            index=self.index,
                        ),
                        npartitions=self.npartitions,
                    )
                else:
                    needs_time_conversion = True
                    for col in time_cols:
                        numeric_dd[col] = _convert_to_numeric(numeric_dd[col], skipna)
        else:
            needs_time_conversion = is_datetime64_any_dtype(self._meta)
            if needs_time_conversion:
                numeric_dd = _convert_to_numeric(self, skipna)

        if axis == 1:
            return numeric_dd.map_partitions(
                M.std if not needs_time_conversion else _sqrt_and_convert_to_timedelta,
                meta=meta,
                axis=axis,
                skipna=skipna,
                ddof=ddof,
                enforce_metadata=False,
                numeric_only=numeric_only,
            )

        result = numeric_dd.var(
            skipna=skipna, ddof=ddof, numeric_only=numeric_only, split_every=split_every
        )

        if needs_time_conversion:
            sqrt_func_kwargs = {
                "is_df_like": is_dataframe_like(self._meta),
                "time_cols": time_cols,
                "axis": axis,
                "dtype": getattr(meta, "dtype", None),
            }
            sqrt_func = _sqrt_and_convert_to_timedelta
        else:
            sqrt_func_kwargs = {}
            sqrt_func = np.sqrt

        result = result.map_partitions(
            sqrt_func,
            meta=meta,
            enforce_metadata=False,
            parent_meta=self._meta,
            **sqrt_func_kwargs,
        )
        return result

    def enforce_runtime_divisions(self):
        """Enforce the current divisions at runtime.

        Injects a layer into the Task Graph that checks that the current divisions
        match the expected divisions at runtime.
        """
        if not self.known_divisions:
            raise ValueError("No known divisions to enforce!")
        return new_collection(expr.EnforceRuntimeDivisions(self))

    @derived_from(pd.DataFrame)
    def skew(
        self,
        axis=0,
        bias=True,
        nan_policy="propagate",
        numeric_only=False,
    ):
        """
        .. note::

           This implementation follows the dask.array.stats implementation
           of skewness and calculates skewness without taking into account
           a bias term for finite sample size, which corresponds to the
           default settings of the scipy.stats skewness calculation. However,
           Pandas corrects for this, so the values differ by a factor of
           (n * (n - 1)) ** 0.5 / (n - 2), where n is the number of samples.

           Further, this method currently does not support filtering out NaN
           values, which is again a difference to Pandas.
        """
        _raise_if_object_series(self, "skew")
        if axis is None:
            raise ValueError("`axis=None` isn't currently supported for `skew`")
        axis = self._validate_axis(axis)

        if is_dataframe_like(self):
            # Let pandas raise errors if necessary
            meta = self._meta_nonempty.skew(axis=axis, numeric_only=numeric_only)
        else:
            meta = self._meta_nonempty.skew()

        if axis == 1:
            return self.map_partitions(
                M.skew,
                meta=meta,
                axis=axis,
                enforce_metadata=False,
            )

        if not bias:
            raise NotImplementedError("bias=False is not implemented.")
        if nan_policy != "propagate":
            raise NotImplementedError(
                "`nan_policy` other than 'propagate' have not been implemented."
            )

        frame = self
        if frame.ndim > 1:
            frame = frame.select_dtypes(
                include=["number", "bool"], exclude=[np.timedelta64]
            )
        m2 = new_collection(Moment(frame, order=2))
        m3 = new_collection(Moment(frame, order=3))
        result = m3 / m2**1.5
        if result.ndim == 1:
            result = result.fillna(0.0)
        return result

    @derived_from(pd.DataFrame)
    def kurtosis(
        self,
        axis=0,
        fisher=True,
        bias=True,
        nan_policy="propagate",
        numeric_only=False,
    ):
        """
        .. note::

           This implementation follows the dask.array.stats implementation
           of kurtosis and calculates kurtosis without taking into account
           a bias term for finite sample size, which corresponds to the
           default settings of the scipy.stats kurtosis calculation. This differs
           from pandas.

           Further, this method currently does not support filtering out NaN
           values, which is again a difference to Pandas.
        """
        _raise_if_object_series(self, "kurtosis")
        if axis is None:
            raise ValueError("`axis=None` isn't currently supported for `skew`")
        axis = self._validate_axis(axis)

        if is_dataframe_like(self):
            # Let pandas raise errors if necessary
            meta = self._meta_nonempty.kurtosis(axis=axis, numeric_only=numeric_only)
        else:
            meta = self._meta_nonempty.kurtosis()

        if axis == 1:
            return map_partitions(
                M.kurtosis,
                self,
                meta=meta,
                token=self._token_prefix + "kurtosis",
                axis=axis,
                enforce_metadata=False,
            )

        if not bias:
            raise NotImplementedError("bias=False is not implemented.")
        if nan_policy != "propagate":
            raise NotImplementedError(
                "`nan_policy` other than 'propagate' have not been implemented."
            )

        frame = self
        if frame.ndim > 1:
            frame = frame.select_dtypes(
                include=["number", "bool"], exclude=[np.timedelta64]
            )
        m2 = new_collection(Moment(frame, order=2))
        m4 = new_collection(Moment(frame, order=4))
        result = m4 / m2**2.0
        if result.ndim == 1:
            result = result.fillna(0.0)
        if fisher:
            return result - 3
        else:
            return result

    kurt = kurtosis

    @derived_from(pd.DataFrame)
    def sem(
        self, axis=None, skipna=True, ddof=1, split_every=False, numeric_only=False
    ):
        axis = self._validate_axis(axis)
        _raise_if_object_series(self, "sem")
        if axis == 1:
            return self.map_partitions(
                M.sem,
                axis=axis,
                skipna=skipna,
                ddof=ddof,
                numeric_only=numeric_only,
            )
        meta = self._meta.sem(skipna=skipna, ddof=ddof, numeric_only=numeric_only)
        frame = self
        if self.ndim == 2:
            frame = self[list(meta.index)]

        v = frame.var(skipna=skipna, ddof=ddof, split_every=split_every)
        n = frame.count(split_every=split_every)
        result = map_partitions(
            np.sqrt,
            v / n,
            meta=meta,
            enforce_metadata=False,
            parent_meta=self._meta,
        )
        return result

    def _prepare_cov_corr(self, min_periods, numeric_only):
        if min_periods is None:
            min_periods = 2
        elif min_periods < 2:
            raise ValueError("min_periods must be >= 2")

        self._meta.cov(numeric_only=numeric_only, min_periods=min_periods)

        frame = self
        if numeric_only:
            numerics = self._meta._get_numeric_data()
            if len(numerics.columns) != len(self.columns):
                frame = frame[list(numerics.columns)]
        return frame, min_periods

    def _cov(
        self, min_periods=None, numeric_only=False, split_every=False, scalar=False
    ):
        frame, min_periods = self._prepare_cov_corr(min_periods, numeric_only)
        return new_collection(Cov(frame, min_periods, split_every, scalar))

    def _corr(
        self,
        method="pearson",
        min_periods=None,
        numeric_only=False,
        split_every=False,
        scalar=False,
    ):
        if method != "pearson":
            raise NotImplementedError("Only Pearson correlation has been implemented")
        frame, min_periods = self._prepare_cov_corr(min_periods, numeric_only)
        return new_collection(Corr(frame, min_periods, split_every, scalar))

    @derived_from(pd.DataFrame)
    def mean(
        self, axis=0, skipna=True, numeric_only=False, split_every=False, **kwargs
    ):
        _raise_if_object_series(self, "mean")
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.mean, skipna=skipna, numeric_only=numeric_only, axis=axis
            )
        return new_collection(
            self.expr.mean(skipna, numeric_only, split_every=split_every, axis=axis)
        )

    @derived_from(pd.DataFrame)
    def max(self, axis=0, skipna=True, numeric_only=False, split_every=False, **kwargs):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.max, skipna=skipna, numeric_only=numeric_only, axis=axis
            )
        return new_collection(self.expr.max(skipna, numeric_only, split_every, axis))

    @derived_from(pd.DataFrame)
    def any(self, axis=0, skipna=True, split_every=False, **kwargs):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(M.any, skipna=skipna, axis=axis)
        return new_collection(self.expr.any(skipna, split_every))

    @derived_from(pd.DataFrame)
    def all(self, axis=0, skipna=True, split_every=False, **kwargs):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(M.all, skipna=skipna, axis=axis)
        return new_collection(self.expr.all(skipna, split_every))

    @derived_from(pd.DataFrame)
    def idxmin(self, axis=0, skipna=True, numeric_only=False, split_every=False):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.idxmin, skipna=skipna, numeric_only=numeric_only, axis=axis
            )
        return new_collection(self.expr.idxmin(skipna, numeric_only, split_every))

    @derived_from(pd.DataFrame)
    def idxmax(self, axis=0, skipna=True, numeric_only=False, split_every=False):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.idxmax, skipna=skipna, numeric_only=numeric_only, axis=axis
            )
        return new_collection(self.expr.idxmax(skipna, numeric_only, split_every))

    @derived_from(pd.DataFrame)
    def min(self, axis=0, skipna=True, numeric_only=False, split_every=False, **kwargs):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(
                M.min, skipna=skipna, numeric_only=numeric_only, axis=axis
            )
        return new_collection(self.expr.min(skipna, numeric_only, split_every, axis))

    @derived_from(pd.DataFrame)
    def count(self, axis=0, numeric_only=False, split_every=False):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(M.count, numeric_only=numeric_only, axis=axis)
        return new_collection(self.expr.count(numeric_only, split_every))

    @derived_from(pd.DataFrame)
    def abs(self):
        # Raise pandas errors
        _raise_if_object_series(self, "abs")
        meta_nonempty(self._meta).abs()
        return new_collection(self.expr.abs())

    @derived_from(pd.DataFrame)
    def astype(self, dtypes):
        return new_collection(self.expr.astype(dtypes))

    @derived_from(pd.DataFrame)
    def combine_first(self, other):
        other = self._create_alignable_frame(other, "outer").expr
        return new_collection(self.expr.combine_first(other))

    @derived_from(pd.DataFrame)
    def to_timestamp(self, freq=None, how="start"):
        return new_collection(self.expr.to_timestamp(freq, how))

    @derived_from(pd.DataFrame)
    def isna(self):
        return new_collection(self.expr.isna())

    def random_split(self, frac, random_state=None, shuffle=False):
        """Pseudorandomly split dataframe into different pieces row-wise

        Parameters
        ----------
        frac : list
            List of floats that should sum to one.
        random_state : int or np.random.RandomState
            If int or None create a new RandomState with this as the seed.
            Otherwise draw from the passed RandomState.
        shuffle : bool, default False
            If set to True, the dataframe is shuffled (within partition)
            before the split.

        Examples
        --------

        50/50 split

        >>> a, b = df.random_split([0.5, 0.5])  # doctest: +SKIP

        80/10/10 split, consistent random_state

        >>> a, b, c = df.random_split([0.8, 0.1, 0.1], random_state=123)  # doctest: +SKIP

        See Also
        --------
        dask.DataFrame.sample
        """
        if not np.allclose(sum(frac), 1):
            raise ValueError("frac should sum to 1")
        frame = expr.Split(self, frac, random_state, shuffle)

        out = []
        for i in range(len(frac)):
            out.append(new_collection(expr.SplitTake(frame, i, self.ndim)))
        return out

    def isnull(self):
        return new_collection(self.expr.isnull())

    @derived_from(pd.DataFrame)
    def round(self, decimals=0):
        return new_collection(self.expr.round(decimals))

    @derived_from(pd.DataFrame)
    def where(self, cond, other=np.nan):
        cond = self._create_alignable_frame(cond)
        other = self._create_alignable_frame(other)
        cond = cond.expr if isinstance(cond, FrameBase) else cond
        other = other.expr if isinstance(other, FrameBase) else other
        return new_collection(self.expr.where(cond, other))

    @derived_from(pd.DataFrame)
    def mask(self, cond, other=np.nan):
        cond = self._create_alignable_frame(cond)
        other = self._create_alignable_frame(other)
        cond = cond.expr if isinstance(cond, FrameBase) else cond
        other = other.expr if isinstance(other, FrameBase) else other
        return new_collection(self.expr.mask(cond, other))

    @derived_from(pd.DataFrame)
    def replace(self, to_replace=None, value=no_default, regex=False):
        return new_collection(self.expr.replace(to_replace, value, regex))

    @derived_from(pd.DataFrame)
    def ffill(self, axis=0, limit=None):
        axis = _validate_axis(axis)
        if axis == 1:
            return self.map_partitions(M.ffill, axis=axis, limit=limit)
        frame = self
        if limit is None:
            frame = FillnaCheck(self, "ffill", lambda x: 0)
        return new_collection(FFill(frame, limit))

    @derived_from(pd.DataFrame)
    def bfill(self, axis=0, limit=None):
        axis = _validate_axis(axis)
        if axis == 1:
            return self.map_partitions(M.bfill, axis=axis, limit=limit)
        frame = self
        if limit is None:
            frame = FillnaCheck(self, "bfill", lambda x: x.npartitions - 1)
        return new_collection(BFill(frame, limit))

    @derived_from(pd.DataFrame)
    def fillna(self, value=None, axis=None):
        axis = self._validate_axis(axis)
        if axis == 1:
            return self.map_partitions(M.fillna, value, axis=axis)
        if isinstance(value, FrameBase):
            value = value.expr
        return new_collection(self.expr.fillna(value))

    @derived_from(pd.DataFrame)
    def shift(self, periods=1, freq=None, axis=0):
        if not isinstance(periods, Integral):
            raise TypeError("periods must be an integer")

        axis = _validate_axis(axis)
        if axis == 0:
            return new_collection(Shift(self, periods, freq))

        return self.map_partitions(
            func=Shift.func,
            enforce_metadata=False,
            transform_divisions=False,
            periods=periods,
            axis=axis,
            freq=freq,
        )

    @derived_from(pd.DataFrame)
    def diff(self, periods=1, axis=0):
        """
        .. note::

           Pandas currently uses an ``object``-dtype column to represent
           boolean data with missing values. This can cause issues for
           boolean-specific operations, like ``|``. To enable boolean-
           specific operations, at the cost of metadata that doesn't match
           pandas, use ``.astype(bool)`` after the ``shift``.
        """
        axis = _validate_axis(axis)
        if axis == 0:
            return new_collection(Diff(self, periods))
        return self.map_partitions(
            func=Diff.func,
            enforce_metadata=False,
            transform_divisions=False,
            clear_divisions=False,
            periods=periods,
            axis=axis,
        )

    @derived_from(pd.DataFrame)
    def rename_axis(
        self, mapper=no_default, index=no_default, columns=no_default, axis=0
    ):
        return new_collection(self.expr.rename_axis(mapper, index, columns, axis))

    def _create_alignable_frame(self, other, join="outer"):
        if not is_dask_collection(other) and (
            is_series_like(other) or is_dataframe_like(other)
        ):
            if join in ("inner", "left"):
                npartitions = 1
            else:
                # We have to trigger alignment, otherwise pandas will add
                # the same values to every partition
                npartitions = 2
            other = from_pandas(other, npartitions=npartitions)
        return other

    @derived_from(pd.DataFrame)
    def align(self, other, join="outer", axis=None, fill_value=None):
        other = self._create_alignable_frame(other, join)
        return self.expr.align(other.expr, join, axis, fill_value)

    def nunique_approx(self, split_every=None):
        """Approximate number of unique rows.

        This method uses the HyperLogLog algorithm for cardinality
        estimation to compute the approximate number of unique rows.
        The approximate error is 0.406%.

        Parameters
        ----------
        split_every : int, optional
            Group partitions into groups of this size while performing a
            tree-reduction. If set to False, no tree-reduction will be used.
            Default is 8.

        Returns
        -------
        a float representing the approximate number of elements
        """
        return new_collection(self.expr.nunique_approx(split_every=split_every))

    @derived_from(pd.DataFrame)
    def cumsum(self, axis=0, skipna=True, **kwargs):
        if axis == 1:
            return self.map_partitions(M.cumsum, axis=axis, skipna=skipna)
        return new_collection(self.expr.cumsum(skipna=skipna))

    @derived_from(pd.DataFrame)
    def cumprod(self, axis=0, skipna=True, **kwargs):
        if axis == 1:
            return self.map_partitions(M.cumprod, axis=axis, skipna=skipna)
        return new_collection(self.expr.cumprod(skipna=skipna))

    @derived_from(pd.DataFrame)
    def cummax(self, axis=0, skipna=True):
        if axis == 1:
            return self.map_partitions(M.cummax, axis=axis, skipna=skipna)
        return new_collection(self.expr.cummax(skipna=skipna))

    @derived_from(pd.DataFrame)
    def cummin(self, axis=0, skipna=True):
        if axis == 1:
            return self.map_partitions(M.cummin, axis=axis, skipna=skipna)
        return new_collection(self.expr.cummin(skipna=skipna))

    def reduction(
        self,
        chunk,
        aggregate=None,
        combine=None,
        meta=no_default,
        token=None,
        split_every=None,
        chunk_kwargs=None,
        aggregate_kwargs=None,
        combine_kwargs=None,
        **kwargs,
    ):
        """Generic row-wise reductions.

        Parameters
        ----------
        chunk : callable
            Function to operate on each partition. Should return a
            ``pandas.DataFrame``, ``pandas.Series``, or a scalar.
        aggregate : callable, optional
            Function to operate on the concatenated result of ``chunk``. If not
            specified, defaults to ``chunk``. Used to do the final aggregation
            in a tree reduction.

            The input to ``aggregate`` depends on the output of ``chunk``.
            If the output of ``chunk`` is a:

            - scalar: Input is a Series, with one row per partition.
            - Series: Input is a DataFrame, with one row per partition. Columns
              are the rows in the output series.
            - DataFrame: Input is a DataFrame, with one row per partition.
              Columns are the columns in the output dataframes.

            Should return a ``pandas.DataFrame``, ``pandas.Series``, or a
            scalar.
        combine : callable, optional
            Function to operate on intermediate concatenated results of
            ``chunk`` in a tree-reduction. If not provided, defaults to
            ``aggregate``. The input/output requirements should match that of
            ``aggregate`` described above.
        $META
        token : str, optional
            The name to use for the output keys.
        split_every : int, optional
            Group partitions into groups of this size while performing a
            tree-reduction. If set to False, no tree-reduction will be used,
            and all intermediates will be concatenated and passed to
            ``aggregate``. Default is 8.
        chunk_kwargs : dict, optional
            Keyword arguments to pass on to ``chunk`` only.
        aggregate_kwargs : dict, optional
            Keyword arguments to pass on to ``aggregate`` only.
        combine_kwargs : dict, optional
            Keyword arguments to pass on to ``combine`` only.
        kwargs :
            All remaining keywords will be passed to ``chunk``, ``combine``,
            and ``aggregate``.

        Examples
        --------
        >>> import pandas as pd
        >>> import dask.dataframe as dd
        >>> df = pd.DataFrame({'x': range(50), 'y': range(50, 100)})
        >>> ddf = dd.from_pandas(df, npartitions=4)

        Count the number of rows in a DataFrame. To do this, count the number
        of rows in each partition, then sum the results:

        >>> res = ddf.reduction(lambda x: x.count(),
        ...                     aggregate=lambda x: x.sum())
        >>> res.compute()
        x    50
        y    50
        dtype: int64

        Count the number of rows in a Series with elements greater than or
        equal to a value (provided via a keyword).

        >>> def count_greater(x, value=0):
        ...     return (x >= value).sum()
        >>> res = ddf.x.reduction(count_greater, aggregate=lambda x: x.sum(),
        ...                       chunk_kwargs={'value': 25})
        >>> res.compute()
        25

        Aggregate both the sum and count of a Series at the same time:

        >>> def sum_and_count(x):
        ...     return pd.Series({'count': x.count(), 'sum': x.sum()},
        ...                      index=['count', 'sum'])
        >>> res = ddf.x.reduction(sum_and_count, aggregate=lambda x: x.sum())
        >>> res.compute()
        count      50
        sum      1225
        dtype: int64

        Doing the same, but for a DataFrame. Here ``chunk`` returns a
        DataFrame, meaning the input to ``aggregate`` is a DataFrame with an
        index with non-unique entries for both 'x' and 'y'. We groupby the
        index, and sum each group to get the final result.

        >>> def sum_and_count(x):
        ...     return pd.DataFrame({'count': x.count(), 'sum': x.sum()},
        ...                         columns=['count', 'sum'])
        >>> res = ddf.reduction(sum_and_count,
        ...                     aggregate=lambda x: x.groupby(level=0).sum())
        >>> res.compute()
           count   sum
        x     50  1225
        y     50  3725
        """
        if split_every is not None and split_every < 2 and split_every is not False:
            raise ValueError("split_every must be at least 2")

        if combine is None:
            if combine_kwargs:
                raise ValueError("`combine_kwargs` provided with no `combine`")

        chunk_kwargs = chunk_kwargs.copy() if chunk_kwargs else {}
        chunk_kwargs.update(kwargs)
        chunk_kwargs["func"] = chunk

        combine_kwargs = combine_kwargs.copy() if combine_kwargs else {}
        combine_kwargs.update(kwargs)
        combine_kwargs["func"] = combine or aggregate or chunk

        aggregate_kwargs = aggregate_kwargs.copy() if aggregate_kwargs else {}
        aggregate_kwargs.update(kwargs)
        aggregate_kwargs["func"] = aggregate or chunk

        return new_collection(
            CustomReduction(
                self,
                meta,
                chunk_kwargs,
                aggregate_kwargs,
                combine_kwargs,
                split_every,
                token,
            )
        )

    def memory_usage_per_partition(self, index: bool = True, deep: bool = False):
        """Return the memory usage of each partition

        Parameters
        ----------
        index : bool, default True
            Specifies whether to include the memory usage of the index in
            returned Series.
        deep : bool, default False
            If True, introspect the data deeply by interrogating
            ``object`` dtypes for system-level memory consumption, and include
            it in the returned values.

        Returns
        -------
        Series
            A Series whose index is the partition number and whose values
            are the memory usage of each partition in bytes.
        """
        return new_collection(self.expr.memory_usage_per_partition(index, deep))

    @property
    def loc(self):
        """Purely label-location based indexer for selection by label.

        >>> df.loc["b"]  # doctest: +SKIP
        >>> df.loc["b":"d"]  # doctest: +SKIP
        """
        from dask_expr._indexing import LocIndexer

        return LocIndexer(self)

    @derived_from(pd.DataFrame)
    def notnull(self):
        return new_collection(expr.NotNull(self))

    @derived_from(pd.DataFrame)
    def isnull(self):
        return ~self.notnull()

    def compute_current_divisions(self, col=None, set_divisions: bool = False):
        """Compute the current divisions of the DataFrame.

        This method triggers immediate computation. If you find yourself running this command
        repeatedly for the same dataframe, we recommend storing the result
        so you don't have to rerun it.

        If the column or index values overlap between partitions, raises ``ValueError``.
        To prevent this, make sure the data are sorted by the column or index.

        Parameters
        ----------
        col : string, optional
            Calculate the divisions for a non-index column by passing in the name of the column.
            If col is not specified, the index will be used to calculate divisions.
            In this case, if the divisions are already known, they will be returned
            immediately without computing.
        set_divisions : bool, default False
            Whether to set the computed divisions into the DataFrame. If False, the divisions
            of the DataFrame are unchanged.

        Examples
        --------
        >>> import dask
        >>> ddf = dask.datasets.timeseries(start="2021-01-01", end="2021-01-07", freq="1h").clear_divisions()
        >>> divisions = ddf.compute_current_divisions()
        >>> print(divisions)  # doctest: +NORMALIZE_WHITESPACE
        (Timestamp('2021-01-01 00:00:00'),
         Timestamp('2021-01-02 00:00:00'),
         Timestamp('2021-01-03 00:00:00'),
         Timestamp('2021-01-04 00:00:00'),
         Timestamp('2021-01-05 00:00:00'),
         Timestamp('2021-01-06 00:00:00'),
         Timestamp('2021-01-06 23:00:00'))

        >>> ddf.divisions = divisions
        >>> ddf.known_divisions
        True

        >>> ddf = ddf.reset_index().clear_divisions()
        >>> divisions = ddf.compute_current_divisions("timestamp")
        >>> print(divisions)  # doctest: +NORMALIZE_WHITESPACE
        (Timestamp('2021-01-01 00:00:00'),
         Timestamp('2021-01-02 00:00:00'),
         Timestamp('2021-01-03 00:00:00'),
         Timestamp('2021-01-04 00:00:00'),
         Timestamp('2021-01-05 00:00:00'),
         Timestamp('2021-01-06 00:00:00'),
         Timestamp('2021-01-06 23:00:00'))

        >>> ddf = ddf.set_index("timestamp", divisions=divisions, sorted=True)
        """
        if col is None and self.known_divisions:
            if set_divisions:
                return self
            return self.divisions

        if col is not None and set_divisions:
            raise NotImplementedError(
                "Can't set divisions of non-index, call set_index instead."
            )

        if col is not None:
            frame = self[col]
        else:
            frame = self.index

        mins, maxes, lens = _compute_partition_stats(frame, allow_overlap=set_divisions)
        divisions = tuple(mins) + (maxes[-1],)
        if not set_divisions:
            return divisions
        if len(mins) == len(self.divisions) - 1:
            if not any(mins[i] >= maxes[i - 1] for i in range(1, len(mins))):
                return new_collection(expr.SetDivisions(self, divisions))

        return new_collection(expr.ResolveOverlappingDivisions(self, mins, maxes, lens))

    @classmethod
    def from_dict(
        cls, data, *, npartitions=1, orient="columns", dtype=None, columns=None
    ):
        """
        Construct a Dask DataFrame from a Python Dictionary

        See Also
        --------
        dask.dataframe.from_dict
        """
        return from_dict(data, npartitions, orient, dtype=dtype, columns=columns)

    def to_json(self, filename, *args, **kwargs):
        """See dd.to_json docstring for more information"""
        from dask.dataframe.io import to_json

        return to_json(self, filename, *args, **kwargs)

    def to_sql(
        self,
        name: str,
        uri: str,
        schema=None,
        if_exists: str = "fail",
        index: bool = True,
        index_label=None,
        chunksize=None,
        dtype=None,
        method=None,
        compute=True,
        parallel=False,
        engine_kwargs=None,
    ):
        from dask_expr.io.sql import to_sql

        return to_sql(
            self,
            name,
            uri,
            schema=schema,
            if_exists=if_exists,
            index=index,
            index_label=index_label,
            chunksize=chunksize,
            dtype=dtype,
            method=method,
            compute=compute,
            parallel=parallel,
            engine_kwargs=engine_kwargs,
        )

    def to_orc(self, path, *args, **kwargs):
        """See dd.to_orc docstring for more information"""
        from dask_expr.io.orc import to_orc

        return to_orc(self, path, *args, **kwargs)

    def to_csv(self, filename, **kwargs):
        """See dd.to_csv docstring for more information"""
        from dask_expr.io.csv import to_csv

        return to_csv(self, filename, **kwargs)

    def to_records(self, index=False, lengths=None):
        from dask_expr.io.records import to_records

        if lengths is True:
            lengths = tuple(self.map_partitions(len).compute())

        frame = self.to_legacy_dataframe()
        records = to_records(frame)

        chunks = frame._validate_chunks(records, lengths)
        records._chunks = (chunks[0],)

        return records

    def to_bag(self, index=False, format="tuple"):
        """Create a Dask Bag from a Series"""
        from dask_expr.io.bag import to_bag

        return to_bag(self, index, format=format)

    def to_hdf(self, path_or_buf, key, mode="a", append=False, **kwargs):
        """See dd.to_hdf docstring for more information"""
        from dask_expr.io.hdf import to_hdf

        return to_hdf(self, path_or_buf, key, mode, append, **kwargs)

    def to_delayed(self, optimize_graph=True):
        """Convert into a list of ``dask.delayed`` objects, one per partition.

        Parameters
        ----------
        optimize_graph : bool, optional
            If True [default], the graph is optimized before converting into
            ``dask.delayed`` objects.

        Examples
        --------
        >>> partitions = df.to_delayed()  # doctest: +SKIP

        See Also
        --------
        dask_expr.from_delayed
        """
        return self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph)

    def to_backend(self, backend: str | None = None, **kwargs):
        """Move to a new DataFrame backend

        Parameters
        ----------
        backend : str, Optional
            The name of the new backend to move to. The default
            is the current "dataframe.backend" configuration.

        Returns
        -------
        DataFrame, Series or Index
        """
        from dask_expr._backends import dataframe_creation_dispatch

        # Get desired backend
        backend = backend or dataframe_creation_dispatch.backend
        # Check that "backend" has a registered entrypoint
        backend_entrypoint = dataframe_creation_dispatch.dispatch(backend)
        # Call `DataFrameBackendEntrypoint.to_backend`
        return backend_entrypoint.to_backend(self, **kwargs)

    @derived_from(pd.Series)
    def dot(self, other, meta=no_default):
        if not isinstance(other, FrameBase):
            raise TypeError("The second operand must be a dask dataframe")

        if isinstance(other, DataFrame):
            s = self.map_partitions(M.dot, other, meta=meta)
            return s.groupby(by=s.index).apply(
                lambda x: x.sum(skipna=False), meta=s._meta_nonempty
            )

        return self.map_partitions(_dot_series, other, meta=meta).sum(skipna=False)

    @derived_from(pd.DataFrame)
    def pipe(self, func, *args, **kwargs):
        if isinstance(func, tuple):
            func, target = func
            if target in kwargs:
                raise ValueError(
                    "%s is both the pipe target and a keyword argument" % target
                )
            kwargs[target] = self
            return func(*args, **kwargs)
        else:
            return func(self, *args, **kwargs)

    def sample(self, n=None, frac=None, replace=False, random_state=None):
        """Random sample of items

        Parameters
        ----------
        n : int, optional
            Number of items to return is not supported by dask. Use frac
            instead.
        frac : float, optional
            Approximate fraction of items to return. This sampling fraction is
            applied to all partitions equally. Note that this is an
            **approximate fraction**. You should not expect exactly ``len(df) * frac``
            items to be returned, as the exact number of elements selected will
            depend on how your data is partitioned (but should be pretty close
            in practice).
        replace : boolean, optional
            Sample with or without replacement. Default = False.
        random_state : int or ``np.random.RandomState``
            If an int, we create a new RandomState with this as the seed;
            Otherwise we draw from the passed RandomState.

        See Also
        --------
        DataFrame.random_split
        pandas.DataFrame.sample
        """
        if n is not None:
            msg = (
                "sample does not support the number of sampled items "
                "parameter, 'n'. Please use the 'frac' parameter instead."
            )
            if isinstance(n, Number) and 0 <= n <= 1:
                warnings.warn(msg)
                frac = n
            else:
                raise ValueError(msg)

        if frac is None:
            raise ValueError("frac must not be None")

        if random_state is None:
            random_state = np.random.RandomState()

        state_data = random_state_data(self.npartitions, random_state)
        return new_collection(
            expr.Sample(self, state_data=state_data, frac=frac, replace=replace)
        )

    def _repr_data(self):
        raise NotImplementedError

    @property
    def _repr_divisions(self):
        name = f"npartitions={self.npartitions}"
        if self.known_divisions:
            divisions = pd.Index(self.divisions, name=name)
        else:
            # avoid to be converted to NaN
            divisions = pd.Index([""] * (self.npartitions + 1), name=name)
        return divisions


def _dot_series(*args, **kwargs):
    # .sum() is invoked on each partition before being applied to all
    # partitions. The return type is expected to be a series, not a numpy object
    return meta_series_constructor(args[0])(M.dot(*args, **kwargs))


# Add operator attributes
for op in [
    "__add__",
    "__radd__",
    "__sub__",
    "__rsub__",
    "__mul__",
    "__rmul__",
    "__mod__",
    "__rmod__",
    "__floordiv__",
    "__rfloordiv__",
    "__truediv__",
    "__rtruediv__",
    "__pow__",
    "__rpow__",
    "__lt__",
    "__rlt__",
    "__gt__",
    "__rgt__",
    "__le__",
    "__rle__",
    "__ge__",
    "__rge__",
    "__eq__",
    "__ne__",
    "__and__",
    "__rand__",
    "__or__",
    "__ror__",
    "__xor__",
    "__rxor__",
]:
    setattr(FrameBase, op, functools.partialmethod(_wrap_expr_op, op=op))

for op in [
    "__invert__",
    "__neg__",
    "__pos__",
]:
    setattr(FrameBase, op, functools.partialmethod(_wrap_unary_expr_op, op=op))


[docs]class DataFrame(FrameBase): """DataFrame-like Expr Collection""" _accessors: ClassVar[set[str]] = set() _partition_type = pd.DataFrame @property def shape(self): return self.size / max(len(self.columns), 1), len(self.columns) @property def ndim(self): """Return dimensionality""" return 2 @property def empty(self): # __getattr__ will be called after we raise this, so we'll raise it again from there raise AttributeNotImplementedError( "Checking whether a Dask DataFrame has any rows may be expensive. " "However, checking the number of columns is fast. " "Depending on which of these results you need, use either " "`len(df.index) == 0` or `len(df.columns) == 0`" )
[docs] @derived_from(pd.DataFrame) def items(self): for i, name in enumerate(self.columns): yield (name, self.iloc[:, i])
@property def axes(self): return [self.index, self.columns] def __contains__(self, key): return key in self._meta def __iter__(self): return iter(self._meta)
[docs] @derived_from(pd.DataFrame) def iterrows(self): frame = self.optimize() for i in range(self.npartitions): df = frame.get_partition(i).compute() yield from df.iterrows()
[docs] @derived_from(pd.DataFrame) def itertuples(self, index=True, name="Pandas"): frame = self.optimize() for i in range(self.npartitions): df = frame.get_partition(i).compute() yield from df.itertuples(index=index, name=name)
@property def _elemwise(self): return elemwise def __array_ufunc__(self, numpy_ufunc, method, *inputs, **kwargs): out = kwargs.get("out", ()) for x in inputs + out: # ufuncs work with 0-dimensional NumPy ndarrays # so we don't want to raise NotImplemented if isinstance(x, np.ndarray) and x.shape == (): continue elif not isinstance( x, (Number, Scalar, FrameBase, Array, pd.DataFrame, pd.Series, pd.Index) ): return NotImplemented if method == "__call__": if numpy_ufunc.signature is not None: return NotImplemented if numpy_ufunc.nout > 1: # ufuncs with multiple output values # are not yet supported for frames return NotImplemented else: return elemwise(numpy_ufunc, *inputs, **kwargs) else: # ufunc methods are not yet supported for frames return NotImplemented def __array_wrap__(self, array, context=None): if isinstance(context, tuple) and len(context) > 0: if isinstance(context[1][0], np.ndarray) and context[1][0].shape == (): index = None else: index = context[1][0].index else: try: import inspect method_name = f"`{inspect.stack()[3][3]}`" except IndexError: method_name = "This method" raise NotImplementedError( f"{method_name} is not implemented for `dask.dataframe.DataFrame`." ) return meta_frame_constructor(self)(array, index=index, columns=self.columns) def _ipython_key_completions_(self): return methods.tolist(self.columns) def _repr_html_(self): return self.to_html()
[docs] @derived_from(pd.DataFrame) def assign(self, **pairs): result = self args = [] for k, v in pairs.items(): v = _maybe_from_pandas([v])[0] if not isinstance(k, str): raise TypeError(f"Column name cannot be type {type(k)}") if callable(v): result = new_collection(expr.Assign(result, *args)) args = [] result = new_collection(expr.Assign(result, k, v(result))) continue elif isinstance(v, (Scalar, Series)): if isinstance(v, Series): if not expr.are_co_aligned(result.expr, v.expr): if len(args) > 0: result = expr.Assign(result, *args) args = [] result = new_collection(expr.AssignAlign(result, k, v.expr)) continue elif not isinstance(v, FrameBase) and isinstance(v, Hashable): pass elif isinstance(v, Array): if len(v.shape) > 1: raise ValueError("Array assignment only supports 1-D arrays") if v.npartitions != result.npartitions: raise ValueError( "Number of partitions do not match " f"({v.npartitions} != {result.npartitions})" ) v = from_dask_array( v, index=result.index.to_legacy_dataframe(), meta=result._meta ) else: raise TypeError(f"Column assignment doesn't support type {type(v)}") args.extend([k, v]) if len(args) > 0: result = new_collection(expr.Assign(result, *args)) return result
@derived_from(pd.DataFrame) def clip(self, lower=None, upper=None, axis=None, **kwargs): axis = self._validate_axis(axis) if axis == 1: return self.map_partitions(M.clip, lower, upper, axis=axis) return new_collection(self.expr.clip(lower, upper, axis))
[docs] def merge( self, right, how="inner", on=None, left_on=None, right_on=None, left_index=False, right_index=False, suffixes=("_x", "_y"), indicator=False, shuffle_method=None, npartitions=None, broadcast=None, ): """Merge the DataFrame with another DataFrame This will merge the two datasets, either on the indices, a certain column in each dataset or the index in one dataset and the column in another. Parameters ---------- right: dask.dataframe.DataFrame how : {'left', 'right', 'outer', 'inner', 'leftsemi'}, default: 'inner' How to handle the operation of the two objects: - left: use calling frame's index (or column if on is specified) - right: use other frame's index - outer: form union of calling frame's index (or column if on is specified) with other frame's index, and sort it lexicographically - inner: form intersection of calling frame's index (or column if on is specified) with other frame's index, preserving the order of the calling's one - leftsemi: Choose all rows in left where the join keys can be found in right. Won't duplicate rows if the keys are duplicated in right. Drops all columns from right. on : label or list Column or index level names to join on. These must be found in both DataFrames. If on is None and not merging on indexes then this defaults to the intersection of the columns in both DataFrames. left_on : label or list, or array-like Column to join on in the left DataFrame. Other than in pandas arrays and lists are only support if their length is 1. right_on : label or list, or array-like Column to join on in the right DataFrame. Other than in pandas arrays and lists are only support if their length is 1. left_index : boolean, default False Use the index from the left DataFrame as the join key. right_index : boolean, default False Use the index from the right DataFrame as the join key. suffixes : 2-length sequence (tuple, list, ...) Suffix to apply to overlapping column names in the left and right side, respectively indicator : boolean or string, default False If True, adds a column to output DataFrame called "_merge" with information on the source of each row. If string, column with information on source of each row will be added to output DataFrame, and column will be named value of string. Information column is Categorical-type and takes on a value of "left_only" for observations whose merge key only appears in `left` DataFrame, "right_only" for observations whose merge key only appears in `right` DataFrame, and "both" if the observation’s merge key is found in both. npartitions: int or None, optional The ideal number of output partitions. This is only utilised when performing a hash_join (merging on columns only). If ``None`` then ``npartitions = max(lhs.npartitions, rhs.npartitions)``. Default is ``None``. shuffle_method: {'disk', 'tasks', 'p2p'}, optional Either ``'disk'`` for single-node operation or ``'tasks'`` and ``'p2p'``` for distributed operation. Will be inferred by your current scheduler. broadcast: boolean or float, optional Whether to use a broadcast-based join in lieu of a shuffle-based join for supported cases. By default, a simple heuristic will be used to select the underlying algorithm. If a floating-point value is specified, that number will be used as the ``broadcast_bias`` within the simple heuristic (a large number makes Dask more likely to choose the ``broacast_join`` code path). See ``broadcast_join`` for more information. Notes ----- There are three ways to join dataframes: 1. Joining on indices. In this case the divisions are aligned using the function ``dask.dataframe.multi.align_partitions``. Afterwards, each partition is merged with the pandas merge function. 2. Joining one on index and one on column. In this case the divisions of dataframe merged by index (:math:`d_i`) are used to divide the column merged dataframe (:math:`d_c`) one using ``dask.dataframe.multi.rearrange_by_divisions``. In this case the merged dataframe (:math:`d_m`) has the exact same divisions as (:math:`d_i`). This can lead to issues if you merge multiple rows from (:math:`d_c`) to one row in (:math:`d_i`). 3. Joining both on columns. In this case a hash join is performed using ``dask.dataframe.multi.hash_join``. In some cases, you may see a ``MemoryError`` if the ``merge`` operation requires an internal ``shuffle``, because shuffling places all rows that have the same index in the same partition. To avoid this error, make sure all rows with the same ``on``-column value can fit on a single partition. """ return merge( self, right, how, on, left_on, right_on, left_index, right_index, suffixes, indicator, shuffle_method, npartitions=npartitions, broadcast=broadcast, )
[docs] @derived_from(pd.DataFrame) def join( self, other, on=None, how="left", lsuffix="", rsuffix="", shuffle_method=None, npartitions=None, ): if not isinstance(other, list) and not is_dask_collection(other): other = from_pandas(other, npartitions=1) if ( not isinstance(other, list) and not is_dataframe_like(other._meta) and hasattr(other._meta, "name") ): other = new_collection(expr.ToFrame(other)) if not isinstance(other, FrameBase): if not isinstance(other, list) or not all( isinstance(o, FrameBase) for o in other ): raise ValueError("other must be DataFrame or list of DataFrames") if how not in ("outer", "left"): raise ValueError("merge_multi only supports left or outer joins") other = [ from_pandas(o, npartitions=1) if not is_dask_collection(o) else o for o in other ] return new_collection( JoinRecursive([self.expr] + [o.expr for o in other], how=how) ) return self.merge( right=other, left_index=on is None, right_index=True, left_on=on, how=how, suffixes=(lsuffix, rsuffix), shuffle_method=shuffle_method, npartitions=npartitions, )
[docs] @derived_from(pd.DataFrame) def groupby( self, by, group_keys=True, sort=None, observed=None, dropna=None, **kwargs ): from dask_expr._groupby import GroupBy if isinstance(by, FrameBase) and not isinstance(by, Series): raise ValueError( f"`by` must be a column name or list of columns, got {by}." ) return GroupBy( self, by, group_keys=group_keys, sort=sort, observed=observed, dropna=dropna, **kwargs, )
def __setitem__(self, key, value): if isinstance(key, (tuple, list)) and isinstance(value, DataFrame): out = self.assign(**{k: value[c] for k, c in zip(key, value.columns)}) elif isinstance(key, pd.Index) and not isinstance(value, DataFrame): out = self.assign(**{k: value for k in list(key)}) elif ( is_dataframe_like(key) or is_series_like(key) or isinstance(key, (DataFrame, Series)) ): out = self.where(~key, value) elif not isinstance(key, str): raise NotImplementedError(f"Item assignment with {type(key)} not supported") else: out = self.assign(**{key: value}) self._expr = out._expr def __delitem__(self, key): columns = [c for c in self.columns if c != key] out = self[columns] self._expr = out._expr def __setattr__(self, key, value): try: columns = object.__getattribute__(self, "_expr").columns except AttributeError: columns = () # exclude protected attributes from setitem if key in columns and key not in [ "divisions", "dask", "_name", "_meta", "_expr", ]: self[key] = value else: object.__setattr__(self, key, value) def __getattr__(self, key): try: # Prioritize `DataFrame` attributes return object.__getattribute__(self, key) except AttributeError as err: try: # Check if key is in columns if key # is not a normal attribute if key in self.expr._meta.columns: return new_collection(self.expr[key]) raise err except AttributeError: # Fall back to `BaseFrame.__getattr__` return super().__getattr__(key) def __dir__(self): o = set(dir(type(self))) o.update(self.__dict__) o.update(set(dir(expr.Expr))) o.update(c for c in self.columns if (isinstance(c, str) and c.isidentifier())) return list(o) def map(self, func, na_action=None, meta=None): if not PANDAS_GE_210: raise NotImplementedError( f"DataFrame.map requires pandas>=2.1.0, but pandas={PANDAS_VERSION} is " "installed." ) if meta is None: meta = expr._emulate(M.map, self, func, na_action=na_action, udf=True) warnings.warn(meta_warning(meta)) return new_collection(expr.Map(self, arg=func, na_action=na_action, meta=meta))
[docs] @derived_from(pd.DataFrame) def nlargest(self, n=5, columns=None, split_every=None): return new_collection( NLargest(self, n=n, _columns=columns, split_every=split_every) )
[docs] @derived_from(pd.DataFrame) def nsmallest(self, n=5, columns=None, split_every=None): return new_collection( NSmallest(self, n=n, _columns=columns, split_every=split_every) )
[docs] @derived_from(pd.DataFrame) def memory_usage(self, deep=False, index=True): return new_collection(MemoryUsageFrame(self, deep=deep, _index=index))
@derived_from(pd.DataFrame) def combine(self, other, func, fill_value=None, overwrite=True): other = self._create_alignable_frame(other, "outer") if not expr.are_co_aligned(self.expr, other.expr): return new_collection( expr.CombineFrameAlign(self, other, func, fill_value, overwrite) ) return new_collection( expr.CombineFrame(self, other, func, fill_value, overwrite) )
[docs] @derived_from( pd.DataFrame, inconsistencies="keep=False will raise a ``NotImplementedError``", ) def drop_duplicates( self, subset=None, split_every=None, split_out=True, shuffle_method=None, ignore_index=False, keep="first", ): shuffle_method = _get_shuffle_preferring_order(shuffle_method) if keep is False: raise NotImplementedError("drop_duplicates with keep=False") # Fail early if subset is not valid, e.g. missing columns subset = _convert_to_list(subset) meta_nonempty(self._meta).drop_duplicates(subset=subset, keep=keep) return new_collection( DropDuplicates( self, subset=subset, ignore_index=ignore_index, split_out=split_out, split_every=split_every, shuffle_method=shuffle_method, keep=keep, ) )
[docs] @insert_meta_param_description(pad=12) def apply(self, function, *args, meta=no_default, axis=0, **kwargs): """Parallel version of pandas.DataFrame.apply This mimics the pandas version except for the following: 1. Only ``axis=1`` is supported (and must be specified explicitly). 2. The user should provide output metadata via the `meta` keyword. Parameters ---------- func : function Function to apply to each column/row axis : {0 or 'index', 1 or 'columns'}, default 0 - 0 or 'index': apply function to each column (NOT SUPPORTED) - 1 or 'columns': apply function to each row $META args : tuple Positional arguments to pass to function in addition to the array/series Additional keyword arguments will be passed as keywords to the function Returns ------- applied : Series or DataFrame Examples -------- >>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2) Apply a function to row-wise passing in extra arguments in ``args`` and ``kwargs``: >>> def myadd(row, a, b=1): ... return row.sum() + a + b >>> res = ddf.apply(myadd, axis=1, args=(2,), b=1.5) # doctest: +SKIP By default, dask tries to infer the output metadata by running your provided function on some fake data. This works well in many cases, but can sometimes be expensive, or even fail. To avoid this, you can manually specify the output metadata with the ``meta`` keyword. This can be specified in many forms, for more information see ``dask.dataframe.utils.make_meta``. Here we specify the output is a Series with name ``'x'``, and dtype ``float64``: >>> res = ddf.apply(myadd, axis=1, args=(2,), b=1.5, meta=('x', 'f8')) In the case where the metadata doesn't change, you can also pass in the object itself directly: >>> res = ddf.apply(lambda row: row + 1, axis=1, meta=ddf) See Also -------- DataFrame.map_partitions """ axis = self._validate_axis(axis) if axis == 0: msg = ( "Dask DataFrame.apply only supports axis=1\n" " Try: df.apply(func, axis=1)" ) raise NotImplementedError(msg) if meta is no_default: meta = expr._emulate( M.apply, self, function, args=args, udf=True, axis=axis, **kwargs ) warnings.warn(meta_warning(meta)) return new_collection( self.expr.apply(function, *args, meta=meta, axis=axis, **kwargs) )
[docs] @derived_from(pd.DataFrame) def dropna(self, how=no_default, subset=None, thresh=no_default): if how is not no_default and thresh is not no_default: raise TypeError( "You cannot set both the how and thresh arguments at the same time." ) subset = _convert_to_list(subset) return new_collection( expr.DropnaFrame(self, how=how, subset=subset, thresh=thresh) )
@classmethod def _validate_axis(cls, axis=0, numeric_axis: bool = True) -> None | Literal[0, 1]: if axis not in (0, 1, "index", "columns", None): raise ValueError(f"No axis named {axis}") if numeric_axis: num_axis: dict[str | None, Literal[0, 1]] = {"index": 0, "columns": 1} return num_axis.get(axis, axis) else: return axis
[docs] @derived_from(pd.DataFrame, ua_args=["index"]) def rename(self, index=None, columns=None): if index is not None: raise ValueError("Cannot rename index.") return new_collection(expr.RenameFrame(self, columns=columns))
[docs] @derived_from(pd.DataFrame) def squeeze(self, axis=None): if axis in [None, 1]: if len(self.columns) == 1: return self[self.columns[0]] else: return self elif axis == 0: raise NotImplementedError( f"{type(self)} does not support squeeze along axis 0" ) else: raise ValueError(f"No axis {axis} for object type {type(self)}")
[docs] @derived_from(pd.DataFrame) def explode(self, column): column = _convert_to_list(column) return new_collection(expr.ExplodeFrame(self, column=column))
[docs] @derived_from(pd.DataFrame) def drop(self, labels=None, axis=0, columns=None, errors="raise"): if columns is None and labels is None: raise TypeError("must either specify 'columns' or 'labels'") axis = _validate_axis(axis) if axis == 1: columns = labels or columns elif axis == 0 and columns is None: raise NotImplementedError( "Drop currently only works for axis=1 or when columns is not None" ) return new_collection(expr.Drop(self, columns=columns, errors=errors))
[docs] def to_parquet(self, path, **kwargs): from dask_expr.io.parquet import to_parquet return to_parquet(self, path, **kwargs)
[docs] @derived_from(pd.DataFrame) def select_dtypes(self, include=None, exclude=None): columns = list( self._meta.select_dtypes(include=include, exclude=exclude).columns ) return new_collection(self.expr[columns])
[docs] @derived_from(pd.DataFrame) def eval(self, expr, **kwargs): if "inplace" in kwargs: raise NotImplementedError("inplace is not supported for eval") return new_collection(Eval(self, _expr=expr, expr_kwargs=kwargs))
[docs] def set_index( self, other, drop=True, sorted=False, npartitions: int | None = None, divisions=None, sort: bool = True, shuffle_method=None, upsample: float = 1.0, partition_size: float = 128e6, append: bool = False, **options, ): """Set the DataFrame index (row labels) using an existing column. If ``sort=False``, this function operates exactly like ``pandas.set_index`` and sets the index on the DataFrame. If ``sort=True`` (default), this function also sorts the DataFrame by the new index. This can have a significant impact on performance, because joins, groupbys, lookups, etc. are all much faster on that column. However, this performance increase comes with a cost, sorting a parallel dataset requires expensive shuffles. Often we ``set_index`` once directly after data ingest and filtering and then perform many cheap computations off of the sorted dataset. With ``sort=True``, this function is much more expensive. Under normal operation this function does an initial pass over the index column to compute approximate quantiles to serve as future divisions. It then passes over the data a second time, splitting up each input partition into several pieces and sharing those pieces to all of the output partitions now in sorted order. In some cases we can alleviate those costs, for example if your dataset is sorted already then we can avoid making many small pieces or if you know good values to split the new index column then we can avoid the initial pass over the data. For example if your new index is a datetime index and your data is already sorted by day then this entire operation can be done for free. You can control these options with the following parameters. Parameters ---------- other: string or Dask Series Column to use as index. drop: boolean, default True Delete column to be used as the new index. sorted: bool, optional If the index column is already sorted in increasing order. Defaults to False npartitions: int, None, or 'auto' The ideal number of output partitions. If None, use the same as the input. If 'auto' then decide by memory use. Only used when ``divisions`` is not given. If ``divisions`` is given, the number of output partitions will be ``len(divisions) - 1``. divisions: list, optional The "dividing lines" used to split the new index into partitions. For ``divisions=[0, 10, 50, 100]``, there would be three output partitions, where the new index contained [0, 10), [10, 50), and [50, 100), respectively. See https://docs.dask.org/en/latest/dataframe-design.html#partitions. If not given (default), good divisions are calculated by immediately computing the data and looking at the distribution of its values. For large datasets, this can be expensive. Note that if ``sorted=True``, specified divisions are assumed to match the existing partitions in the data; if this is untrue you should leave divisions empty and call ``repartition`` after ``set_index``. inplace: bool, optional Modifying the DataFrame in place is not supported by Dask. Defaults to False. sort: bool, optional If ``True``, sort the DataFrame by the new index. Otherwise set the index on the individual existing partitions. Defaults to ``True``. shuffle_method: {'disk', 'tasks', 'p2p'}, optional Either ``'disk'`` for single-node operation or ``'tasks'`` and ``'p2p'`` for distributed operation. Will be inferred by your current scheduler. compute: bool, default False Whether or not to trigger an immediate computation. Defaults to False. Note, that even if you set ``compute=False``, an immediate computation will still be triggered if ``divisions`` is ``None``. partition_size: int, optional Desired size of each partitions in bytes. Only used when ``npartitions='auto'`` Examples -------- >>> import dask >>> ddf = dask.datasets.timeseries(start="2021-01-01", end="2021-01-07", freq="1h").reset_index() >>> ddf2 = ddf.set_index("x") >>> ddf2 = ddf.set_index(ddf.x) >>> ddf2 = ddf.set_index(ddf.timestamp, sorted=True) A common case is when we have a datetime column that we know to be sorted and is cleanly divided by day. We can set this index for free by specifying both that the column is pre-sorted and the particular divisions along which is is separated >>> import pandas as pd >>> divisions = pd.date_range(start="2021-01-01", end="2021-01-07", freq='1D') >>> divisions DatetimeIndex(['2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04', '2021-01-05', '2021-01-06', '2021-01-07'], dtype='datetime64[ns]', freq='D') Note that ``len(divisons)`` is equal to ``npartitions + 1``. This is because ``divisions`` represents the upper and lower bounds of each partition. The first item is the lower bound of the first partition, the second item is the lower bound of the second partition and the upper bound of the first partition, and so on. The second-to-last item is the lower bound of the last partition, and the last (extra) item is the upper bound of the last partition. >>> ddf2 = ddf.set_index("timestamp", sorted=True, divisions=divisions.tolist()) If you'll be running `set_index` on the same (or similar) datasets repeatedly, you could save time by letting Dask calculate good divisions once, then copy-pasting them to reuse. This is especially helpful running in a Jupyter notebook: >>> ddf2 = ddf.set_index("name") # slow, calculates data distribution >>> ddf2.divisions # doctest: +SKIP ["Alice", "Laura", "Ursula", "Zelda"] >>> # ^ Now copy-paste this and edit the line above to: >>> # ddf2 = ddf.set_index("name", divisions=["Alice", "Laura", "Ursula", "Zelda"]) """ if isinstance(other, list) and len(other) == 1: other = other[0] if isinstance(other, list): if any([isinstance(c, FrameBase) for c in other]): raise TypeError("List[FrameBase] not supported by set_index") else: raise NotImplementedError( "Dask dataframe does not yet support multi-indexes.\n" f"You tried to index with this index: {other}\n" "Indexes must be single columns only." ) if isinstance(other, DataFrame): raise NotImplementedError( "Dask dataframe does not yet support multi-indexes.\n" f"You tried to index with a frame with these columns: {list(other.columns)}\n" "Indexes must be single columns only." ) if isinstance(other, Series): if other._name == self.index._name: return self elif other == self.index.name: return self if divisions is not None: check_divisions(divisions) if (sorted or not sort) and npartitions is not None: raise ValueError( "Specifying npartitions with sort=False or sorted=True is not " "supported. Call `repartition` afterwards." ) if sorted: if divisions is not None and len(divisions) - 1 != self.npartitions: msg = ( "When doing `df.set_index(col, sorted=True, divisions=...)`, " "divisions indicates known splits in the index column. In this " "case divisions must be the same length as the existing " "divisions in `df`\n\n" "If the intent is to repartition into new divisions after " "setting the index, you probably want:\n\n" "`df.set_index(col, sorted=True).repartition(divisions=divisions)`" ) raise ValueError(msg) result = new_collection( SetIndexBlockwise( self, other, drop, new_divisions=divisions, append=append ) ) return result.compute_current_divisions(set_divisions=True) elif not sort: return new_collection( SetIndexBlockwise(self, other, drop, None, append=append) ) return new_collection( SetIndex( self, other, drop, user_divisions=divisions, npartitions=npartitions, upsample=upsample, partition_size=partition_size, shuffle_method=get_specified_shuffle(shuffle_method), append=append, options=options, ) )
[docs] def sort_values( self, by: str | list[str], npartitions: int | None = None, ascending: bool | list[bool] = True, na_position: Literal["first"] | Literal["last"] = "last", partition_size: float = 128e6, sort_function: Callable[[pd.DataFrame], pd.DataFrame] | None = None, sort_function_kwargs: Mapping[str, Any] | None = None, upsample: float = 1.0, ignore_index: bool | None = False, shuffle_method: str | None = None, **options, ): """Sort the dataset by a single column. Sorting a parallel dataset requires expensive shuffles and is generally not recommended. See ``set_index`` for implementation details. Parameters ---------- by: str or list[str] Column(s) to sort by. npartitions: int, None, or 'auto' The ideal number of output partitions. If None, use the same as the input. If 'auto' then decide by memory use. ascending: bool, optional Sort ascending vs. descending. Defaults to True. na_position: {'last', 'first'}, optional Puts NaNs at the beginning if 'first', puts NaN at the end if 'last'. Defaults to 'last'. sort_function: function, optional Sorting function to use when sorting underlying partitions. If None, defaults to ``M.sort_values`` (the partition library's implementation of ``sort_values``). sort_function_kwargs: dict, optional Additional keyword arguments to pass to the partition sorting function. By default, ``by``, ``ascending``, and ``na_position`` are provided. Examples -------- >>> df2 = df.sort_values('x') # doctest: +SKIP """ if na_position not in ("first", "last"): raise ValueError("na_position must be either 'first' or 'last'") if not isinstance(by, list): by = [by] if any(not isinstance(b, str) for b in by): raise NotImplementedError( "Dataframes only support sorting by named columns which must be passed as a " "string or a list of strings.\n" "You passed %s" % str(by) ) if not isinstance(ascending, bool) and not len(ascending) == len(by): raise ValueError(f"Length of {ascending=} != length of {by=}") return new_collection( SortValues( self, by, ascending, na_position, npartitions, partition_size, sort_function, sort_function_kwargs, upsample, ignore_index, get_specified_shuffle(shuffle_method), options=options, ) )
[docs] def query(self, expr, **kwargs): """Filter dataframe with complex expression Blocked version of pd.DataFrame.query Parameters ---------- expr: str The query string to evaluate. You can refer to column names that are not valid Python variable names by surrounding them in backticks. Dask does not fully support referring to variables using the '@' character, use f-strings or the ``local_dict`` keyword argument instead. Notes ----- This is like the sequential version except that this will also happen in many threads. This may conflict with ``numexpr`` which will use multiple threads itself. We recommend that you set ``numexpr`` to use a single thread: .. code-block:: python import numexpr numexpr.set_num_threads(1) See also -------- pandas.DataFrame.query pandas.eval Examples -------- >>> import pandas as pd >>> import dask_expr as dd >>> df = pd.DataFrame({'x': [1, 2, 1, 2], ... 'y': [1, 2, 3, 4], ... 'z z': [4, 3, 2, 1]}) >>> ddf = dd.from_pandas(df, npartitions=2) Refer to column names directly: >>> ddf.query('y > x').compute() x y z z 2 1 3 2 3 2 4 1 Refer to column name using backticks: >>> ddf.query('`z z` > x').compute() x y z z 0 1 1 4 1 2 2 3 2 1 3 2 Refer to variable name using f-strings: >>> value = 1 >>> ddf.query(f'x == {value}').compute() x y z z 0 1 1 4 2 1 3 2 Refer to variable name using ``local_dict``: >>> ddf.query('x == @value', local_dict={"value": value}).compute() x y z z 0 1 1 4 2 1 3 2 """ return new_collection(Query(self, expr, kwargs))
[docs] @derived_from(pd.DataFrame) def mode(self, dropna=True, split_every=False, numeric_only=False): modes = [] for _, col in self.items(): if numeric_only and not pd.api.types.is_numeric_dtype(col.dtype): continue modes.append(col.mode(dropna=dropna, split_every=split_every)) return concat(modes, axis=1)
@derived_from(pd.DataFrame) def add_prefix(self, prefix): return new_collection(expr.AddPrefix(self, prefix)) @derived_from(pd.DataFrame) def add_suffix(self, suffix): return new_collection(expr.AddSuffix(self, suffix))
[docs] def pivot_table(self, index, columns, values, aggfunc="mean"): """ Create a spreadsheet-style pivot table as a DataFrame. Target ``columns`` must have category dtype to infer result's ``columns``. ``index``, ``columns``, ``values`` and ``aggfunc`` must be all scalar. Parameters ---------- values : scalar column to aggregate index : scalar column to be index columns : scalar column to be columns aggfunc : {'mean', 'sum', 'count'}, default 'mean' Returns ------- table : DataFrame """ return pivot_table(self, index, columns, values, aggfunc)
@property def iloc(self): """Purely integer-location based indexing for selection by position. Only indexing the column positions is supported. Trying to select row positions will raise a ValueError. See :ref:`dataframe.indexing` for more. Examples -------- >>> df.iloc[:, [2, 0, 1]] # doctest: +SKIP """ from dask_expr._indexing import ILocIndexer return ILocIndexer(self) def _comparison_op(self, expr_cls, other, level, axis): if level is not None: raise NotImplementedError("level must be None") axis = self._validate_axis(axis) return new_collection(expr_cls(self, other, axis))
[docs] def lt(self, other, level=None, axis=0): return self._comparison_op(expr.LTFrame, other, level, axis)
[docs] def le(self, other, level=None, axis=0): return self._comparison_op(expr.LEFrame, other, level, axis)
[docs] def gt(self, other, level=None, axis=0): return self._comparison_op(expr.GTFrame, other, level, axis)
[docs] def ge(self, other, level=None, axis=0): return self._comparison_op(expr.GEFrame, other, level, axis)
[docs] def ne(self, other, level=None, axis=0): return self._comparison_op(expr.NEFrame, other, level, axis)
[docs] def eq(self, other, level=None, axis=0): return self._comparison_op(expr.EQFrame, other, level, axis)
[docs] def categorize(self, columns=None, index=None, split_every=None, **kwargs): """Convert columns of the DataFrame to category dtype. .. warning:: This method eagerly computes the categories of the chosen columns. Parameters ---------- columns : list, optional A list of column names to convert to categoricals. By default any column with an object dtype is converted to a categorical, and any unknown categoricals are made known. index : bool, optional Whether to categorize the index. By default, object indices are converted to categorical, and unknown categorical indices are made known. Set True to always categorize the index, False to never. split_every : int, optional Group partitions into groups of this size while performing a tree-reduction. If set to False, no tree-reduction will be used. kwargs Keyword arguments are passed on to compute. """ df = self meta = df._meta if columns is None: columns = list(meta.select_dtypes(["object", "string", "category"]).columns) elif is_scalar(columns): columns = [columns] # Filter out known categorical columns columns = [ c for c in columns if not (is_categorical_dtype(meta[c]) and has_known_categories(meta[c])) ] if index is not False: if is_categorical_dtype(meta.index): index = not has_known_categories(meta.index) elif index is None: index = str(meta.index.dtype) in ("object", "string") # Nothing to do if not len(columns) and index is False: return df from dask_expr._collection import new_collection # Eagerly compute the categories categories, index = new_collection( GetCategories(self, columns=columns, index=index, split_every=split_every) ).compute() # Some operations like get_dummies() rely on the order of categories categories = {k: v.sort_values() for k, v in categories.items()} # Categorize each partition return new_collection(Categorize(self, categories, index))
@derived_from(pd.DataFrame) def nunique(self, axis=0, dropna=True, split_every=False): if axis == 1: return new_collection(expr.NUniqueColumns(self, axis=axis, dropna=dropna)) else: return concat( [ col.nunique(dropna=dropna, split_every=split_every).to_series(name) for name, col in self.items() ] )
[docs] def quantile(self, q=0.5, axis=0, numeric_only=False, method="default"): """Approximate row-wise and precise column-wise quantiles of DataFrame Parameters ---------- q : list/array of floats, default 0.5 (50%) Iterable of numbers ranging from 0 to 1 for the desired quantiles axis : {0, 1, 'index', 'columns'} (default 0) 0 or 'index' for row-wise, 1 or 'columns' for column-wise method : {'default', 'tdigest', 'dask'}, optional What method to use. By default will use dask's internal custom algorithm (``'dask'``). If set to ``'tdigest'`` will use tdigest for floats and ints and fallback to the ``'dask'`` otherwise. """ allowed_methods = ["default", "dask", "tdigest"] if method not in allowed_methods: raise ValueError("method can only be 'default', 'dask' or 'tdigest'") meta = make_meta( meta_nonempty(self._meta).quantile( q=q, numeric_only=numeric_only, axis=axis ) ) if axis == 1: if isinstance(q, list): # Not supported, the result will have current index as columns raise ValueError("'q' must be scalar when axis=1 is specified") return self.map_partitions( M.quantile, q, axis, enforce_metadata=False, meta=meta, numeric_only=numeric_only, ) if numeric_only: frame = self.select_dtypes( "number", exclude=[np.timedelta64, np.datetime64] ) else: frame = self collections = [] for _, col in frame.items(): collections.append(col.quantile(q=q, method=method)) if len(collections) > 0 and isinstance(collections[0], Scalar): return _from_scalars(collections, meta, frame.expr.columns) return concat(collections, axis=1)
[docs] @derived_from(pd.DataFrame) def median(self, axis=0, numeric_only=False): if axis == 1 or self.npartitions == 1: return self.median_approximate(axis=axis, numeric_only=numeric_only) raise NotImplementedError( "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " "See the `median_approximate` method instead, which uses an approximate algorithm." )
[docs] def median_approximate(self, axis=0, method="default", numeric_only=False): """Return the approximate median of the values over the requested axis. Parameters ---------- axis : {0, 1, "index", "columns"} (default 0) 0 or ``"index"`` for row-wise, 1 or ``"columns"`` for column-wise method : {'default', 'tdigest', 'dask'}, optional What method to use. By default will use Dask's internal custom algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest for floats and ints and fallback to the ``"dask"`` otherwise. """ return self.quantile( axis=axis, method=method, numeric_only=numeric_only ).rename(None)
[docs] @derived_from(pd.DataFrame) def describe( self, split_every=False, percentiles=None, percentiles_method="default", include=None, exclude=None, ): # TODO: duplicated columns if include is None and exclude is None: _include = [np.number, np.timedelta64, np.datetime64] columns = self._meta.select_dtypes(include=_include).columns if len(columns) == 0: columns = self._meta.columns elif include == "all": if exclude is not None: raise ValueError("exclude must be None when include is 'all'") columns = self._meta.columns else: columns = self._meta.select_dtypes(include=include, exclude=exclude).columns stats = [ self[col].describe( split_every=split_every, percentiles=percentiles, percentiles_method=percentiles_method, ) for col in columns ] return concat(stats, axis=1)
[docs] @derived_from(pd.DataFrame) def pop(self, item): out = self[item] self._expr = expr.Drop(self, columns=[item]) return out
[docs] def info(self, buf=None, verbose=False, memory_usage=False): """ Concise summary of a Dask DataFrame """ if buf is None: import sys buf = sys.stdout lines = [str(type(self)).replace("._collection", "")] if len(self.columns) == 0: lines.append(f"{type(self.index._meta).__name__}: 0 entries") lines.append(f"Empty {type(self).__name__}") put_lines(buf, lines) return # Group and execute the required computations computations = {} if verbose: computations.update({"index": self.index, "count": self.count()}) if memory_usage: computations["memory_usage"] = self.memory_usage(deep=True, index=True) computations = dict(zip(computations.keys(), compute(*computations.values()))) if verbose: import textwrap index = computations["index"] counts = computations["count"] lines.append(index_summary(index)) lines.append(f"Data columns (total {len(self.columns)} columns):") from pandas.io.formats.printing import pprint_thing space = max(len(pprint_thing(k)) for k in self.columns) + 1 column_width = max(space, 7) header = ( textwrap.dedent( """\ # {{column:<{column_width}}} Non-Null Count Dtype --- {{underl:<{column_width}}} -------------- -----""" ) .format(column_width=column_width) .format(column="Column", underl="------") ) column_template = textwrap.dedent( """\ {{i:^3}} {{name:<{column_width}}} {{count}} non-null {{dtype}}""".format( column_width=column_width ) ) column_info = [ column_template.format( i=pprint_thing(i), name=pprint_thing(name), count=pprint_thing(count), dtype=pprint_thing(dtype), ) for i, (name, count, dtype) in enumerate( # NOTE: Use `counts.values` for cudf support zip(self.columns, counts.values, self.dtypes) ) ] lines.extend(header.split("\n")) else: column_info = [index_summary(self.columns, name="Columns")] lines.extend(column_info) dtype_counts = [ "%s(%d)" % k for k in sorted(self.dtypes.value_counts().items(), key=str) ] lines.append("dtypes: {}".format(", ".join(dtype_counts))) if memory_usage: memory_int = computations["memory_usage"].sum() lines.append(f"memory usage: {memory_repr(memory_int)}\n") put_lines(buf, lines)
[docs] @derived_from(pd.DataFrame) def cov(self, min_periods=None, numeric_only=False, split_every=False): return self._cov(min_periods, numeric_only, split_every)
[docs] @derived_from(pd.DataFrame) def corr( self, method="pearson", min_periods=None, numeric_only=False, split_every=False, ): return self._corr(method, min_periods, numeric_only, split_every)
[docs] @derived_from(pd.DataFrame) def to_string(self, max_rows=5): # option_context doesn't affect return self._repr_data().to_string(max_rows=max_rows, show_dimensions=False)
[docs] @derived_from(pd.DataFrame) def to_html(self, max_rows=5): # pd.Series doesn't have html repr data = self._repr_data().to_html(max_rows=max_rows, show_dimensions=False) n_expr = len({e._name for e in self.walk()}) return get_template("dataframe.html.j2").render( data=data, name=self._name, layers=maybe_pluralize(n_expr, "expression"), )
def _repr_data(self): meta = self._meta index = self._repr_divisions cols = meta.columns if len(cols) == 0: series_df = pd.DataFrame([[]] * len(index), columns=cols, index=index) else: series_df = pd.concat( [_repr_data_series(s, index=index) for _, s in meta.items()], axis=1 ) return series_df
[docs]class Series(FrameBase): """Series-like Expr Collection""" _accessors: ClassVar[set[str]] = set() _partition_type = pd.Series @property def shape(self): """ Return a tuple representing the dimensionality of the DataFrame. The number of rows is a Delayed result. The number of columns is a concrete integer. """ return (self.size,) @property def axes(self): return [self.index] @property def ndim(self): """Return dimensionality""" return 1 @property def _elemwise(self): return elemwise def __dir__(self): o = set(dir(type(self))) o.update(self.__dict__) o.update(set(dir(expr.Expr))) for accessor in ["cat", "str"]: if not hasattr(self._meta, accessor): o.remove(accessor) return list(o) def __contains__(self, item): raise NotImplementedError( "Using 'in' to test for membership is not supported. Use the values instead" ) @derived_from(pd.Series) def __iter__(self): frame = self.optimize() for i in range(self.npartitions): s = frame.get_partition(i).compute() yield from s def __getitem__(self, key): if isinstance(key, Series) or self.npartitions == 1: return super().__getitem__(key) return self.loc[key] @property def name(self): return self.expr._meta.name @name.setter def name(self, name): self._expr = self.rename(index=name)._expr @property def dtype(self): return self.expr._meta.dtype @property def nbytes(self): """Number of bytes""" return new_collection(self.expr.nbytes) def __array_ufunc__(self, numpy_ufunc, method, *inputs, **kwargs): out = kwargs.get("out", ()) for x in inputs + out: # ufuncs work with 0-dimensional NumPy ndarrays # so we don't want to raise NotImplemented if isinstance(x, np.ndarray) and x.shape == (): continue elif not isinstance( x, (Number, Scalar, FrameBase, Array, pd.DataFrame, pd.Series, pd.Index) ): return NotImplemented if method == "__call__": if numpy_ufunc.signature is not None: return NotImplemented if numpy_ufunc.nout > 1: # ufuncs with multiple output values # are not yet supported for frames return NotImplemented else: return elemwise(numpy_ufunc, *inputs, **kwargs) else: # ufunc methods are not yet supported for frames return NotImplemented def __array_wrap__(self, array, context=None): if isinstance(context, tuple) and len(context) > 0: if isinstance(context[1][0], np.ndarray) and context[1][0].shape == (): index = None else: index = context[1][0].index else: try: import inspect method_name = f"`{inspect.stack()[3][3]}`" except IndexError: method_name = "This method" raise NotImplementedError( f"{method_name} is not implemented for `dask.dataframe.Series`." ) return meta_series_constructor(self)(array, index=index, name=self.name)
[docs] @derived_from(pd.Series) def map(self, arg, na_action=None, meta=None): if isinstance(arg, Series): if not expr.are_co_aligned(self.expr, arg.expr): if meta is None: warnings.warn(meta_warning(meta)) return new_collection( expr.MapAlign(self, arg, op=None, na_action=na_action, meta=meta) ) if meta is None: meta = expr._emulate(M.map, self, arg, na_action=na_action, udf=True) warnings.warn(meta_warning(meta)) return new_collection(expr.Map(self, arg=arg, na_action=na_action, meta=meta))
[docs] @derived_from(pd.Series) def clip(self, lower=None, upper=None, axis=None, **kwargs): axis = self._validate_axis(axis) return new_collection(self.expr.clip(lower, upper, axis))
[docs] @derived_from(pd.Series) def to_frame(self, name=no_default): return new_collection(expr.ToFrame(self, name=name))
def _comparison_op(self, expr_cls, other, level, fill_value, axis): if level is not None: raise NotImplementedError("level must be None") self._validate_axis(axis) return new_collection(expr_cls(self, other, fill_value=fill_value))
[docs] def lt(self, other, level=None, fill_value=None, axis=0): return self._comparison_op(expr.LTSeries, other, level, fill_value, axis)
[docs] def le(self, other, level=None, fill_value=None, axis=0): return self._comparison_op(expr.LESeries, other, level, fill_value, axis)
[docs] def gt(self, other, level=None, fill_value=None, axis=0): return self._comparison_op(expr.GTSeries, other, level, fill_value, axis)
[docs] def ge(self, other, level=None, fill_value=None, axis=0): return self._comparison_op(expr.GESeries, other, level, fill_value, axis)
[docs] def ne(self, other, level=None, fill_value=None, axis=0): return self._comparison_op(expr.NESeries, other, level, fill_value, axis)
[docs] def eq(self, other, level=None, fill_value=None, axis=0): return self._comparison_op(expr.EQSeries, other, level, fill_value, axis)
[docs] @derived_from(pd.Series) def value_counts( self, sort=None, ascending=False, dropna=True, normalize=False, split_every=None, split_out=no_default, ): if split_out is no_default: if isinstance(self.dtype, CategoricalDtype): # unobserved categories are a pain split_out = 1 else: split_out = True if split_out == 1 and split_out is not True and sort is None: sort = True length = None if (split_out > 1 or split_out is True) and normalize: frame = self if not dropna else self.dropna() length = Len(frame) return new_collection( ValueCounts( self, sort, ascending, dropna, normalize, split_every, split_out, length ) )
@derived_from(pd.Series) def mode(self, dropna=True, split_every=False): return new_collection(self.expr.mode(dropna, split_every))
[docs] @derived_from(pd.Series) def nlargest(self, n=5, split_every=None): return new_collection(NLargest(self, n=n, split_every=split_every))
[docs] @derived_from(pd.Series) def nsmallest(self, n=5, split_every=None): return new_collection(NSmallest(self, n=n, split_every=split_every))
[docs] @derived_from(pd.Series) def memory_usage(self, deep=False, index=True): return new_collection(MemoryUsageFrame(self, deep=deep, _index=index))
[docs] def unique(self, split_every=None, split_out=True, shuffle_method=None): """ Return Series of unique values in the object. Includes NA values. Returns ------- uniques : Series """ shuffle_method = _get_shuffle_preferring_order(shuffle_method) return new_collection(Unique(self, split_every, split_out, shuffle_method))
[docs] @derived_from(pd.Series) def nunique(self, dropna=True, split_every=False, split_out=True): uniqs = self.drop_duplicates(split_every=split_every, split_out=split_out) if dropna: # count mimics pandas behavior and excludes NA values if isinstance(uniqs, Index): uniqs = uniqs.to_series() return uniqs.count() else: return uniqs.size
[docs] def drop_duplicates( self, ignore_index=False, split_every=None, split_out=True, shuffle_method=None, keep="first", ): shuffle_method = _get_shuffle_preferring_order(shuffle_method) if keep is False: raise NotImplementedError("drop_duplicates with keep=False") return new_collection( DropDuplicates( self, ignore_index=ignore_index, split_out=split_out, split_every=split_every, shuffle_method=shuffle_method, keep=keep, ) )
[docs] @insert_meta_param_description(pad=12) def apply(self, function, *args, meta=no_default, axis=0, **kwargs): """Parallel version of pandas.Series.apply Parameters ---------- func : function Function to apply $META args : tuple Positional arguments to pass to function in addition to the value. Additional keyword arguments will be passed as keywords to the function. Returns ------- applied : Series or DataFrame if func returns a Series. Examples -------- >>> import dask.dataframe as dd >>> s = pd.Series(range(5), name='x') >>> ds = dd.from_pandas(s, npartitions=2) Apply a function elementwise across the Series, passing in extra arguments in ``args`` and ``kwargs``: >>> def myadd(x, a, b=1): ... return x + a + b >>> res = ds.apply(myadd, args=(2,), b=1.5) # doctest: +SKIP By default, dask tries to infer the output metadata by running your provided function on some fake data. This works well in many cases, but can sometimes be expensive, or even fail. To avoid this, you can manually specify the output metadata with the ``meta`` keyword. This can be specified in many forms, for more information see ``dask.dataframe.utils.make_meta``. Here we specify the output is a Series with name ``'x'``, and dtype ``float64``: >>> res = ds.apply(myadd, args=(2,), b=1.5, meta=('x', 'f8')) In the case where the metadata doesn't change, you can also pass in the object itself directly: >>> res = ds.apply(lambda x: x + 1, meta=ds) See Also -------- Series.map_partitions """ self._validate_axis(axis) if meta is no_default: meta = expr._emulate(M.apply, self, function, args=args, udf=True, **kwargs) warnings.warn(meta_warning(meta)) return new_collection(self.expr.apply(function, *args, meta=meta, **kwargs))
@classmethod def _validate_axis(cls, axis=0, numeric_axis: bool = True) -> None | Literal[0, 1]: if axis not in (0, "index", None): raise ValueError(f"No axis named {axis} for Series") if numeric_axis: num_axis: dict[str | None, Literal[0, 1]] = {"index": 0} return num_axis.get(axis, axis) else: return axis @derived_from(pd.Series) def squeeze(self): return self
[docs] @derived_from(pd.Series) def dropna(self): return new_collection(expr.DropnaSeries(self))
[docs] @derived_from(pd.Series) def between(self, left, right, inclusive="both"): return new_collection( expr.Between(self, left=left, right=right, inclusive=inclusive) )
@derived_from(pd.Series) def combine(self, other, func, fill_value=None): other = self._create_alignable_frame(other, "outer") if not expr.are_co_aligned(self.expr, other.expr): return new_collection( expr.CombineSeriesAlign(self, other, func, fill_value) ) return new_collection(expr.CombineSeries(self, other, func, fill_value))
[docs] @derived_from(pd.Series) def explode(self): return new_collection(expr.ExplodeSeries(self))
@derived_from(pd.Series) def add_prefix(self, prefix): return new_collection(expr.AddPrefixSeries(self, prefix)) @derived_from(pd.Series) def add_suffix(self, suffix): return new_collection(expr.AddSuffixSeries(self, suffix)) cat = CachedAccessor("cat", CategoricalAccessor) dt = CachedAccessor("dt", DatetimeAccessor) str = CachedAccessor("str", StringAccessor) def _repartition_quantiles(self, npartitions, upsample=1.0, random_state=None): return new_collection( RepartitionQuantiles(self, npartitions, upsample, random_state) )
[docs] @derived_from(pd.Series) def groupby(self, by, **kwargs): from dask_expr._groupby import SeriesGroupBy return SeriesGroupBy(self, by, **kwargs)
[docs] def rename(self, index, sorted_index=False): """Alter Series index labels or name Function / dict values must be unique (1-to-1). Labels not contained in a dict / Series will be left as-is. Extra labels listed don't throw an error. Alternatively, change ``Series.name`` with a scalar value. Parameters ---------- index : scalar, hashable sequence, dict-like or callable, optional If dict-like or callable, the transformation is applied to the index. Scalar or hashable sequence-like will alter the ``Series.name`` attribute. inplace : boolean, default False Whether to return a new Series or modify this one inplace. sorted_index : bool, default False If true, the output ``Series`` will have known divisions inferred from the input series and the transformation. Ignored for non-callable/dict-like ``index`` or when the input series has unknown divisions. Note that this may only be set to ``True`` if you know that the transformed index is monotonically increasing. Dask will check that transformed divisions are monotonic, but cannot check all the values between divisions, so incorrectly setting this can result in bugs. Returns ------- renamed : Series See Also -------- pandas.Series.rename """ return new_collection(expr.RenameSeries(self, index, sorted_index))
[docs] def quantile(self, q=0.5, method="default"): """Approximate quantiles of Series Parameters ---------- q : list/array of floats, default 0.5 (50%) Iterable of numbers ranging from 0 to 1 for the desired quantiles method : {'default', 'tdigest', 'dask'}, optional What method to use. By default will use dask's internal custom algorithm (``'dask'``). If set to ``'tdigest'`` will use tdigest for floats and ints and fallback to the ``'dask'`` otherwise. """ _raise_if_object_series(self, "quantile") allowed_methods = ["default", "dask", "tdigest"] if method not in allowed_methods: raise ValueError("method can only be 'default', 'dask' or 'tdigest'") return new_collection(SeriesQuantile(self, q, method))
[docs] @derived_from(pd.Series) def median(self): if self.npartitions == 1: return self.median_approximate() raise NotImplementedError( "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " "See the `median_approximate` method instead, which uses an approximate algorithm." )
[docs] def median_approximate(self, method="default"): """Return the approximate median of the values over the requested axis. Parameters ---------- method : {'default', 'tdigest', 'dask'}, optional What method to use. By default will use Dask's internal custom algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest for floats and ints and fallback to the ``"dask"`` otherwise. """ return self.quantile(method=method)
[docs] @derived_from(pd.Series) def cov(self, other, min_periods=None, split_every=False): if not isinstance(other, Series): raise TypeError("other must be a dask.dataframe.Series") df = concat([self, other], axis=1) return df._cov(min_periods, True, split_every, scalar=True)
[docs] @derived_from(pd.Series) def corr(self, other, method="pearson", min_periods=None, split_every=False): if not isinstance(other, Series): raise TypeError("other must be a dask.dataframe.Series") df = concat([self, other], axis=1) return df._corr(method, min_periods, True, split_every, scalar=True)
[docs] @derived_from(pd.Series) def autocorr(self, lag=1, split_every=False): if not isinstance(lag, Integral): raise TypeError("lag must be an integer") return self.corr(self if lag == 0 else self.shift(lag), split_every=split_every)
[docs] @derived_from(pd.Series) def describe( self, split_every=False, percentiles=None, percentiles_method="default", include=None, exclude=None, ): if ( is_numeric_dtype(self.dtype) and not is_bool_dtype(self.dtype) or is_timedelta64_dtype(self.dtype) or is_datetime64_any_dtype(self.dtype) ): return new_collection( DescribeNumeric(self, split_every, percentiles, percentiles_method) ) else: return new_collection( DescribeNonNumeric(self, split_every, percentiles, percentiles_method) )
@property @derived_from(pd.Series) def is_monotonic_increasing(self): return new_collection(IsMonotonicIncreasing(self)) @property @derived_from(pd.Series) def is_monotonic_decreasing(self): return new_collection(IsMonotonicDecreasing(self))
[docs] @derived_from(pd.Series) def to_string(self, max_rows=5): # option_context doesn't affect return self._repr_data().to_string(max_rows=max_rows)
def _repr_data(self): return _repr_data_series(self._meta, self._repr_divisions) if PANDAS_GE_220: @derived_from(pd.Series) def case_when(self, caselist): if not isinstance(caselist, list): raise TypeError("The caselist argument should be a list") caselist = list(flatten([[c, v] for c, v in caselist], container=list)) return new_collection(expr.CaseWhen(self, *caselist))
for name in [ "add", "sub", "mul", "div", "divide", "truediv", "floordiv", "mod", "pow", "radd", "rsub", "rmul", "rdiv", "rtruediv", "rfloordiv", "rmod", "rpow", ]: assert not hasattr(DataFrame, name), name setattr(DataFrame, name, _wrap_expr_method_operator(name, DataFrame)) assert not hasattr(Series, name), name setattr(Series, name, _wrap_expr_method_operator(name, Series))
[docs]class Index(Series): """Index-like Expr Collection""" _accessors: ClassVar[set[str]] = set() _partition_type = pd.Index _dt_attributes = { "nanosecond", "microsecond", "millisecond", "dayofyear", "minute", "hour", "day", "dayofweek", "second", "week", "weekday", "weekofyear", "month", "quarter", "year", } _cat_attributes = { "known", "as_known", "as_unknown", "add_categories", "categories", "remove_categories", "reorder_categories", "as_ordered", "codes", "remove_unused_categories", "set_categories", "as_unordered", "ordered", "rename_categories", } def __getattr__(self, key): if ( isinstance(self._meta.dtype, pd.CategoricalDtype) and key in self._cat_attributes ): return getattr(self.cat, key) elif key in self._dt_attributes: return getattr(self.dt, key) if hasattr(super(), key): # Doesn't trigger super().__getattr__ # Not a magic attribute. This is a real method or property of Series that # has been overridden by RaiseAttributeError(). raise AttributeError( f"{self.__class__.__name__!r} object has no attribute {key!r}" ) return super().__getattr__(key) def __repr__(self): return f"<dask_expr.expr.Index: expr={self.expr}>" def __array_wrap__(self, array, context=None): return pd.Index(array, name=self.name)
[docs] @derived_from(pd.Index) def to_series(self, index=None, name=no_default): if index is not None: raise NotImplementedError return new_collection(expr.ToSeriesIndex(self, index=index, name=name))
[docs] @derived_from(pd.Index, ua_args=["index"]) def to_frame(self, index=True, name=no_default): if not index: raise NotImplementedError return new_collection(expr.ToFrameIndex(self, index=index, name=name))
[docs] @derived_from(pd.Index) def memory_usage(self, deep=False): return new_collection(MemoryUsageIndex(self, deep=deep))
[docs] def shift(self, periods=1, freq=None): return new_collection(expr.ShiftIndex(self, periods, freq))
[docs] @derived_from(pd.Index) def map(self, arg, na_action=None, meta=None, is_monotonic=False): """ Note that this method clears any known divisions. If your mapping function is monotonically increasing then use `is_monotonic` to apply the maping function to the old divisions and assign the new divisions to the output. """ if isinstance(arg, Series): if not expr.are_co_aligned(self.expr, arg.expr): if meta is None: warnings.warn(meta_warning(meta)) return new_collection( expr.MapIndexAlign(self, arg, na_action, meta, is_monotonic) ) if meta is None: meta = expr._emulate(M.map, self, arg, na_action=na_action, udf=True) warnings.warn(meta_warning(meta)) return new_collection( expr.Map( self, arg=arg, na_action=na_action, meta=meta, is_monotonic=is_monotonic ) )
def __dir__(self): o = set(dir(type(self))) o.update(self.__dict__) o.update(set(dir(expr.Expr))) o.update(self._dt_attributes) if isinstance(self.dtype, pd.CategoricalDtype): o.update(self._cat_attributes) return list(o) # Methods and properties of Series that are not implemented on Index
[docs] def count(self, split_every=False): return new_collection(IndexCount(self, split_every))
@property def index(self): raise AttributeError("'Index' object has no attribute 'index'") def sum(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'sum'") def prod(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'prod'") def mean(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'mean'") def std(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'std'") def var(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'var'") def idxmax(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'idxmax'") def idxmin(self, *args, **kwargs): raise AttributeError("'Index' object has no attribute 'idxmin'")
class Scalar(FrameBase): """Scalar Expr Collection""" def __repr__(self): return f"<dask_expr.expr.Scalar: expr={self.expr}, dtype={self.dtype}>" def __bool__(self): raise TypeError( f"Trying to convert {self} to a boolean value. Because Dask objects are " "lazily evaluated, they cannot be converted to a boolean value or used " "in boolean conditions like if statements. Try calling .compute() to " "force computation prior to converting to a boolean value or using in " "a conditional statement." ) def __dask_postcompute__(self): return first, () def to_series(self, index=0) -> Series: return new_collection(expr.ScalarToSeries(self, index=index)) def __array__(self): # array interface is required to support pandas instance + Scalar # Otherwise, above op results in pd.Series of Scalar (object dtype) return np.asarray(self.compute()) @property def dtype(self): return self._meta.dtype def new_collection(expr): """Create new collection from an expr""" meta = expr._meta expr._name # Ensure backend is imported return get_collection_type(meta)(expr) def optimize(collection, fuse=True): return new_collection(expr.optimize(collection.expr, fuse=fuse))
[docs]def from_pandas(data, npartitions=None, sort=True, chunksize=None): if chunksize is not None and npartitions is not None: raise ValueError("Exactly one of npartitions and chunksize must be specified.") elif chunksize is None and npartitions is None: npartitions = 1 if not has_parallel_type(data): raise TypeError("Input must be a pandas DataFrame or Series.") if data.index.isna().any() and not _is_any_real_numeric_dtype(data.index): raise NotImplementedError( "Index in passed data is non-numeric and contains nulls, which Dask does not entirely support.\n" "Consider passing `data.loc[~data.isna()]` instead." ) if npartitions is not None and not isinstance(npartitions, int): raise TypeError( "Please provide npartitions as an int, or possibly as None if you specify chunksize." ) elif chunksize is not None and not isinstance(chunksize, int): raise TypeError( "Please provide chunksize as an int, or possibly as None if you specify npartitions." ) from dask_expr.io.io import FromPandas return new_collection( FromPandas( _BackendData(data.copy()), npartitions=npartitions, sort=sort, chunksize=chunksize, pyarrow_strings_enabled=pyarrow_strings_enabled(), ) )
[docs]def from_array(arr, chunksize=50_000, columns=None, meta=None): import dask.array as da if isinstance(arr, da.Array): return from_dask_array(arr, columns=columns, meta=meta) from dask_expr.io.io import FromArray result = FromArray( arr, chunksize=chunksize, original_columns=columns, meta=meta, ) if pyarrow_strings_enabled() and arr.dtype.kind in "OU": result = expr.ArrowStringConversion(result) return new_collection(result)
def from_graph(*args, **kwargs): from dask_expr.io.io import FromGraph return new_collection(FromGraph(*args, **kwargs)) @dataframe_creation_dispatch.register_inplace("pandas") def from_dict( data, npartitions, orient="columns", dtype=None, columns=None, constructor=pd.DataFrame, ): """ Construct a Dask DataFrame from a Python Dictionary Parameters ---------- data : dict Of the form {field : array-like} or {field : dict}. npartitions : int The number of partitions of the index to create. Note that depending on the size and index of the dataframe, the output may have fewer partitions than requested. orient : {'columns', 'index', 'tight'}, default 'columns' The "orientation" of the data. If the keys of the passed dict should be the columns of the resulting DataFrame, pass 'columns' (default). Otherwise if the keys should be rows, pass 'index'. If 'tight', assume a dict with keys ['index', 'columns', 'data', 'index_names', 'column_names']. dtype: bool Data type to force, otherwise infer. columns: string, optional Column labels to use when ``orient='index'``. Raises a ValueError if used with ``orient='columns'`` or ``orient='tight'``. constructor: class, default pd.DataFrame Class with which ``from_dict`` should be called with. Examples -------- >>> import dask.dataframe as dd >>> ddf = dd.from_dict({"num1": [1, 2, 3, 4], "num2": [7, 8, 9, 10]}, npartitions=2) """ collection_types = {type(v) for v in data.values() if is_dask_collection(v)} if collection_types: raise NotImplementedError( "from_dict doesn't currently support Dask collections as inputs. " f"Objects of type {collection_types} were given in the input dict." ) return from_pandas( constructor.from_dict(data, orient, dtype, columns), npartitions, ) def from_dask_dataframe(*args, **kwargs) -> FrameBase: """Create a dask-expr collection from a legacy dask-dataframe collection WARNING: This API is deprecated. Please use `from_legacy_dataframe`. Parameters ---------- optimize Whether to optimize the graph before conversion. """ warnings.warn( "`from_dask_dataframe` is deprecated, please use `from_legacy_dataframe`.", FutureWarning, ) return from_legacy_dataframe(*args, **kwargs)
[docs]def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase: """Create a dask-expr collection from a legacy dask-dataframe collection Parameters ---------- optimize Whether to optimize the graph before conversion. """ graph = ddf.dask if optimize: graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__()) return from_graph( graph, ddf._meta, ddf.divisions, ddf.__dask_keys__(), key_split(ddf._name) )
[docs]def from_dask_array(x, columns=None, index=None, meta=None): from dask.dataframe.io import from_dask_array if isinstance(index, FrameBase): index = index.to_legacy_dataframe() if columns is not None and not len(columns): columns = None df = from_dask_array(x, columns=columns, index=index, meta=meta) return from_legacy_dataframe(df, optimize=True)
[docs]def read_csv( path, *args, header="infer", usecols=None, dtype_backend=None, storage_options=None, **kwargs, ): from dask_expr.io.csv import ReadCSV if not isinstance(path, str): path = stringify_path(path) return new_collection( ReadCSV( path, columns=usecols, dtype_backend=dtype_backend, storage_options=storage_options, kwargs=kwargs, header=header, ) )
[docs]def read_table( path, *args, header="infer", usecols=None, dtype_backend=None, storage_options=None, **kwargs, ): from dask_expr.io.csv import ReadTable if not isinstance(path, str): path = stringify_path(path) return new_collection( ReadTable( path, columns=usecols, dtype_backend=dtype_backend, storage_options=storage_options, kwargs=kwargs, header=header, ) )
[docs]def read_fwf( path, *args, header="infer", usecols=None, dtype_backend=None, storage_options=None, **kwargs, ): from dask_expr.io.csv import ReadFwf if not isinstance(path, str): path = stringify_path(path) return new_collection( ReadFwf( path, columns=usecols, dtype_backend=dtype_backend, storage_options=storage_options, kwargs=kwargs, header=header, ) )
[docs]def read_parquet( path=None, columns=None, filters=None, categories=None, index=None, storage_options=None, dtype_backend=None, calculate_divisions=False, ignore_metadata_file=False, metadata_task_size=None, split_row_groups="infer", blocksize="default", aggregate_files=None, parquet_file_extension=(".parq", ".parquet", ".pq"), filesystem="fsspec", engine=None, arrow_to_pandas=None, **kwargs, ): """ Read a Parquet file into a Dask DataFrame This reads a directory of Parquet data into a Dask.dataframe, one file per partition. It selects the index among the sorted columns if any exist. Parameters ---------- path : str or list Source directory for data, or path(s) to individual parquet files. Prefix with a protocol like ``s3://`` to read from alternative filesystems. To read from multiple files you can pass a globstring or a list of paths, with the caveat that they must all have the same protocol. columns : str or list, default None Field name(s) to read in as columns in the output. By default all non-index fields will be read (as determined by the pandas parquet metadata, if present). Provide a single field name instead of a list to read in the data as a Series. filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. Using this argument will result in row-wise filtering of the final partitions. Predicates can be expressed in disjunctive normal form (DNF). This means that the inner-most tuple describes a single column predicate. These inner predicates are combined with an AND conjunction into a larger predicate. The outer-most list then combines all of the combined filters with an OR disjunction. Predicates can also be expressed as a ``List[Tuple]``. These are evaluated as an AND conjunction. To express OR in predicates, one must use the (preferred for "pyarrow") ``List[List[Tuple]]`` notation. index : str, list or False, default None Field name(s) to use as the output frame index. By default will be inferred from the pandas parquet file metadata, if present. Use ``False`` to read all fields as columns. categories : list or dict, default None For any fields listed here, if the parquet encoding is Dictionary, the column will be created with dtype category. Use only if it is guaranteed that the column is encoded as dictionary in all row-groups. If a list, assumes up to 2**16-1 labels; if a dict, specify the number of labels expected; if None, will load categories automatically for data written by dask, not otherwise. storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. Note that the default file-system backend can be configured with the ``filesystem`` argument, described below. open_file_options : dict, default None Key/value arguments to be passed along to ``AbstractFileSystem.open`` when each parquet data file is open for reading. Experimental (optimized) "precaching" for remote file systems (e.g. S3, GCS) can be enabled by adding ``{"method": "parquet"}`` under the ``"precache_options"`` key. Also, a custom file-open function can be used (instead of ``AbstractFileSystem.open``), by specifying the desired function under the ``"open_file_func"`` key. dtype_backend : {'numpy_nullable', 'pyarrow'}, defaults to NumPy backed DataFrames Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when 'numpy_nullable' is set, pyarrow is used for all dtypes if 'pyarrow' is set. ``dtype_backend="pyarrow"`` requires ``pandas`` 1.5+. calculate_divisions : bool, default False Whether to use min/max statistics from the footer metadata (or global ``_metadata`` file) to calculate divisions for the output DataFrame collection. Divisions will not be calculated if statistics are missing. This option will be ignored if ``index`` is not specified and there is no physical index column specified in the custom "pandas" Parquet metadata. Note that ``calculate_divisions=True`` may be extremely slow when no global ``_metadata`` file is present, especially when reading from remote storage. Set this to ``True`` only when known divisions are needed for your workload (see :ref:`dataframe-design-partitions`). ignore_metadata_file : bool, default False Whether to ignore the global ``_metadata`` file (when one is present). If ``True``, or if the global ``_metadata`` file is missing, the parquet metadata may be gathered and processed in parallel. Parallel metadata processing is currently supported for ``ArrowDatasetEngine`` only. metadata_task_size : int, default configurable If parquet metadata is processed in parallel (see ``ignore_metadata_file`` description above), this argument can be used to specify the number of dataset files to be processed by each task in the Dask graph. If this argument is set to ``0``, parallel metadata processing will be disabled. The default values for local and remote filesystems can be specified with the "metadata-task-size-local" and "metadata-task-size-remote" config fields, respectively (see "dataframe.parquet"). split_row_groups : 'infer', 'adaptive', bool, or int, default 'infer' If True, then each output dataframe partition will correspond to a single parquet-file row-group. If False, each partition will correspond to a complete file. If a positive integer value is given, each dataframe partition will correspond to that number of parquet row-groups (or fewer). If 'adaptive', the metadata of each file will be used to ensure that every partition satisfies ``blocksize``. If 'infer' (the default), the uncompressed storage-size metadata in the first file will be used to automatically set ``split_row_groups`` to either 'adaptive' or ``False``. blocksize : int or str, default 'default' The desired size of each output ``DataFrame`` partition in terms of total (uncompressed) parquet storage space. This argument is currently used to set the default value of ``split_row_groups`` (using row-group metadata from a single file), and will be ignored if ``split_row_groups`` is not set to 'infer' or 'adaptive'. Default is 256 MiB. aggregate_files : bool or str, default None WARNING: Passing a string argument to ``aggregate_files`` will result in experimental behavior. This behavior may change in the future. Whether distinct file paths may be aggregated into the same output partition. This parameter is only used when `split_row_groups` is set to 'infer', 'adaptive' or to an integer >1. A setting of True means that any two file paths may be aggregated into the same output partition, while False means that inter-file aggregation is prohibited. For "hive-partitioned" datasets, a "partition"-column name can also be specified. In this case, we allow the aggregation of any two files sharing a file path up to, and including, the corresponding directory name. For example, if ``aggregate_files`` is set to ``"section"`` for the directory structure below, ``03.parquet`` and ``04.parquet`` may be aggregated together, but ``01.parquet`` and ``02.parquet`` cannot be. If, however, ``aggregate_files`` is set to ``"region"``, ``01.parquet`` may be aggregated with ``02.parquet``, and ``03.parquet`` may be aggregated with ``04.parquet``:: dataset-path/ ├── region=1/ │ ├── section=a/ │ │ └── 01.parquet │ ├── section=b/ │ └── └── 02.parquet └── region=2/ ├── section=a/ │ ├── 03.parquet └── └── 04.parquet Note that the default behavior of ``aggregate_files`` is ``False``. parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq") A file extension or an iterable of extensions to use when discovering parquet files in a directory. Files that don't match these extensions will be ignored. This argument only applies when ``paths`` corresponds to a directory and no ``_metadata`` file is present (or ``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None`` will treat all files in the directory as parquet files. The purpose of this argument is to ensure that the engine will ignore unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. dataset: dict, default None Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. These options may include a "filesystem" key to configure the desired file-system backend. However, the top-level ``filesystem`` argument will always take precedence. **Note**: The ``dataset`` options may include a "partitioning" key. However, since ``pyarrow.dataset.Partitioning`` objects cannot be serialized, the value can be a dict of key-word arguments for the ``pyarrow.dataset.partitioning`` API (e.g. ``dataset={"partitioning": {"flavor": "hive", "schema": ...}}``). Note that partitioned columns will not be converted to categorical dtypes when a custom partitioning schema is specified in this way. read: dict, default None Dictionary of options to pass through to ``engine.read_partitions`` using the ``read`` key-word argument. arrow_to_pandas: dict, default None Dictionary of options to use when converting from ``pyarrow.Table`` to a pandas ``DataFrame`` object. Only used by the "arrow" engine. **kwargs: dict (of dicts) Options to pass through to ``engine.read_partitions`` as stand-alone key-word arguments. Note that these options will be ignored by the engines defined in ``dask.dataframe``, but may be used by other custom implementations. Examples -------- >>> df = dd.read_parquet('s3://bucket/my-parquet-data') # doctest: +SKIP See Also -------- to_parquet pyarrow.parquet.ParquetDataset """ from dask_expr.io.parquet import ( ReadParquetFSSpec, ReadParquetPyarrowFS, _set_parquet_engine, ) if not isinstance(path, str): path = stringify_path(path) kwargs["dtype_backend"] = dtype_backend if arrow_to_pandas: kwargs["arrow_to_pandas"] = arrow_to_pandas if filters is not None: for filter in flatten(filters, container=list): col, op, val = filter if op == "in" and not isinstance(val, (set, list, tuple)): raise TypeError("Value of 'in' filter must be a list, set or tuple.") if ( isinstance(filesystem, pa_fs.FileSystem) or isinstance(filesystem, str) and filesystem.lower() in ("arrow", "pyarrow") ): if parse_version(pa.__version__) < parse_version("15.0.0"): raise ValueError( "pyarrow>=15.0.0 is required to use the pyarrow filesystem." ) if metadata_task_size is not None: raise NotImplementedError( "metadata_task_size is not supported when using the pyarrow filesystem." ) if split_row_groups != "infer": raise NotImplementedError( "split_row_groups is not supported when using the pyarrow filesystem." ) if blocksize is not None and blocksize != "default": raise NotImplementedError( "blocksize is not supported when using the pyarrow filesystem." ) if aggregate_files is not None: raise NotImplementedError( "aggregate_files is not supported when using the pyarrow filesystem." ) if parquet_file_extension != (".parq", ".parquet", ".pq"): raise NotImplementedError( "parquet_file_extension is not supported when using the pyarrow filesystem." ) if engine is not None: raise NotImplementedError( "engine is not supported when using the pyarrow filesystem." ) return new_collection( ReadParquetPyarrowFS( path, columns=_convert_to_list(columns), filters=filters, categories=categories, index=index, calculate_divisions=calculate_divisions, storage_options=storage_options, filesystem=filesystem, ignore_metadata_file=ignore_metadata_file, arrow_to_pandas=arrow_to_pandas, pyarrow_strings_enabled=pyarrow_strings_enabled(), kwargs=kwargs, _series=isinstance(columns, str), ) ) return new_collection( ReadParquetFSSpec( path, columns=_convert_to_list(columns), filters=filters, categories=categories, index=index, storage_options=storage_options, calculate_divisions=calculate_divisions, ignore_metadata_file=ignore_metadata_file, metadata_task_size=metadata_task_size, split_row_groups=split_row_groups, blocksize=blocksize, aggregate_files=aggregate_files, parquet_file_extension=parquet_file_extension, filesystem=filesystem, engine=_set_parquet_engine(engine), kwargs=kwargs, _series=isinstance(columns, str), ) )
[docs]def concat( dfs, axis=0, join="outer", ignore_unknown_divisions=False, ignore_order=False, interleave_partitions=False, **kwargs, ): if not isinstance(dfs, list): raise TypeError("dfs must be a list of DataFrames/Series objects") if len(dfs) == 0: raise ValueError("No objects to concatenate") if len(dfs) == 1: if axis == 1 and isinstance(dfs[0], Series): return dfs[0].to_frame() return dfs[0] if join not in ("inner", "outer"): raise ValueError("'join' must be 'inner' or 'outer'") dfs = [from_pandas(df) if not isinstance(df, FrameBase) else df for df in dfs] if axis == 1: dfs = [df for df in dfs if len(df.columns) > 0 or isinstance(df, Series)] return new_collection( Concat( join, ignore_order, kwargs, axis, ignore_unknown_divisions, interleave_partitions, *dfs, ) )
[docs]def merge( left, right, how="inner", on=None, left_on=None, right_on=None, left_index=False, right_index=False, suffixes=("_x", "_y"), indicator=False, shuffle_method=None, npartitions=None, broadcast=None, ): for o in [on, left_on, right_on]: if isinstance(o, FrameBase): raise NotImplementedError() if not on and not left_on and not right_on and not left_index and not right_index: on = [c for c in left.columns if c in right.columns] if not on: left_index = right_index = True if on and not left_on and not right_on: left_on = right_on = on supported_how = ("left", "right", "outer", "inner", "leftsemi") if how not in supported_how: raise ValueError( f"dask.dataframe.merge does not support how='{how}'." f"Options are: {supported_how}." ) if how == "leftsemi": if right_index or any( o not in right.columns for o in _convert_to_list(right_on) ): raise NotImplementedError( "how='leftsemi' does not support right_index=True or on columns from the index" ) else: right = right[_convert_to_list(right_on)].rename( columns=dict(zip(_convert_to_list(right_on), _convert_to_list(left_on))) ) right_on = left_on # Transform pandas objects into dask.dataframe objects if not is_dask_collection(left): if right_index and left_on: # change to join on index left = left.set_index(left[left_on]) left_on = None left_index = True left = from_pandas(left, npartitions=1) if not is_dask_collection(right): if left_index and right_on: # change to join on index right = right.set_index(right[right_on]) right_on = None right_index = True right = from_pandas(right, npartitions=1) assert is_dataframe_like(right._meta) if left_on and right_on: warn_dtype_mismatch(left, right, left_on, right_on) return new_collection( Merge( left, right, how=how, left_on=left_on, right_on=right_on, left_index=left_index, right_index=right_index, suffixes=suffixes, indicator=indicator, shuffle_method=get_specified_shuffle(shuffle_method), _npartitions=npartitions, broadcast=broadcast, ) )
[docs]def merge_asof( left, right, on=None, left_on=None, right_on=None, left_index=False, right_index=False, by=None, left_by=None, right_by=None, suffixes=("_x", "_y"), tolerance=None, allow_exact_matches=True, direction="backward", ): if direction not in ["backward", "forward", "nearest"]: raise ValueError( "Invalid merge_asof direction. Choose from 'backward'" " 'forward', or 'nearest'" ) kwargs = { "on": on, "left_on": left_on, "right_on": right_on, "left_index": left_index, "right_index": right_index, "by": by, "left_by": left_by, "right_by": right_by, "suffixes": suffixes, "tolerance": tolerance, "allow_exact_matches": allow_exact_matches, "direction": direction, } if left is None or right is None: raise ValueError("Cannot merge_asof on None") # if is_dataframe_like(left) and is_dataframe_like(right): if isinstance(left, pd.DataFrame) and isinstance(right, pd.DataFrame): return pd.merge_asof(left, right, **kwargs) if on is not None: if left_on is not None or right_on is not None: raise ValueError( "Can only pass argument 'on' OR 'left_on' and 'right_on', not a " "combination of both." ) left_on = right_on = on kwargs["left_on"] = left_on kwargs["right_on"] = right_on del kwargs["on"] for o in [left_on, right_on]: if isinstance(o, _Frame): raise NotImplementedError( "Dask collections not currently allowed in merge columns" ) if not is_dask_collection(left): left = from_pandas(left, npartitions=1) if not is_dask_collection(right): right = from_pandas(right, npartitions=1) if by is not None: if left_by is not None or right_by is not None: raise ValueError( "Can only pass argument 'by' OR 'left_by' and 'right_by', not a combination of both." ) kwargs["left_by"] = kwargs["right_by"] = by del kwargs["by"] if left_by is None and right_by is not None: raise ValueError("Must specify both left_on and right_on if one is specified.") if left_by is not None and right_by is None: raise ValueError("Must specify both left_on and right_on if one is specified.") from dask_expr._merge_asof import MergeAsof return new_collection(MergeAsof(left, right, **kwargs))
[docs]def from_map( func, *iterables, args=None, meta=no_default, divisions=None, label=None, enforce_metadata=False, **kwargs, ): """Create a dask-expr collection from a custom function map NOTE: The underlying ``Expr`` object produced by this API will support column projection (via ``simplify``) if the ``func`` argument has "columns" in its signature. """ from dask.dataframe.io.utils import DataFrameIOFunction from dask_expr.io import FromMap, FromMapProjectable if "token" in kwargs: # This option doesn't really make sense in dask-expr raise NotImplementedError("dask_expr does not support a token argument.") lengths = set() iterables = list(iterables) for i, iterable in enumerate(iterables): if not isinstance(iterable, Iterable): raise ValueError( f"All elements of `iterables` must be Iterable, got {type(iterable)}" ) try: lengths.add(len(iterable)) except (AttributeError, TypeError): iterables[i] = list(iterable) lengths.add(len(iterables[i])) if len(lengths) == 0: raise ValueError("`from_map` requires at least one Iterable input") elif len(lengths) > 1: raise ValueError("All `iterables` must have the same length") if lengths == {0}: raise ValueError("All `iterables` must have a non-zero length") # Check if `func` supports column projection allow_projection = False columns_arg_required = False if param := inspect.signature(func).parameters.get("columns", None): allow_projection = True columns_arg_required = param.default is param.empty if meta is no_default and columns_arg_required: raise TypeError( "Argument `func` of `from_map` has a required `columns` " " parameter and not `meta` provided." "Either provide `meta` yourself or make `columns` an optional argument." ) elif isinstance(func, DataFrameIOFunction): warnings.warn( "dask_expr does not support the DataFrameIOFunction " "protocol for column projection. To enable column " "projection, please ensure that the signature of `func` " "includes a `columns=` keyword argument instead." ) else: allow_projection = False args = [] if args is None else args kwargs = {} if kwargs is None else kwargs if allow_projection: columns = kwargs.pop("columns", None) result = new_collection( FromMapProjectable( func, iterables, columns, args, kwargs, columns_arg_required, meta, enforce_metadata, divisions, label, ) ) else: result = new_collection( FromMap( func, iterables, args, kwargs, meta, enforce_metadata, divisions, label, ) ) if pyarrow_strings_enabled(): return new_collection(expr.ArrowStringConversion(result)) return result
def repartition(df, divisions, force=False): """Repartition dataframe along new divisions Dask.DataFrame objects are partitioned along their index. Often when multiple dataframes interact we need to align these partitionings. The ``repartition`` function constructs a new DataFrame object holding the same data but partitioned on different values. It does this by performing a sequence of ``loc`` and ``concat`` calls to split and merge the previous generation of partitions. Parameters ---------- divisions : list List of partitions to be used force : bool, default False Allows the expansion of the existing divisions. If False then the new divisions lower and upper bounds must be the same as the old divisions. Examples -------- >>> df = df.repartition([0, 5, 10, 20]) # doctest: +SKIP Also works on Pandas objects >>> ddf = dd.repartition(df, [0, 5, 10, 20]) # doctest: +SKIP """ if isinstance(df, FrameBase): return df.repartition(divisions=divisions, force=force) elif is_dataframe_like(df) or is_series_like(df): return new_collection( FromPandasDivisions( _BackendData(df), divisions=divisions, pyarrow_strings_enabled=pyarrow_strings_enabled(), ) ) else: raise NotImplementedError(f"repartition is not implemented for {type(df)}.")
[docs]def pivot_table(df, index, columns, values, aggfunc="mean"): if not is_scalar(index) or index not in df._meta.columns: raise ValueError("'index' must be the name of an existing column") if not is_scalar(columns) or columns not in df._meta.columns: raise ValueError("'columns' must be the name of an existing column") if not methods.is_categorical_dtype(df._meta[columns]): raise ValueError("'columns' must be category dtype") if not has_known_categories(df._meta[columns]): raise ValueError("'columns' must have known categories") if not ( is_scalar(values) and values in df._meta.columns or not is_scalar(values) and all(is_scalar(x) and x in df._meta.columns for x in values) ): raise ValueError("'values' must refer to an existing column or columns") available_aggfuncs = ["mean", "sum", "count", "first", "last"] if not is_scalar(aggfunc) or aggfunc not in available_aggfuncs: raise ValueError( "aggfunc must be either " + ", ".join(f"'{x}'" for x in available_aggfuncs) ) return new_collection( PivotTable(df, index=index, columns=columns, values=values, aggfunc=aggfunc) )
[docs]def to_numeric(arg, errors="raise", downcast=None, meta=None): """ Return type depends on input. Delayed if scalar, otherwise same as input. For errors, only "raise" and "coerce" are allowed. """ if errors not in ("raise", "coerce"): raise ValueError("invalid error value specified") if pd_is_scalar(arg): if meta is not None: raise KeyError("``meta`` is not allowed when input is a scalar.") return delayed(pd.to_numeric, pure=True)(arg, errors=errors, downcast=downcast) if is_arraylike(arg): return new_collection( ToNumeric( from_array(arg).astype(arg.dtype), errors=errors, downcast=downcast ) ).to_dask_array(meta=meta) if is_series_like(arg): return new_collection( ToNumeric(frame=arg, errors=errors, downcast=downcast, meta=meta) ) raise TypeError( "arg must be a list, tuple, dask.array.Array, or dask.dataframe.Series" )
[docs]def to_datetime(arg, meta=None, **kwargs): tz_kwarg = {"tz": "utc"} if kwargs.get("utc") else {} (arg,) = _maybe_from_pandas([arg]) if meta is None: if isinstance(arg, Index): meta = get_meta_library(arg).DatetimeIndex([], **tz_kwarg) meta.name = arg.name elif not (is_dataframe_like(arg) or is_series_like(arg)): raise NotImplementedError( "dask.dataframe.to_datetime does not support " "non-index-able arguments (like scalars)" ) else: meta = meta_series_constructor(arg)([pd.Timestamp("2000", **tz_kwarg)]) meta.index = meta.index.astype(arg.index.dtype) meta.index.name = arg.index.name kwargs.pop("infer_datetime_format", None) return new_collection(ToDatetime(frame=arg, kwargs=kwargs, meta=meta))
[docs]def to_timedelta(arg, unit=None, errors="raise"): if not isinstance(arg, Series): raise TypeError("arg must be a Series") return new_collection(ToTimedelta(frame=arg, unit=unit, errors=errors))
def _from_scalars(scalars, meta, names): return new_collection(FromScalars(meta, names, *scalars))
[docs]@insert_meta_param_description def map_partitions( func, *args, meta=no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, parent_meta=None, **kwargs, ): """Apply Python function on each DataFrame partition. Parameters ---------- func : function Function applied to each partition. args, kwargs : Arguments and keywords to pass to the function. At least one of the args should be a Dask.dataframe. Arguments and keywords may contain ``Scalar``, ``Delayed`` or regular python objects. DataFrame-like args (both dask and pandas) will be repartitioned to align (if necessary) before applying the function (see ``align_dataframes`` to control). enforce_metadata : bool, default True Whether to enforce at runtime that the structure of the DataFrame produced by ``func`` actually matches the structure of ``meta``. This will rename and reorder columns for each partition, and will raise an error if this doesn't work, but it won't raise if dtypes don't match. transform_divisions : bool, default True Whether to apply the function onto the divisions and apply those transformed divisions to the output. align_dataframes : bool, default True Whether to repartition DataFrame- or Series-like args (both dask and pandas) so their divisions align before applying the function. This requires all inputs to have known divisions. Single-partition inputs will be split into multiple partitions. If False, all inputs must have either the same number of partitions or a single partition. Single-partition inputs will be broadcast to every partition of multi-partition inputs. $META """ if align_dataframes: # TODO: Handle alignment? # Perhaps we only handle the case that all `Expr` operands # have the same number of partitions or can be broadcasted # within `MapPartitions`. If so, the `map_partitions` API # will need to call `Repartition` on operands that are not # aligned with `self.expr`. raise NotImplementedError() new_expr = expr.MapPartitions( args[0], func, meta, enforce_metadata, transform_divisions, clear_divisions, align_dataframes, parent_meta, kwargs.pop("token", None), kwargs, *args[1:], ) return new_collection(new_expr)
[docs]def map_overlap( func, df, before, after, *args, meta=no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, **kwargs, ): if isinstance(before, str): before = pd.to_timedelta(before) if isinstance(after, str): after = pd.to_timedelta(after) if isinstance(before, datetime.timedelta) or isinstance(after, datetime.timedelta): if isinstance(df, FrameBase): inferred_type = df.index._meta_nonempty.inferred_type else: inferred_type = df.index.inferred_type if not is_datetime64_any_dtype(inferred_type): raise TypeError( "Must have a `DatetimeIndex` when using string offset " "for `before` and `after`" ) elif not ( isinstance(before, Integral) and before >= 0 and isinstance(after, Integral) and after >= 0 ): raise ValueError("before and after must be positive integers") df = _maybe_from_pandas([df])[0] args = _maybe_from_pandas(args) if align_dataframes: dfs = [df] + args dfs = [df for df in dfs if isinstance(df, FrameBase)] if len(dfs) > 1 and not expr.are_co_aligned(*dfs): return new_collection( expr.MapOverlapAlign( df, func, before, after, meta, enforce_metadata, transform_divisions, clear_divisions, align_dataframes, kwargs.pop("token", None), kwargs, *args, ) ) new_expr = expr.MapOverlap( df, func, before, after, meta, enforce_metadata, transform_divisions, clear_divisions, align_dataframes, kwargs.pop("token", None), kwargs, *args, ) return new_collection(new_expr)
def isna(arg): if isinstance(arg, FrameBase): return arg.isna() else: return from_pandas(arg).isna() def elemwise(op, *args, meta=no_default, out=None, transform_divisions=True, **kwargs): """Elementwise operation for Dask dataframes Parameters ---------- op: callable Function to apply across input dataframes *args: DataFrames, Series, Scalars, Arrays, The arguments of the operation meta: pd.DataFrame, pd.Series (optional) Valid metadata for the operation. Will evaluate on a small piece of data if not provided. transform_divisions: boolean If the input is a ``dask.dataframe.Index`` we normally will also apply the function onto the divisions and apply those transformed divisions to the output. You can pass ``transform_divisions=False`` to override this behavior out : ``dask.array`` or ``None`` If out is a dask.DataFrame, dask.Series or dask.Scalar then this overwrites the contents of it with the result **kwargs: scalars Examples -------- >>> elemwise(operator.add, df.x, df.y) # doctest: +SKIP """ args = _maybe_from_pandas(args) dfs = [df for df in args if isinstance(df, FrameBase)] if len(dfs) <= 1 or expr.are_co_aligned(*dfs): result = new_collection( expr.UFuncElemwise(dfs[0], op, meta, transform_divisions, kwargs, *args) ) else: result = new_collection(expr.UFuncAlign(dfs[0], op, meta, kwargs, *args)) return handle_out(out, result) def handle_out(out, result): """Handle out parameters If out is a dask.DataFrame, dask.Series or dask.Scalar then this overwrites the contents of it with the result. The method replaces the expression of the out parameter with the result from this operation to perform something akin to an inplace modification. """ if isinstance(out, tuple): if len(out) == 1: out = out[0] elif len(out) > 1: raise NotImplementedError( "The `out` parameter with length > 1 is not supported" ) else: out = None if out is not None and out.__class__ != result.__class__: raise TypeError( "Mismatched types between result and out parameter. " "out=%s, result=%s" % (str(type(out)), str(type(result))) ) if isinstance(out, DataFrame): if len(out.columns) != len(result.columns): raise ValueError( "Mismatched columns count between result and out parameter. " "out=%s, result=%s" % (str(len(out.columns)), str(len(result.columns))) ) if isinstance(out, (Series, DataFrame, Scalar)): out._expr = result._expr elif out is not None: msg = ( "The out parameter is not fully supported." " Received type %s, expected %s " % ( typename(type(out)), typename(type(result)), ) ) raise NotImplementedError(msg) else: return result def _compute_partition_stats( column: Series, allow_overlap: bool = False ) -> tuple[list, list, list[int]]: """For a given column, compute the min, max, and len of each partition. And make sure that the partitions are sorted relative to each other. NOTE: this does not guarantee that every partition is internally sorted. """ mins = column.map_partitions(M.min, meta=column) maxes = column.map_partitions(M.max, meta=column) lens = column.map_partitions(len, meta=column) mins, maxes, lens = compute(mins, maxes, lens) mins = mins.bfill().tolist() maxes = maxes.bfill().tolist() non_empty_mins = [m for m, length in zip(mins, lens) if length != 0] non_empty_maxes = [m for m, length in zip(maxes, lens) if length != 0] if ( sorted(non_empty_mins) != non_empty_mins or sorted(non_empty_maxes) != non_empty_maxes ): raise ValueError( f"Partitions are not sorted ascending by {column.name or 'the index'}. ", f"In your dataset the (min, max, len) values of {column.name or 'the index'} " f"for each partition are: {list(zip(mins, maxes, lens))}", ) if not allow_overlap and any( a <= b for a, b in zip(non_empty_mins[1:], non_empty_maxes[:-1]) ): warnings.warn( "Partitions have overlapping values, so divisions are non-unique. " "Use `set_index(sorted=True)` with no `divisions` to allow dask to fix the overlap. " f"In your dataset the (min, max, len) values of {column.name or 'the index'} " f"for each partition are : {list(zip(mins, maxes, lens))}", UserWarning, ) lens = methods.tolist(lens) if not allow_overlap: return (mins, maxes, lens) else: return (non_empty_mins, non_empty_maxes, lens)