Custom Initialization¶

Often we want to run custom code when we start up or tear down a scheduler or worker. We might do this manually with functions like Client.run or Client.run_on_scheduler, but this is error prone and difficult to automate.

To resolve this, Dask includes a few mechanisms to run arbitrary code around the lifecycle of a Scheduler or Worker.

Both dask-scheduler and dask-worker support a --preload option that allows custom initialization of each scheduler/worker respectively. A module or Python file passed as a --preload value is guaranteed to be imported before establishing any connection. A dask_setup(service) function is called if found, with a Scheduler or Worker instance as the argument. As the service stops, dask_teardown(service) is called if present.

To support additional configuration, a single --preload module may register additional command-line arguments by exposing dask_setup as a Click command. This command will be used to parse additional arguments provided to dask-worker or dask-scheduler and will be called before service initialization.

As an example, consider the following file that creates a scheduler plugin and registers it with the scheduler

# scheduler-setup.py
import click

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
def __init__(self, print_count):
self.print_count = print_count
SchedulerPlugin.__init__(self)

print("Added a new worker at:", worker)
if self.print_count and scheduler is not None:
print("Total workers:", len(scheduler.workers))

@click.command()
@click.option("--print-count/--no-print-count", default=False)
plugin = MyPlugin(print_count)

dask-scheduler --preload scheduler-setup.py --print-count

 Client.register_worker_plugin