import functools
import math
import warnings
from collections.abc import Callable
import numpy as np
import pandas as pd
from dask import is_dask_collection
from dask._task_spec import Task
from dask.core import flatten
from dask.dataframe.core import (
GROUP_KEYS_DEFAULT,
_concat,
apply_and_enforce,
is_dataframe_like,
is_series_like,
)
from dask.dataframe.dispatch import concat, make_meta, meta_nonempty
from dask.dataframe.groupby import (
_agg_finalize,
_aggregate_docstring,
_apply_chunk,
_build_agg_args,
_cov_agg,
_cov_chunk,
_cum_agg_aligned,
_cum_agg_filled,
_cumcount_aggregate,
_determine_levels,
_groupby_aggregate,
_groupby_aggregate_spec,
_groupby_apply_funcs,
_groupby_get_group,
_groupby_raise_unaligned,
_groupby_slice_apply,
_groupby_slice_shift,
_groupby_slice_transform,
_head_aggregate,
_head_chunk,
_non_agg_chunk,
_normalize_spec,
_nunique_df_chunk,
_nunique_df_combine,
_tail_aggregate,
_tail_chunk,
_unique_aggregate,
_value_counts,
_value_counts_aggregate,
_var_agg,
_var_chunk,
)
from dask.dataframe.utils import insert_meta_param_description
from dask.typing import Key
from dask.utils import M, derived_from, is_index_like
from pandas.core.apply import reconstruct_func, validate_func_kwargs
from dask_expr._collection import FrameBase, Index, Series, new_collection
from dask_expr._expr import (
Assign,
Blockwise,
Expr,
MapPartitions,
Projection,
RenameFrame,
RenameSeries,
ToFrame,
_DeepCopy,
_extract_meta,
are_co_aligned,
determine_column_projection,
no_default,
)
from dask_expr._reductions import ApplyConcatApply, Chunk, Reduction
from dask_expr._shuffle import RearrangeByColumn
from dask_expr._util import (
PANDAS_GE_300,
_convert_to_list,
get_specified_shuffle,
is_scalar,
)
def _as_dict(key, value):
# Utility to convert a single kwarg to a dict.
# The dict will be empty if the value is None
return {} if value is None else {key: value}
def _adjust_split_out_for_group_keys(npartitions, by):
if len(by) == 1:
return math.ceil(npartitions / 15)
return math.ceil(npartitions / (10 / (len(by) - 1)))
[docs]class Aggregation:
"""User defined groupby-aggregation.
This class allows users to define their own custom aggregation in terms of
operations on Pandas dataframes in a map-reduce style. You need to specify
what operation to do on each chunk of data, how to combine those chunks of
data together, and then how to finalize the result.
See :ref:`dataframe.groupby.aggregate` for more.
Parameters
----------
name : str
the name of the aggregation. It should be unique, since intermediate
result will be identified by this name.
chunk : callable
a function that will be called with the grouped column of each
partition, takes a Pandas SeriesGroupBy in input.
It can either return a single series or a tuple of series.
The index has to be equal to the groups.
agg : callable
a function that will be called to aggregate the results of each chunk.
Again the argument(s) will be a Pandas SeriesGroupBy. If ``chunk``
returned a tuple, ``agg`` will be called with all of them as
individual positional arguments.
finalize : callable
an optional finalizer that will be called with the results from the
aggregation.
Examples
--------
We could implement ``sum`` as follows:
>>> custom_sum = dd.Aggregation(
... name='custom_sum',
... chunk=lambda s: s.sum(),
... agg=lambda s0: s0.sum()
... ) # doctest: +SKIP
>>> df.groupby('g').agg(custom_sum) # doctest: +SKIP
We can implement ``mean`` as follows:
>>> custom_mean = dd.Aggregation(
... name='custom_mean',
... chunk=lambda s: (s.count(), s.sum()),
... agg=lambda count, sum: (count.sum(), sum.sum()),
... finalize=lambda count, sum: sum / count,
... ) # doctest: +SKIP
>>> df.groupby('g').agg(custom_mean) # doctest: +SKIP
Though of course, both of these are built-in and so you don't need to
implement them yourself.
"""
[docs] def __init__(self, name, chunk, agg, finalize=None):
self.chunk = chunk
self.agg = agg
self.finalize = finalize
self.__name__ = name
###
### Groupby-aggregation expressions
###
class GroupByBase:
@functools.cached_property
def _by_meta(self):
return [meta_nonempty(x._meta) if isinstance(x, Expr) else x for x in self.by]
@functools.cached_property
def _by_columns(self):
return [x for x in self.by if not isinstance(x, Expr)]
@property
def split_by(self):
return list(
flatten(
[[x] if not isinstance(x, Expr) else x.columns for x in self.by],
container=list,
)
)
@functools.cached_property
def by(self):
return self.operands[len(self._parameters) :]
@functools.cached_property
def levels(self):
return _determine_levels(self.by)
@property
def shuffle_by_index(self):
return True
class GroupByChunk(Chunk, GroupByBase):
_preserves_partitioning_information = True
@functools.cached_property
def _args(self) -> list:
return [self.frame] + self.by
@functools.cached_property
def _meta(self):
args = [
meta_nonempty(op._meta) if isinstance(op, Expr) else op for op in self._args
]
return make_meta(self.operation(*args, **self._kwargs))
class GroupByApplyConcatApply(ApplyConcatApply, GroupByBase):
_chunk_cls = GroupByChunk
@functools.cached_property
def _meta_chunk(self):
meta = meta_nonempty(self.frame._meta)
return self.chunk(meta, *self._by_meta, **self.chunk_kwargs)
@property
def _chunk_cls_args(self):
return self.by
@property
def split_out(self):
if self.operand("split_out") is None:
return 1
return super().split_out
@property
def _projection_columns(self):
return self.frame.columns
def _tune_down(self):
if self.operand("split_out") is None:
return self.substitute_parameters(
{
"split_out": functools.partial(
_adjust_split_out_for_group_keys, by=self.by
)
}
)
class SingleAggregation(GroupByApplyConcatApply, GroupByBase):
"""Single groupby aggregation
This is an abstract class. Sub-classes must implement
the following methods:
- `groupby_chunk`: Applied to each group within
the `chunk` method of `GroupByApplyConcatApply`
- `groupby_aggregate`: Applied to each group within
the `aggregate` method of `GroupByApplyConcatApply`
Parameters
----------
frame: Expr
Dataframe- or series-like expression to group.
by: str, list or Series
The key for grouping
observed:
Passed through to dataframe backend.
dropna:
Whether rows with NA values should be dropped.
chunk_kwargs:
Key-word arguments to pass to `groupby_chunk`.
aggregate_kwargs:
Key-word arguments to pass to `aggregate_chunk`.
"""
_parameters = [
"frame",
"observed",
"dropna",
"chunk_kwargs",
"aggregate_kwargs",
"_slice",
"split_every",
"split_out",
"sort",
"shuffle_method",
]
_defaults = {
"observed": None,
"dropna": None,
"chunk_kwargs": None,
"aggregate_kwargs": None,
"_slice": None,
"split_every": 8,
"split_out": None,
"sort": None,
"shuffle_method": None,
}
groupby_chunk = None
groupby_aggregate = None
@classmethod
def chunk(cls, df, *by, **kwargs):
return _apply_chunk(df, *by, **kwargs)
@classmethod
def aggregate(cls, inputs, **kwargs):
return _groupby_aggregate(_concat(inputs), **kwargs)
@property
def chunk_kwargs(self) -> dict:
chunk_kwargs = self.operand("chunk_kwargs") or {}
columns = self._slice
return {
"chunk": self.groupby_chunk,
"columns": columns,
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
**chunk_kwargs,
}
@property
def aggregate_kwargs(self) -> dict:
aggregate_kwargs = self.operand("aggregate_kwargs") or {}
groupby_aggregate = self.groupby_aggregate or self.groupby_chunk
return {
"aggfunc": groupby_aggregate,
"levels": self.levels,
"sort": self.sort,
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
**aggregate_kwargs,
}
def _simplify_up(self, parent, dependents):
return groupby_projection(self, parent, dependents)
class GroupbyAggregationBase(GroupByApplyConcatApply, GroupByBase):
"""Base class for groupby aggregation
This class can be subclassed to perform a general
groupby aggregation by passing in a `str`, `list` or
`dict`-based specification using the `arg` operand.
Parameters
----------
frame: Expr
Dataframe- or series-like expression to group.
by: str, list or Series
The key for grouping
arg: str, list or dict
Aggregation spec defining the specific aggregations
to perform.
observed:
Passed through to dataframe backend.
dropna:
Whether rows with NA values should be dropped.
"""
_parameters = [
"frame",
"arg",
"observed",
"dropna",
"split_every",
"split_out",
"sort",
"shuffle_method",
"_slice",
]
_defaults = {
"observed": None,
"dropna": None,
"split_every": 8,
"split_out": None,
"sort": None,
"shuffle_method": None,
"_slice": None,
}
@functools.cached_property
def spec(self):
# Converts the `arg` operand into specific
# chunk, aggregate, and finalizer functions
if is_dataframe_like(self.frame._meta):
group_columns = self._by_columns
if self._slice:
non_group_columns = self._slice
if is_scalar(non_group_columns):
non_group_columns = [non_group_columns]
else:
non_group_columns = [
col for col in self.frame.columns if col not in group_columns
]
spec = _normalize_spec(self.arg, non_group_columns)
elif is_series_like(self.frame._meta):
if isinstance(self.arg, (list, tuple, dict)):
spec = _normalize_spec({None: self.arg}, [])
spec = [
(result_column, func, input_column)
for ((_, result_column), func, input_column) in spec
]
else:
spec = _normalize_spec({None: self.arg}, [])
spec = [
(self.frame.columns[0], func, input_column)
for (_, func, input_column) in spec
]
else:
raise ValueError(f"aggregate on unknown object {self.frame._meta}")
return spec
@functools.cached_property
def agg_args(self):
keys = ["chunk_funcs", "aggregate_funcs", "finalizers"]
return dict(zip(keys, _build_agg_args(self.spec)))
def _simplify_down(self):
if not isinstance(self.arg, dict):
return
# Use agg-spec information to add column projection
required_columns = (
set(self._by_columns)
.union(self.arg.keys())
.intersection(self.frame.columns)
)
column_projection = [
column for column in self.frame.columns if column in required_columns
]
if column_projection != self.frame.columns:
return type(self)(self.frame[column_projection], *self.operands[1:])
class GroupbyAggregation(GroupbyAggregationBase):
"""Logical groupby aggregation class
This class lowers itself to concrete implementations for decomposable
or holistic aggregations.
"""
@functools.cached_property
def _meta(self):
return self._lower()._meta
@functools.cached_property
def _is_decomposable(self):
return not any(s[1] in ("median", np.median) for s in self.spec)
def _lower(self):
cls = (
DecomposableGroupbyAggregation
if self._is_decomposable
else HolisticGroupbyAggregation
)
return cls(
self.frame,
self.arg,
self.observed,
self.dropna,
self.split_every,
self.split_out,
self.sort,
self.shuffle_method,
self._slice,
*self.by,
)
class HolisticGroupbyAggregation(GroupbyAggregationBase):
"""Groupby aggregation for both decomposable and non-decomposable aggregates
This class always calculates the aggregates by first collecting all the data for
the groups and then aggregating at once.
We are always shuffling, so we will never call combine
"""
@functools.cached_property
def _meta(self):
meta = self._meta_chunk
aggregate = self.aggregate or (lambda x: x)
aggregate_kwargs = self.aggregate_kwargs
meta = aggregate([meta], **aggregate_kwargs)
return make_meta(meta)
chunk = staticmethod(_non_agg_chunk)
@property
def should_shuffle(self):
return True
@classmethod
def aggregate(cls, inputs, **kwargs):
return _groupby_aggregate_spec(_concat(inputs), **kwargs)
@property
def chunk_kwargs(self) -> dict:
return {
"by": self._by_columns,
"key": [col for col in self.frame.columns if col not in self._by_columns],
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
}
@property
def aggregate_kwargs(self) -> dict:
return {
"spec": self.arg,
"levels": _determine_levels(self.by),
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
}
class DecomposableGroupbyAggregation(GroupbyAggregationBase):
"""Groupby aggregation for decomposable aggregates
The results may be calculated via tree or shuffle reduction.
"""
chunk = staticmethod(_groupby_apply_funcs)
@classmethod
def combine(cls, inputs, **kwargs):
return _groupby_apply_funcs(_concat(inputs), **kwargs)
@classmethod
def aggregate(cls, inputs, **kwargs):
return _agg_finalize(_concat(inputs), **kwargs)
@property
def chunk_kwargs(self) -> dict:
return {
"funcs": self.agg_args["chunk_funcs"],
"sort": self.sort,
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
}
@property
def combine_kwargs(self) -> dict:
return {
"funcs": self.agg_args["aggregate_funcs"],
"level": self.levels,
"sort": self.sort,
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
}
@property
def aggregate_kwargs(self) -> dict:
return {
"aggregate_funcs": self.agg_args["aggregate_funcs"],
"arg": self.arg,
"columns": self._slice,
"finalize_funcs": self.agg_args["finalizers"],
"is_series": self.frame._meta.ndim == 1,
"level": self.levels,
"sort": self.sort,
**_as_dict("observed", self.observed),
**_as_dict("dropna", self.dropna),
}
class Sum(SingleAggregation):
groupby_chunk = M.sum
class Prod(SingleAggregation):
groupby_chunk = M.prod
class Min(SingleAggregation):
groupby_chunk = M.min
class Max(SingleAggregation):
groupby_chunk = M.max
class First(SingleAggregation):
groupby_chunk = M.first
class Last(SingleAggregation):
groupby_chunk = M.last
class Count(SingleAggregation):
groupby_chunk = M.count
groupby_aggregate = M.sum
class Size(SingleAggregation):
groupby_chunk = M.size
groupby_aggregate = M.sum
def _simplify_down(self):
if (
self._slice is not None
and not isinstance(self._slice, list)
or self.frame.ndim == 1
):
# Scalar slices influence the result and are allowed, i.e., the name of
# the series is different
return
# We can remove every column since pandas reduces to a Series anyway
by_columns = self._by_columns
by_columns = [c for c in by_columns if c in self.frame.columns]
if set(by_columns) == set(self.frame.columns):
return
slice_idx = self._parameters.index("_slice")
ops = [op if i != slice_idx else None for i, op in enumerate(self.operands)]
return type(self)(self.frame[by_columns], *ops[1:])
class IdxMin(SingleAggregation):
groupby_chunk = M.idxmin
groupby_aggregate = M.first
class IdxMax(IdxMin):
groupby_chunk = M.idxmax
groupby_aggregate = M.first
class ValueCounts(SingleAggregation):
groupby_chunk = staticmethod(_value_counts)
groupby_aggregate = staticmethod(_value_counts_aggregate)
class Unique(SingleAggregation):
groupby_chunk = M.unique
groupby_aggregate = staticmethod(_unique_aggregate)
@functools.cached_property
def aggregate_kwargs(self) -> dict:
kwargs = super().aggregate_kwargs
meta = self.frame._meta
if meta.ndim == 1:
name = meta.name
else:
name = meta[self._slice].name
return {**kwargs, "name": name}
class Cov(SingleAggregation):
chunk = staticmethod(_cov_chunk)
std = False
@classmethod
def combine(cls, g, levels):
return _concat(g)
@classmethod
def aggregate(cls, inputs, **kwargs):
return _cov_agg(_concat(inputs), **kwargs)
@property
def chunk_kwargs(self) -> dict:
return self.operand("chunk_kwargs")
@property
def aggregate_kwargs(self) -> dict:
kwargs = self.operand("aggregate_kwargs").copy()
kwargs["sort"] = self.sort
kwargs["std"] = self.std
kwargs["levels"] = self.levels
return kwargs
@property
def combine_kwargs(self) -> dict:
return {"levels": self.levels}
class Corr(Cov):
std = True
class GroupByReduction(Reduction, GroupByBase):
_chunk_cls = GroupByChunk
def _tune_down(self):
if self.operand("split_out") is None:
return self.substitute_parameters(
{
"split_out": functools.partial(
_adjust_split_out_for_group_keys, by=self.by
)
}
)
@property
def split_out(self):
if self.operand("split_out") is None:
return 1
return super().split_out
@property
def _chunk_cls_args(self):
return self.by
@functools.cached_property
def _meta_chunk(self):
meta = meta_nonempty(self.frame._meta)
return self.chunk(meta, *self._by_meta, **self.chunk_kwargs)
def _divisions(self):
if self.sort:
return (None, None)
split_out = self.split_out
if split_out is True:
split_out = self.frame.npartitions
return (None,) * (split_out + 1)
def _simplify_up(self, parent, dependents):
return groupby_projection(self, parent, dependents)
@functools.cached_property
def combine_kwargs(self):
return {"levels": self.levels, "observed": self.observed, "dropna": self.dropna}
@functools.cached_property
def chunk_kwargs(self):
return {"observed": self.observed, "dropna": self.dropna}
@functools.cached_property
def aggregate_kwargs(self):
return {
"levels": self.levels,
"sort": self.sort,
"observed": self.observed,
"dropna": self.dropna,
}
def _var_combine(g, levels, sort=False, observed=False, dropna=True):
return g.groupby(level=levels, sort=sort, observed=observed, dropna=dropna).sum()
class Var(GroupByReduction):
_parameters = [
"frame",
"ddof",
"numeric_only",
"split_out",
"split_every",
"sort",
"dropna",
"observed",
"shuffle_method",
]
_defaults = {
"split_out": 1,
"sort": None,
"observed": None,
"dropna": None,
"split_every": None,
"shuffle_method": None,
}
reduction_aggregate = staticmethod(_var_agg)
reduction_combine = staticmethod(_var_combine)
chunk = staticmethod(_var_chunk)
@functools.cached_property
def aggregate_kwargs(self):
return {
"ddof": self.ddof,
"numeric_only": self.numeric_only,
**super().aggregate_kwargs,
}
@functools.cached_property
def chunk_kwargs(self):
return {"numeric_only": self.numeric_only, **super().chunk_kwargs}
class Std(Var):
def _lower(self):
v = Var(*self.operands)
return MapPartitions(
v,
func=np.sqrt,
meta=v._meta,
enforce_metadata=True,
transform_divisions=True,
clear_divisions=True,
)
def _mean_chunk(df, *by, observed=None, dropna=None):
if is_series_like(df):
df = df.to_frame()
g = _groupby_raise_unaligned(df, by=by, observed=observed, dropna=dropna)
x = g.sum(numeric_only=True)
n = g[x.columns].count().rename(columns=lambda c: c + "-count")
return concat([x, n], axis=1)
def _mean_combine(g, levels, sort=False, observed=None, dropna=None):
return g.groupby(level=levels, sort=sort, observed=observed, dropna=dropna).sum()
def _mean_agg(g, levels, sort=False, observed=False, dropna=True):
result = g.groupby(level=levels, sort=sort, observed=observed, dropna=dropna).sum()
s = result[result.columns[: len(result.columns) // 2]]
c = result[result.columns[len(result.columns) // 2 :]]
c.columns = s.columns
return s / c
class Mean(GroupByReduction):
_parameters = SingleAggregation._parameters
_defaults = SingleAggregation._defaults
reduction_aggregate = staticmethod(_mean_agg)
reduction_combine = staticmethod(_mean_combine)
chunk = staticmethod(_mean_chunk)
def nunique_df_combine(dfs, *args, **kwargs):
return _nunique_df_combine(concat(dfs), *args, **kwargs)
def nunique_df_aggregate(dfs, levels, name, sort=False):
df = concat(dfs)
if df.ndim == 1:
# split out reduces to a Series
return df.groupby(level=levels, sort=sort, observed=True).nunique()
else:
return df.groupby(level=levels, sort=sort, observed=True)[name].nunique()
class NUnique(SingleAggregation):
aggregate = staticmethod(nunique_df_aggregate)
combine = staticmethod(nunique_df_combine)
@staticmethod
def chunk(df, *by, **kwargs):
if df.ndim == 1:
df = df.to_frame()
kwargs = dict(name=df.columns[0], levels=_determine_levels(by))
return _nunique_df_chunk(df, *by, **kwargs)
@functools.cached_property
def chunk_kwargs(self) -> dict:
kwargs = super().chunk_kwargs
kwargs["name"] = self._slice
return kwargs
@functools.cached_property
def aggregate_kwargs(self) -> dict:
return {"levels": self.levels, "name": self._slice}
@functools.cached_property
def combine_kwargs(self):
return {"levels": self.levels}
class Head(SingleAggregation):
groupby_chunk = staticmethod(_head_chunk)
groupby_aggregate = staticmethod(_head_aggregate)
@classmethod
def combine(cls, inputs, **kwargs):
return _concat(inputs)
class Tail(Head):
groupby_chunk = staticmethod(_tail_chunk)
groupby_aggregate = staticmethod(_tail_aggregate)
class GroupByApply(Expr, GroupByBase):
_parameters = [
"frame",
"observed",
"dropna",
"_slice",
"group_keys",
"func",
"meta",
"args",
"kwargs",
"shuffle_method",
]
_defaults = {
"observed": None,
"dropna": None,
"_slice": None,
"group_keys": True,
"shuffle_method": None,
}
@functools.cached_property
def grp_func(self):
return functools.partial(groupby_slice_apply, func=self.func)
@functools.cached_property
def _meta(self):
if self.operand("meta") is not no_default:
return make_meta(self.operand("meta"), parent_meta=self.frame._meta)
return _meta_apply_transform(self, self.grp_func)
def _divisions(self):
if self.need_to_shuffle:
return (None,) * (self.frame.npartitions + 1)
return self.frame.divisions
def _shuffle_grp_func(self, shuffled=False):
return self.grp_func
@functools.cached_property
def unique_partition_mapping_columns_from_shuffle(self):
if not self.need_to_shuffle:
return self.frame.unique_partition_mapping_columns_from_shuffle
elif not any(isinstance(b, Expr) for b in self.by):
return {tuple(self._by_columns)}
else:
return set()
@functools.cached_property
def need_to_shuffle(self):
if not any(isinstance(b, Expr) for b in self.by):
if any(
set(self._by_columns) >= set(cols)
for cols in self.frame.unique_partition_mapping_columns_from_shuffle
):
return False
return any(div is None for div in self.frame.divisions) or not any(
_contains_index_name(self.frame._meta.index.name, b) for b in self.by
)
def _lower(self):
df = self.frame
by = self.by
if self.need_to_shuffle:
def get_map_columns(df):
map_columns = {col: str(col) for col in df.columns if col != str(col)}
unmap_columns = {v: k for k, v in map_columns.items()}
return map_columns, unmap_columns
# Map Tuple[str] column names to str before the shuffle
if any(isinstance(b, Expr) for b in self.by):
is_series = df.ndim == 1
if is_series:
df = ToFrame(df)
cols, assign_exprs = [], []
for i, b in enumerate(self.by):
if isinstance(b, Expr):
assign_exprs.extend([f"_by_{i}", b])
cols.append(f"_by_{i}")
if len(assign_exprs):
df = Assign(df, *assign_exprs)
map_columns, unmap_columns = get_map_columns(df)
if map_columns:
df = RenameFrame(df, map_columns)
df = RearrangeByColumn(
df,
[map_columns.get(c, c) for c in cols],
df.npartitions,
method=self.shuffle_method,
)
if unmap_columns:
df = RenameFrame(df, unmap_columns)
# pandas checks if the group keys are part of the initial
# DataFrame through reference tracking. This is fine as long
# as we don't trigger a copy after the Assign above, since the
# blocks stay separate normally, disk shuffle triggers a copy
# though, and we shouldn't rely on those internals in pandas,
# so we can trigger a deep copy here to clear the references
# since we know more about the query than pandas does.
by = [
(
b
if not isinstance(b, Expr)
else _DeepCopy(
RenameSeries(
Projection(df, f"_by_{i}"), index=self.by[i].columns[0]
)
)
)
for i, b in enumerate(self.by)
]
cols = [col for col in df.columns if col not in cols]
if is_series:
cols = cols[0]
df = Projection(df, cols)
else:
map_columns, unmap_columns = get_map_columns(df)
if map_columns:
df = RenameFrame(df, map_columns)
df = RearrangeByColumn(
df,
map_columns.get(self.by[0], self.by[0]),
self.npartitions,
method=self.shuffle_method,
)
if unmap_columns:
df = RenameFrame(df, unmap_columns)
grp_func = self._shuffle_grp_func(True)
else:
grp_func = self._shuffle_grp_func(False)
return GroupByUDFBlockwise(
df,
self._slice,
self.group_keys,
self.observed,
self.dropna,
self.operand("args"),
self.operand("kwargs"),
grp_func,
self.operand("meta"),
*by,
)
class GroupByTransform(GroupByApply):
@functools.cached_property
def grp_func(self):
return functools.partial(groupby_slice_transform, func=self.func)
def _fillna(group, *, what, **kwargs):
return getattr(group, what)(**kwargs)
class GroupByBFill(GroupByTransform):
func = staticmethod(functools.partial(_fillna, what="bfill"))
def _simplify_up(self, parent, dependents):
if isinstance(parent, Projection):
return groupby_projection(self, parent, dependents)
class GroupByFFill(GroupByBFill):
func = staticmethod(functools.partial(_fillna, what="ffill"))
class GroupByShift(GroupByApply):
_defaults = {
"observed": None,
"dropna": None,
"_slice": None,
"func": None,
"group_keys": True,
}
@functools.cached_property
def grp_func(self):
return functools.partial(groupby_slice_shift, shuffled=False)
def _shuffle_grp_func(self, shuffled=False):
return functools.partial(groupby_slice_shift, shuffled=shuffled)
class Median(GroupByShift):
_parameters = GroupByApply._parameters + ["split_every"]
default = {**GroupByShift._defaults, "split_every": None}
@functools.cached_property
def grp_func(self):
return functools.partial(_median_groupby_aggregate)
def _shuffle_grp_func(self, shuffled=False):
return self.grp_func
def _simplify_up(self, parent, dependents):
if isinstance(parent, Projection):
return groupby_projection(self, parent, dependents)
@functools.cached_property
def npartitions(self):
npartitions = self.frame.npartitions
if self.split_every is not None:
npartitions = npartitions // self.split_every
return npartitions
def groupby_get_group(df, *by_key, get_key=None, columns=None):
if PANDAS_GE_300 and is_scalar(get_key):
get_key = (get_key,)
return _groupby_get_group(df, list(by_key), get_key, columns)
class GetGroup(Blockwise, GroupByBase):
operation = staticmethod(groupby_get_group)
_parameters = ["frame", "get_key", "columns"]
_keyword_only = ["get_key", "columns"]
@property
def _args(self) -> list:
return [self.frame] + self.by
@property
def _kwargs(self) -> dict:
cols = self.operand("columns")
return {
"get_key": self.get_key,
"columns": cols if cols is not None else self.frame.columns,
}
def _median_groupby_aggregate(
df,
by=None,
key=None,
group_keys=True, # not used
dropna=None,
observed=None,
numeric_only=False,
args=None,
**kwargs,
):
dropna = {"dropna": dropna} if dropna is not None else {}
observed = {"observed": observed} if observed is not None else {}
g = df.groupby(by=by, **observed, **dropna)
if key is not None:
g = g[key]
return g.median(numeric_only=numeric_only)
class GroupByUDFBlockwise(Blockwise, GroupByBase):
_parameters = [
"frame",
"_slice",
"group_keys",
"observed",
"dropna",
"args",
"kwargs",
"dask_func",
"meta",
]
_defaults = {"observed": None, "dropna": None}
_keyword_only = [
"_slice",
"group_keys",
"observed",
"dropna",
"args",
"kwargs",
"dask_func",
]
@property
def _args(self) -> list:
return [self.frame] + self.by
@functools.cached_property
def _meta(self):
if self.operand("meta") is not no_default:
return make_meta(self.operand("meta"), parent_meta=self.frame._meta)
return _meta_apply_transform(self, self.dask_func)
def _task(self, name: Key, index: int) -> Task:
args = [self._blockwise_arg(op, index) for op in self._args]
kwargs = self._kwargs.copy()
kwargs.update(
{
"_func": self.operation,
"_meta": self._meta,
}
)
return Task(name, apply_and_enforce, *args, **kwargs)
@staticmethod
def operation(
frame,
*by,
_slice,
group_keys=None,
observed=None,
dropna=None,
args=None,
kwargs=None,
dask_func=None,
):
if args is None:
args = ()
if kwargs is None:
kwargs = {}
return dask_func(
frame,
list(by),
key=_slice,
group_keys=group_keys,
args=args,
**_as_dict("observed", observed),
**_as_dict("dropna", dropna),
**kwargs,
)
def _contains_index_name(index_name, by):
if index_name is None:
return False
if isinstance(by, Expr):
return False
if not is_scalar(by):
return False
return index_name == by
def groupby_slice_apply(
df,
grouper,
key,
func,
args,
group_keys=GROUP_KEYS_DEFAULT,
dropna=None,
observed=None,
**kwargs,
):
return _groupby_slice_apply(
df,
grouper,
key,
func,
*args,
group_keys=group_keys,
dropna=dropna,
observed=observed,
**kwargs,
)
def groupby_slice_shift(
df,
grouper,
key,
args,
shuffled,
group_keys=GROUP_KEYS_DEFAULT,
dropna=None,
observed=None,
**kwargs,
):
return _groupby_slice_shift(
df, grouper, key, shuffled, group_keys, dropna, observed, **kwargs
)
def groupby_slice_transform(
df,
grouper,
key,
func,
args,
group_keys=GROUP_KEYS_DEFAULT,
dropna=None,
observed=None,
**kwargs,
):
return _groupby_slice_transform(
df,
grouper,
key,
func,
*args,
group_keys=group_keys,
dropna=dropna,
observed=observed,
**kwargs,
)
def _meta_apply_transform(obj, grp_func):
kwargs = obj.operand("kwargs")
by_meta = obj._by_meta
by_meta = [x if is_scalar(x) else meta_nonempty(x) for x in by_meta]
meta_args, meta_kwargs = _extract_meta((obj.operand("args"), kwargs), nonempty=True)
return make_meta(
grp_func(
meta_nonempty(obj.frame._meta),
by_meta,
key=obj._slice,
args=meta_args,
**_as_dict("observed", obj.observed),
**_as_dict("dropna", obj.dropna),
**_as_dict("group_keys", obj.group_keys),
**meta_kwargs,
)
)
def groupby_projection(expr, parent, dependents):
if isinstance(parent, Projection):
columns = determine_column_projection(
expr, parent, dependents, additional_columns=expr._by_columns
)
columns = _convert_to_list(columns)
columns = [col for col in expr.frame.columns if col in columns]
if columns == expr.frame.columns:
return
return type(parent)(
type(expr)(expr.frame[columns], *expr.operands[1:]),
*parent.operands[1:],
)
return
###
### Groupby Collection API
###
def _clean_by_expr(obj, by):
if (
isinstance(by, Series)
and by.name in obj.columns
and obj.ndim == 2
and by._name == obj[by.name]._name
):
return by.name
elif isinstance(by, Index) and by._name == obj.index._name:
return by.expr
elif isinstance(by, (Series, Index)):
if not are_co_aligned(obj.expr, by.expr):
raise NotImplementedError(
"by must be in the DataFrames columns or aligned with the DataFrame."
)
if isinstance(by, Index):
by = by.to_series()
by.index = obj.index
return by.expr
# By is a column name, e.g. str or int
return by
class GroupByCumulative(Expr, GroupByBase):
_parameters = ["frame", "dropna", "_slice", "numeric_only"]
_defaults = {"numeric_only": None, "dropna": None, "_slice": None}
chunk = None
aggregate = None
initial = 0
@functools.cached_property
def _meta(self):
cols = None if self._slice is None else self._slice
return _apply_chunk(
self.frame._meta,
*self._by_meta,
chunk=self.chunk,
columns=cols,
**self.numeric_only,
)
def _divisions(self):
return self.frame.divisions
@property
def numeric_only(self):
no = self.operand("numeric_only")
return {} if no is None else {"numeric_only": no}
def _lower(self):
meta = self._meta
dropna = {} if self.dropna is None else {"dropna": self.dropna}
columns = meta.name if is_series_like(meta) else meta.columns
frame = MapPartitions(
self.frame,
_apply_chunk,
meta,
True,
True,
False,
True,
None,
None,
{"chunk": self.chunk, "columns": columns, **dropna, **self.numeric_only},
*self.by,
)
cum_raw = frame
if frame.ndim == 1:
frame = ToFrame(frame)
by = self.by.copy()
for i, b in enumerate(by):
if not isinstance(b, Expr):
if b in self.frame.columns:
frame = Assign(frame, f"_by_{b}", self.frame[b])
else:
frame = Assign(frame, f"_by_{b}", self.frame.index)
by[i] = f"_by_{b}"
columns = 0 if columns is None else columns
cum_last = MapPartitions(
frame,
_apply_chunk,
no_default,
True,
True,
False,
True,
None,
None,
{"chunk": M.last, "columns": columns, **dropna},
*by,
)
return GroupByCumulativeFinalizer(
frame,
cum_raw,
cum_last,
meta,
self.aggregate,
self.initial,
columns,
*by,
)
class GroupByCumulativeFinalizer(Expr, GroupByBase):
_parameters = [
"frame",
"cum_raw",
"cum_last",
"meta",
"aggregate",
"initial",
"columns",
]
@functools.cached_property
def _meta(self):
return self.meta
def _divisions(self):
return self.frame.divisions
def _layer(self) -> dict:
dsk = {(self._name, 0): (self.cum_raw._name, 0)}
name_cum = "cum-last" + self._name
for i in range(1, self.frame.npartitions):
# store each cumulative step to graph to reduce computation
if i == 1:
dsk[(name_cum, i)] = (self.cum_last._name, i - 1)
else:
# aggregate with previous cumulation results
dsk[(name_cum, i)] = (
_cum_agg_filled,
(name_cum, i - 1),
(self.cum_last._name, i - 1),
self.aggregate,
self.initial,
)
dsk[(self._name, i)] = (
_cum_agg_aligned,
(self.frame._name, i),
(name_cum, i),
self.by,
self.operand("columns"),
self.aggregate,
self.initial,
)
return dsk
class GroupByCumsum(GroupByCumulative):
chunk = M.cumsum
aggregate = M.add
initial = 0
class GroupByCumprod(GroupByCumulative):
chunk = M.cumprod
aggregate = M.mul
initial = 1
class GroupByCumcount(GroupByCumulative):
chunk = M.cumcount
aggregate = staticmethod(_cumcount_aggregate)
initial = -1
class GroupBy:
"""Collection container for groupby aggregations
The purpose of this class is to expose an API similar
to Pandas' `Groupby` for dask-expr collections.
See Also
--------
SingleAggregation
"""
def __init__(
self,
obj,
by,
group_keys=True,
sort=None,
observed=None,
dropna=None,
slice=None,
):
if isinstance(by, (tuple, list)):
by = [_clean_by_expr(obj, x) for x in by]
else:
by = _clean_by_expr(obj, by)
by_ = by if isinstance(by, (tuple, list)) else [by]
if any(isinstance(key, pd.Grouper) for key in by_):
raise NotImplementedError("pd.Grouper is currently not supported by Dask.")
self._slice = slice
# Check if we can project columns
projection = None
if (
np.isscalar(slice)
or isinstance(slice, (str, list, tuple))
or (
(is_index_like(slice) or is_series_like(slice))
and not is_dask_collection(slice)
)
):
projection = set(by_).union(
{slice} if (np.isscalar(slice) or isinstance(slice, str)) else slice
)
projection = [c for c in obj.columns if c in projection]
self.obj = obj[projection] if projection is not None else obj
self.sort = sort
self.observed = (
observed if observed is not None else False if not PANDAS_GE_300 else True
)
self.dropna = dropna
self.group_keys = group_keys
self.by = (
[by]
if np.isscalar(by) or isinstance(by, Expr) or isinstance(by, Callable)
else list(by)
)
# surface pandas errors
self._meta = self.obj._meta.groupby(
by,
group_keys=group_keys,
sort=sort,
**_as_dict("observed", observed),
**_as_dict("dropna", dropna),
)
if slice is not None:
if isinstance(slice, tuple):
slice = list(slice)
self._meta = self._meta[slice]
def _numeric_only_kwargs(self, numeric_only):
kwargs = {"numeric_only": numeric_only}
return {"chunk_kwargs": kwargs.copy(), "aggregate_kwargs": kwargs.copy()}
def _single_agg(
self,
expr_cls,
split_every=None,
split_out=None,
chunk_kwargs=None,
aggregate_kwargs=None,
shuffle_method=None,
):
if split_every is None:
split_every = 8
return new_collection(
expr_cls(
self.obj.expr,
self.observed,
self.dropna,
chunk_kwargs,
aggregate_kwargs,
self._slice,
split_every,
split_out,
self.sort,
get_specified_shuffle(shuffle_method),
*self.by,
)
)
def __getattr__(self, key):
try:
return self[key]
except KeyError as e:
raise AttributeError(e) from e
def __dir__(self):
return sorted(
set(
dir(type(self))
+ list(self.__dict__)
+ list(filter(M.isidentifier, self.obj.columns))
)
)
def compute(self, **kwargs):
raise NotImplementedError(
"DataFrameGroupBy does not allow compute method."
"Please chain it with an aggregation method (like ``.mean()``) or get a "
"specific group using ``.get_group()`` before calling ``compute()``"
)
def __getitem__(self, key):
if is_scalar(key):
return SeriesGroupBy(
self.obj,
by=self.by,
slice=key,
sort=self.sort,
dropna=self.dropna,
observed=self.observed,
)
g = GroupBy(
self.obj,
by=self.by,
slice=key,
sort=self.sort,
dropna=self.dropna,
observed=self.observed,
group_keys=self.group_keys,
)
return g
[docs] @derived_from(
pd.core.groupby.GroupBy,
inconsistencies="If the group is not present, Dask will return an empty Series/DataFrame.",
)
def get_group(self, key):
return new_collection(GetGroup(self.obj.expr, key, self._slice, *self.by))
[docs] @derived_from(pd.core.groupby.GroupBy)
def count(self, **kwargs):
return self._single_agg(Count, **kwargs)
[docs] @derived_from(pd.core.groupby.GroupBy)
def sum(self, numeric_only=False, min_count=None, **kwargs):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
result = self._single_agg(Sum, **kwargs, **numeric_kwargs)
if min_count:
return result.where(self.count() >= min_count, other=np.nan)
return result
@derived_from(pd.core.groupby.GroupBy)
def prod(self, numeric_only=False, min_count=None, **kwargs):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
result = self._single_agg(Prod, **kwargs, **numeric_kwargs)
if min_count:
return result.where(self.count() >= min_count, other=np.nan)
return result
def _cum_agg(self, cls, numeric_only=None):
return new_collection(
cls(
self.obj.expr,
self.dropna,
self._slice,
numeric_only,
*self.by,
)
)
[docs] @derived_from(pd.core.groupby.GroupBy)
def cumsum(self, numeric_only=False):
return self._cum_agg(GroupByCumsum, numeric_only)
[docs] @derived_from(pd.core.groupby.GroupBy)
def cumprod(self, numeric_only=False):
return self._cum_agg(GroupByCumprod, numeric_only)
[docs] @derived_from(pd.core.groupby.GroupBy)
def cumcount(self):
return self._cum_agg(GroupByCumcount)
def _all_numeric(self):
"""Are all columns that we're not grouping on numeric?"""
numerics = self.obj._meta._get_numeric_data()
# This computes a groupby but only on the empty meta
post_group_columns = self._meta.count().columns
return len(set(post_group_columns) - set(numerics.columns)) == 0
[docs] @derived_from(pd.core.groupby.GroupBy)
def mean(self, numeric_only=False, split_out=None, **kwargs):
if not numeric_only and not self._all_numeric():
raise NotImplementedError(
"'numeric_only=False' is not implemented in Dask."
)
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
result = self._single_agg(Mean, split_out=split_out, **kwargs, **numeric_kwargs)
return self._postprocess_series_squeeze(result)
def _postprocess_series_squeeze(self, result):
if (
isinstance(self.obj, Series)
or is_scalar(self._slice)
and self._slice is not None
):
if len(result.columns) < 1:
raise NotImplementedError(
"Cannot call `SeriesGroupBy.var` or `SeriesGroupBy.mean` on the key "
"column. Please use `aggregate` if you really need to do this."
)
result = result[result.columns[0]]
return result
[docs] @derived_from(pd.core.groupby.GroupBy)
def min(self, numeric_only=False, **kwargs):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
return self._single_agg(Min, **kwargs, **numeric_kwargs)
[docs] @derived_from(pd.core.groupby.GroupBy)
def max(self, numeric_only=False, **kwargs):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
return self._single_agg(Max, **kwargs, **numeric_kwargs)
[docs] @derived_from(pd.core.groupby.GroupBy)
def first(self, numeric_only=False, sort=None, **kwargs):
if sort:
raise NotImplementedError()
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
return self._single_agg(First, **kwargs, **numeric_kwargs)
[docs] @derived_from(pd.DataFrame)
def cov(
self,
ddof=1,
split_every=None,
split_out=None,
numeric_only=False,
shuffle_method=None,
):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
return self._single_agg(
Cov,
split_every,
split_out,
chunk_kwargs=numeric_kwargs["chunk_kwargs"],
aggregate_kwargs={"ddof": ddof},
)
[docs] @derived_from(pd.DataFrame)
def corr(
self, split_every=None, split_out=None, numeric_only=False, shuffle_method=None
):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
return self._single_agg(
Corr,
split_every,
split_out,
chunk_kwargs=numeric_kwargs["chunk_kwargs"],
aggregate_kwargs={"ddof": 1},
)
[docs] @derived_from(pd.core.groupby.GroupBy)
def last(self, numeric_only=False, sort=None, **kwargs):
if sort:
raise NotImplementedError()
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
return self._single_agg(Last, **kwargs, **numeric_kwargs)
[docs] @derived_from(pd.core.groupby.GroupBy)
def ffill(self, limit=None, shuffle_method=None):
return self._transform_like_op(
GroupByFFill, None, limit=limit, shuffle_method=shuffle_method
)
[docs] @derived_from(pd.core.groupby.GroupBy)
def bfill(self, limit=None, shuffle_method=None):
return self._transform_like_op(
GroupByBFill, None, limit=limit, shuffle_method=shuffle_method
)
[docs] @derived_from(pd.core.groupby.GroupBy)
def size(self, **kwargs):
return self._single_agg(Size, **kwargs)
[docs] @derived_from(pd.DataFrame)
def idxmin(
self,
split_every=None,
split_out=None,
skipna=True,
numeric_only=False,
shuffle_method=None,
):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
numeric_kwargs["chunk_kwargs"]["skipna"] = skipna
return self._single_agg(
IdxMin,
split_every=split_every,
split_out=split_out,
shuffle_method=shuffle_method,
**numeric_kwargs,
)
[docs] @derived_from(pd.DataFrame)
def idxmax(
self,
split_every=None,
split_out=None,
skipna=True,
numeric_only=False,
shuffle_method=None,
):
numeric_kwargs = self._numeric_only_kwargs(numeric_only)
numeric_kwargs["chunk_kwargs"]["skipna"] = skipna
return self._single_agg(
IdxMax,
split_every=split_every,
split_out=split_out,
shuffle_method=shuffle_method,
**numeric_kwargs,
)
@derived_from(pd.core.groupby.SeriesGroupBy)
def head(self, n=5, split_every=None, split_out=None):
chunk_kwargs = {"n": n}
aggregate_kwargs = {
"n": n,
"index_levels": len(self.by) if not isinstance(self.by, Expr) else 1,
}
return self._single_agg(
Head,
split_every=split_every,
split_out=split_out,
chunk_kwargs=chunk_kwargs,
aggregate_kwargs=aggregate_kwargs,
)
@derived_from(pd.core.groupby.SeriesGroupBy)
def tail(self, n=5, split_every=None, split_out=None):
chunk_kwargs = {"n": n}
aggregate_kwargs = {
"n": n,
"index_levels": len(self.by) if not isinstance(self.by, Expr) else 1,
}
return self._single_agg(
Tail,
split_every=split_every,
split_out=split_out,
chunk_kwargs=chunk_kwargs,
aggregate_kwargs=aggregate_kwargs,
)
[docs] @derived_from(pd.core.groupby.GroupBy)
def var(
self,
ddof=1,
split_every=None,
split_out=None,
numeric_only=False,
shuffle_method=None,
):
if not numeric_only and not self._all_numeric():
raise NotImplementedError(
"'numeric_only=False' is not implemented in Dask."
)
result = new_collection(
Var(
self.obj.expr,
ddof,
numeric_only,
split_out,
split_every,
self.sort,
self.dropna,
self.observed,
shuffle_method,
*self.by,
)
)
return self._postprocess_series_squeeze(result)
[docs] @derived_from(pd.core.groupby.GroupBy)
def std(
self,
ddof=1,
split_every=None,
split_out=None,
numeric_only=False,
shuffle_method=None,
):
if not numeric_only and not self._all_numeric():
raise NotImplementedError(
"'numeric_only=False' is not implemented in Dask."
)
result = new_collection(
Std(
self.obj.expr,
ddof,
numeric_only,
split_out,
split_every,
self.sort,
self.dropna,
self.observed,
shuffle_method,
*self.by,
)
)
return self._postprocess_series_squeeze(result)
[docs] @_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.agg")
def aggregate(
self, arg=None, split_every=8, split_out=None, shuffle_method=None, **kwargs
):
relabeling, order, columns = None, None, None
if arg is None:
if not isinstance(self, SeriesGroupBy):
relabeling, arg, columns, order = reconstruct_func(arg, **kwargs)
elif isinstance(self, SeriesGroupBy):
columns, arg = validate_func_kwargs(kwargs)
relabeling = True
if arg == "size":
return self.size()
result = new_collection(
GroupbyAggregation(
self.obj.expr,
arg,
self.observed,
self.dropna,
split_every,
split_out,
self.sort,
shuffle_method,
self._slice,
*self.by,
)
)
if relabeling and result is not None:
if order is not None:
result = result.iloc[:, order]
result.columns = columns
return result
def agg(self, *args, **kwargs):
return self.aggregate(*args, **kwargs)
def _warn_if_no_meta(self, meta):
if meta is no_default:
msg = (
"`meta` is not specified, inferred from partial data. "
"Please provide `meta` if the result is unexpected.\n"
" Before: .apply(func)\n"
" After: .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result\n"
" or: .apply(func, meta=('x', 'f8')) for series result"
)
warnings.warn(msg, stacklevel=3)
[docs] @insert_meta_param_description(pad=12)
def apply(self, func, *args, meta=no_default, shuffle_method=None, **kwargs):
"""Parallel version of pandas GroupBy.apply
This mimics the pandas version except for the following:
1. If the grouper does not align with the index then this causes a full
shuffle. The order of rows within each group may not be preserved.
2. Dask's GroupBy.apply is not appropriate for aggregations. For custom
aggregations, use :class:`dask.dataframe.groupby.Aggregation`.
.. warning::
Pandas' groupby-apply can be used to to apply arbitrary functions,
including aggregations that result in one row per group. Dask's
groupby-apply will apply ``func`` once on each group, doing a shuffle
if needed, such that each group is contained in one partition.
When ``func`` is a reduction, e.g., you'll end up with one row
per group. To apply a custom aggregation with Dask,
use :class:`dask.dataframe.groupby.Aggregation`.
Parameters
----------
func: function
Function to apply
args, kwargs : Scalar, Delayed or object
Arguments and keywords to pass to the function.
$META
Returns
-------
applied : Series or DataFrame depending on columns keyword
"""
self._warn_if_no_meta(meta)
return new_collection(
GroupByApply(
self.obj.expr,
self.observed,
self.dropna,
self._slice,
self.group_keys,
func,
meta,
args,
kwargs,
get_specified_shuffle(shuffle_method),
*self.by,
)
)
def _transform_like_op(
self, expr_cls, func, meta=no_default, shuffle_method=None, *args, **kwargs
):
return new_collection(
expr_cls(
self.obj.expr,
self.observed,
self.dropna,
self._slice,
self.group_keys,
func,
meta,
args,
kwargs,
get_specified_shuffle(shuffle_method),
*self.by,
)
)
@insert_meta_param_description(pad=12)
def shift(self, periods=1, meta=no_default, shuffle_method=None, *args, **kwargs):
"""Parallel version of pandas GroupBy.shift
This mimics the pandas version except for the following:
If the grouper does not align with the index then this causes a full
shuffle. The order of rows within each group may not be preserved.
Parameters
----------
periods : Delayed, Scalar or int, default 1
Number of periods to shift.
freq : Delayed, Scalar or str, optional
Frequency string.
fill_value : Scalar, Delayed or object, optional
The scalar value to use for newly introduced missing values.
$META
Returns
-------
shifted : Series or DataFrame shifted within each group.
Examples
--------
>>> import dask
>>> ddf = dask.datasets.timeseries(freq="1h")
>>> result = ddf.groupby("name").shift(1, meta={"id": int, "x": float, "y": float})
"""
if "axis" in kwargs:
raise TypeError("axis is not supported in shift.")
self._warn_if_no_meta(meta)
kwargs = {"periods": periods, **kwargs}
return self._transform_like_op(
GroupByShift, None, meta, shuffle_method, *args, **kwargs
)
@derived_from(pd.core.groupby.GroupBy)
def median(
self, split_every=None, split_out=True, shuffle_method=None, numeric_only=False
):
result = new_collection(
Median(
self.obj.expr,
self.observed,
self.dropna,
self._slice,
self.group_keys,
None,
no_default,
(),
{"numeric_only": numeric_only},
get_specified_shuffle(shuffle_method),
split_every,
*self.by,
)
)
if split_out is not True:
result = result.repartition(npartitions=split_out)
return result
[docs] def rolling(self, window, min_periods=None, center=False, win_type=None, axis=0):
"""Provides rolling transformations.
.. note::
Since MultiIndexes are not well supported in Dask, this method returns a
dataframe with the same index as the original data. The groupby column is
not added as the first level of the index like pandas does.
This method works differently from other groupby methods. It does a groupby
on each partition (plus some overlap). This means that the output has the
same shape and number of partitions as the original.
Parameters
----------
window : str, offset
Size of the moving window. This is the number of observations used
for calculating the statistic. Data must have a ``DatetimeIndex``
min_periods : int, default None
Minimum number of observations in window required to have a value
(otherwise result is NA).
center : boolean, default False
Set the labels at the center of the window.
win_type : string, default None
Provide a window type. The recognized window types are identical
to pandas.
axis : int, default 0
Returns
-------
a Rolling object on which to call a method to compute a statistic
Examples
--------
>>> import dask
>>> ddf = dask.datasets.timeseries(freq="1h")
>>> result = ddf.groupby("name").x.rolling('1D').max()
"""
from dask_expr._rolling import Rolling
return Rolling(
self.obj,
window,
min_periods=min_periods,
center=center,
win_type=win_type,
groupby_kwargs={
"by": self.by,
"sort": self.sort,
"observed": self.observed,
"dropna": self.dropna,
"group_keys": self.group_keys,
},
groupby_slice=self._slice,
)
class SeriesGroupBy(GroupBy):
def __init__(
self,
obj,
by,
sort=None,
observed=None,
dropna=None,
slice=None,
):
# Raise pandas errors if applicable
if isinstance(obj, Series):
if isinstance(by, FrameBase):
obj._meta.groupby(by._meta, **_as_dict("observed", observed))
elif isinstance(by, (list, tuple)) and any(
isinstance(x, FrameBase) for x in by
):
metas = [x._meta if isinstance(x, FrameBase) else x for x in by]
obj._meta.groupby(metas, **_as_dict("observed", observed))
elif isinstance(by, list):
if len(by) == 0:
raise ValueError("No group keys passed!")
non_series_items = [item for item in by if not isinstance(item, Series)]
obj._meta.groupby(non_series_items, **_as_dict("observed", observed))
else:
obj._meta.groupby(by, **_as_dict("observed", observed))
super().__init__(
obj, by=by, slice=slice, observed=observed, dropna=dropna, sort=sort
)
@derived_from(pd.core.groupby.SeriesGroupBy)
def value_counts(self, **kwargs):
return self._single_agg(ValueCounts, **kwargs)
@derived_from(pd.core.groupby.SeriesGroupBy)
def unique(self, **kwargs):
return self._single_agg(Unique, **kwargs)
[docs] def idxmin(
self,
split_every=None,
split_out=None,
skipna=True,
numeric_only=False,
**kwargs,
):
# pandas doesn't support numeric_only here, which is odd
return self._single_agg(
IdxMin,
split_every=None,
split_out=split_out,
chunk_kwargs=dict(skipna=skipna),
)
[docs] def idxmax(
self,
split_every=None,
split_out=None,
skipna=True,
numeric_only=False,
**kwargs,
):
# pandas doesn't support numeric_only here, which is odd
return self._single_agg(
IdxMax,
split_every=split_every,
split_out=split_out,
chunk_kwargs=dict(skipna=skipna),
)
[docs] @derived_from(pd.core.groupby.SeriesGroupBy)
def nunique(self, split_every=None, split_out=True, shuffle_method=None):
"""
Examples
--------
>>> import pandas as pd
>>> import dask.dataframe as dd
>>> d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
>>> df = pd.DataFrame(data=d)
>>> ddf = dd.from_pandas(df, 2)
>>> ddf.groupby(['col1']).col2.nunique().compute()
"""
slice = self._slice or self.obj.name
return new_collection(
NUnique(
self.obj.expr,
self.observed,
self.dropna,
None,
None,
slice,
split_every,
split_out,
self.sort,
get_specified_shuffle(shuffle_method),
*self.by,
)
)
def cov(self, *args, **kwargs):
raise NotImplementedError("cov is not implemented for SeriesGroupBy objects.")
def corr(self, *args, **kwargs):
raise NotImplementedError("cov is not implemented for SeriesGroupBy objects.")
def _all_numeric(self):
return True