API Reference

Dask APIs generally follow from upstream APIs:

Additionally, Dask has its own functions to start computations, persist data in memory, check progress, and so forth that complement the APIs above. These more general Dask functions are described below:

compute(*args[, traverse, optimize_graph, ...])

Compute several dask collections at once.

is_dask_collection(x)

Returns True if x is a dask collection

optimize(*args[, traverse])

Optimize several dask collections at once.

persist(*args[, traverse, optimize_graph, ...])

Persist multiple Dask collections into memory

visualize(*args[, filename, traverse, ...])

Visualize several dask graphs simultaneously.

These functions work with any scheduler. More advanced operations are available when using the newer scheduler and starting a dask.distributed.Client (which, despite its name, runs nicely on a single machine). This API provides the ability to submit, cancel, and track work asynchronously, and includes many functions for complex inter-task workflows. These are not necessary for normal operation, but can be useful for real-time or advanced operation.

This more advanced API is available in the Dask distributed documentation

dask.annotate(**annotations)[source]

Context Manager for setting HighLevelGraph Layer annotations.

Annotations are metadata or soft constraints associated with tasks that dask schedulers may choose to respect: They signal intent without enforcing hard constraints. As such, they are primarily designed for use with the distributed scheduler.

Almost any object can serve as an annotation, but small Python objects are preferred, while large objects such as NumPy arrays are discouraged.

Callables supplied as an annotation should take a single key argument and produce the appropriate annotation. Individual task keys in the annotated collection are supplied to the callable.

Parameters
**annotationskey-value pairs

Examples

All tasks within array A should have priority 100 and be retried 3 times on failure.

>>> import dask
>>> import dask.array as da
>>> with dask.annotate(priority=100, retries=3):
...     A = da.ones((10000, 10000))

Prioritise tasks within Array A on flattened block ID.

>>> nblocks = (10, 10)
>>> with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]):
...     A = da.ones((1000, 1000), chunks=(100, 100))

Annotations may be nested.

>>> with dask.annotate(priority=1):
...     with dask.annotate(retries=3):
...         A = da.ones((1000, 1000))
...     B = A + 1
dask.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)[source]

Compute several dask collections at once.

Parameters
argsobject

Any number of objects. If it is a dask object, it’s computed and the result is returned. By default, python builtin collections are also traversed to look for dask objects (for more information see the traverse keyword). Non-dask arguments are passed through unchanged.

traversebool, optional

By default dask traverses builtin python collections looking for dask objects passed to compute. For large collections this can be expensive. If none of the arguments contain any dask objects, set traverse=False to avoid doing this traversal.

schedulerstring, optional

Which scheduler to use like “threads”, “synchronous” or “processes”. If not provided, the default is to check the global settings first, and then fall back to the collection defaults.

optimize_graphbool, optional

If True [default], the optimizations for each collection are applied before computation. Otherwise the graph is run as is. This can be useful for debugging.

getNone

Should be left to None The get= keyword has been removed.

kwargs

Extra keywords to forward to the scheduler function.

Examples

>>> import dask as d
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> d.compute(a, b)
(45, 4.5)

By default, dask objects inside python collections will also be computed:

>>> d.compute({'a': a, 'b': b, 'c': 1})
({'a': 45, 'b': 4.5, 'c': 1},)
dask.is_dask_collection(x)[source]

Returns True if x is a dask collection

dask.optimize(*args, traverse=True, **kwargs)[source]

Optimize several dask collections at once.

Returns equivalent dask collections that all share the same merged and optimized underlying graph. This can be useful if converting multiple collections to delayed objects, or to manually apply the optimizations at strategic points.

Note that in most cases you shouldn’t need to call this method directly.

Parameters
*argsobjects

Any number of objects. If a dask object, its graph is optimized and merged with all those of all other dask objects before returning an equivalent dask collection. Non-dask arguments are passed through unchanged.

traversebool, optional

By default dask traverses builtin python collections looking for dask objects passed to optimize. For large collections this can be expensive. If none of the arguments contain any dask objects, set traverse=False to avoid doing this traversal.

optimizationslist of callables, optional

Additional optimization passes to perform.

**kwargs

Extra keyword arguments to forward to the optimization passes.

Examples

