## db.from_sequence¶

You can create a bag from an existing Python iterable:

>>> import dask.bag as db
>>> b = db.from_sequence([1, 2, 3, 4, 5, 6])


You can control the number of partitions into which this data is binned:

>>> b = db.from_sequence([1, 2, 3, 4, 5, 6], npartitions=2)


This controls the granularity of the parallelism that you expose. By default, Dask will try to partition your data into about 100 partitions.

>>> b = db.from_sequence(['1.dat', '2.dat', ...]).map(load_from_filename)


## db.read_text¶

Dask Bag can load data directly from text files. You can pass either a single file name, a list of file names, or a globstring. The resulting bag will have one item per line and one file per partition:

>>> b = db.read_text('myfile.txt')
>>> b = db.read_text(['myfile.1.txt', 'myfile.2.txt', ...])


This handles standard compression libraries like gzip, bz2, xz, or any easily installed compression library that has a file-like object. Compression will be inferred by the file name extension, or by using the compression='gzip' keyword:

>>> b = db.read_text('myfile.*.txt.gz')


The resulting items in the bag are strings. If you have encoded data like line-delimited JSON, then you may want to map a decoding or load function across the bag:

>>> import json


Or do string munging tasks. For convenience, there is a string namespace attached directly to bags with .str.methodname:

>>> b = db.read_text('myfile.*.csv').str.strip().str.split(',')


## db.from_avro¶

Dask Bag can read binary files in the Avro format if fastavro is installed. A bag can be made from one or more files, with optional chunking within files. The resulting bag will have one item per Avro record, which will be a dictionary of the form given by the Avro schema. There will be at least one partition per input file:

>>> b = db.read_avro('datafile.avro')


By default, Dask will split data files into chunks of approximately blocksize bytes in size. The actual blocks you would get depend on the internal blocking of the file.

For files that are compressed after creation (this is not the same as the internal “codec” used by Avro), no chunking should be used, and there will be exactly one partition per file:

> b = bd.read_avro('compressed.*.avro.gz', blocksize=None, compression='gzip')


## db.from_delayed¶

You can construct a Dask bag from dask.delayed values using the db.from_delayed function. For more information, see documentation on using dask.delayed with collections.

## In Memory¶

You can convert a Dask bag to a list or Python iterable by calling compute() or by converting the object into a list:

>>> result = b.compute()
or
>>> result = list(b)


## To Text Files¶

You can convert a Dask bag into a sequence of files on disk by calling the .to_textfiles() method:

dask.bag.core.to_textfiles(b, path, name_function=None, compression='infer', encoding='utf-8', compute=True, storage_options=None, last_endline=False, **kwargs)

Write dask Bag to disk, one filename per partition, one line per element.

Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.

Use a globstring

>>> b.to_textfiles('/path/to/data/*.json.gz')


The * will be replaced by the increasing sequence 1, 2, …

/path/to/data/0.json.gz
/path/to/data/1.json.gz


Use a globstring and a name_function= keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.

>>> from datetime import date, timedelta
>>> def name(i):
...     return str(date(2015, 1, 1) + i * timedelta(days=1))

>>> name(0)
'2015-01-01'
>>> name(15)
'2015-01-16'

>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)

/path/to/data/2015-01-01.json.gz
/path/to/data/2015-01-02.json.gz
...


You can also provide an explicit list of paths.

>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...]
>>> b.to_textfiles(paths)


Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.

Bag Contents: The bag calling to_textfiles must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mapping json.dumps on to the bag first, and then calling to_textfiles :

>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")


Last endline: By default the last line does not end with a newline character. Pass last_endline=True to invert the default.

## To Avro¶

Dask bags can be written directly to Avro binary format using fastavro. One file will be written per bag partition. This requires the user to provide a fully-specified schema dictionary (see the docstring of the .to_avro() method).

dask.bag.avro.to_avro(b, filename, schema, name_function=None, storage_options=None, codec='null', sync_interval=16000, metadata=None, compute=True, **kwargs)

Write bag to set of avro files

The schema is a complex dictionary dscribing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html . It’s structure is as follows:

{'name': 'Test',
'namespace': 'Test',
'doc': 'Descriptive text',
'type': 'record',
'fields': [
{'name': 'a', 'type': 'int'},
]}


where the “name” field is required, but “namespace” and “doc” are optional descriptors; “type” must always be “record”. The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec ( https://avro.apache.org/docs/1.8.2/spec.html ).

Results in one avro file per input partition.

Parameters: b: dask.bag.Bag filename: list of str or str Filenames to write to. If a list, number must match the number of partitions. If a string, must includ a glob character “*”, which will be expanded using name_function schema: dict Avro schema dictionary, see above name_function: None or callable Expands integers into strings, see dask.bytes.utils.build_name_function storage_options: None or dict Extra key/value options to pass to the backend file-system codec: ‘null’, ‘deflate’, or ‘snappy’ Compression algorithm sync_interval: int Number of records to include in each block within a file metadata: None or dict Included in the file header compute: bool If True, files are written immediately, and function blocks. If False, returns delayed objects, which can be computed by the user where convenient. kwargs: passed to compute(), if compute=True

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice', 'value': 100},
...                       {'name': 'Bob', 'value': 200}])
>>> schema = {'name': 'People', 'doc': "Set of people's scores",
...           'type': 'record',
...           'fields': [
...               {'name': 'name', 'type': 'string'},
...               {'name': 'value', 'type': 'int'}]}
>>> b.to_avro('my-data.*.avro', schema)
['my-data.0.avro', 'my-data.1.avro']


## To DataFrames¶

You can convert a Dask bag into a Dask DataFrame and use those storage solutions.

Bag.to_dataframe(meta=None, columns=None)

Bag should contain tuples, dict records, or scalars.

Index will not be particularly meaningful. Use reindex afterwards if necessary.

Parameters: meta : pd.DataFrame, dict, iterable, optional An empty pd.DataFrame that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of a DataFrame, a dict of {name: dtype} or iterable of (name, dtype) can be provided. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call to compute. This may lead to unexpected results, so providing meta is recommended. For more information, see dask.dataframe.utils.make_meta. columns : sequence, optional Column names to use. If the passed data do not have names associated with them, this argument provides names for the columns. Otherwise this argument indicates the order of the columns in the result (any names not found in the data will become all-NA columns). Note that if meta is provided, column names will be taken from there and this parameter is invalid.

Examples

>>> import dask.bag as db
>>> b = db.from_sequence([{'name': 'Alice',   'balance': 100},
...                       {'name': 'Bob',     'balance': 200},
...                       {'name': 'Charlie', 'balance': 300}],
...                      npartitions=2)
>>> df = b.to_dataframe()

>>> df.compute()
balance     name
0      100    Alice
1      200      Bob
0      300  Charlie


## To Delayed Values¶

You can convert a Dask bag into a list of Dask delayed values and custom storage solutions from there.

Bag.to_delayed(optimize_graph=True)

Convert into a list of dask.delayed objects, one per partition.

Parameters: optimize_graph : bool, optional If True [default], the graph is optimized before converting into dask.delayed objects.