Python API (advanced)

In some rare cases, experts may want to create Scheduler, Worker, and Nanny objects explicitly in Python. This is often necessary when making tools to automatically deploy Dask in custom settings.

It is more common to create a Local cluster with Client() on a single machine or use the Command Line Interface (CLI). New readers are recommended to start there.

If you do want to start Scheduler and Worker objects yourself you should be a little familiar with async/await style Python syntax. These objects are awaitable and are commonly used within async with context managers. Here are a few examples to show a few ways to start and finish things.

Full Example

Scheduler([loop, delete_interval, ...])

Dynamic distributed task scheduler

Worker([scheduler_ip, scheduler_port, ...])

Worker node in a Dask distributed cluster

Client([address, loop, timeout, ...])

Connect to and submit computation to a Dask cluster

We first start with a comprehensive example of setting up a Scheduler, two Workers, and one Client in the same event loop, running a simple computation, and then cleaning everything up.

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

Now we look at simpler examples that build up to this case.

Scheduler

Scheduler([loop, delete_interval, ...])

Dynamic distributed task scheduler

We create scheduler by creating a Scheduler() object, and then await that object to wait for it to start up. We can then wait on the .finished method to wait until it closes. In the meantime the scheduler will be active managing the cluster..

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = Scheduler()        # scheduler created, but not yet running
    s = await s            # the scheduler is running
    await s.finished()     # wait until the scheduler closes

asyncio.get_event_loop().run_until_complete(f())

This program will run forever, or until some external process connects to the scheduler and tells it to stop. If you want to close things yourself you can close any Scheduler, Worker, Nanny, or Client class by awaiting the .close method:

await s.close()

Worker

Worker([scheduler_ip, scheduler_port, ...])

Worker node in a Dask distributed cluster

The worker follows the same API. The only difference is that the worker needs to know the address of the scheduler.

import asyncio
from dask.distributed import Scheduler, Worker

async def f(scheduler_address):
    w = await Worker(scheduler_address)
    await w.finished()

asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))

Start many in one event loop

Scheduler([loop, delete_interval, ...])

Dynamic distributed task scheduler

Worker([scheduler_ip, scheduler_port, ...])

Worker node in a Dask distributed cluster

We can run as many of these objects as we like in the same event loop.

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    s = await Scheduler()
    w = await Worker(s.address)
    await w.finished()
    await s.finished()

asyncio.get_event_loop().run_until_complete(f())

Use Context Managers

We can also use async with context managers to make sure that we clean up properly. Here is the same example as from above:

import asyncio
from dask.distributed import Scheduler, Worker

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w:
            await w.finished()
            await s.finished()

asyncio.get_event_loop().run_until_complete(f())

Alternatively, in the example below we also include a Client, run a small computation, and then allow things to clean up after that computation..

import asyncio
from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())

This is equivalent to creating and awaiting each server, and then calling .close on each as we leave the context. In this example we don’t wait on s.finished(), so this will terminate relatively quickly. You could have called await s.finished() though if you wanted this to run forever.

Nanny

Nanny([scheduler_ip, scheduler_port, ...])

A process to manage worker processes

Alternatively, we can replace Worker with Nanny if we want your workers to be managed in a separate process. The Nanny constructor follows the same API. This allows workers to restart themselves in case of failure. Also, it provides some additional monitoring, and is useful when coordinating many workers that should live in different processes in order to avoid the GIL.

# w = await Worker(s.address)
w = await Nanny(s.address)

API

These classes have a variety of keyword arguments that you can use to control their behavior. See the API documentation below for more information.

Scheduler

class distributed.Scheduler(loop=None, delete_interval='500ms', synchronize_worker_interval='60s', services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), **kwargs)[source]

Dynamic distributed task scheduler

The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.

All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.

The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.

A Scheduler is typically started either with the dask-scheduler executable:

$ dask-scheduler
Scheduler started at 127.0.0.1:8786

