Python API (advanced)¶
In some rare cases, experts may want to create Scheduler
, Worker
, and
Nanny
objects explicitly in Python. This is often necessary when making
tools to automatically deploy Dask in custom settings.
It is more common to create a Local cluster with Client() on a single machine or use the Command Line Interface (CLI). New readers are recommended to start there.
If you do want to start Scheduler and Worker objects yourself you should be a
little familiar with async
/await
style Python syntax. These objects
are awaitable and are commonly used within async with
context managers.
Here are a few examples to show a few ways to start and finish things.
Full Example¶
Scheduler ([loop, delete_interval, …]) |
Dynamic distributed task scheduler |
Worker ([scheduler_ip, scheduler_port, …]) |
Worker node in a Dask distributed cluster |
Client ([address, loop, timeout, …]) |
Connect to and submit computation to a Dask cluster |
We first start with a comprehensive example of setting up a Scheduler, two Workers, and one Client in the same event loop, running a simple computation, and then cleaning everything up.
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
Now we look at simpler examples that build up to this case.
Scheduler¶
Scheduler ([loop, delete_interval, …]) |
Dynamic distributed task scheduler |
We create scheduler by creating a Scheduler()
object, and then await
that object to wait for it to start up. We can then wait on the .finished
method to wait until it closes. In the meantime the scheduler will be active
managing the cluster..
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
s = Scheduler() # scheduler created, but not yet running
s = await s # the scheduler is running
await s.finished() # wait until the scheduler closes
asyncio.get_event_loop().run_until_complete(f())
This program will run forever, or until some external process connects to the
scheduler and tells it to stop. If you want to close things yourself you can
close any Scheduler
, Worker
, Nanny
, or Client
class by awaiting
the .close
method:
await s.close()
Worker¶
Worker ([scheduler_ip, scheduler_port, …]) |
Worker node in a Dask distributed cluster |
The worker follows the same API. The only difference is that the worker needs to know the address of the scheduler.
import asyncio
from dask.distributed import Scheduler, Worker
async def f(scheduler_address):
w = await Worker(scheduler_address)
await w.finished()
asyncio.get_event_loop().run_until_complete(f("tcp://127.0.0.1:8786"))
Start many in one event loop¶
Scheduler ([loop, delete_interval, …]) |
Dynamic distributed task scheduler |
Worker ([scheduler_ip, scheduler_port, …]) |
Worker node in a Dask distributed cluster |
We can run as many of these objects as we like in the same event loop.
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
s = await Scheduler()
w = await Worker(s.address)
await w.finished()
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
Use Context Managers¶
We can also use async with
context managers to make sure that we clean up
properly. Here is the same example as from above:
import asyncio
from dask.distributed import Scheduler, Worker
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w:
await w.finished()
await s.finished()
asyncio.get_event_loop().run_until_complete(f())
Alternatively, in the example below we also include a Client
, run a small
computation, and then allow things to clean up after that computation..
import asyncio
from dask.distributed import Scheduler, Worker, Client
async def f():
async with Scheduler() as s:
async with Worker(s.address) as w1, Worker(s.address) as w2:
async with Client(s.address, asynchronous=True) as client:
future = client.submit(lambda x: x + 1, 10)
result = await future
print(result)
asyncio.get_event_loop().run_until_complete(f())
This is equivalent to creating and awaiting
each server, and then calling
.close
on each as we leave the context.
In this example we don’t wait on s.finished()
, so this will terminate
relatively quickly. You could have called await s.finished()
though if you
wanted this to run forever.
Nanny¶
Nanny ([scheduler_ip, scheduler_port, …]) |
A process to manage worker processes |
Alternatively, we can replace Worker
with Nanny
if we want your workers
to be managed in a separate process. The Nanny
constructor follows the
same API. This allows workers to restart themselves in case of failure. Also,
it provides some additional monitoring, and is useful when coordinating many
workers that should live in different processes in order to avoid the GIL.
# w = await Worker(s.address)
w = await Nanny(s.address)
API¶
These classes have a variety of keyword arguments that you can use to control their behavior. See the API documentation below for more information.
Scheduler¶
-
class
distributed.
Scheduler
(loop=None, delete_interval='500ms', synchronize_worker_interval='60s', services=None, service_kwargs=None, allowed_failures=None, extensions=None, validate=None, scheduler_file=None, security=None, worker_ttl=None, idle_timeout=None, interface=None, host=None, port=0, protocol=None, dashboard_address=None, dashboard=None, http_prefix='/', preload=None, preload_argv=(), plugins=(), **kwargs)[source]¶ Dynamic distributed task scheduler
The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.
All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.
The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.
A Scheduler is typically started either with the
dask-scheduler
executable:$ dask-scheduler Scheduler started at 127.0.0.1:8786
Or within a LocalCluster a Client starts up without connection information:
>>> c = Client() >>> c.cluster.scheduler Scheduler(...)
Users typically do not interact with the scheduler directly but rather with the client object
Client
.State
The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.
- tasks:
{task key: TaskState}
- Tasks currently known to the scheduler
- tasks:
- unrunnable:
{TaskState}
- Tasks in the “no-worker” state
- unrunnable:
- workers:
{worker key: WorkerState}
- Workers currently connected to the scheduler
- workers:
- idle:
{WorkerState}
: - Set of workers that are not fully utilized
- idle:
- saturated:
{WorkerState}
: - Set of workers that are not over-utilized
- saturated:
- host_info:
{hostname: dict}
: - Information about each worker host
- host_info:
- clients:
{client key: ClientState}
- Clients currently connected to the scheduler
- clients:
- services:
{str: port}
: - Other services running on this scheduler, like Bokeh
- services:
- loop:
IOLoop
: - The running Tornado IOLoop
- loop:
- client_comms:
{client key: Comm}
- For each client, a Comm object used to receive task requests and report task status updates.
- client_comms:
- stream_comms:
{worker key: Comm}
- For each worker, a Comm object from which we both accept stimuli and report results
- stream_comms:
- task_duration:
{key-prefix: time}
- Time we expect certain functions to take, e.g.
{'sum': 0.25}
- task_duration:
-
adaptive_target
(comm=None, target_duration=None)[source]¶ Desired number of workers based on the current workload
This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling.
Parameters: - target_duration : str
A desired duration of time for computations to take. This affects how rapidly the scheduler will ask to scale.
See also
-
add_client
(comm, client=None, versions=None)[source]¶ Add client to network
We listen to all future messages from this Comm.
-
add_keys
(comm=None, worker=None, keys=())[source]¶ Learn that a worker has certain keys
This should not be used in practice and is mostly here for legacy reasons. However, it is sent by workers from time to time.
-
add_plugin
(plugin=None, idempotent=False, **kwargs)[source]¶ Add external plugin to scheduler
See https://distributed.readthedocs.io/en/latest/plugins.html
-
add_worker
(comm=None, address=None, keys=(), nthreads=None, name=None, resolve_address=True, nbytes=None, types=None, now=None, resources=None, host_info=None, memory_limit=None, metrics=None, pid=0, services=None, local_directory=None, versions=None, nanny=None, extra=None)[source]¶ Add a new worker to the cluster
-
broadcast
(comm=None, msg=None, workers=None, hosts=None, nanny=False, serializers=None)[source]¶ Broadcast message to workers, return all results
-
close
(comm=None, fast=False, close_workers=False)[source]¶ Send cleanup signal to all coroutines then wait until finished
See also
Scheduler.cleanup
-
close_worker
(comm=None, worker=None, safe=None)[source]¶ Remove a worker from the cluster
This both removes the worker from our local state and also sends a signal to the worker to shut down. This works regardless of whether or not the worker has a nanny process restarting it
-
coerce_address
(addr, resolve=True)[source]¶ Coerce possible input addresses to canonical form. resolve can be disabled for testing with fake hostnames.
Handles strings, tuples, or aliases.
-
feed
(comm, function=None, setup=None, teardown=None, interval='1s', **kwargs)[source]¶ Provides a data Comm to external requester
Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.
-
get_worker_service_addr
(worker, service_name, protocol=False)[source]¶ Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.
Parameters: - worker : address
- service_name : str
Common services include ‘bokeh’ and ‘nanny’
- protocol : boolean
Whether or not to include a full address with protocol (True) or just a (host, port) pair
-
handle_long_running
(key=None, worker=None, compute_duration=None)[source]¶ A task has seceded from the thread pool
We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.
-
handle_worker
(comm=None, worker=None)[source]¶ Listen to responses from a single worker
This is the main loop for scheduler-worker interaction
See also
Scheduler.handle_client
- Equivalent coroutine for clients
-
proxy
(comm=None, msg=None, worker=None, serializers=None)[source]¶ Proxy a communication through the scheduler to some other worker
-
rebalance
(comm=None, keys=None, workers=None)[source]¶ Rebalance keys so that each worker stores roughly equal bytes
Policy
This orders the workers by what fraction of bytes of the existing keys they have. It walks down this list from most-to-least. At each worker it sends the largest results it can find and sends them to the least occupied worker until either the sender or the recipient are at the average expected load.
-
reevaluate_occupancy
(worker_index: ctypes.c_long = 0)[source]¶ Periodically reassess task duration time
The expected duration of a task can change over time. Unfortunately we don’t have a good constant-time way to propagate the effects of these changes out to the summaries that they affect, like the total expected runtime of each of the workers, or what tasks are stealable.
In this coroutine we walk through all of the workers and re-align their estimates with the current state of tasks. We do this periodically rather than at every transition, and we only do it if the scheduler process isn’t under load (using psutil.Process.cpu_percent()). This lets us avoid this fringe optimization when we have better things to think about.
-
register_worker_plugin
(comm, plugin, name=None)[source]¶ Registers a setup function, and call it on every worker
-
remove_worker
(comm=None, address=None, safe=False, close=True)[source]¶ Remove worker from cluster
We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.
-
replicate
(comm=None, keys=None, n=None, workers=None, branching_factor=2, delete=True, lock=True)[source]¶ Replicate data throughout cluster
This performs a tree copy of the data throughout the network individually on each piece of data.
Parameters: - keys: Iterable
list of keys to replicate
- n: int
Number of replications we expect to see within the cluster
- branching_factor: int, optional
The number of workers that can copy data in each generation. The larger the branching factor, the more data we copy in a single step, but the more a given worker risks being swamped by data requests.
See also
-
report
(msg: dict, ts: distributed.scheduler.TaskState = None, client: str = None)[source]¶ Publish updates to all listening Queues and Comms
If the message contains a key then we only send the message to those comms that care about the key.
-
reschedule
(key=None, worker=None)[source]¶ Reschedule a task
Things may have shifted and this task may now be better suited to run elsewhere
-
retire_workers
(comm=None, workers=None, remove=True, close_workers=False, names=None, lock=True, **kwargs) → dict[source]¶ Gracefully retire workers from cluster
Parameters: - workers: list (optional)
List of worker addresses to retire. If not provided we call
workers_to_close
which finds a good set- names: list (optional)
List of worker names to retire.
- remove: bool (defaults to True)
Whether or not to remove the worker metadata immediately or else wait for the worker to contact us
- close_workers: bool (defaults to False)
Whether or not to actually close the worker explicitly from here. Otherwise we expect some external job scheduler to finish off the worker.
- **kwargs: dict
Extra options to pass to workers_to_close to determine which workers we should drop
Returns: - Dictionary mapping worker ID/address to dictionary of information about
- that worker for each retired worker.
See also
-
run_function
(stream, function, args=(), kwargs={}, wait=True)[source]¶ Run a function within this process
See also
-
scatter
(comm=None, data=None, workers=None, client=None, broadcast=False, timeout=2)[source]¶ Send data out to workers
See also
-
send_task_to_worker
(worker, ts: distributed.scheduler.TaskState, duration: ctypes.c_double = -1)[source]¶ Send a single computational task to a worker
-
start_ipython
(comm=None)[source]¶ Start an IPython kernel
Returns Jupyter connection info dictionary.
-
stimulus_cancel
(comm, keys=None, client=None, force=False)[source]¶ Stop execution on a list of keys
-
stimulus_missing_data
(cause=None, key=None, worker=None, ensure=True, **kwargs)[source]¶ Mark that certain keys have gone missing. Recover.
-
stimulus_task_erred
(key=None, worker=None, exception=None, traceback=None, **kwargs)[source]¶ Mark that a task has erred on a particular worker
-
stimulus_task_finished
(key=None, worker=None, **kwargs)[source]¶ Mark that a task has finished execution on a particular worker
-
transition
(key, finish: str, *args, **kwargs)[source]¶ Transition a key from its current state to the finish state
Returns: - Dictionary of recommendations for future transitions
See also
Scheduler.transitions
- transitive version of this function
Examples
>>> self.transition('x', 'waiting') {'x': 'processing'}
-
transition_story
(*keys)¶ Get all transitions that touch one of the input keys
-
transitions
(recommendations: dict)[source]¶ Process transitions until none are left
This includes feedback from previous transitions and continues until we reach a steady state
-
update_data
(comm=None, who_has=None, nbytes: dict = None, client=None, serializers=None)[source]¶ Learn that new data has entered the network from an external source
See also
Scheduler.mark_key_in_memory
-
update_graph
(client=None, tasks=None, keys=None, dependencies=None, restrictions=None, priority=None, loose_restrictions=None, resources=None, submitting_task=None, retries=None, user_priority=0, actors=None, fifo_timeout=0, annotations=None)[source]¶ Add new computations to the internal dask graph
This happens whenever the Client calls submit, map, get, or compute.
-
worker_send
(worker, msg)[source]¶ Send message to worker
This also handles connection failures by adding a callback to remove the worker on the next cycle.
-
workers_list
(workers)[source]¶ List of qualifying workers
Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match
-
workers_to_close
(comm=None, memory_ratio=None, n=None, key=None, minimum=None, target=None, attribute='address')[source]¶ Find workers that we can close with low cost
This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.
This is for use with systems like
distributed.deploy.adaptive
.Parameters: - memory_factor : Number
Amount of extra space we want to have for our stored data. Defaults two 2, or that we want to have twice as much memory as we currently have data.
- n : int
Number of workers to close
- minimum : int
Minimum number of workers to keep around
- key : Callable(WorkerState)
An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when closing workers must be done collectively, such as by hostname.
- target : int
Target number of workers to have after we close
- attribute : str
The attribute of the WorkerState object to return, like “address” or “name”. Defaults to “address”.
Returns: - to_close: list of worker addresses that are OK to close
See also
Examples
>>> scheduler.workers_to_close() ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']
Group workers by hostname prior to closing
>>> scheduler.workers_to_close(key=lambda ws: ws.host) ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']
Remove two workers
>>> scheduler.workers_to_close(n=2)
Keep enough workers to have twice as much memory as we we need.
>>> scheduler.workers_to_close(memory_ratio=2)
Worker¶
-
class
distributed.
Worker
(scheduler_ip=None, scheduler_port=None, scheduler_file=None, ncores=None, nthreads=None, loop=None, local_dir=None, local_directory=None, services=None, service_ports=None, service_kwargs=None, name=None, reconnect=True, memory_limit='auto', executor=None, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, security=None, contact_address=None, memory_monitor_interval='200ms', extensions=None, metrics={}, startup_information={}, data=None, interface=None, host=None, port=None, protocol=None, dashboard_address=None, dashboard=False, http_prefix='/', nanny=None, plugins=(), low_level_profiler=False, validate=None, profile_cycle_interval=None, lifetime=None, lifetime_stagger=None, lifetime_restart=None, **kwargs)[source]¶ Worker node in a Dask distributed cluster
Workers perform two functions:
- Serve data from a local dictionary
- Perform computation on that data and on data from peers
Workers keep the scheduler informed of their data and use that scheduler to gather data from other workers when necessary to perform a computation.
You can start a worker with the
dask-worker
command line application:$ dask-worker scheduler-ip:port
Use the
--help
flag to see more options:$ dask-worker --help
The rest of this docstring is about the internal state the the worker uses to manage and track internal computations.
State
Informational State
These attributes don’t change significantly during execution.
- nthreads:
int
: - Number of nthreads used by this worker process
- nthreads:
- executor:
concurrent.futures.ThreadPoolExecutor
: - Executor used to perform computation This can also be the string “offload” in which case this uses the same thread pool used for offloading communications. This results in the same thread being used for deserialization and computation.
- executor:
- local_directory:
path
: - Path on local machine to store temporary files
- local_directory:
- scheduler:
rpc
: - Location of scheduler. See
.ip/.port
attributes.
- scheduler:
- name:
string
: - Alias
- name:
- services:
{str: Server}
: - Auxiliary web servers running on this worker
- services:
- service_ports:
{str: port}
: - total_out_connections:
int
- The maximum number of concurrent outgoing requests for data
- total_out_connections:
- total_in_connections:
int
- The maximum number of concurrent incoming requests for data
- total_in_connections:
- total_comm_nbytes:
int
- batched_stream:
BatchedSend
- A batched stream along which we communicate to the scheduler
- batched_stream:
- log:
[(message)]
- A structured and queryable log. See
Worker.story
- log:
Volatile State
These attributes track the progress of tasks that this worker is trying to complete. In the descriptions below a
key
is the name of a task that we want to compute anddep
is the name of a piece of dependent data that we want to collect from others.- tasks:
{key: TaskState}
- The tasks currently executing on this worker (and any dependencies of those tasks)
- tasks:
- data:
{key: object}
: - Prefer using the host attribute instead of this, unless memory_limit and at least one of memory_target_fraction or memory_spill_fraction values are defined, in that case, this attribute is a zict.Buffer, from which information on LRU cache can be queried.
- data:
- data.memory:
{key: object}
: - Dictionary mapping keys to actual values stored in memory. Only available if condition for data being a zict.Buffer is met.
- data.memory:
- data.disk:
{key: object}
: - Dictionary mapping keys to actual values stored on disk. Only available if condition for data being a zict.Buffer is met.
- data.disk:
- data_needed: deque(keys)
- The keys which still require data in order to execute, arranged in a deque
- ready: [keys]
- Keys that are ready to run. Stored in a LIFO stack
- constrained: [keys]
- Keys for which we have the data to run, but are waiting on abstract resources like GPUs. Stored in a FIFO deque
- executing_count:
int
- A count of tasks currently executing on this worker
- executing_count:
- executed_count: int
- A number of tasks that this worker has run in its lifetime
- long_running: {keys}
- A set of keys of tasks that are running and have started their own long-running clients.
- has_what:
{worker: {deps}}
- The data that we care about that we think a worker has
- has_what:
- pending_data_per_worker:
{worker: [dep]}
- The data on each worker that we still want, prioritized as a deque
- pending_data_per_worker:
- in_flight_tasks:
int
- A count of the number of tasks that are coming to us in current peer-to-peer connections
- in_flight_tasks:
- in_flight_workers:
{worker: {task}}
- The workers from which we are currently gathering data and the dependencies we expect from those connections
- in_flight_workers:
- comm_bytes:
int
- The total number of bytes in flight
- comm_bytes:
- threads:
{key: int}
- The ID of the thread on which the task ran
- threads:
- active_threads:
{int: key}
- The keys currently running on active threads
- active_threads:
- waiting_for_data_count:
int
- A count of how many tasks are currently waiting for data
- waiting_for_data_count:
Parameters: - scheduler_ip: str
- scheduler_port: int
- ip: str, optional
- data: MutableMapping, type, None
The object to use for storage, builds a disk-backed LRU dict by default
- nthreads: int, optional
- loop: tornado.ioloop.IOLoop
- local_directory: str, optional
Directory where we place local resources
- name: str, optional
- memory_limit: int, float, string
Number of bytes of memory that this worker should use. Set to zero for no limit. Set to ‘auto’ to calculate as system.MEMORY_LIMIT * min(1, nthreads / total_cores) Use strings or numbers like 5GB or 5e9
- memory_target_fraction: float
Fraction of memory to try to stay beneath
- memory_spill_fraction: float
Fraction of memory at which we start spilling to disk
- memory_pause_fraction: float
Fraction of memory at which we stop running new tasks
- executor: concurrent.futures.Executor
- resources: dict
Resources that this worker has like
{'GPU': 2}
- nanny: str
Address on which to contact nanny, if it exists
- lifetime: str
Amount of time like “1 hour” after which we gracefully shut down the worker. This defaults to None, meaning no explicit shutdown time.
- lifetime_stagger: str
Amount of time like “5 minutes” to stagger the lifetime value The actual lifetime will be selected uniformly at random between lifetime +/- lifetime_stagger
- lifetime_restart: bool
Whether or not to restart a worker after it has reached its lifetime Default False
See also
distributed.scheduler.Scheduler
distributed.nanny.Nanny
Examples
Use the command line to start a worker:
$ dask-scheduler Start scheduler at 127.0.0.1:8786 $ dask-worker 127.0.0.1:8786 Start worker at: 127.0.0.1:1234 Registered with scheduler at: 127.0.0.1:8786
-
close_gracefully
(restart=None)[source]¶ Gracefully shut down a worker
This first informs the scheduler that we’re shutting down, and asks it to move our data elsewhere. Afterwards, we close as normal
-
executor_submit
(key, function, args=(), kwargs=None, executor=None)[source]¶ Safely run function in thread pool executor
We’ve run into issues running concurrent.future futures within tornado. Apparently it’s advantageous to use timeouts and periodic callbacks to ensure things run smoothly. This can get tricky, so we pull it off into an separate method.
-
gather_dep
(worker, dep, deps, total_nbytes, cause=None)[source]¶ Gather dependencies for a task from a worker who has them
Parameters: - worker : str
address of worker to gather dependency from
- dep : TaskState
task we want to gather dependencies for
- deps : list
keys of dependencies to gather from worker – this is not necessarily equivalent to the full list of dependencies of
dep
as some dependencies may already be present on this worker.
-
get_current_task
()[source]¶ Get the key of the task we are currently running
This only makes sense to run within a task
See also
get_worker
Examples
>>> from dask.distributed import get_worker >>> def f(): ... return get_worker().get_current_task()
>>> future = client.submit(f) # doctest: +SKIP >>> future.result() # doctest: +SKIP 'f-1234'
-
local_dir
¶ For API compatibility with Nanny
-
memory_monitor
()[source]¶ Track this process’s memory usage and act accordingly
If we rise above 70% memory use, start dumping data to disk.
If we rise above 80% memory use, stop execution of new tasks
-
transition_fetch_waiting
(ts, runspec)[source]¶ This is a rescheduling transition that occurs after a worker failure. A task was available from another worker but that worker died and the scheduler reassigned the task for computation here.
-
transition_flight_waiting
(ts, runspec)[source]¶ This is a rescheduling transition that occurs after a worker failure. A task was in flight from another worker to this worker when that worker died and the scheduler reassigned the task for computation here.
-
trigger_profile
()[source]¶ Get a frame from all actively computing threads
Merge these frames into existing profile counts
-
worker_address
¶ For API compatibility with Nanny
Nanny¶
-
class
distributed.
Nanny
(scheduler_ip=None, scheduler_port=None, scheduler_file=None, worker_port=0, nthreads=None, ncores=None, loop=None, local_dir=None, local_directory=None, services=None, name=None, memory_limit='auto', reconnect=True, validate=False, quiet=False, resources=None, silence_logs=None, death_timeout=None, preload=None, preload_argv=None, preload_nanny=None, preload_nanny_argv=None, security=None, contact_address=None, listen_address=None, worker_class=None, env=None, interface=None, host=None, port=None, protocol=None, config=None, **worker_kwargs)[source]¶ A process to manage worker processes
The nanny spins up Worker processes, watches then, and kills or restarts them as necessary. It is necessary if you want to use the
Client.restart
method, or to restart the worker automatically if it gets to the terminate fractiom of its memory limit.The parameters for the Nanny are mostly the same as those for the Worker.
See also
-
close_gracefully
(comm=None)[source]¶ A signal that we shouldn’t try to restart workers if they go away
This is used as part of the cluster shutdown process.
-
instantiate
(comm=None) → distributed.core.Status[source]¶ Start a local worker process
Blocks until the process is up and the scheduler is properly informed
-
kill
(comm=None, timeout=2)[source]¶ Kill the local worker process
Blocks until both the process is down and the scheduler is properly informed
-
local_dir
¶ For API compatibility with Nanny
-