Configuration

Taking full advantage of Dask sometimes requires user configuration. This might be to control logging verbosity, specify cluster configuration, provide credentials for security, or any of several other options that arise in production.

Configuration is specified in one of the following ways:

  1. YAML files in ~/.config/dask/ or /etc/dask/

  2. Environment variables like DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True

  3. Default settings within sub-libraries

This combination makes it easy to specify configuration in a variety of settings ranging from personal workstations, to IT-mandated configuration, to docker images.

Access Configuration

dask.config.get(key[, default, config, ...])

Get elements from global config

Dask’s configuration system is usually accessed using the dask.config.get function. You can use . for nested access, for example:

>>> import dask
>>> import dask.distributed  # populate config with distributed defaults

>>> dask.config.get("distributed.client") # use `.` for nested access
{'heartbeat': '5s', 'scheduler-info-interval': '2s'}

>>> dask.config.get("distributed.scheduler.unknown-task-duration")
'500ms'

You may wish to inspect the dask.config.config dictionary to get a sense for what configuration is being used by your current system.

Note that the get function treats underscores and hyphens identically. For example, dask.config.get("temporary-directory") is equivalent to dask.config.get("temporary_directory").

Values like "128 MiB" and "10s" are parsed using the functions in Utilities.

Specify Configuration

YAML files

You can specify configuration values in YAML files. For example:

array:
   chunk-size: 128 MiB

distributed:
   worker:
      memory:
         spill: 0.85  # default: 0.7
         target: 0.75  # default: 0.6
         terminate: 0.98  # default: 0.95

These files can live in any of the following locations:

  1. The ~/.config/dask directory in the user’s home directory

  2. The {sys.prefix}/etc/dask directory local to Python

  3. The root directory (specified by the DASK_ROOT_CONFIG environment variable or /etc/dask/ by default)

Dask searches for all YAML files within each of these directories and merges them together, preferring configuration files closer to the user over system configuration files (preference follows the order in the list above). Additionally, users can specify a path with the DASK_CONFIG environment variable, which takes precedence at the top of the list above.

The contents of these YAML files are merged together, allowing different Dask subprojects like dask-kubernetes or dask-ml to manage configuration files separately, but have them merge into the same global configuration.

Note: for historical reasons we also look in the ``~/.dask`` directory for config files. This is deprecated and will soon be removed.

Environment Variables

You can also specify configuration values with environment variables like the following:

export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=True
export DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES=5

resulting in configuration values like the following:

{
    'distributed': {
        'scheduler': {
            'work-stealing': True,
            'allowed-failures': 5
        }
    }
}

Dask searches for all environment variables that start with DASK_, then transforms keys by converting to lower case and changing double-underscores to nested structures.

Dask tries to parse all values with ast.literal_eval, letting users pass numeric and boolean values (such as True in the example above) as well as lists, dictionaries, and so on with normal Python syntax.

Environment variables take precedence over configuration values found in YAML files.

Defaults

Additionally, individual subprojects may add their own default values when they are imported. These are always added with lower priority than the YAML files or environment variables mentioned above:

>>> import dask.config
>>> dask.config.config  # no configuration by default
{}

>>> import dask.distributed
>>> dask.config.config  # New values have been added
{
    'scheduler': ...,
    'worker': ...,
    'tls': ...
}

Directly within Python

dask.config.set([arg, config, lock])

Temporarily set configuration values within a context manager

Configuration is stored within a normal Python dictionary in dask.config.config and can be modified using normal Python operations.

Additionally, you can temporarily set a configuration value using the dask.config.set function. This function accepts a dictionary as an input and interprets "." as nested access:

>>> dask.config.set({'scheduler.work-stealing': True})

This function can also be used as a context manager for consistent cleanup:

with dask.config.set({'scheduler.work-stealing': True}):
    ...

Note that the set function treats underscores and hyphens identically. For example, dask.config.set({'scheduler.work-stealing': True}) is equivalent to dask.config.set({'scheduler.work_stealing': True}).

Distributing configuration

It may also be desirable to package up your whole Dask configuration for use on another machine. This is used in some Dask Distributed libraries to ensure remote components have the same configuration as your local system.

This is typically handled by the downstream libraries which use base64 encoding to pass config via the DASK_INTERNAL_INHERIT_CONFIG environment variable.

dask.config.serialize(data)

Serialize config data into a string.

dask.config.deserialize(data)

