Often we want to run custom code when we start up or tear down a scheduler or
worker. We might do this manually with functions like
Client.run_on_scheduler, but this is error prone and difficult to automate.
To resolve this, Dask includes a few mechanisms to run arbitrary code around the lifecycle of a Scheduler or Worker.
dask-worker support a
--preload option that
allows custom initialization of each scheduler/worker respectively. A module or
Python file passed as a
--preload value is guaranteed to be imported before
establishing any connection. A
dask_setup(service) function is called if
found, with a
Worker instance as the argument. As the
dask_teardown(service) is called if present.
To support additional configuration, a single
--preload module may register
additional command-line arguments by exposing
dask_setup as a Click
command. This command will be used to parse additional arguments provided to
dask-scheduler and will be called before service
As an example, consider the following file that creates a scheduler plugin and registers it with the scheduler
# scheduler-setup.py import click from distributed.diagnostics.plugin import SchedulerPlugin class MyPlugin(SchedulerPlugin): def __init__(self, print_count): self.print_count = print_count SchedulerPlugin.__init__(self) def add_worker(self, scheduler=None, worker=None, **kwargs): print("Added a new worker at:", worker) if self.print_count and scheduler is not None: print("Total workers:", len(scheduler.workers)) @click.command() @click.option("--print-count/--no-print-count", default=False) def dask_setup(scheduler, print_count): plugin = MyPlugin(print_count) scheduler.add_plugin(plugin)
We can then run this preload script by referring to its filename (or module name if it is on the path) when we start the scheduler:
dask-scheduler --preload scheduler-setup.py --print-count
Worker Lifecycle Plugins¶
You can also create a class with setup and teardown methods, and register that class with the scheduler to give to every worker.