Or within a LocalCluster a Client starts up without connection information:

>>> c = Client()  
>>> c.cluster.scheduler  
Scheduler(...)

Users typically do not interact with the scheduler directly but rather with the client object Client.

State

The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.

  • tasks: {task key: TaskState}

    Tasks currently known to the scheduler

  • unrunnable: {TaskState}

    Tasks in the “no-worker” state

  • workers: {worker key: WorkerState}

    Workers currently connected to the scheduler

  • idle: {WorkerState}:

    Set of workers that are not fully utilized

  • saturated: {WorkerState}:

    Set of workers that are not over-utilized

  • host_info: {hostname: dict}:

    Information about each worker host

  • clients: {client key: ClientState}

    Clients currently connected to the scheduler

  • services: {str: port}:

    Other services running on this scheduler, like Bokeh

  • loop: IOLoop:

    The running Tornado IOLoop

  • client_comms: {client key: Comm}

    For each client, a Comm object used to receive task requests and report task status updates.

  • stream_comms: {worker key: Comm}

    For each worker, a Comm object from which we both accept stimuli and report results

  • task_duration: {key-prefix: time}

    Time we expect certain functions to take, e.g. {'sum': 0.25}

adaptive_target(comm=None, target_duration=None)[source]

Desired number of workers based on the current workload

This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling.

Parameters
target_durationstr

A desired duration of time for computations to take. This affects how rapidly the scheduler will ask to scale.

async add_client(comm, client=None, versions=None)[source]

Add client to network

We listen to all future messages from this Comm.

add_keys(comm=None, worker=None, keys=(), stimulus_id=None)[source]

Learn that a worker has certain keys

This should not be used in practice and is mostly here for legacy reasons. However, it is sent by workers from time to time.

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs)[source]

Add external plugin to scheduler.

See https://distributed.readthedocs.io/en/latest/plugins.html

Parameters
pluginSchedulerPlugin

SchedulerPlugin instance to add

idempotentbool

If true, the plugin is assumed to already exist and no action is taken.

namestr

A name for the plugin, if None, the name attribute is checked on the Plugin instance and generated if not discovered.

**kwargs

Deprecated; additional arguments passed to the plugin class if it is not already an instance

async add_worker(comm=None, *, address: str, status: str, keys=(), nthreads=None, name=None, resolve_address=True, nbytes=None, types=None, now=None, resources=None, host_info=None, memory_limit=None, metrics=None, pid=0, services=None, local_directory=None, versions=None, nanny=None, extra=None)[source]

Add a new worker to the cluster

async broadcast(comm=None, msg=None, workers=None, hosts=None, nanny=False, serializers=None)[source]

Broadcast message to workers, return all results

cancel_key(key, client, retries=5, force=False)[source]

Cancel a particular key and all dependents

client_heartbeat(client=None)[source]

Handle heartbeats from Client

client_releases_keys(keys=None, client=None)[source]

Remove keys from client desired list

client_send(client, msg)[source]

Send message to client

async close(comm=None, fast=False, close_workers=False)[source]

Send cleanup signal to all coroutines then wait until finished

See also

Scheduler.cleanup
async close_worker(comm=None, worker=None, safe=None)[source]

Remove a worker from the cluster

This both removes the worker from our local state and also sends a signal to the worker to shut down. This works regardless of whether or not the worker has a nanny process restarting it

coerce_address(addr, resolve=True)[source]

Coerce possible input addresses to canonical form. resolve can be disabled for testing with fake hostnames.

Handles strings, tuples, or aliases.

async delete_worker_data(worker_address: str, keys: Collection[str]) None[source]

Delete data from a worker and update the corresponding worker/task states

Parameters
worker_address: str

Worker address to delete keys from

keys: list[str]

List of keys to delete on the specified worker

async feed(comm, function=None, setup=None, teardown=None, interval='1s', **kwargs)[source]

Provides a data Comm to external requester

Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.

