dask.bag.Bag.foldby
dask.bag.Bag.foldby¶
- Bag.foldby(key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__', split_every=None)[source]¶
Combined reduction and groupby.
Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.
The computation
>>> b.foldby(key, binop, init)
is equivalent to the following:
>>> def reduction(group): ... return reduce(binop, group, init)
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))
But uses minimal communication and so is much faster.
>>> import dask.bag as db >>> b = db.from_sequence(range(10)) >>> iseven = lambda x: x % 2 == 0 >>> add = lambda x, y: x + y >>> dict(b.foldby(iseven, add)) {True: 20, False: 25}
Key Function
The key function determines how to group the elements in your bag. In the common case where your bag holds dictionaries then the key function often gets out one of those elements.
>>> def key(x): ... return x['name']
This case is so common that it is special cased, and if you provide a key that is not a callable function then dask.bag will turn it into one automatically. The following are equivalent:
>>> b.foldby(lambda x: x['name'], ...) >>> b.foldby('name', ...)
Binops
It can be tricky to construct the right binary operators to perform analytic queries. The
foldby
method accepts two binary operators,binop
andcombine
. Binary operators two inputs and output must have the same type.Binop takes a running total and a new element and produces a new total:
>>> def binop(total, x): ... return total + x['amount']
Combine takes two totals and combines them:
>>> def combine(total1, total2): ... return total1 + total2
Each of these binary operators may have a default first value for total, before any other value is seen. For addition binary operators like above this is often
0
or the identity element for your operation.split_every
Group partitions into groups of this size while performing reduction. Defaults to 8.
>>> b.foldby('name', binop, 0, combine, 0)
See also
toolz.reduceby
pyspark.combineByKey
Examples
We can compute the maximum of some
(key, value)
pairs, grouped by thekey
. (You might be better off converting theBag
to adask.dataframe
and using its groupby).>>> import random >>> import dask.bag as db
>>> tokens = list('abcdefg') >>> values = range(10000) >>> a = [(random.choice(tokens), random.choice(values)) ... for _ in range(100)] >>> a[:2] [('g', 676), ('a', 871)]
>>> a = db.from_sequence(a)
>>> def binop(t, x): ... return max((t, x), key=lambda x: x[1])
>>> a.foldby(lambda x: x[0], binop).compute() [('g', ('g', 984)), ('a', ('a', 871)), ('b', ('b', 999)), ('c', ('c', 765)), ('f', ('f', 955)), ('e', ('e', 991)), ('d', ('d', 854))]