dask.bag.Bag.foldby

dask.bag.Bag.foldby

Bag.foldby(key, binop, initial=_NoDefault.no_default, combine=None, combine_initial=_NoDefault.no_default, split_every=None)[source]

Combined reduction and groupby.

Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.

The computation

>>> b.foldby(key, binop, init)                        

is equivalent to the following:

>>> def reduction(group):                               
...     return reduce(binop, group, init)               
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))

But uses minimal communication and so is much faster.

>>> import dask.bag as db
>>> b = db.from_sequence(range(10))
>>> iseven = lambda x: x % 2 == 0
>>> add = lambda x, y: x + y
>>> dict(b.foldby(iseven, add))
{True: 20, False: 25}

Key Function

The key function determines how to group the elements in your bag. In the common case where your bag holds dictionaries then the key function often gets out one of those elements.

>>> def key(x):
...     return x['name']

This case is so common that it is special cased, and if you provide a key that is not a callable function then dask.bag will turn it into one automatically. The following are equivalent:

>>> b.foldby(lambda x: x['name'], ...)  
>>> b.foldby('name', ...)  

Binops

It can be tricky to construct the right binary operators to perform analytic queries. The foldby method accepts two binary operators, binop and combine. Binary operators two inputs and output must have the same type.

Binop takes a running total and a new element and produces a new total:

>>> def binop(total, x):
...     return total + x['amount']

Combine takes two totals and combines them:

>>> def combine(total1, total2):
...     return total1 + total2

Each of these binary operators may have a default first value for total, before any other value is seen. For addition binary operators like above this is often 0 or the identity element for your operation.

split_every

Group partitions into groups of this size while performing reduction. Defaults to 8.

>>> b.foldby('name', binop, 0, combine, 0)  

See also

toolz.reduceby
pyspark.combineByKey

Examples

We can compute the maximum of some (key, value) pairs, grouped by the key. (You might be better off converting the Bag to a dask.dataframe and using its groupby).

>>> import random
>>> import dask.bag as db
>>> tokens = list('abcdefg')
>>> values = range(10000)
>>> a = [(random.choice(tokens), random.choice(values))
...       for _ in range(100)]
>>> a[:2]  
[('g', 676), ('a', 871)]
>>> a = db.from_sequence(a)
>>> def binop(t, x):
...     return max((t, x), key=lambda x: x[1])
>>> a.foldby(lambda x: x[0], binop).compute()  
[('g', ('g', 984)),
 ('a', ('a', 871)),
 ('b', ('b', 999)),
 ('c', ('c', 765)),
 ('f', ('f', 955)),
 ('e', ('e', 991)),
 ('d', ('d', 854))]