async gather(comm=None, keys=None, serializers=None)[source]

Collect data from workers to the scheduler

async gather_on_worker(worker_address: str, who_has: dict[str, list[str]]) set[source]

Peer-to-peer copy of keys from multiple workers to a single worker

Parameters
worker_address: str

Recipient worker address to copy keys to

who_has: dict[Hashable, list[str]]

{key: [sender address, sender address, …], key: …}

Returns
returns:

set of keys that failed to be copied

get_worker_service_addr(worker, service_name, protocol=False)[source]

Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.

Parameters
workeraddress
service_namestr

Common services include ‘bokeh’ and ‘nanny’

protocolboolean

Whether or not to include a full address with protocol (True) or just a (host, port) pair

handle_long_running(key=None, worker=None, compute_duration=None)[source]

A task has seceded from the thread pool

We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.

async handle_worker(comm=None, worker=None)[source]

Listen to responses from a single worker

This is the main loop for scheduler-worker interaction

See also

Scheduler.handle_client

Equivalent coroutine for clients

identity(comm=None)[source]

Basic information about ourselves and our cluster

async proxy(comm=None, msg=None, worker=None, serializers=None)[source]

Proxy a communication through the scheduler to some other worker

async rebalance(comm=None, keys: Iterable[Hashable] = None, workers: Iterable[str] = None) dict[source]

Rebalance keys so that each worker ends up with roughly the same process memory (managed+unmanaged).

Warning

This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.

Algorithm

  1. Find the mean occupancy of the cluster, defined as data managed by dask + unmanaged process memory that has been there for at least 30 seconds (distributed.worker.memory.recent-to-old-time). This lets us ignore temporary spikes caused by task heap usage.

    Alternatively, you may change how memory is measured both for the individual workers as well as to calculate the mean through distributed.worker.memory.rebalance.measure. Namely, this can be useful to disregard inaccurate OS memory measurements.

  2. Discard workers whose occupancy is within 5% of the mean cluster occupancy (distributed.worker.memory.rebalance.sender-recipient-gap / 2). This helps avoid data from bouncing around the cluster repeatedly.

  3. Workers above the mean are senders; those below are recipients.

  4. Discard senders whose absolute occupancy is below 30% (distributed.worker.memory.rebalance.sender-min). In other words, no data is moved regardless of imbalancing as long as all workers are below 30%.

  5. Discard recipients whose absolute occupancy is above 60% (distributed.worker.memory.rebalance.recipient-max). Note that this threshold by default is the same as distributed.worker.memory.target to prevent workers from accepting data and immediately spilling it out to disk.

  6. Iteratively pick the sender and recipient that are farthest from the mean and move the least recently inserted key between the two, until either all senders or all recipients fall within 5% of the mean.

    A recipient will be skipped if it already has a copy of the data. In other words, this method does not degrade replication. A key will be skipped if there are no recipients available with enough memory to accept the key and that don’t already hold a copy.

The least recently insertd (LRI) policy is a greedy choice with the advantage of being O(1), trivial to implement (it relies on python dict insertion-sorting) and hopefully good enough in most cases. Discarded alternative policies were:

  • Largest first. O(n*log(n)) save for non-trivial additional data structures and risks causing the largest chunks of data to repeatedly move around the cluster like pinballs.

  • Least recently used (LRU). This information is currently available on the workers only and not trivial to replicate on the scheduler; transmitting it over the network would be very expensive. Also, note that dask will go out of its way to minimise the amount of time intermediate keys are held in memory, so in such a case LRI is a close approximation of LRU.

Parameters
keys: optional

whitelist of dask keys that should be considered for moving. All other keys will be ignored. Note that this offers no guarantee that a key will actually be moved (e.g. because it is unnecessary or because there are no viable recipient workers for it).

workers: optional

whitelist of workers addresses to be considered as senders or recipients. All other workers will be ignored. The mean cluster occupancy will be calculated only using the whitelisted workers.

