High Level Graphs

Dask graphs produced by collections like Arrays, Bags, and DataFrames have high-level structure that can be useful for visualization and high-level optimization. The task graphs produced by these collections encode this structure explicitly as HighLevelGraph objects. This document describes how to work with these in more detail.

Motivation and Example

In full generality, Dask schedulers expect arbitrary task graphs where each node is a single Python function call and each edge is a dependency between two function calls. These are usually stored in flat dictionaries. Here is some simple Dask DataFrame code and the task graph that it might generate:

import dask.dataframe as dd

df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']
{
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

The task graph is a dictionary that stores every Pandas-level function call necessary to compute the final result. We can see that there is some structure to this dictionary if we separate out the tasks that were associated to each high-level Dask DataFrame operation:

{
 # From the dask.dataframe.read_csv call
 ('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
 ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
 ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
 ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv'),

 # From the df + 100 call
 ('add', 0): (operator.add, ('read-csv', 0), 100),
 ('add', 1): (operator.add, ('read-csv', 1), 100),
 ('add', 2): (operator.add, ('read-csv', 2), 100),
 ('add', 3): (operator.add, ('read-csv', 3), 100),

 # From the df[df.name == 'Alice'] call
 ('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
 ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
 ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
 ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3)),
}

By understanding this high-level structure we are able to understand our task graphs more easily (this is more important for larger datasets when there are thousands of tasks per layer) and how to perform high-level optimizations. For example, in the case above we may want to automatically rewrite our code to filter our datasets before adding 100:

# Before
df = dd.read_csv('myfile.*.csv')
df = df + 100
df = df[df.name == 'Alice']

# After
df = dd.read_csv('myfile.*.csv')
df = df[df.name == 'Alice']
df = df + 100

Dask’s high level graphs help us to explicitly encode this structure by storing our task graphs in layers with dependencies between layers:

>>> import dask.dataframe as dd

>>> df = dd.read_csv('myfile.*.csv')
>>> df = df + 100
>>> df = df[df.name == 'Alice']

>>> graph = df.__dask_graph__()
>>> graph.layers
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}

 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

>>> graph.dependencies
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}

While the DataFrame points to the output layers on which it depends directly:

>>> df.__dask_layers__()
{'filter'}

HighLevelGraphs

The HighLevelGraph object is a Mapping object composed of other sub-Mappings, along with a high-level dependency mapping between them:

class HighLevelGraph(Mapping):
    layers: Dict[str, Mapping]
    dependencies: Dict[str, Set[str]]

You can construct a HighLevelGraph explicitly by providing both to the constructor:

layers = {
   'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
                ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
                ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
                ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},

   'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
           ('add', 1): (operator.add, ('read-csv', 1), 100),
           ('add', 2): (operator.add, ('read-csv', 2), 100),
           ('add', 3): (operator.add, ('read-csv', 3), 100)},

   'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
              ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
              ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
              ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}

dependencies = {'read-csv': set(),
                'add': {'read-csv'},
                'filter': {'add'}}

graph = HighLevelGraph(layers, dependencies)

This object satisfies the Mapping interface, and so operates as a normal Python dictionary that is the semantic merger of the underlying layers:

>>> len(graph)
12
>>> graph[('read-csv', 0)]
('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),

API

class dask.highlevelgraph.HighLevelGraph(layers: collections.abc.Mapping[str, collections.abc.Mapping[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any]], dependencies: collections.abc.Mapping[str, set[str]], key_dependencies: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]] | None = None)[source]

Task graph composed of layers of dependent subgraphs

This object encodes a Dask task graph that is composed of layers of dependent subgraphs, such as commonly occurs when building task graphs using high level collections like Dask array, bag, or dataframe.

Typically each high level array, bag, or dataframe operation takes the task graphs of the input collections, merges them, and then adds one or more new layers of tasks for the new operation. These layers typically have at least as many tasks as there are partitions or chunks in the collection. The HighLevelGraph object stores the subgraphs for each operation separately in sub-graphs, and also stores the dependency structure between them.

Parameters
layersMapping[str, Mapping]

The subgraph layers, keyed by a unique name

dependenciesMapping[str, set[str]]

The set of layers on which each layer depends

key_dependenciesdict[Key, set], optional

Mapping (some) keys in the high level graph to their dependencies. If a key is missing, its dependencies will be calculated on-the-fly.