>>> import dask as d
>>> import dask.array as da
>>> a = da.arange(10, chunks=2).sum()
>>> b = da.arange(10, chunks=2).mean()
>>> a2, b2 = d.optimize(a, b)
>>> a2.compute() == a.compute()
True
>>> b2.compute() == b.compute()
True
dask.persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)[source]

Persist multiple Dask collections into memory

This turns lazy Dask collections into Dask collections with the same metadata, but now with their results fully computed or actively computing in the background.

For example a lazy dask.array built up from many lazy calls will now be a dask.array of the same shape, dtype, chunks, etc., but now with all of those previously lazy tasks either computed in memory as many small numpy.array (in the single-machine case) or asynchronously running in the background on a cluster (in the distributed case).

This function operates differently if a dask.distributed.Client exists and is connected to a distributed scheduler. In this case this function will return as soon as the task graph has been submitted to the cluster, but before the computations have completed. Computations will continue asynchronously in the background. When using this function with the single machine scheduler it blocks until the computations have finished.

When using Dask on a single machine you should ensure that the dataset fits entirely within memory.

Parameters
*args: Dask collections
schedulerstring, optional

Which scheduler to use like “threads”, “synchronous” or “processes”. If not provided, the default is to check the global settings first, and then fall back to the collection defaults.

traversebool, optional

By default dask traverses builtin python collections looking for dask objects passed to persist. For large collections this can be expensive. If none of the arguments contain any dask objects, set traverse=False to avoid doing this traversal.

optimize_graphbool, optional

If True [default], the graph is optimized before computation. Otherwise the graph is run as is. This can be useful for debugging.

**kwargs

Extra keywords to forward to the scheduler function.

Returns
New dask collections backed by in-memory data

Examples

>>> df = dd.read_csv('/path/to/*.csv')  
>>> df = df[df.name == 'Alice']  
>>> df['in-debt'] = df.balance < 0  
>>> df = df.persist()  # triggers computation  
>>> df.value().min()  # future computations are now fast  
-10
>>> df.value().max()  
100
>>> from dask import persist  # use persist function on multiple collections
>>> a, b = persist(a, b)  
dask.visualize(*args, filename='mydask', traverse=True, optimize_graph=False, maxval=None, **kwargs)[source]

Visualize several dask graphs simultaneously.

Requires graphviz to be installed. All options that are not the dask graph(s) should be passed as keyword arguments.

Parameters
argsobject

Any number of objects. If it is a dask object, its associated graph will be included in the output of visualize. By default, python builtin collections are also traversed to look for dask objects (for more information see the traverse keyword). Arguments lacking an associated graph will be ignored.

filenamestr or None, optional

The name of the file to write to disk. If the provided filename doesn’t include an extension, ‘.png’ will be used by default. If filename is None, no file will be written, and we communicate with dot using only pipes.

format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, optional

Format in which to write output file. Default is ‘png’.

traversebool, optional

By default, dask traverses builtin python collections looking for dask objects passed to visualize. For large collections this can be expensive. If none of the arguments contain any dask objects, set traverse=False to avoid doing this traversal.

optimize_graphbool, optional

If True, the graph is optimized before rendering. Otherwise, the graph is displayed as is. Default is False.

color{None, ‘order’, ‘ages’, ‘freed’, ‘memoryincreases’, ‘memorydecreases’, ‘memorypressure’}, optional

Options to color nodes. colormap:

  • None, the default, no colors.

  • ‘order’, colors the nodes’ border based on the order they appear in the graph.

  • ‘ages’, how long the data of a node is held.

  • ‘freed’, the number of dependencies released after running a node.

  • ‘memoryincreases’, how many more outputs are held after the lifetime of a node. Large values may indicate nodes that should have run later.

  • ‘memorydecreases’, how many fewer outputs are held after the lifetime of a node. Large values may indicate nodes that should have run sooner.

  • ‘memorypressure’, the number of data held when the node is run (circle), or the data is released (rectangle).

maxval{int, float}, optional

Maximum value for colormap to normalize form 0 to 1.0. Default is None will make it the max number of values

collapse_outputsbool, optional

Whether to collapse output boxes, which often have empty labels. Default is False.

verbosebool, optional

Whether to label output and input boxes even if the data aren’t chunked. Beware: these labels can get very long. Default is False.

**kwargs

Additional keyword arguments to forward to to_graphviz.

Returns
resultIPython.diplay.Image, IPython.display.SVG, or None

See dask.dot.dot_graph for more information.

See also

dask.dot.dot_graph

Notes