reevaluate_occupancy(worker_index: ctypes.c_long = 0)[source]

Periodically reassess task duration time

The expected duration of a task can change over time. Unfortunately we don’t have a good constant-time way to propagate the effects of these changes out to the summaries that they affect, like the total expected runtime of each of the workers, or what tasks are stealable.

In this coroutine we walk through all of the workers and re-align their estimates with the current state of tasks. We do this periodically rather than at every transition, and we only do it if the scheduler process isn’t under load (using psutil.Process.cpu_percent()). This lets us avoid this fringe optimization when we have better things to think about.

async register_nanny_plugin(comm, plugin, name=None)[source]

Registers a setup function, and call it on every worker

async register_scheduler_plugin(comm=None, plugin=None, name=None)[source]

Register a plugin on the scheduler.

async register_worker_plugin(comm, plugin, name=None)[source]

Registers a worker plugin on all running and future workers

remove_client(client=None)[source]

Remove client from network

remove_plugin(name: str | None = None, plugin: SchedulerPlugin | None = None) None[source]

Remove external plugin from scheduler

Parameters
namestr

Name of the plugin to remove

pluginSchedulerPlugin

Deprecated; use name argument instead. Instance of a SchedulerPlugin class to remove;

async remove_worker(comm=None, address=None, safe=False, close=True)[source]

Remove worker from cluster

We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.

async replicate(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, lock=True)[source]

Replicate data throughout cluster

This performs a tree copy of the data throughout the network individually on each piece of data.

Parameters
keys: Iterable

list of keys to replicate

n: int

Number of replications we expect to see within the cluster

branching_factor: int, optional

The number of workers that can copy data in each generation. The larger the branching factor, the more data we copy in a single step, but the more a given worker risks being swamped by data requests.

report(msg: dict, ts: Optional[distributed.scheduler.TaskState] = None, client: Optional[str] = None)[source]

Publish updates to all listening Queues and Comms

If the message contains a key then we only send the message to those comms that care about the key.

reschedule(key=None, worker=None)[source]

Reschedule a task

Things may have shifted and this task may now be better suited to run elsewhere

async restart(client=None, timeout=30)[source]

Restart all workers. Reset local state.

async retire_workers(comm=None, workers=None, remove=True, close_workers=False, names=None, lock=True, **kwargs) dict[source]

Gracefully retire workers from cluster

Parameters
workers: list (optional)

List of worker addresses to retire. If not provided we call workers_to_close which finds a good set

names: list (optional)

List of worker names to retire.

remove: bool (defaults to True)

Whether or not to remove the worker metadata immediately or else wait for the worker to contact us

close_workers: bool (defaults to False)

Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker.

**kwargs: dict

Extra options to pass to workers_to_close to determine which workers we should drop

Returns
Dictionary mapping worker ID/address to dictionary of information about
that worker for each retired worker.
run_function(stream, function, args=(), kwargs={}, wait=True)[source]

Run a function within this process

async scatter(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]

Send data out to workers

send_all(client_msgs: dict, worker_msgs: dict)[source]

Send messages to client and workers

send_task_to_worker(worker, ts: distributed.scheduler.TaskState, duration: ctypes.c_double = - 1)[source]

Send a single computational task to a worker

async start()[source]

Clear out old state and restart all running coroutines

start_ipython(comm=None)[source]

Start an IPython kernel

Returns Jupyter connection info dictionary.

stimulus_cancel(comm, keys=None, client=None, force=False)[source]

Stop execution on a list of keys

stimulus_task_erred(key=None, worker=None, exception=None, traceback=None, **kwargs)[source]

Mark that a task has erred on a particular worker

stimulus_task_finished(key=None, worker=None, **kwargs)[source]

Mark that a task has finished execution on a particular worker

story(*keys)[source]

Get all transitions that touch one of the input keys

transition(key, finish: str, *args, **kwargs)[source]

Transition a key from its current state to the finish state