De-serialize config data into the original object.

Conversion Utility

It is possible to configure Dask inline with dot notation, with YAML or via environment variables. You can enter your own configuration items below to convert back and forth.

Warning

This utility is designed to improve understanding of converting between different notations and does not claim to be a perfect implementation. Please use for reference only.

YAML

Environment variable

Inline with dot notation

Updating Configuration

Manipulating configuration dictionaries

dask.config.merge(*dicts)

Update a sequence of nested dictionaries

dask.config.update(old, new[, priority])

Update a nested dictionary with values from another

dask.config.expand_environment_variables(config)

Expand environment variables in a nested config dictionary

As described above, configuration can come from many places, including several YAML files, environment variables, and project defaults. Each of these provides a configuration that is possibly nested like the following:

x = {'a': 0, 'c': {'d': 4}}
y = {'a': 1, 'b': 2, 'c': {'e': 5}}

Dask will merge these configurations respecting nested data structures, and respecting order:

>>> dask.config.merge(x, y)
{'a': 1, 'b': 2, 'c': {'d': 4, 'e': 5}}

You can also use the update function to update the existing configuration in place with a new configuration. This can be done with priority being given to either config. This is often used to update the global configuration in dask.config.config:

dask.config.update(dask.config, new, priority='new')  # Give priority to new values
dask.config.update(dask.config, new, priority='old')  # Give priority to old values

Sometimes it is useful to expand environment variables stored within a configuration. This can be done with the expand_environment_variables function:

dask.config.config = dask.config.expand_environment_variables(dask.config.config)

Refreshing Configuration

dask.config.collect([paths, env])

Collect configuration from paths and environment variables

dask.config.refresh([config, defaults])

Update configuration by re-reading yaml files and env variables

If you change your environment variables or YAML files, Dask will not immediately see the changes. Instead, you can call refresh to go through the configuration collection process and update the default configuration:

>>> dask.config.config
{}

>>> # make some changes to yaml files

>>> dask.config.refresh()
>>> dask.config.config
{...}

This function uses dask.config.collect, which returns the configuration without modifying the global configuration. You might use this to determine the configuration of particular paths not yet on the config path:

>>> dask.config.collect(paths=[...])
{...}

Downstream Libraries

dask.config.ensure_file(source[, ...])

Copy file to default location if it does not already exist

dask.config.update(old, new[, priority])

Update a nested dictionary with values from another

dask.config.update_defaults(new[, config, ...])

Add a new set of defaults to the configuration

Downstream Dask libraries often follow a standard convention to use the central Dask configuration. This section provides recommendations for integration using a fictional project, dask-foo, as an example.

