Python API (advanced)
Contents
Python API (advanced)¶
In some rare cases, experts may want to create Scheduler
, Worker
, and
Nanny
objects explicitly in Python. This is often necessary when making
tools to automatically deploy Dask in custom settings.
It is more common to create a Local cluster with Client() on a single machine or use the Command Line Interface (CLI). New readers are recommended to start there.
If you do want to start Scheduler and Worker objects yourself you should be a
little familiar with async
/await
style Python syntax. These objects
are awaitable and are commonly used within async with
context managers.
Here are a few examples to show a few ways to start and finish things.
Full Example¶
|
Dynamic distributed task scheduler |
|
Worker node in a Dask distributed cluster |
|
Connect to and submit computation to a Dask cluster |
We first start with a comprehensive example of setting up a Scheduler, two Workers, and one Client in the same event loop, running a simple computation, and then cleaning everything up.
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
Now we look at simpler examples that build up to this case.
Scheduler¶
|
Dynamic distributed task scheduler |
We create scheduler by creating a Scheduler()
object, and then await
that object to wait for it to start up. We can then wait on the .finished
method to wait until it closes. In the meantime the scheduler will be active
managing the cluster..
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
s = Scheduler() # scheduler created, but not yet running
s = await s # the scheduler is running
await s.finished() # wait until the scheduler closes
asyncio.get_event_loop().run_until_complete(f())
This program will run forever, or until some external process connects to the
scheduler and tells it to stop. If you want to close things yourself you can
close any Scheduler
, Worker
, Nanny
, or Client
class by awaiting
the .close
method:
await s.close()
Worker¶
|
Worker node in a Dask distributed cluster |
The worker follows the same API. The only difference is that the worker needs to know the address of the scheduler.
import asyncio
from dask.distributed import Scheduler, Worker
async def f(scheduler_address):
w = await Worker(scheduler_address)
await w.finished()
asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))
Start many in one event loop¶
|
Dynamic distributed task scheduler |
|
Worker node in a Dask distributed cluster |
We can run as many of these objects as we like in the same event loop.
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
s = await Scheduler()
w = await Worker(s.address)
await w.finished()
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
Use Context Managers¶
We can also use async with
context managers to make sure that we clean up
properly. Here is the same example as from above:
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w:
await w.finished()
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
Alternatively, in the example below we also include a Client
, run a small
computation, and then allow things to clean up after that computation..
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
This is equivalent to creating and awaiting
each server, and then calling
.close
on each as we leave the context.
In this example we don’t wait on s.finished()
, so this will terminate
relatively quickly. You could have called await s.finished()
though if you
wanted this to run forever.
Nanny¶
|
A process to manage worker processes |
Alternatively, we can replace Worker
with Nanny
if we want your workers
to be managed in a separate process. The Nanny
constructor follows the
same API. This allows workers to restart themselves in case of failure. Also,
it provides some additional monitoring, and is useful when coordinating many
workers that should live in different processes in order to avoid the GIL.
# w = await Worker(s.address)
w = await Nanny(s.address)
API¶
These classes have a variety of keyword arguments that you can use to control their behavior. See the API documentation below for more information.
Scheduler¶
- class distributed.Scheduler(loop=None, services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), contact_address=None, transition_counter_max=False, jupyter=False, **kwargs)[source]¶
Dynamic distributed task scheduler
The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.
All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.
A Scheduler is typically started either with the
dask scheduler
executable:$ dask scheduler Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connection information:
>>> c = Client() >>> c.cluster.scheduler Scheduler(...)
Users typically do not interact with the scheduler directly but rather with the client object
Client
.The
contact_address
parameter allows to advertise a specific address to the workers for communication with the scheduler, which is different than the address the scheduler binds to. This is useful when the scheduler listens on a private address, which therefore cannot be used by the workers to contact it.State
The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.
- tasks:
{task key: TaskState}
Tasks currently known to the scheduler
- tasks:
- unrunnable:
{TaskState}
Tasks in the “no-worker” state
- unrunnable:
- workers:
{worker key: WorkerState}
Workers currently connected to the scheduler
- workers:
- idle:
{WorkerState}
: Set of workers that are not fully utilized
- idle:
- saturated:
{WorkerState}
: Set of workers that are not over-utilized
- saturated:
- host_info:
{hostname: dict}
: Information about each worker host
- host_info:
- clients:
{client key: ClientState}
Clients currently connected to the scheduler
- clients:
- services:
{str: port}
: Other services running on this scheduler, like Bokeh
- services:
- loop:
IOLoop
: The running Tornado IOLoop
- loop:
- client_comms:
{client key: Comm}
For each client, a Comm object used to receive task requests and report task status updates.
- client_comms:
- stream_comms:
{worker key: Comm}
For each worker, a Comm object from which we both accept stimuli and report results
- stream_comms:
- task_duration:
{key-prefix: time}
Time we expect certain functions to take, e.g.
{'sum': 0.25}
- task_duration:
- adaptive_target(target_duration=None)[source]¶
Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling.
- Parameters
- target_durationstr
A desired duration of time for computations to take. This affects how rapidly the scheduler will ask to scale.
See also
- async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) None [source]¶
Add client to network
We listen to all future messages from this Comm.
- add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) Literal['OK', 'not found'] [source]¶
Learn that a worker has certain keys
This should not be used in practice and is mostly here for legacy reasons. However, it is sent by workers from time to time.
- add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) None [source]¶
Add external plugin to scheduler.
See https://distributed.readthedocs.io/en/latest/plugins.html
- Parameters
- pluginSchedulerPlugin
SchedulerPlugin instance to add
- idempotentbool
If true, the plugin is assumed to already exist and no action is taken.
- namestr
A name for the plugin, if None, the name attribute is checked on the Plugin instance and generated if not discovered.
- async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) None [source]¶
Add a new worker to the cluster
- async benchmark_hardware() dict[str, dict[str, float]] [source]¶
Run a benchmark on the workers for memory, disk, and network bandwidths
- Returns
- result: dict
A dictionary mapping the names “disk”, “memory”, and “network” to dictionaries mapping sizes to bandwidths. These bandwidths are averaged over many workers running computations across the cluster.
- async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = 'raise') dict[str, Any] [source]¶
Broadcast message to workers, return all results
- client_releases_keys(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) None [source]¶
Remove keys from client desired list
- async close(fast=None, close_workers=None, reason='unknown')[source]¶
Send cleanup signal to all coroutines then wait until finished
See also
Scheduler.cleanup
- close_worker(worker: str) None [source]¶
Ask a worker to shut itself down. Do not wait for it to take effect. Note that there is no guarantee that the worker will actually accept the command.
Note that
remove_worker()
sends the same command internally if close=True.See also
- coerce_address(addr: str | tuple, resolve: bool = True) str [source]¶
Coerce possible input addresses to canonical form. resolve can be disabled for testing with fake hostnames.
Handles strings, tuples, or aliases.
- async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) None [source]¶
Delete data from a worker and update the corresponding worker/task states
- Parameters
- worker_address: str
Worker address to delete keys from
- keys: list[Key]
List of keys to delete on the specified worker
- async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) None [source]¶
Write a cluster state dump to an fsspec-compatible URL.
- async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) None [source]¶
Provides a data Comm to external requester
Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.
- async gather(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object] [source]¶
Collect data from workers to the scheduler
- async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) set [source]¶
Peer-to-peer copy of keys from multiple workers to a single worker
- Parameters
- worker_address: str
Recipient worker address to copy keys to
- who_has: dict[Key, list[str]]
{key: [sender address, sender address, …], key: …}
- Returns
- returns:
set of keys that failed to be copied
- async get_cluster_state(exclude: collections.abc.Collection[str]) dict [source]¶
Produce the state dict used in a cluster state dump
- async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) list[distributed.scheduler.Transition] [source]¶
RPC hook for
SchedulerState.story()
.Note that the msgpack serialization/deserialization round-trip will transform the
Transition
namedtuples into regular tuples.
- get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) tuple[str, int] | str | None [source]¶
Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.
- Parameters
- workeraddress
- service_namestr
Common services include ‘bokeh’ and ‘nanny’
- protocolboolean
Whether or not to include a full address with protocol (True) or just a (host, port) pair
- handle_long_running(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, compute_duration: float | None, stimulus_id: str) None [source]¶
A task has seceded from the thread pool
We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.
- handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) None [source]¶
Request from a Worker to refresh the who_has for some keys. Not to be confused with scheduler.who_has, which is a dedicated comm RPC request from a Client.
- async handle_worker(comm: distributed.comm.core.Comm, worker: str) None [source]¶
Listen to responses from a single worker
This is the main loop for scheduler-worker interaction
See also
Scheduler.handle_client
Equivalent coroutine for clients
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None [source]¶
Log an event under a given topic
- Parameters
- topicstr, list[str]
Name of the topic under which to log an event. To log the same event under multiple topics, pass a list of topic names.
- msg
Event message to log. Note this must be msgpack serializable.
See also
- async proxy(msg: dict, worker: str, serializers: Any = None) Any [source]¶
Proxy a communication through the scheduler to some other worker
- async rebalance(keys: collections.abc.Iterable[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) dict [source]¶
Rebalance keys so that each worker ends up with roughly the same process memory (managed+unmanaged).
Warning
This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.
Algorithm
Find the mean occupancy of the cluster, defined as data managed by dask + unmanaged process memory that has been there for at least 30 seconds (
distributed.worker.memory.recent-to-old-time
). This lets us ignore temporary spikes caused by task heap usage.Alternatively, you may change how memory is measured both for the individual workers as well as to calculate the mean through
distributed.worker.memory.rebalance.measure
. Namely, this can be useful to disregard inaccurate OS memory measurements.Discard workers whose occupancy is within 5% of the mean cluster occupancy (
distributed.worker.memory.rebalance.sender-recipient-gap
/ 2). This helps avoid data from bouncing around the cluster repeatedly.Workers above the mean are senders; those below are recipients.
Discard senders whose absolute occupancy is below 30% (
distributed.worker.memory.rebalance.sender-min
). In other words, no data is moved regardless of imbalancing as long as all workers are below 30%.Discard recipients whose absolute occupancy is above 60% (
distributed.worker.memory.rebalance.recipient-max
). Note that this threshold by default is the same asdistributed.worker.memory.target
to prevent workers from accepting data and immediately spilling it out to disk.Iteratively pick the sender and recipient that are farthest from the mean and move the least recently inserted key between the two, until either all senders or all recipients fall within 5% of the mean.
A recipient will be skipped if it already has a copy of the data. In other words, this method does not degrade replication. A key will be skipped if there are no recipients available with enough memory to accept the key and that don’t already hold a copy.
The least recently insertd (LRI) policy is a greedy choice with the advantage of being O(1), trivial to implement (it relies on python dict insertion-sorting) and hopefully good enough in most cases. Discarded alternative policies were:
Largest first. O(n*log(n)) save for non-trivial additional data structures and risks causing the largest chunks of data to repeatedly move around the cluster like pinballs.
Least recently used (LRU). This information is currently available on the workers only and not trivial to replicate on the scheduler; transmitting it over the network would be very expensive. Also, note that dask will go out of its way to minimise the amount of time intermediate keys are held in memory, so in such a case LRI is a close approximation of LRU.
- Parameters
- keys: optional
allowlist of dask keys that should be considered for moving. All other keys will be ignored. Note that this offers no guarantee that a key will actually be moved (e.g. because it is unnecessary or because there are no viable recipient workers for it).
- workers: optional
allowlist of workers addresses to be considered as senders or recipients. All other workers will be ignored. The mean cluster occupancy will be calculated only using the allowed workers.
- async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage] [source]¶
Registers a nanny plugin on all running and future nannies
- async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) None [source]¶
Register a plugin on the scheduler.
- async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) dict[str, distributed.core.OKMessage] [source]¶
Registers a worker plugin on all running and future workers
- remove_client(client: str, stimulus_id: str | None = None) None [source]¶
Remove client from network
- remove_plugin(name: str | None = None) None [source]¶
Remove external plugin from scheduler
- Parameters
- namestr
Name of the plugin to remove
- remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) Literal['OK', 'already-removed'] [source]¶
Remove worker from cluster.
We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.
See also
- async replicate(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, stimulus_id=None)[source]¶
Replicate data throughout cluster
This performs a tree copy of the data throughout the network individually on each piece of data.
- Parameters
- keys: Iterable
list of keys to replicate
- n: int
Number of replications we expect to see within the cluster
- branching_factor: int, optional
The number of workers that can copy data in each generation. The larger the branching factor, the more data we copy in a single step, but the more a given worker risks being swamped by data requests.
See also
- report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) None [source]¶
Publish updates to all listening Queues and Comms
If the message contains a key then we only send the message to those comms that care about the key.
- request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None [source]¶
Asynchronously ask a worker to acquire a replica of the listed keys from other workers. This is a fire-and-forget operation which offers no feedback for success or failure, and is intended for housekeeping and not for computation.
- request_remove_replicas(addr: str, keys: list[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) None [source]¶
Asynchronously ask a worker to discard its replica of the listed keys. This must never be used to destroy the last replica of a key. This is a fire-and-forget operation, intended for housekeeping and not for computation.
The replica disappears immediately from TaskState.who_has on the Scheduler side; if the worker refuses to delete, e.g. because the task is a dependency of another task running on it, it will (also asynchronously) inform the scheduler to re-add itself to who_has. If the worker agrees to discard the task, there is no feedback.
- async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) None [source]¶
Forget all tasks and call restart_workers on all workers.
- Parameters
- timeout:
See restart_workers
- wait_for_workers:
See restart_workers
- async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) dict[str, Literal['OK', 'removed', 'timed out']] [source]¶
Restart selected workers. Optionally wait for workers to return.
Workers without nannies are shut down, hoping an external deployment system will restart them. Therefore, if not using nannies and your deployment system does not automatically restart workers,
restart
will just shut down all workers, then time out!After
restart
, all connected workers are new, regardless of whetherTimeoutError
was raised. Any workers that failed to shut down in time are removed, and may or may not shut down on their own in the future.- Parameters
- workers:
List of worker addresses to restart. If omitted, restart all workers.
- timeout:
How long to wait for workers to shut down and come back, if
wait_for_workers
is True, otherwise just how long to wait for workers to shut down. Raisesasyncio.TimeoutError
if this is exceeded.- wait_for_workers:
Whether to wait for all workers to reconnect, or just for them to shut down (default True). Use
restart(wait_for_workers=False)
combined withClient.wait_for_workers()
for granular control over how many workers to wait for.- on_error:
If ‘raise’ (the default), raise if any nanny times out while restarting the worker. If ‘return’, return error messages.
- Returns
- {worker address: “OK”, “no nanny”, or “timed out” or error message}
- async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any] [source]¶
- async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') dict[str, distributed.utils.Any]
- async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") dict[str, distributed.utils.Any]
Gracefully retire workers from cluster. Any key that is in memory exclusively on the retired workers is replicated somewhere else.
- Parameters
- workers: list[str] (optional)
List of worker addresses to retire.
- names: list (optional)
List of worker names to retire. Mutually exclusive with
workers
. If neitherworkers
nornames
are provided, we callworkers_to_close
which finds a good set.- close_workers: bool (defaults to False)
Whether to actually close the worker explicitly from here. Otherwise, we expect some external job scheduler to finish off the worker.
- remove: bool (defaults to True)
Whether to remove the worker metadata immediately or else wait for the worker to contact us.
If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. If close_workers=True and remove=False, this method will return while the workers are still in the cluster, although they won’t accept new tasks. If close_workers=False or for whatever reason a worker doesn’t accept the close command, it will be left permanently unable to accept new tasks and it is expected to be closed in some other way.
- **kwargs: dict
Extra options to pass to workers_to_close to determine which workers we should drop. Only accepted if
workers
andnames
are omitted.
- Returns
- Dictionary mapping worker ID/address to dictionary of information about
- that worker for each retired worker.
- If there are keys that exist in memory only on the workers being retired and it
- was impossible to replicate them somewhere else (e.g. because there aren’t
- any other running workers), the workers holding such keys won’t be retired and
- won’t appear in the returned dict.
See also
- run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) Any [source]¶
Run a function within this process
See also
- async scatter(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]¶
Send data out to workers
See also
- send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) None [source]¶
Send messages to client and workers
- send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState, duration: float = - 1) None [source]¶
Send a single computational task to a worker
- stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool = False) None [source]¶
Stop execution on a list of keys
- stimulus_queue_slots_maybe_opened(*, stimulus_id: str) None [source]¶
Respond to an event which may have opened spots on worker threadpools
Selects the appropriate number of tasks from the front of the queue according to the total number of task slots available on workers (potentially 0), and transitions them to
processing
.Notes
Other transitions related to this stimulus should be fully processed beforehand, so any tasks that became runnable are already in
processing
. Otherwise, overproduction can occur if queued tasks get scheduled before downstream tasks.Must be called after check_idle_saturated; i.e. idle_task_count must be up to date.
- stimulus_task_erred(key=None, worker=None, exception=None, stimulus_id=None, traceback=None, run_id=None, **kwargs)[source]¶
Mark that a task has erred on a particular worker
- stimulus_task_finished(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) tuple[dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]] [source]¶
Mark that a task has finished execution on a particular worker
- transition(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']] [source]¶
Transition a key from its current state to the finish state
- Returns
- Dictionary of recommendations for future transitions
See also
Scheduler.transitions
transitive version of this function
Examples
>>> self.transition('x', 'waiting') {'x': 'processing'}
- transitions(recommendations: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) None [source]¶
Process transitions until none are left
This includes feedback from previous transitions and continues until we reach a steady state
- async unregister_nanny_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage] [source]¶
Unregisters a worker plugin
- async unregister_worker_plugin(comm: None, name: str) dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage] [source]¶
Unregisters a worker plugin
- update_data(*, who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) None [source]¶
Learn that new data has entered the network from an external source
- worker_send(worker: str, msg: dict[str, Any]) None [source]¶
Send message to worker
This also handles connection failures by adding a callback to remove the worker on the next cycle.
- workers_list(workers: collections.abc.Iterable[str] | None) list[str] [source]¶
List of qualifying workers
Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match
- workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') list[str] [source]¶
Find workers that we can close with low cost
This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.
This is for use with systems like
distributed.deploy.adaptive
.- Parameters
- memory_ratioNumber
Amount of extra space we want to have for our stored data. Defaults to 2, or that we want to have twice as much memory as we currently have data.
- nint
Number of workers to close
- minimumint
Minimum number of workers to keep around
- keyCallable(WorkerState)
An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when closing workers must be done collectively, such as by hostname.
- targetint
Target number of workers to have after we close
- attributestr
The attribute of the WorkerState object to return, like “address” or “name”. Defaults to “address”.
- Returns
- to_close: list of worker addresses that are OK to close
See also
Examples
>>> scheduler.workers_to_close() ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']
Group workers by hostname prior to closing
>>> scheduler.workers_to_close(key=lambda ws: ws.host) ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']
Remove two workers
>>> scheduler.workers_to_close(n=2)
Keep enough workers to have twice as much memory as we we need.
>>> scheduler.workers_to_close(memory_ratio=2)
Worker¶
- class distributed.Worker(scheduler_ip: str | None = None, scheduler_port: int | None = None, *, scheduler_file: str | None = None, nthreads: int | None = None, loop: IOLoop | None = None, local_directory: str | None = None, services: dict | None = None, name: Any | None = None, reconnect: bool | None = None, executor: Executor | dict[str, Executor] | Literal['offload'] | None = None, resources: dict[str, float] | None = None, silence_logs: int | None = None, death_timeout: Any | None = None, preload: list[str] | None = None, preload_argv: list[str] | list[list[str]] | None = None, security: Security | dict[str, Any] | None = None, contact_address: str | None = None, heartbeat_interval: Any = '1s', extensions: dict[str, type] | None = None, metrics: Mapping[str, Callable[[Worker], Any]] = {}, startup_information: Mapping[str, Callable[[Worker], Any]] = {}, interface: str | None = None, host: str | None = None, port: int | str | Collection[int] | None = None, protocol: str | None = None, dashboard_address: str | None = None, dashboard: bool = False, http_prefix: str = '/', nanny: Nanny | None = None, plugins: tuple[WorkerPlugin, ...] = (), low_level_profiler: bool | None = None, validate: bool | None = None, profile_cycle_interval=None, lifetime: Any | None = None, lifetime_stagger: Any | None = None, lifetime_restart: bool | None = None, transition_counter_max: int | Literal[False] = False, memory_limit: str | float = 'auto', data: WorkerDataParameter = None, memory_target_fraction: float | Literal[False] | None = None, memory_spill_fraction: float | Literal[False] | None = None, memory_pause_fraction: float | Literal[False] | None = None, scheduler_sni: str | None = None, **kwargs)[source]¶
Worker node in a Dask distributed cluster
Workers perform two functions:
Serve data from a local dictionary
Perform computation on that data and on data from peers
Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.
You can start a worker with the
dask worker
command line application:$ dask worker scheduler-ip:port
Use the
--help
flag to see more options:$ dask worker --help
The rest of this docstring is about the internal state that the worker uses to manage and track internal computations.
State
Informational State
These attributes don’t change significantly during execution.
- nthreads:
int
: Number of nthreads used by this worker process
- nthreads:
- executors:
dict[str, concurrent.futures.Executor]
: Executors used to perform computation. Always contains the default executor.
- executors:
- local_directory:
path
: Path on local machine to store temporary files
- local_directory:
- scheduler:
PooledRPCCall
: Location of scheduler. See
.ip/.port
attributes.
- scheduler:
- name:
string
: Alias
- name:
- services:
{str: Server}
: Auxiliary web servers running on this worker
- services:
service_ports:
{str: port}
:- transfer_outgoing_count_limit:
int
The maximum number of concurrent outgoing data transfers. See also
distributed.worker_state_machine.WorkerState.transfer_incoming_count_limit
.
- transfer_outgoing_count_limit:
- batched_stream:
BatchedSend
A batched stream along which we communicate to the scheduler
- batched_stream:
- log:
[(message)]
A structured and queryable log. See
Worker.story
- log:
Volatile State
These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a
key
is the name of a task that we want to compute anddep
is the name of a piece of dependent data that we want to collect from others.- threads:
{key: int}
The ID of the thread on which the task ran
- threads:
- active_threads:
{int: key}
The keys currently running on active threads
- active_threads:
- state:
WorkerState
Encapsulated state machine. See
BaseWorker
andWorkerState
- state:
- Parameters
- scheduler_ip: str, optional
- scheduler_port: int, optional
- scheduler_file: str, optional
- host: str, optional
- data: MutableMapping, type, None
The object to use for storage, builds a disk-backed LRU dict by default.
If a callable to construct the storage object is provided, it will receive the worker’s attr:
local_directory
as an argument if the calling signature has an argument namedworker_local_directory
.- nthreads: int, optional
- local_directory: str, optional
Directory where we place local resources
- name: str, optional
- memory_limit: int, float, string
Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9
- memory_target_fraction: float or False
Fraction of memory to try to stay beneath (default: read from config key distributed.worker.memory.target)
- memory_spill_fraction: float or False
Fraction of memory at which we start spilling to disk (default: read from config key distributed.worker.memory.spill)
- memory_pause_fraction: float or False
Fraction of memory at which we stop running new tasks (default: read from config key distributed.worker.memory.pause)
- max_spill: int, string or False
Limit of number of bytes to be spilled on disk. (default: read from config key distributed.worker.memory.max-spill)
- executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], “offload”
- The executor(s) to use. Depending on the type, it has the following meanings:
Executor instance: The default executor.
Dict[str, Executor]: mapping names to Executor instances. If the “default” key isn’t in the dict, a “default” executor will be created using
ThreadPoolExecutor(nthreads)
.Str: The string “offload”, which refer to the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.
- resources: dict
Resources that this worker has like
{'GPU': 2}
- nanny: str
Address on which to contact nanny, if it exists
- lifetime: str
Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.
- lifetime_stagger: str
Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger
- lifetime_restart: bool
Whether or not to restart a worker after it has reached its lifetime Default False
- kwargs: optional
Additional parameters to ServerNode constructor
Examples
Use the command line to start a worker:
$ dask scheduler Start scheduler at 127.0.0.1:8786 $ dask worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786
- batched_send(msg: dict[str, Any]) None [source]¶
Implements BaseWorker abstract method.
Send a fire-and-forget message to the scheduler through bulk comms.
If we’re not currently connected to the scheduler, the message will be silently dropped!
- async close(timeout: float = 30, executor_wait: bool = True, nanny: bool = True, reason: str = 'worker-close') str | None [source]¶
Close the worker
Close asynchronous operations running on the worker, stop all executors and comms. If requested, this also closes the nanny.
- Parameters
- timeout
Timeout in seconds for shutting down individual instructions
- executor_wait
If True, shut down executors synchronously, otherwise asynchronously
- nanny
If True, close the nanny
- reason
Reason for closing the worker
- Returns
- str | None
None if worker already in closing state or failed, “OK” otherwise
- async close_gracefully(restart=None, reason: str = 'worker-close-gracefully')[source]¶
Gracefully shut down a worker
This first informs the scheduler that we’re shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal
- property data: collections.abc.MutableMapping[Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object]¶
{task key: task payload} of all completed tasks, whether they were computed on this Worker or computed somewhere else and then transferred here over the network.
When using the default configuration, this is a zict buffer that automatically spills to disk whenever the target threshold is exceeded. If spilling is disabled, it is a plain dict instead. It could also be a user-defined arbitrary dict-like passed when initialising the Worker or the Nanny. Worker logic should treat this opaquely and stick to the MutableMapping API.
Note
This same collection is also available at
self.state.data
andself.memory_manager.data
.
- digest_metric(name: collections.abc.Hashable, value: float) None [source]¶
Implement BaseWorker.digest_metric by calling Server.digest_metric
- async execute(key: Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
Execute a task. Implements BaseWorker abstract method.
- async gather(who_has: dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) dict[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]], object] [source]¶
Endpoint used by Scheduler.rebalance() and Scheduler.replicate()
- async gather_dep(worker: str, to_gather: collections.abc.Collection[typing.Union[str, bytes, int, float, tuple[typing.Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]], ...]]], total_nbytes: int, *, stimulus_id: str) distributed.worker_state_machine.StateMachineEvent [source]¶
Implements BaseWorker abstract method
- get_current_task() Union[str, bytes, int, float, tuple[ForwardRef('Key'), ...]] [source]¶
Get the key of the task we are currently running
This only makes sense to run within a task
See also
get_worker
Examples
>>> from dask.distributed import get_worker >>> def f(): ... return get_worker().get_current_task()
>>> future = client.submit(f) >>> future.result() 'f-1234'
- handle_stimulus(*stims: distributed.worker_state_machine.StateMachineEvent) None [source]¶
Override BaseWorker method for added validation
- log_event(topic: str | collections.abc.Collection[str], msg: Any) None [source]¶
Log an event under a given topic
- Parameters
- topicstr, list[str]
Name of the topic under which to log an event. To log the same event under multiple topics, pass a list of topic names.
- msg
Event message to log. Note this must be msgpack serializable.
See also
- async retry_busy_worker_later(worker: str) distributed.worker_state_machine.StateMachineEvent [source]¶
Wait some time, then take a peer worker out of busy state. Implements BaseWorker abstract method.
- async start_unsafe()[source]¶
Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts.
This is intended to be overwritten or called by subclasses. For a safe startup, please use
Server.start
instead.If
death_timeout
is configured, we will require this coroutine to finish before this timeout is reached. If the timeout is reached we will close the instance and raise anasyncio.TimeoutError
- transfer_outgoing_bytes_total: int¶
Total size of data transfers to other workers (including in-progress and failed transfers)
- transfer_outgoing_count_total: int¶
Total number of data transfers to other workers since the worker was started
- trigger_profile() None [source]¶
Get a frame from all actively computing threads
Merge these frames into existing profile counts
- property worker_address¶
For API compatibility with Nanny
Nanny¶
- class distributed.Nanny(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port: int | str | collections.abc.Collection[int] | None = 0, nthreads=None, loop=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port: int | str | collections.abc.Collection[int] | None = None, protocol=None, config=None, **worker_kwargs)[source]¶
A process to manage worker processes
The nanny spins up Worker processes, watches them, and kills or restarts them as necessary. It is necessary if you want to use the
Client.restart
method, or to restart the worker automatically if it gets to the terminate fraction of its memory limit.The parameters for the Nanny are mostly the same as those for the Worker with exceptions listed below.
- Parameters
- env: dict, optional
Environment variables set at time of Nanny initialization will be ensured to be set in the Worker process as well. This argument allows to overwrite or otherwise set environment variables for the Worker. It is also possible to set environment variables using the option
distributed.nanny.environ
. Precedence as followsNanny arguments
Existing environment variables
Dask configuration
Note
Some environment variables, like
OMP_NUM_THREADS
, must be set before importing numpy to have effect. Others, likeMALLOC_TRIM_THRESHOLD_
(see Memory not released back to the OS), must be set before starting the Linux process. Such variables would be ineffective if set here or indistributed.nanny.environ
; they must be set indistributed.nanny.pre-spawn-environ
so that they are set before spawning the subprocess, even if this means poisoning the process running the Nanny.For the same reason, be warned that changing
distributed.worker.multiprocessing-method
fromspawn
tofork
orforkserver
may inhibit some environment variables; if you do, you should set the variables yourself in the shell before you startdask-worker
.
See also
- async close(timeout: float = 5, reason: str = 'nanny-close') Literal['OK'] [source]¶
Close the worker process, stop all comms.
- close_gracefully(reason: str = 'nanny-close-gracefully') None [source]¶
A signal that we shouldn’t try to restart workers if they go away
This is used as part of the cluster shutdown process.
- async instantiate() distributed.core.Status [source]¶
Start a local worker process
Blocks until the process is up and the scheduler is properly informed
- async kill(timeout: float = 5, reason: str = 'nanny-kill') None [source]¶
Kill the local worker process
Blocks until both the process is down and the scheduler is properly informed