Source code for dask.dataframe.tseries.resample

from __future__ import annotations

import functools
from collections import namedtuple

import numpy as np
import pandas as pd
from pandas.core.resample import Resampler as pd_Resampler

from dask.dataframe import methods
from dask.dataframe.dask_expr._collection import new_collection
from dask.dataframe.dask_expr._expr import (
    Blockwise,
    Expr,
    Projection,
    make_meta,
    plain_column_projection,
)
from dask.dataframe.dask_expr._repartition import Repartition
from dask.dataframe.dispatch import meta_nonempty
from dask.utils import derived_from

BlockwiseDep = namedtuple(typename="BlockwiseDep", field_names=["iterable"])  # type: ignore


def _resample_series(
    series,
    start,
    end,
    reindex_closed,
    rule,
    resample_kwargs,
    how,
    fill_value,
    how_args,
    how_kwargs,
):
    out = getattr(series.resample(rule, **resample_kwargs), how)(
        *how_args, **how_kwargs
    )
    if reindex_closed is None:
        inclusive = "both"
    else:
        inclusive = reindex_closed
    closed_kwargs = {"inclusive": inclusive}

    new_index = pd.date_range(
        start.tz_localize(None),
        end.tz_localize(None),
        freq=rule,
        **closed_kwargs,
        name=out.index.name,
        unit=out.index.unit,
    ).tz_localize(start.tz, nonexistent="shift_forward")

    if not out.index.isin(new_index).all():
        raise ValueError(
            "Index is not contained within new index. This can often be "
            "resolved by using larger partitions, or unambiguous "
            "frequencies: 'Q', 'A'..."
        )

    return out.reindex(new_index, fill_value=fill_value)


def _resample_bin_and_out_divs(divisions, rule, closed="left", label="left"):
    rule = pd.tseries.frequencies.to_offset(rule)
    g = pd.Grouper(freq=rule, how="count", closed=closed, label=label)

    # Determine bins to apply `how` to. Disregard labeling scheme.
    divs = pd.Series(range(len(divisions)), index=divisions)
    temp = divs.resample(rule, closed=closed, label="left").count()
    tempdivs = temp.loc[temp > 0].index

    # Cleanup closed == 'right' and label == 'right'
    res = pd.offsets.Nano() if isinstance(rule, pd.offsets.Tick) else pd.offsets.Day()
    if g.closed == "right":
        newdivs = tempdivs + res
    else:
        newdivs = tempdivs
    if g.label == "right":
        outdivs = tempdivs + rule
    else:
        outdivs = tempdivs

    newdivs = methods.tolist(newdivs)
    outdivs = methods.tolist(outdivs)

    # Adjust ends
    if newdivs[0] < divisions[0]:
        newdivs[0] = divisions[0]
    if newdivs[-1] < divisions[-1]:
        if len(newdivs) < len(divs):
            setter = lambda a, val: a.append(val)
        else:
            setter = lambda a, val: a.__setitem__(-1, val)
        setter(newdivs, divisions[-1] + res)
        if outdivs[-1] > divisions[-1]:
            setter(outdivs, outdivs[-1])
        elif outdivs[-1] < divisions[-1]:
            setter(outdivs, temp.index[-1])

    return tuple(map(pd.Timestamp, newdivs)), tuple(map(pd.Timestamp, outdivs))


class ResampleReduction(Expr):
    _parameters = [
        "frame",
        "rule",
        "kwargs",
        "how_args",
        "how_kwargs",
    ]
    _defaults = {
        "closed": None,
        "label": None,
        "kwargs": None,
        "how_args": (),
        "how_kwargs": None,
    }
    how: str | None = None
    fill_value = np.nan

    @functools.cached_property
    def npartitions(self):
        return self.frame.npartitions

    def _divisions(self):
        return self._resample_divisions[1]

    @functools.cached_property
    def _meta(self):
        resample = meta_nonempty(self.frame._meta).resample(self.rule, **self.kwargs)
        meta = getattr(resample, self.how)(*self.how_args, **self.how_kwargs or {})
        return make_meta(meta)

    @functools.cached_property
    def kwargs(self):
        return {} if self.operand("kwargs") is None else self.operand("kwargs")

    @functools.cached_property
    def how_kwargs(self):
        return {} if self.operand("how_kwargs") is None else self.operand("how_kwargs")

    @functools.cached_property
    def _resample_divisions(self):
        return _resample_bin_and_out_divs(
            self.frame.divisions, self.rule, **self.kwargs or {}
        )

    def _simplify_up(self, parent, dependents):
        if isinstance(parent, Projection):
            return plain_column_projection(self, parent, dependents)

    def _lower(self):
        partitioned = Repartition(
            self.frame, new_divisions=self._resample_divisions[0], force=True
        )
        output_divisions = self._resample_divisions[1]
        return ResampleAggregation(
            partitioned,
            BlockwiseDep(output_divisions[:-1]),
            BlockwiseDep(output_divisions[1:]),
            BlockwiseDep(["left"] * (len(output_divisions[1:]) - 1) + [None]),
            self.rule,
            self.kwargs,
            self.how,
            self.fill_value,
            list(self.how_args),
            self.how_kwargs,
        )


