Often we want to run custom code when we start up or tear down a scheduler or
worker. We might do this manually with functions like
Client.run_on_scheduler, but this is error prone and difficult to automate.
To resolve this, Dask includes a few mechanisms to run arbitrary code around the lifecycle of a Scheduler or Worker.
dask-worker support a
--preload option that
allows custom initialization of each scheduler/worker respectively. A module or
Python file passed as a
--preload value is guaranteed to be imported before
establishing any connection. A
dask_setup(service) function is called if
found, with a
Worker instance as the argument. As the
dask_teardown(service) is called if present.
To support additional configuration, a single
--preload module may register
additional command-line arguments by exposing
dask_setup as a Click
command. This command will be used to parse additional arguments provided to
dask-scheduler and will be called before service
As an example, consider the following file that creates a scheduler plugin and registers it with the scheduler
# scheduler-setup.py import click from distributed.diagnostics.plugin import SchedulerPlugin class MyPlugin(SchedulerPlugin): def __init__(self, print_count): self.print_count = print_count SchedulerPlugin.__init__(self) def add_worker(self, scheduler=None, worker=None, **kwargs): print("Added a new worker at:", worker) if self.print_count and scheduler is not None: print("Total workers:", len(scheduler.workers)) @click.command() @click.option("--print-count/--no-print-count", default=False) def dask_setup(scheduler, print_count): plugin = MyPlugin(print_count) scheduler.add_plugin(plugin)
We can then run this preload script by referring to its filename (or module name if it is on the path) when we start the scheduler:
dask-scheduler --preload scheduler-setup.py --print-count
Worker Lifecycle Plugins¶
You can also create a class with
and register that class with the scheduler to give to every worker using the
||Registers a lifecycle worker plugin for all current and future workers.|
Registers a lifecycle worker plugin for all current and future workers.
This registers a new object to handle setup, task state transitions and teardown for workers in this cluster. The plugin will instantiate itself on all currently connected workers. It will also be run on any worker that connects in the future.
The plugin may include methods
transition. See the
dask.distributed.WorkerPluginclass or the examples below for the interface and docstrings. It must be serializable with the pickle or cloudpickle modules.
If the plugin has a
nameattribute, or if the
name=keyword is used then that will control idempotency. A a plugin with that name has already registered then any future plugins will not run.
For alternatives to plugins, you may also wish to look into preload scripts.
- plugin: WorkerPlugin
The plugin object to pass to the workers
- name: str, optional
A name for the plugin. Registering a plugin with the same name will have no effect.
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, **kwargs): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_worker_plugin(plugin)
You can get access to the plugin with the
>>> client.register_worker_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)