Single Machine: dask.distributed

The dask.distributed scheduler works well on a single machine. It is sometimes preferred over the default scheduler for the following reasons:

  1. It provides access to asynchronous API, notably Futures
  2. It provides a diagnostic dashboard that can provide valuable insight on performance and progress
  3. It handles data locality with more sophistication, and so can be more efficient than the multiprocessing scheduler on workloads that require multiple processes

You can create a dask.distributed scheduler by importing and creating a Client with no arguments. This overrides whatever default was previously set.

from dask.distributed import Client
client = Client()

You can navigate to http://localhost:8787/status to see the diagnostic dashboard if you have Bokeh installed.

Client

You can trivially set up a local cluster on your machine by instantiating a Dask Client with no arguments

from dask.distributed import Client
client = Client()

This sets up a scheduler in your local process and several processes running single-threaded Workers.

If you want to run workers in your same process, you can pass the processes=False keyword argument.

client = Client(processes=False)

This is sometimes preferable if you want to avoid inter-worker communication and your computations release the GIL. This is common when primarily using NumPy or Dask Array.

LocalCluster

The Client() call described above is shorthand for creating a LocalCluster and then passing that to your client.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

This is equivalent, but somewhat more explicit. You may want to look at the keyword arguments available on LocalCluster to understand the options available to you on handling the mixture of threads and processes, like specifying explicit ports, and so on.

class distributed.deploy.local.LocalCluster(n_workers=None, threads_per_worker=None, processes=True, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, **worker_kwargs)

Create local Scheduler and Workers

This creates a “cluster” of a scheduler and workers running on the local machine.

Parameters:
n_workers: int

Number of workers to start

processes: bool

Whether to use processes (True) or threads (False). Defaults to True

threads_per_worker: int

Number of threads per each worker

scheduler_port: int

Port of the scheduler. 8786 by default, use 0 to choose a random port

silence_logs: logging level

Level of logs to print out to stdout. logging.WARN by default. Use a falsey value like False or None for no change.

host: string

Host address on which the scheduler will listen, defaults to only localhost

ip: string

Deprecated. See host above.

dashboard_address: str

Address on which to listen for the Bokeh diagnostics server like ‘localhost:8787’ or ‘0.0.0.0:8787’. Defaults to ‘:8787’. Set to None to disable the dashboard. Use ‘:0’ for a random port.

diagnostics_port: int

Deprecated. See dashboard_address.

asynchronous: bool (False by default)

Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use.

worker_kwargs: dict

Extra worker arguments, will be passed to the Worker constructor.

blocked_handlers: List[str]

A list of strings specifying a blacklist of handlers to disallow on the Scheduler, like ['feed', 'run_function']

service_kwargs: Dict[str, Dict]

Extra keywords to hand to the running services

security : Security
protocol: str (optional)

Protocol to use like tcp://, tls://, inproc:// This defaults to sensible choice given other keyword arguments like processes and security

interface: str (optional)

Network interface to use. Defaults to lo/localhost

worker_class: Worker

Worker class used to instantiate workers from.

Examples

>>> cluster = LocalCluster()  # Create a local cluster with as many workers as cores  # doctest: +SKIP
>>> cluster  # doctest: +SKIP
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
>>> c = Client(cluster)  # connect to local cluster  # doctest: +SKIP

Scale the cluster to three workers

>>> cluster.scale(3)  # doctest: +SKIP

Pass extra keyword arguments to Bokeh

>>> LocalCluster(service_kwargs={'bokeh': {'prefix': '/foo'}})  # doctest: +SKIP