Downstream projects typically follow the following convention:

  1. Maintain default configuration in a YAML file within their source directory:

    setup.py
    dask_foo/__init__.py
    dask_foo/config.py
    dask_foo/core.py
    dask_foo/foo.yaml  # <---
    
  2. Place configuration in that file within a namespace for the project:

    # dask_foo/foo.yaml
    
    foo:
      color: red
      admin:
        a: 1
        b: 2
    
  3. Within a config.py file (or anywhere) load that default config file and update it into the global configuration:

    # dask_foo/config.py
    import os
    import yaml
    
    import dask.config
    
    fn = os.path.join(os.path.dirname(__file__), 'foo.yaml')
    
    with open(fn) as f:
        defaults = yaml.safe_load(f)
    
    dask.config.update_defaults(defaults)
    
  4. Within that same config.py file, copy the 'foo.yaml' file to the user’s configuration directory if it doesn’t already exist.

    We also comment the file to make it easier for us to change defaults in the future.

    # ... continued from above
    
    dask.config.ensure_file(source=fn, comment=True)
    

    The user can investigate ~/.config/dask/*.yaml to see all of the commented out configuration files to which they have access.

  5. Ensure that this file is run on import by including it in __init__.py:

    # dask_foo/__init__.py
    
    from . import config
    
  6. Within dask_foo code, use the dask.config.get function to access configuration values:

    # dask_foo/core.py
    
    def process(fn, color=dask.config.get('foo.color')):
        ...
    
  7. You may also want to ensure that your yaml configuration files are included in your package. This can be accomplished by including the following line in your MANIFEST.in:

    recursive-include <PACKAGE_NAME> *.yaml
    

    and the following in your setup.py setup call:

    from setuptools import setup
    
    setup(...,
          include_package_data=True,
          ...)
    

This process keeps configuration in a central place, but also keeps it safe within namespaces. It places config files in an easy to access location by default (~/.config/dask/\*.yaml), so that users can easily discover what they can change, but maintains the actual defaults within the source code, so that they more closely track changes in the library.

However, downstream libraries may choose alternative solutions, such as isolating their configuration within their library, rather than using the global dask.config system. All functions in the dask.config module also work with parameters, and do not need to mutate global state.

API

dask.config.get(key, default='__no_default__', config={'array': {'chunk-size': '128MiB', 'rechunk-threshold': 4, 'slicing': {'split-large-chunks': None}, 'svg': {'size': 120}}, 'dataframe': {'parquet': {'metadata-task-size-local': 512, 'metadata-task-size-remote': 16}, 'shuffle-compression': None}, 'optimization': {'fuse': {'active': None, 'ave-width': 1, 'max-depth-new-edges': None, 'max-height': inf, 'max-width': None, 'rename-keys': True, 'subgraphs': None}}, 'temporary-directory': None, 'tokenize': {'ensure-deterministic': False}}, override_with=None)[source]

Get elements from global config

If override_with is not None this value will be passed straight back. Useful for getting kwarg defaults from Dask config.

Use ‘.’ for nested access

See also

dask.config.set

Examples

>>> from dask import config
>>> config.get('foo')  
{'x': 1, 'y': 2}
>>> config.get('foo.x')  
1
>>> config.get('foo.x.y', default=123)  
123
>>> config.get('foo.y', override_with=None)  
2
>>> config.get('foo.y', override_with=3)  
3
dask.config.set(arg=None, config={'array': {'chunk-size': '128MiB', 'rechunk-threshold': 4, 'slicing': {'split-large-chunks': None}, 'svg': {'size': 120}}, 'dataframe': {'parquet': {'metadata-task-size-local': 512, 'metadata-task-size-remote': 16}, 'shuffle-compression': None}, 'optimization': {'fuse': {'active': None, 'ave-width': 1, 'max-depth-new-edges': None, 'max-height': inf, 'max-width': None, 'rename-keys': True, 'subgraphs': None}}, 'temporary-directory': None, 'tokenize': {'ensure-deterministic': False}}, lock=<unlocked _thread.lock object>, **kwargs)[source]

Temporarily set configuration values within a context manager

Parameters
argmapping or None, optional

A mapping of configuration key-value pairs to set.

**kwargs :

Additional key-value pairs to set. If arg is provided, values set in arg will be applied before those in kwargs. Double-underscores (__) in keyword arguments will be replaced with ., allowing nested values to be easily set.

See also

dask.config.get

Examples

>>> import dask

Set 'foo.bar' in a context, by providing a mapping.

>>> with dask.config.set({'foo.bar': 123}):
...     pass

Set 'foo.bar' in a context, by providing a keyword argument.

>>> with dask.config.set(foo__bar=123):
...     pass

Set 'foo.bar' globally.

>>> dask.config.set(foo__bar=123)  
dask.config.merge(*dicts)[source]

Update a sequence of nested dictionaries

This prefers the values in the latter dictionaries to those in the former

Examples

>>> a = {'x': 1, 'y': {'a': 2}}
>>> b = {'y': {'b': 3}}
>>> merge(a, b)  
{'x': 1, 'y': {'a': 2, 'b': 3}}
dask.config.update(old, new, priority='new')[source]

Update a nested dictionary with values from another

This is like dict.update except that it smoothly merges nested values

This operates in-place and modifies old

Parameters
priority: string {‘old’, ‘new’}

If new (default) then the new dictionary has preference. Otherwise the old dictionary does.

Examples

>>> a = {'x': 1, 'y': {'a': 2}}
>>> b = {'x': 2, 'y': {'b': 3}}
>>> update(a, b)  
{'x': 2, 'y': {'a': 2, 'b': 3}}
>>> a = {'x': 1, 'y': {'a': 2}}
>>> b = {'x': 2, 'y': {'b': 3}}
>>> update(a, b, priority='old')  
{'x': 1, 'y': {'a': 2, 'b': 3}}
dask.config.collect(paths=['/etc/dask', '/home/docs/checkouts/readthedocs.org/user_builds/dask/envs/stable/etc/dask', '/home/docs/.config/dask', '/home/docs/.dask'], env=None)[source]

Collect configuration from paths and environment variables

Parameters
pathsList[str]

A list of paths to search for yaml config files

envdict

The system environment variables

Returns
config: dict

See also

dask.config.refresh

collect configuration and update into primary config

dask.config.refresh(config={'array': {'chunk-size': '128MiB', 'rechunk-threshold': 4, 'slicing': {'split-large-chunks': None}, 'svg': {'size': 120}}, 'dataframe': {'parquet': {'metadata-task-size-local': 512, 'metadata-task-size-remote': 16}, 'shuffle-compression': None}, 'optimization': {'fuse': {'active': None, 'ave-width': 1, 'max-depth-new-edges': None, 'max-height': inf, 'max-width': None, 'rename-keys': True, 'subgraphs': None}}, 'temporary-directory': None, 'tokenize': {'ensure-deterministic': False}}, defaults=[{'temporary-directory': None, 'tokenize': {'ensure-deterministic': False}, 'dataframe': {'shuffle-compression': None, 'parquet': {'metadata-task-size-local': 512, 'metadata-task-size-remote': 16}}, 'array': {'svg': {'size': 120}, 'slicing': {'split-large-chunks': None}}, 'optimization': {'fuse': {'active': None, 'ave-width': 1, 'max-width': None, 'max-height': inf, 'max-depth-new-edges': None, 'subgraphs': None, 'rename-keys': True}}}, {'array': {'chunk-size': '128MiB', 'rechunk-threshold': 4}}], **kwargs)[source]

Update configuration by re-reading yaml files and env variables

This mutates the global dask.config.config, or the config parameter if passed in.

This goes through the following stages:

  1. Clearing out all old configuration

  2. Updating from the stored defaults from downstream libraries (see update_defaults)

  3. Updating from yaml files and environment variables

Note that some functionality only checks configuration once at startup and may not change behavior, even if configuration changes. It is recommended to restart your python process if convenient to ensure that new configuration changes take place.

See also

dask.config.collect

for parameters

dask.config.update_defaults
dask.config.ensure_file(source, destination=None, comment=True)[source]

Copy file to default location if it does not already exist

This tries to move a default configuration file to a default location if if does not already exist. It also comments out that file by default.

This is to be used by downstream modules (like dask.distributed) that may have default configuration files that they wish to include in the default configuration path.

Parameters
sourcestring, filename

Source configuration file, typically within a source directory.

destinationstring, directory

Destination directory. Configurable by DASK_CONFIG environment variable, falling back to ~/.config/dask.

commentbool, True by default

Whether or not to comment out the config file when copying.

dask.config.expand_environment_variables(config)[source]

Expand environment variables in a nested config dictionary

This function will recursively search through any nested dictionaries and/or lists.

Parameters
configdict, iterable, or str

Input object to search for environment variables

Returns
configsame type as input

Examples

>>> expand_environment_variables({'x': [1, 2, '$USER']})  
{'x': [1, 2, 'my-username']}

Configuration Reference

Note

It is possible to configure Dask inline with dot notation, with YAML or via environment variables. See the conversion utility for converting the following dot notation to other forms.

Dask

temporary-directory   None

Temporary directory for local disk storage /tmp, /scratch, or /local. This directory is used during dask spill-to-disk operations. When the value is "null" (default), dask will create a directory from where dask was launched: `cwd/dask-worker-space`

tokenize.ensure-deterministic   False

If ``true``, tokenize will error instead of falling back to uuids when a deterministic token cannot be generated. Defaults to ``false``.

dataframe.shuffle-compression   None

Compression algorithm used for on disk-shuffling. Partd, the library used for compression supports ZLib, BZ2, SNAPPY, and BLOSC

dataframe.parquet.metadata-task-size-local   512

The number of files to handle within each metadata-processing task when reading a parquet dataset from a LOCAL file system. Specifying 0 will result in serial execution on the client.

dataframe.parquet.metadata-task-size-remote   16

The number of files to handle within each metadata-processing task when reading a parquet dataset from a REMOTE file system. Specifying 0 will result in serial execution on the client.

array.svg.size   120

The size of pixels used when displaying a dask array as an SVG image. This is used, for example, for nice rendering in a Jupyter notebook

array.slicing.split-large-chunks   None

How to handle large chunks created when slicing Arrays. By default a warning is produced. Set to ``False`` to silence the warning and allow large output chunks. Set to ``True`` to silence the warning and avoid large output chunks.

optimization.fuse.active   None

Turn task fusion on/off. This option refers to the fusion of a fully-materialized task graph (not a high-Level graph). By default (None), the active task-fusion option will be treated as ``False`` for Dask-Dataframe collections, and as ``True`` for all other graphs (including Dask-Array collections).

optimization.fuse.ave-width   1

Upper limit for width, where width = num_nodes / height, a good measure of parallelizability

optimization.fuse.max-width   None

Don't fuse if total width is greater than this. Set to null to dynamically adjust to 1.5 + ave_width * log(ave_width + 1)

optimization.fuse.max-height   inf

Don't fuse more than this many levels

optimization.fuse.max-depth-new-edges   None

Don't fuse if new dependencies are added after this many levels. Set to null to dynamically adjust to ave_width * 1.5.

optimization.fuse.subgraphs   None

Set to True to fuse multiple tasks into SubgraphCallable objects. Set to None to let the default optimizer of individual dask collections decide. If no collection-specific default exists, None defaults to False.

optimization.fuse.rename-keys   True

Set to true to rename the fused keys with `default_fused_keys_renamer`. Renaming fused keys can keep the graph more understandable and comprehensible, but it comes at the cost of additional processing. If False, then the top-most key will be used. For advanced usage, a function to create the new name is also accepted.

Distributed Client

distributed.client.heartbeat   5s

This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone.

distributed.client.scheduler-info-interval   2s

Interval between scheduler-info updates

Distributed Comm

distributed.comm.retry.count   0

The number of times to retry a connection

distributed.comm.retry.delay.min   1s

The first non-zero delay between retry attempts

distributed.comm.retry.delay.max   20s

The maximum delay between retries

distributed.comm.compression   auto

The compression algorithm to use This could be one of lz4, snappy, zstd, or blosc

distributed.comm.shard   64MiB

The maximum size of a frame to send through a comm Some network infrastructure doesn't like sending through very large messages. Dask comms will cut up these large messages into many small ones. This attribute determines the maximum size of such a shard.

distributed.comm.offload   10MiB

The size of message after which we choose to offload serialization to another thread In some cases, you may also choose to disable this altogether with the value false This is useful if you want to include serialization in profiling data, or if you have data types that are particularly sensitive to deserialization

distributed.comm.default-scheme   tcp

The default protocol to use, like tcp or tls

distributed.comm.socket-backlog   2048

When shuffling data between workers, there can really be O(cluster size) connection requests on a single worker socket, make sure the backlog is large enough not to lose any.

distributed.comm.recent-messages-log-length   0

number of messages to keep for debugging

distributed.comm.ucx.cuda_copy   None

Set environment variables to enable CUDA support over UCX. This may be used even if InfiniBand and NVLink are not supported or disabled, then transferring data over TCP.

distributed.comm.ucx.tcp   None

Set environment variables to enable TCP over UCX, even if InfiniBand and NVLink are not supported or disabled.

Set environment variables to enable UCX over NVLink, implies ``distributed.comm.ucx.tcp=True``.

distributed.comm.ucx.infiniband   None

Set environment variables to enable UCX over InfiniBand, implies ``distributed.comm.ucx.tcp=True``.

distributed.comm.ucx.rdmacm   None

Set environment variables to enable UCX RDMA connection manager support, requires ``distributed.comm.ucx.infiniband=True``.

distributed.comm.ucx.net-devices   None

Interface(s) used by workers for UCX communication. Can be a string (like ``"eth0"`` for NVLink or ``"mlx5_0:1"``/``"ib0"`` for InfiniBand), ``"auto"`` (requires ``distributed.comm.ucx.infiniband=True``) to pick the optimal interface per-worker based on the system's topology, or ``None`` to stay with the default value of ``"all"`` (use all available interfaces). Setting to ``"auto"`` requires UCX-Py to be installed and compiled with hwloc support. Unexpected errors can occur when using ``"auto"`` if any interfaces are disconnected or improperly configured.

distributed.comm.ucx.reuse-endpoints   None

Enable UCX-Py reuse endpoints mechanism if ``True`` or if it's not specified and UCX < 1.11 is installed, otherwise disable reuse endpoints. This was primarily introduced to resolve an issue with CUDA IPC that has been fixed in UCX 1.10, but can cause establishing endpoints to be very slow, this is particularly noticeable in clusters of more than a few dozen workers.

distributed.comm.ucx.create-cuda-context   None

Creates a CUDA context before UCX is initialized. This is necessary to enable UCX to properly identify connectivity of GPUs with specialized networking hardware, such as InfiniBand. This permits UCX to choose transports automatically, without specifying additional variables for each transport, while ensuring optimal connectivity. When ``True``, a CUDA context will be created on the first device listed in ``CUDA_VISIBLE_DEVICES``.

distributed.comm.zstd.level   3

Compression level, between 1 and 22.

distributed.comm.zstd.threads   0

Number of threads to use. 0 for single-threaded, -1 to infer from cpu count.

distributed.comm.timeouts.connect   30s

No Comment

distributed.comm.timeouts.tcp   30s

No Comment

distributed.comm.require-encryption   None

Whether to require encryption on non-local comms

distributed.comm.tls.ciphers   None

No Comment

distributed.comm.tls.ca-file   None

Path to a CA file, in pem format

distributed.comm.tls.scheduler.cert   None

Path to certificate file

distributed.comm.tls.scheduler.key   None

Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank

distributed.comm.tls.worker.key   None

Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank

distributed.comm.tls.worker.cert   None

Path to certificate file

distributed.comm.tls.client.key   None

Path to key file. Alternatively, the key can be appended to the cert file above, and this field left blank

distributed.comm.tls.client.cert   None

Path to certificate file

distributed.comm.websockets.shard   8MiB

The maximum size of a websocket frame to send through a comm. This is somewhat duplicative of distributed.comm.shard, but websockets often have much smaller maximum message sizes than othe protocols, so this attribute is used to set a smaller default shard size and to allow separate control of websocket message sharding.

Distributed Dashboard

The form for the dashboard links This is used wherever we print out the link for the dashboard It is filled in with relevant information like the schema, host, and port number

distributed.dashboard.export-tool   False

No Comment

distributed.dashboard.graph-max-items   5000

maximum number of tasks to try to plot in "graph" view

distributed.dashboard.prometheus.namespace   dask

Namespace prefix to use for all prometheus metrics.

Distributed Deploy

distributed.deploy.lost-worker-timeout   15s

Interval after which to hard-close a lost worker job Otherwise we wait for a while to see if a worker will reappear

distributed.deploy.cluster-repr-interval   500ms

Interval between calls to update cluster-repr for the widget

Distributed Scheduler

distributed.scheduler.allowed-failures   3

The number of retries before a task is considered bad When a worker dies when a task is running that task is rerun elsewhere. If many workers die while running this same task then we call the task bad, and raise a KilledWorker exception. This is the number of workers that are allowed to die before this task is marked as bad.

distributed.scheduler.bandwidth   100000000

The expected bandwidth between any pair of workers This is used when making scheduling decisions. The scheduler will use this value as a baseline, but also learn it over time.

distributed.scheduler.blocked-handlers   []

A list of handlers to exclude The scheduler operates by receiving messages from various workers and clients and then performing operations based on those messages. Each message has an operation like "close-worker" or "task-finished". In some high security situations administrators may choose to block certain handlers from running. Those handlers can be listed here. For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute.

distributed.scheduler.default-data-size   1kiB

The default size of a piece of data if we don't know anything about it. This is used by the scheduler in some scheduling decisions

distributed.scheduler.events-cleanup-delay   1h

The amount of time to wait until workers or clients are removed from the event log after they have been removed from the scheduler

distributed.scheduler.idle-timeout   None

Shut down the scheduler after this duration if no activity has occured This can be helpful to reduce costs and stop zombie processes from roaming the earth.

distributed.scheduler.transition-log-length   100000

How long should we keep the transition log Every time a task transitions states (like "waiting", "processing", "memory", "released") we record that transition in a log. To make sure that we don't run out of memory we will clear out old entries after a certain length. This is that length.

distributed.scheduler.events-log-length   100000

How long should we keep the events log All events (e.g. worker heartbeat) are stored in the events log. To make sure that we don't run out of memory we will clear out old entries after a certain length. This is that length.

distributed.scheduler.work-stealing   True

Whether or not to balance work between workers dynamically Some times one worker has more work than we expected. The scheduler will move these tasks around as necessary by default. Set this to false to disable this behavior

distributed.scheduler.work-stealing-interval   100ms

How frequently to balance worker loads

distributed.scheduler.worker-ttl   None

Time to live for workers. If we don't receive a heartbeat faster than this then we assume that the worker has died.

distributed.scheduler.pickle   True

Is the scheduler allowed to deserialize arbitrary bytestrings? The scheduler almost never deserializes user data. However there are some cases where the user can submit functions to run directly on the scheduler. This can be convenient for debugging, but also introduces some security risk. By setting this to false we ensure that the user is unable to run arbitrary code on the scheduler.

distributed.scheduler.preload   []

Run custom modules during the lifetime of the scheduler You can run custom modules when the scheduler starts up and closes down. See https://docs.dask.org/en/latest/setup/custom-startup.html for more information

distributed.scheduler.preload-argv   []

Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/setup/custom-startup.html for more information

distributed.scheduler.unknown-task-duration   500ms

Default duration for all tasks with unknown durations Over time the scheduler learns a duration for tasks. However when it sees a new type of task for the first time it has to make a guess as to how long it will take. This value is that guess.

distributed.scheduler.default-task-durations.rechunk-split   1us

No Comment

distributed.scheduler.default-task-durations.split-shuffle   1us

No Comment

distributed.scheduler.validate   False

Whether or not to run consistency checks during execution. This is typically only used for debugging.

distributed.scheduler.dashboard.status.task-stream-length   1000

The maximum number of tasks to include in the task stream plot

distributed.scheduler.dashboard.tasks.task-stream-length   100000

The maximum number of tasks to include in the task stream plot

distributed.scheduler.dashboard.tls.ca-file   None

No Comment

distributed.scheduler.dashboard.tls.key   None

No Comment

distributed.scheduler.dashboard.tls.cert   None

No Comment

distributed.scheduler.dashboard.bokeh-application.allow_websocket_origin   ['*']

No Comment

distributed.scheduler.dashboard.bokeh-application.keep_alive_milliseconds   500

No Comment

distributed.scheduler.dashboard.bokeh-application.check_unused_sessions_milliseconds   500

No Comment

distributed.scheduler.locks.lease-validation-interval   10s

The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself.

distributed.scheduler.locks.lease-timeout   30s

Maximum interval to wait for a Client refresh before a lease is invalidated and released.

distributed.scheduler.http.routes   ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']

A list of modules like "prometheus" and "health" that can be included or excluded as desired These modules will have a ``routes`` keyword that gets added to the main HTTP Server. This is also a list that can be extended with user defined modules.

distributed.scheduler.allowed-imports   ['dask', 'distributed']

A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules.

distributed.scheduler.active-memory-manager.start   False

set to true to auto-start the AMM on Scheduler init

distributed.scheduler.active-memory-manager.interval   2s

Time expression, e.g. "2s". Run the AMM cycle every .

distributed.scheduler.active-memory-manager.policies   [{'class': 'distributed.active_memory_manager.ReduceReplicas'}]

No Comment

Distributed Worker

distributed.worker.blocked-handlers   []

A list of handlers to exclude The scheduler operates by receiving messages from various workers and clients and then performing operations based on those messages. Each message has an operation like "close-worker" or "task-finished". In some high security situations administrators may choose to block certain handlers from running. Those handlers can be listed here. For a list of handlers see the `dask.distributed.Scheduler.handlers` attribute.

distributed.worker.multiprocessing-method   spawn

How we create new workers, one of "spawn", "forkserver", or "fork" This is passed to the ``multiprocessing.get_context`` function.

distributed.worker.use-file-locking   True

Whether or not to use lock files when creating workers Workers create a local directory in which to place temporary files. When many workers are created on the same process at once these workers can conflict with each other by trying to create this directory all at the same time. To avoid this, Dask usually used a file-based lock. However, on some systems file-based locks don't work. This is particularly common on HPC NFS systems, where users may want to set this to false.

distributed.worker.connections.outgoing   50

No Comment

distributed.worker.connections.incoming   10

No Comment

distributed.worker.preload   []

Run custom modules during the lifetime of the worker You can run custom modules when the worker starts up and closes down. See https://docs.dask.org/en/latest/setup/custom-startup.html for more information

distributed.worker.preload-argv   []

Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/setup/custom-startup.html for more information

distributed.worker.daemon   True

Whether or not to run our process as a daemon process

distributed.worker.validate   False

Whether or not to run consistency checks during execution. This is typically only used for debugging.

distributed.worker.lifetime.duration   None

The time after creation to close the worker, like "1 hour"

distributed.worker.lifetime.stagger   0 seconds

Random amount by which to stagger lifetimes If you create many workers at the same time, you may want to avoid having them kill themselves all at the same time. To avoid this you might want to set a stagger time, so that they close themselves with some random variation, like "5 minutes" That way some workers can die, new ones can be brought up, and data can be transferred over smoothly.

distributed.worker.lifetime.restart   False

Do we try to resurrect the worker after the lifetime deadline?

distributed.worker.profile.interval   10ms

The time between polling the worker threads, typically short like 10ms

distributed.worker.profile.cycle   1000ms

The time between bundling together this data and sending it to the scheduler This controls the granularity at which people can query the profile information on the time axis.

distributed.worker.profile.low-level   False

Whether or not to use the libunwind and stacktrace libraries to gather profiling information at the lower level (beneath Python) To get this to work you will need to install the experimental stacktrace library at conda install -c numba stacktrace See https://github.com/numba/stacktrace

distributed.worker.memory.recent-to-old-time   30s

When there is an increase in process memory (as observed by the operating system) that is not accounted for by the dask keys stored on the worker, ignore it for this long before considering it in non-time-sensitive heuristics. This should be set to be longer than the duration of most dask tasks.

distributed.worker.memory.rebalance.measure   optimistic

Which of the properties of distributed.scheduler.MemoryState should be used for measuring worker memory usage

distributed.worker.memory.rebalance.sender-min   0.3

Fraction of worker process memory at which we start potentially transferring data to other workers.

distributed.worker.memory.rebalance.recipient-max   0.6

Fraction of worker process memory at which we stop potentially receiving data from other workers. Ignored when max_memory is not set.

distributed.worker.memory.rebalance.sender-recipient-gap   0.1

Fraction of worker process memory, around the cluster mean, where a worker is neither a sender nor a recipient of data during a rebalance operation. E.g. if the mean cluster occupation is 50%, sender-recipient-gap=0.1 means that only nodes above 55% will donate data and only nodes below 45% will receive them. This helps avoid data from bouncing around the cluster repeatedly.

distributed.worker.memory.target   0.6

When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest chunks of data to disk

distributed.worker.memory.spill   0.7

When the process memory (as observed by the operating system) gets above this amount we spill all data to disk.

distributed.worker.memory.pause   0.8

When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker.

distributed.worker.memory.terminate   0.95

When the process memory reaches this level the nanny process will kill the worker (if a nanny is present)

distributed.worker.http.routes   ['distributed.http.worker.prometheus', 'distributed.http.health', 'distributed.http.statics']

A list of modules like "prometheus" and "health" that can be included or excluded as desired These modules will have a ``routes`` keyword that gets added to the main HTTP Server. This is also a list that can be extended with user defined modules.

Distributed Nanny

distributed.nanny.preload   []

Run custom modules during the lifetime of the scheduler You can run custom modules when the scheduler starts up and closes down. See https://docs.dask.org/en/latest/setup/custom-startup.html for more information

distributed.nanny.preload-argv   []

Arguments to pass into the preload scripts described above See https://docs.dask.org/en/latest/setup/custom-startup.html for more information

distributed.nanny.environ.MALLOC_TRIM_THRESHOLD_   65536

No Comment

distributed.nanny.environ.OMP_NUM_THREADS   1

No Comment

distributed.nanny.environ.MKL_NUM_THREADS   1

No Comment

Distributed Admin

distributed.admin.tick.interval   20ms

The time between ticks, default 20ms

distributed.admin.tick.limit   3s

The time allowed before triggering a warning

distributed.admin.max-error-length   10000

Maximum length of traceback as text Some Python tracebacks can be very very long (particularly in stack overflow errors) If the traceback is larger than this size (in bytes) then we truncate it.

distributed.admin.log-length   10000

Default length of logs to keep in memory The scheduler and workers keep the last 10000 or so log entries in memory.

distributed.admin.log-format   %(name)s - %(levelname)s - %(message)s

The log format to emit. See https://docs.python.org/3/library/logging.html#logrecord-attributes

distributed.admin.pdb-on-err   False

Enter Python Debugger on scheduling error

distributed.admin.system-monitor.interval   500ms

Polling time to query cpu/memory statistics default 500ms

distributed.admin.event-loop   tornado

The event loop to use, Must be one of tornado, asyncio, or uvloop

Distributed RMM

distributed.rmm.pool-size   None

The size of the memory pool in bytes.