Create Dask Bags

There are several ways to create Dask bags around your data:

db.from_sequence

You can create a bag from an existing Python iterable:

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])

You can control the number of partitions into which this data is binned:

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)

This controls the granularity of the parallelism that you expose. By default, Dask will try to partition your data into about 100 partitions.

IMPORTANT: do not load your data into Python and then load that data into a Dask bag. Instead, use Dask Bag to load your data. This parallelizes the loading step and reduces inter-worker communication:

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)

db.read_text

Dask Bag can load data directly from text files. You can pass either a single file name, a list of file names, or a globstring. The resulting bag will have one item per line and one file per partition:

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])
>>> b = db.read_text('myfile.*.txt')

This handles standard compression libraries like gzip, bz2, xz, or any easily installed compression library that has a file-like object. Compression will be inferred by the file name extension, or by using the compression='gzip' keyword:

>>> b = db.read_text('myfile.*.txt.gz')

The resulting items in the bag are strings. If you have encoded data like line-delimited JSON, then you may want to map a decoding or load function across the bag:

>>> import json
>>> b = db.read_text('myfile.*.json').map(json.loads)

Or do string munging tasks. For convenience, there is a string namespace attached directly to bags with .str.methodname:

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')

db.read_avro

Dask Bag can read binary files in the Avro format if fastavro is installed. A bag can be made from one or more files, with optional chunking within files. The resulting bag will have one item per Avro record, which will be a dictionary of the form given by the Avro schema. There will be at least one partition per input file:

>>> b = db.read_avro('datafile.avro')
>>> b = db.read_avro('data.*.avro')

By default, Dask will split data files into chunks of approximately blocksize bytes in size. The actual blocks you would get depend on the internal blocking of the file.

For files that are compressed after creation (this is not the same as the internal “codec” used by Avro), no chunking should be used, and there will be exactly one partition per file:

> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')

db.from_delayed

You can construct a Dask bag from dask.delayed values using the db.from_delayed function. For more information, see documentation on using dask.delayed with collections.

Store Dask Bags

In Memory

You can convert a Dask bag to a list or Python iterable by calling compute() or by converting the object into a list:

>>> result = b.compute()
or
>>> result = list(b)

To Text Files

You can convert a Dask bag into a sequence of files on disk by calling the .to_textfiles() method:

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)[source]

Write dask Bag to disk, one filename per partition, one line per element.

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')  

The * will be replaced by the increasing sequence 1, 2, …

/path/to/data/0.json.gz
/path/to/data/1.json.gz

Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)  
/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...

You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]  
>>> b.to_textfiles(paths) 

Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

Bag Contents: The bag calling to_textfiles must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mapping json.dumps on to the bag first, and then calling to_textfiles :

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")  

Last endline: By default the last line does not end with a newline character. Pass last_endline=True to invert the default.

To Avro

Dask bags can be written directly to Avro binary format using fastavro. One file will be written per bag partition. This requires the user to provide a fully-specified schema dictionary (see the docstring of the .to_avro() method).

dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)[source]

Write bag to set of avro files

The schema is a complex dictionary describing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html . It’s structure is as follows:

{'name': 'Test',
 'namespace': 'Test',
 'doc': 'Descriptive text',
 'type': 'record',
 'fields': [
    {'name': 'a', 'type': 'int'},
 ]}

where the “name” field is required, but “namespace” and “doc” are optional descriptors; “type” must always be “record”. The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec ( https://avro.apache.org/docs/1.8.2/spec.html ).

Results in one avro file per input partition.

Parameters
b: dask.bag.Bag
filename: list of str or str

Filenames to write to. If a list, number must match the number of partitions. If a string, must include a glob character “*”, which will be expanded using name_function

schema: dict

Avro schema dictionary, see above

name_function: None or callable

Expands integers into strings, see dask.bytes.utils.build_name_function

storage_options: None or dict

Extra key/value options to pass to the backend file-system

codec: ‘null’, ‘deflate’, or ‘snappy’

Compression algorithm

sync_interval: int

Number of records to include in each block within a file

metadata: None or dict

Included in the file header

compute: bool

If True, files are written immediately, and function blocks. If False, returns delayed objects, which can be computed by the user where convenient.

kwargs: passed to compute(), if compute=True

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)  
['my-data.0.avro', 'my-data.1.avro']

To DataFrames

You can convert a Dask bag into a Dask DataFrame and use those storage solutions.

Bag.to_dataframe(meta=None, columns=None, optimize_graph=True)[source]

Create Dask Dataframe from a Dask Bag.

Bag should contain tuples, dict records, or scalars.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

Parameters
metapd.DataFrame, dict, iterable, optional

An empty pd.DataFrame that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call to compute. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta.

columnssequence, optional

Column names to use. If the passed data do not have names associated with them, this argument provides names for the columns. Otherwise this argument indicates the order of the columns in the result (any names not found in the data will become all-NA columns). Note that if meta is provided, column names will be taken from there and this parameter is invalid.

optimize_graphbool, optional

If True [default], the graph is optimized before converting into dask.dataframe.DataFrame.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()
>>> df.compute()
      name  balance
0    Alice      100
1      Bob      200
0  Charlie      300

To Delayed Values

You can convert a Dask bag into a list of Dask delayed values and custom storage solutions from there.

Bag.to_delayed(optimize_graph=True)[source]

Convert into a list of dask.delayed objects, one per partition.

Parameters
optimize_graphbool, optional

If True [default], the graph is optimized before converting into dask.delayed objects.