See also

HighLevelGraph.from_collections

typically used by developers to make new HighLevelGraphs

Examples

Here is an idealized example that shows the internal state of a HighLevelGraph

>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.*.csv')  
>>> df = df + 100  
>>> df = df[df.name == 'Alice']  
>>> graph = df.__dask_graph__()  
>>> graph.layers  
{
 'read-csv': {('read-csv', 0): (pandas.read_csv, 'myfile.0.csv'),
              ('read-csv', 1): (pandas.read_csv, 'myfile.1.csv'),
              ('read-csv', 2): (pandas.read_csv, 'myfile.2.csv'),
              ('read-csv', 3): (pandas.read_csv, 'myfile.3.csv')},
 'add': {('add', 0): (operator.add, ('read-csv', 0), 100),
         ('add', 1): (operator.add, ('read-csv', 1), 100),
         ('add', 2): (operator.add, ('read-csv', 2), 100),
         ('add', 3): (operator.add, ('read-csv', 3), 100)}
 'filter': {('filter', 0): (lambda part: part[part.name == 'Alice'], ('add', 0)),
            ('filter', 1): (lambda part: part[part.name == 'Alice'], ('add', 1)),
            ('filter', 2): (lambda part: part[part.name == 'Alice'], ('add', 2)),
            ('filter', 3): (lambda part: part[part.name == 'Alice'], ('add', 3))}
}
>>> graph.dependencies  
{
 'read-csv': set(),
 'add': {'read-csv'},
 'filter': {'add'}
}
cull(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) dask.highlevelgraph.HighLevelGraph[source]

Return new HighLevelGraph with only the tasks required to calculate keys.

In other words, remove unnecessary tasks from dask.

Parameters
keys

iterable of keys or nested list of keys such as the output of __dask_keys__()

Returns
hlg: HighLevelGraph

Culled high level graph

cull_layers(layers: collections.abc.Iterable[str]) dask.highlevelgraph.HighLevelGraph[source]

Return a new HighLevelGraph with only the given layers and their dependencies. Internally, layers are not modified.

This is a variant of HighLevelGraph.cull() which is much faster and does not risk creating a collision between two layers with the same name and different content when two culled graphs are merged later on.

Returns
hlg: HighLevelGraph

Culled high level graph

classmethod from_collections(name: str, layer: collections.abc.Mapping[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any], dependencies: collections.abc.Sequence[dask.typing.DaskCollection] = ()) dask.highlevelgraph.HighLevelGraph[source]

Construct a HighLevelGraph from a new layer and a set of collections

This constructs a HighLevelGraph in the common case where we have a single new layer and a set of old collections on which we want to depend.

This pulls out the __dask_layers__() method of the collections if they exist, and adds them to the dependencies for this new layer. It also merges all of the layers from all of the dependent collections together into the new layers for this graph.

Parameters
namestr

The name of the new layer

layerMapping

The graph layer itself

dependenciesList of Dask collections

A list of other dask collections (like arrays or dataframes) that have graphs themselves

Examples

In typical usage we make a new task layer, and then pass that layer along with all dependent collections to this method.

>>> def add(self, other):
...     name = 'add-' + tokenize(self, other)
...     layer = {(name, i): (add, input_key, other)
...              for i, input_key in enumerate(self.__dask_keys__())}
...     graph = HighLevelGraph.from_collections(name, layer, dependencies=[self])
...     return new_collection(name, graph)
get(k[, d]) D[k] if k in D, else d.  d defaults to None.
get_all_dependencies() dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]][source]

Get dependencies of all keys

This will in most cases materialize all layers, which makes it an expensive operation.

Returns
map: Mapping

A map that maps each key to its dependencies

get_all_external_keys() set[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]][source]

Get all output keys of all layers

This will in most cases _not_ materialize any layers, which makes it a relative cheap operation.

Returns
keys: set

A set of all external keys

items() a set-like object providing a view on D's items[source]
keys() collections.abc.KeysView[source]

Get all keys of all the layers.

This will in many cases materialize layers, which makes it a relatively expensive operation. See get_all_external_keys() for a faster alternative.

to_dict() dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Any][source]

Efficiently convert to plain dict. This method is faster than dict(self).

values() an object providing a view on D's values[source]