Returns
Dictionary of recommendations for future transitions

See also

Scheduler.transitions

transitive version of this function

Examples

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transition_story(*keys)

Get all transitions that touch one of the input keys

transitions(recommendations: dict)[source]

Process transitions until none are left

This includes feedback from previous transitions and continues until we reach a steady state

async unregister_nanny_plugin(comm, name)[source]

Unregisters a worker plugin

async unregister_worker_plugin(comm, name)[source]

Unregisters a worker plugin

update_data(comm=None, *, who_has: dict, nbytes: dict, client=None, serializers=None)[source]

Learn that new data has entered the network from an external source

See also

Scheduler.mark_key_in_memory
update_graph(client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None, resources=None, submitting_task=None, retries=None, user_priority=0, actors=None, fifo_timeout=0, annotations=None, code=None)[source]

Add new computations to the internal dask graph

This happens whenever the Client calls submit, map, get, or compute.

worker_send(worker, msg)[source]

Send message to worker

This also handles connection failures by adding a callback to remove the worker on the next cycle.

workers_list(workers)[source]

List of qualifying workers

Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match

workers_to_close(comm=None, memory_ratio: int | float | None = None, n: int | None = None, key: Callable[[WorkerState], Hashable] | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str][source]

Find workers that we can close with low cost

This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.

This is for use with systems like distributed.deploy.adaptive.

Parameters
memory_ratioNumber

Amount of extra space we want to have for our stored data. Defaults to 2, or that we want to have twice as much memory as we currently have data.

nint

Number of workers to close

minimumint

Minimum number of workers to keep around

keyCallable(WorkerState)

An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when closing workers must be done collectively, such as by hostname.

targetint

Target number of workers to have after we close

attributestr

The attribute of the WorkerState object to return, like “address” or “name”. Defaults to “address”.

Returns
to_close: list of worker addresses that are OK to close

Examples