class ResampleAggregation(Blockwise):
    _parameters = [
        "frame",
        "divisions_left",
        "divisions_right",
        "closed",
        "rule",
        "kwargs",
        "how",
        "fill_value",
        "how_args",
        "how_kwargs",
    ]
    operation = staticmethod(_resample_series)

    @functools.cached_property
    def _meta(self):
        return self.frame._meta

    def _divisions(self):
        return list(self.divisions_left.iterable) + [self.divisions_right.iterable[-1]]

    def _blockwise_arg(self, arg, i):
        if isinstance(arg, BlockwiseDep):
            return arg.iterable[i]
        return super()._blockwise_arg(arg, i)


class ResampleCount(ResampleReduction):
    how = "count"
    fill_value = 0


class ResampleSum(ResampleReduction):
    how = "sum"
    fill_value = 0


class ResampleProd(ResampleReduction):
    how = "prod"


class ResampleMean(ResampleReduction):
    how = "mean"


class ResampleMin(ResampleReduction):
    how = "min"


class ResampleMax(ResampleReduction):
    how = "max"


class ResampleFirst(ResampleReduction):
    how = "first"


class ResampleLast(ResampleReduction):
    how = "last"


class ResampleVar(ResampleReduction):
    how = "var"


class ResampleStd(ResampleReduction):
    how = "std"


class ResampleSize(ResampleReduction):
    how = "size"
    fill_value = 0


class ResampleNUnique(ResampleReduction):
    how = "nunique"
    fill_value = 0


class ResampleMedian(ResampleReduction):
    how = "median"


class ResampleQuantile(ResampleReduction):
    how = "quantile"


class ResampleOhlc(ResampleReduction):
    how = "ohlc"


class ResampleSem(ResampleReduction):
    how = "sem"


class ResampleAgg(ResampleReduction):
    how = "agg"

    def _simplify_up(self, parent, dependents):
        # Disable optimization in `agg`; function may access other columns
        return


[docs]class Resampler: """Aggregate using one or more operations The purpose of this class is to expose an API similar to Pandas' `Resampler` for dask-expr """
[docs] def __init__(self, obj, rule, **kwargs): if obj.divisions[0] is None: msg = ( "Can only resample dataframes with known divisions\n" "See https://docs.dask.org/en/latest/dataframe-design.html#partitions\n" "for more information." ) raise ValueError(msg) self.obj = obj self.rule = rule self.kwargs = kwargs
def _single_agg(self, expr_cls, how_args=(), how_kwargs=None): return new_collection( expr_cls( self.obj, self.rule, self.kwargs, how_args=how_args, how_kwargs=how_kwargs, ) )
[docs] @derived_from(pd_Resampler) def count(self): return self._single_agg(ResampleCount)
[docs] @derived_from(pd_Resampler) def sum(self): return self._single_agg(ResampleSum)
[docs] @derived_from(pd_Resampler) def prod(self): return self._single_agg(ResampleProd)
[docs] @derived_from(pd_Resampler) def mean(self): return self._single_agg(ResampleMean)
[docs] @derived_from(pd_Resampler) def min(self): return self._single_agg(ResampleMin)
[docs] @derived_from(pd_Resampler) def max(self): return self._single_agg(ResampleMax)
[docs] @derived_from(pd_Resampler) def first(self): return self._single_agg(ResampleFirst)
[docs] @derived_from(pd_Resampler) def last(self): return self._single_agg(ResampleLast)
[docs] @derived_from(pd_Resampler) def var(self): return self._single_agg(ResampleVar)
[docs] @derived_from(pd_Resampler) def std(self): return self._single_agg(ResampleStd)
[docs] @derived_from(pd_Resampler) def size(self): return self._single_agg(ResampleSize)
[docs] @derived_from(pd_Resampler) def nunique(self): return self._single_agg(ResampleNUnique)
[docs] @derived_from(pd_Resampler) def median(self): return self._single_agg(ResampleMedian)
[docs] @derived_from(pd_Resampler) def quantile(self): return self._single_agg(ResampleQuantile)
[docs] @derived_from(pd_Resampler) def ohlc(self): return self._single_agg(ResampleOhlc)
[docs] @derived_from(pd_Resampler) def sem(self): return self._single_agg(ResampleSem)
[docs] @derived_from(pd_Resampler) def agg(self, func, *args, **kwargs): return self._single_agg(ResampleAgg, how_args=(func, *args), how_kwargs=kwargs)