For more information on optimization see here:

https://docs.dask.org/en/latest/optimize.html

Examples

>>> x.visualize(filename='dask.pdf')  
>>> x.visualize(filename='dask.pdf', color='order')  

Datasets

Dask has a few helpers for generating demo datasets

dask.datasets.make_people(npartitions=10, records_per_partition=1000, seed=None, locale='en')[source]

Make a dataset of random people

This makes a Dask Bag with dictionary records of randomly generated people. This requires the optional library mimesis to generate records.

Parameters
npartitionsint

Number of partitions

records_per_partitionint

Number of records in each partition

seedint, (optional)

Random seed

localestr

Language locale, like ‘en’, ‘fr’, ‘zh’, or ‘ru’

Returns
b: Dask Bag
dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1d', dtypes={'id': <class 'int'>, 'name': <class 'str'>, 'x': <class 'float'>, 'y': <class 'float'>}, seed=None, **kwargs)[source]

Create timeseries dataframe with random data

Parameters
startdatetime (or datetime-like string)

Start of time series

enddatetime (or datetime-like string)

End of time series

dtypesdict

Mapping of column names to types. Valid types include {float, int, str, ‘category’}

freqstring

String like ‘2s’ or ‘1H’ or ‘12W’ for the time series frequency

partition_freqstring

String like ‘1M’ or ‘2Y’ to divide the dataframe into partitions

seedint (optional)

Randomstate seed

kwargs:

Keywords to pass down to individual column creation functions. Keywords should be prefixed by the column name and then an underscore.

Examples

>>> import dask
>>> df = dask.datasets.timeseries()
>>> df.head()  
          timestamp    id     name         x         y
2000-01-01 00:00:00   967    Jerry -0.031348 -0.040633
2000-01-01 00:00:01  1066  Michael -0.262136  0.307107
2000-01-01 00:00:02   988    Wendy -0.526331  0.128641
2000-01-01 00:00:03  1016   Yvonne  0.620456  0.767270
2000-01-01 00:00:04   998   Ursula  0.684902 -0.463278
>>> df = dask.datasets.timeseries(
...     '2000', '2010',
...     freq='2H', partition_freq='1D', seed=1,  # data frequency
...     dtypes={'value': float, 'name': str, 'id': int},  # data types
...     id_lam=1000  # control number of items in id column
... )

Utilities

Dask has some public utility methods. These are primarily used for parsing configuration values.

dask.utils.format_bytes(n: int) str[source]

Format bytes as text

>>> from dask.utils import format_bytes
>>> format_bytes(1)
'1 B'
>>> format_bytes(1234)
'1.21 kiB'
>>> format_bytes(12345678)
'11.77 MiB'
>>> format_bytes(1234567890)
'1.15 GiB'
>>> format_bytes(1234567890000)
'1.12 TiB'
>>> format_bytes(1234567890000000)
'1.10 PiB'

For all values < 2**60, the output is always <= 10 characters.

dask.utils.format_time(n)[source]

format integers as time

>>> from dask.utils import format_time
>>> format_time(1)
'1.00 s'
>>> format_time(0.001234)
'1.23 ms'
>>> format_time(0.00012345)
'123.45 us'
>>> format_time(123.456)
'123.46 s'
dask.utils.parse_bytes(s)[source]

Parse byte string to numbers

>>> from dask.utils import parse_bytes
>>> parse_bytes('100')
100
>>> parse_bytes('100 MB')
100000000
>>> parse_bytes('100M')
100000000
>>> parse_bytes('5kB')
5000
>>> parse_bytes('5.4 kB')
5400
>>> parse_bytes('1kiB')
1024
>>> parse_bytes('1e6')
1000000
>>> parse_bytes('1e6 kB')
1000000000
>>> parse_bytes('MB')
1000000
>>> parse_bytes(123)
123
>>> parse_bytes('5 foos')
Traceback (most recent call last):
    ...
ValueError: Could not interpret 'foos' as a byte unit
dask.utils.parse_timedelta(s, default='seconds')[source]

Parse timedelta string to number of seconds

Examples

>>> from datetime import timedelta
>>> from dask.utils import parse_timedelta
>>> parse_timedelta('3s')
3
>>> parse_timedelta('3.5 seconds')
3.5
>>> parse_timedelta('300ms')
0.3
>>> parse_timedelta(timedelta(seconds=3))  # also supports timedeltas
3