>>> scheduler.workers_to_close()
['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

Group workers by hostname prior to closing

>>> scheduler.workers_to_close(key=lambda ws: ws.host)
['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

Remove two workers

>>> scheduler.workers_to_close(n=2)

Keep enough workers to have twice as much memory as we we need.

>>> scheduler.workers_to_close(memory_ratio=2)

Worker

class distributed.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, ncores: None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_dir: None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool = True, memory_limit: str | float = 'auto', executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, memory_monitor_interval: Any = '200ms', memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, extensions: list[type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, data: MutableMapping[str, Any] | Callable[[], MutableMapping[str, Any]] | tuple[Callable[..., MutableMapping[str, Any]], dict[str, Any]] | None = None, interface: str | None = None, host: str | None = None, port: int | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, **kwargs)[source]

Worker node in a Dask distributed cluster

Workers perform two functions:

  1. Serve data from a local dictionary

  2. Perform computation on that data and on data from peers

Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.

You can start a worker with the dask-worker command line application:

$ dask-worker scheduler-ip:port

Use the --help flag to see more options:

$ dask-worker --help

The rest of this docstring is about the internal state the the worker uses to manage and track internal computations.

State

Informational State

These attributes don’t change significantly during execution.

  • nthreads: int:

    Number of nthreads used by this worker process

  • executors: dict[str, concurrent.futures.Executor]:

    Executors used to perform computation. Always contains the default executor.

  • local_directory: path:

    Path on local machine to store temporary files

  • scheduler: rpc:

    Location of scheduler. See .ip/.port attributes.

  • name: string:

    Alias

  • services: {str: Server}:

    Auxiliary web servers running on this worker

  • service_ports: {str: port}:

  • total_out_connections: int

    The maximum number of concurrent outgoing requests for data

  • total_in_connections: int

    The maximum number of concurrent incoming requests for data

  • comm_threshold_bytes: int

    As long as the total number of bytes in flight is below this threshold we will not limit the number of outgoing connections for a single tasks dependency fetch.

  • batched_stream: BatchedSend

    A batched stream along which we communicate to the scheduler

  • log: [(message)]

    A structured and queryable log. See Worker.story

Volatile State

These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a key is the name of a task that we want to compute and dep is the name of a piece of dependent data that we want to collect from others.

  • tasks: {key: TaskState}

    The tasks currently executing on this worker (and any dependencies of those tasks)

  • data: {key: object}:

    Prefer using the host attribute instead of this, unless memory_limit and at least one of memory_target_fraction or memory_spill_fraction values are defined, in that case, this attribute is a zict.Buffer, from which information on LRU cache can be queried.

  • data.memory: {key: object}:

    Dictionary mapping keys to actual values stored in memory. Only available if condition for data being a zict.Buffer is met.

  • data.disk: {key: object}:

    Dictionary mapping keys to actual values stored on disk. Only available if condition for data being a zict.Buffer is met.

  • data_needed: deque(keys)

    The keys which still require data in order to execute, arranged in a deque

  • ready: [keys]

    Keys that are ready to run. Stored in a LIFO stack

  • constrained: [keys]

    Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque

  • executing_count: int

    A count of tasks currently executing on this worker

  • executed_count: int

    A number of tasks that this worker has run in its lifetime

  • long_running: {keys}

    A set of keys of tasks that are running and have started their own long-running clients.

  • has_what: {worker: {deps}}

    The data that we care about that we think a worker has

  • pending_data_per_worker: {worker: [dep]}

    The data on each worker that we still want, prioritized as a deque

  • in_flight_tasks: int

    A count of the number of tasks that are coming to us in current peer-to-peer connections

  • in_flight_workers: {worker: {task}}

    The workers from which we are currently gathering data and the dependencies we expect from those connections

  • comm_bytes: int

    The total number of bytes in flight

  • threads: {key: int}

    The ID of the thread on which the task ran

  • active_threads: {int: key}

    The keys currently running on active threads

  • waiting_for_data_count: int

    A count of how many tasks are currently waiting for data

Parameters
scheduler_ip: str, optional
scheduler_port: int, optional
scheduler_file: str, optional
ip: str, optional
data: MutableMapping, type, None

The object to use for storage, builds a disk-backed LRU dict by default

nthreads: int, optional
loop: tornado.ioloop.IOLoop
local_directory: str, optional

Directory where we place local resources

name: str, optional
memory_limit: int, float, string

Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9

memory_target_fraction: float or False

Fraction of memory to try to stay beneath (default: read from config key distributed.worker.memory.target)

memory_spill_fraction: float or false

Fraction of memory at which we start spilling to disk (default: read from config key distributed.worker.memory.spill)

memory_pause_fraction: float or False

Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause)

executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
The executor(s) to use. Depending on the type, it has the following meanings:
  • Executor instance: The default executor.

  • Dict[str, Executor]: mapping names to Executor instances. If the “default” key isn’t in the dict, a “default” executor will be created using ThreadPoolExecutor(nthreads).

  • Str: The string “offload”, which refer to the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.

resources: dict

Resources that this worker has like {'GPU': 2}

nanny: str

Address on which to contact nanny, if it exists

lifetime: str

Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.

lifetime_stagger: str

Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger

lifetime_restart: bool

Whether or not to restart a worker after it has reached its lifetime Default False

kwargs: optional

Additional parameters to ServerNode constructor

Examples

Use the command line to start a worker:

$ dask-scheduler
Start scheduler at 127.0.0.1:8786

$ dask-worker 127.0.0.1:8786
Start worker at:               127.0.0.1:1234
Registered with scheduler at:  127.0.0.1:8786
async close_gracefully(restart=None)[source]

Gracefully shut down a worker

This first informs the scheduler that we’re shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal

async gather_dep(worker: str, to_gather: Iterable[str], total_nbytes: int, *, stimulus_id)[source]

Gather dependencies for a task from a worker who has them

Parameters
workerstr

Address of worker to gather dependencies from

to_gatherlist

Keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of dep as some dependencies may already be present on this worker.

total_nbytesint

Total number of bytes for all the dependencies in to_gather combined

get_current_task() str[source]

Get the key of the task we are currently running

This only makes sense to run within a task

See also

get_worker

Examples

>>> from dask.distributed import get_worker
>>> def f():
...     return get_worker().get_current_task()
>>> future = client.submit(f)  
>>> future.result()  
'f-1234'
handle_cancel_compute(key, reason)[source]

Cancel a task on a best effort basis. This is only possible while a task is in state waiting or ready. Nothing will happen otherwise.

handle_free_keys(comm=None, keys=None, stimulus_id=None)[source]

Handler to be called by the scheduler.

The given keys are no longer referred to and required by the scheduler. The worker is now allowed to release the key, if applicable.

This does not guarantee that the memory is released since the worker may still decide to hold on to the data and task since it is required by an upstream dependency.

handle_remove_replicas(keys, stimulus_id)[source]

Stream handler notifying the worker that it might be holding unreferenced, superfluous data.

This should not actually happen during ordinary operations and is only intended to correct any erroneous state. An example where this is necessary is if a worker fetches data for a downstream task but that task is released before the data arrives. In this case, the scheduler will notify the worker that it may be holding this unnecessary data, if the worker hasn’t released the data itself, already.

This handler does not guarantee the task nor the data to be actually released but only asks the worker to release the data on a best effort guarantee. This protects from race conditions where the given keys may already have been rescheduled for compute in which case the compute would win and this handler is ignored.

For stronger guarantees, see handler free_keys

property local_dir

For API compatibility with Nanny

async memory_monitor()[source]

Track this process’s memory usage and act accordingly

If we rise above 70% memory use, start dumping data to disk.

If we rise above 80% memory use, stop execution of new tasks

start_ipython(comm)[source]

Start an IPython kernel

Returns Jupyter connection info dictionary.

transition(ts, finish: str, *, stimulus_id, **kwargs)[source]

Transition a key from its current state to the finish state

Returns
Dictionary of recommendations for future transitions

See also

Scheduler.transitions

transitive version of this function

Examples

>>> self.transition('x', 'waiting')
{'x': 'processing'}
transitions(recommendations: dict, *, stimulus_id)[source]

Process transitions until none are left

This includes feedback from previous transitions and continues until we reach a steady state

trigger_profile()[source]

Get a frame from all actively computing threads

Merge these frames into existing profile counts

property worker_address

For API compatibility with Nanny

Nanny

class distributed.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port=0, nthreads=None, ncores=None, loop=None, local_dir=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port=None, protocol=None, config=None, **worker_kwargs)[source]

A process to manage worker processes

The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the Client.restart method, or to restart the worker automatically if it gets to the terminate fraction of its memory limit.

The parameters for the Nanny are mostly the same as those for the Worker with exceptions listed below.

Parameters
env: dict, optional

Environment variables set at time of Nanny initialization will be ensured to be set in the Worker process as well. This argument allows to overwrite or otherwise set environment variables for the Worker. It is also possible to set environment variables using the option distributed.nanny.environ. Precedence as follows

  1. Nanny arguments

  2. Existing environment variables

  3. Dask configuration

See also

Worker
async close(comm=None, timeout=5, report=None)[source]

Close the worker process, stop all comms.

close_gracefully(comm=None)[source]

A signal that we shouldn’t try to restart workers if they go away

This is used as part of the cluster shutdown process.

async instantiate(comm=None) distributed.core.Status[source]

Start a local worker process

Blocks until the process is up and the scheduler is properly informed

async kill(comm=None, timeout=2)[source]

Kill the local worker process

Blocks until both the process is down and the scheduler is properly informed

property local_dir

For API compatibility with Nanny

memory_monitor()[source]

Track worker’s memory. Restart if it goes above terminate fraction

async start()[source]

Start nanny, start local process, start watching