Diagnostics (local)
Contents
Diagnostics (local)¶
Profiling parallel code can be challenging, but dask.diagnostics
provides
functionality to aid in profiling and inspecting execution with the
local task scheduler.
This page describes the following few built-in options:
ProgressBar
Profiler
ResourceProfiler
CacheProfiler
Furthermore, this page then provides instructions on how to build your own custom diagnostic.
Progress Bar¶
|
A progress bar for dask. |
The ProgressBar
class builds on the scheduler callbacks described above to
display a progress bar in the terminal or notebook during computation. This can
give a nice feedback during long running graph execution. It can be used as a
context manager around calls to get
or compute
to profile the
computation:
>>> import dask.array as da
>>> from dask.diagnostics import ProgressBar
>>> a = da.random.normal(size=(10000, 10000), chunks=(1000, 1000))
>>> res = a.dot(a.T).mean(axis=0)
>>> with ProgressBar():
... out = res.compute()
[########################################] | 100% Completed | 17.1 s
or registered globally using the register
method:
>>> pbar = ProgressBar()
>>> pbar.register()
>>> out = res.compute()
[########################################] | 100% Completed | 17.1 s
To unregister from the global callbacks, call the unregister
method:
>>> pbar.unregister()
Profiler¶
|
A profiler for dask execution at the task level. |
Dask provides a few tools for profiling execution. As with the ProgressBar
,
they each can be used as context managers or registered globally.
The Profiler
class is used to profile Dask’s execution at the task level.
During execution, it records the following information for each task:
Key
Task
Start time in seconds since the epoch
Finish time in seconds since the epoch
Worker id
ResourceProfiler¶
|
A profiler for resource use. |
The ResourceProfiler
class is used to profile Dask’s execution at the
resource level. During execution, it records the following information
for each timestep:
Time in seconds since the epoch
Memory usage in MB
% CPU usage
The default timestep is 1 second, but can be set manually using the dt
keyword:
>>> from dask.diagnostics import ResourceProfiler
>>> rprof = ResourceProfiler(dt=0.5)
CacheProfiler¶
|
A profiler for dask execution at the scheduler cache level. |
The CacheProfiler
class is used to profile Dask’s execution at the scheduler
cache level. During execution, it records the following information for each
task:
Key
Task
Size metric
Cache entry time in seconds since the epoch
Cache exit time in seconds since the epoch
Here the size metric is the output of a function called on the result of each
task. The default metric is to count each task (metric
is 1 for all tasks).
Other functions may be used as a metric instead through the metric
keyword.
For example, the nbytes
function found in cachey
can be used to measure
the number of bytes in the scheduler cache:
>>> from dask.diagnostics import CacheProfiler
>>> from cachey import nbytes
>>> cprof = CacheProfiler(metric=nbytes)
Example¶
As an example to demonstrate using the diagnostics, we’ll profile some linear algebra done with Dask Array. We’ll create a random array, take its QR decomposition, and then reconstruct the initial array by multiplying the Q and R components together. Note that since the profilers (and all diagnostics) are just context managers, multiple profilers can be used in a with block:
>>> import dask.array as da
>>> from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
>>> a = da.random.random(size=(10000, 1000), chunks=(1000, 1000))
>>> q, r = da.linalg.qr(a)
>>> a2 = q.dot(r)
>>> with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof,
... CacheProfiler() as cprof:
... out = a2.compute()
The results of each profiler are stored in their results
attribute as a
list of namedtuple
objects:
>>> prof.results[0]
TaskData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 5, 0),
task=(qr, (_apply_random, 'random_sample', 1060164455, (1000, 1000), (), {})),
start_time=1454368444.493292,
end_time=1454368444.902987,
worker_id=4466937856)
>>> rprof.results[0]
ResourceData(time=1454368444.078748, mem=74.100736, cpu=0.0)
>>> cprof.results[0]
CacheData(key=('tsqr-8d16e396b237bf7a731333130d310cb9_QR_st1', 7, 0),
task=(qr, (_apply_random, 'random_sample', 1310656009, (1000, 1000), (), {})),
metric=1,
cache_time=1454368444.49662,
free_time=1454368446.769452)
These can be analyzed separately or viewed in a bokeh plot using the provided
visualize
method on each profiler:
>>> prof.visualize()
To view multiple profilers at the same time, the dask.diagnostics.visualize()
function can be used. This takes a list of profilers and creates a vertical
stack of plots aligned along the x-axis:
>>> from dask.diagnostics import visualize
>>> visualize([prof, rprof, cprof])
Looking at the above figure, from top to bottom:
The results from the
Profiler
object: This shows the execution time for each task as a rectangle, organized along the y-axis by worker (in this case threads). Similar tasks are grouped by color and, by hovering over each task, one can see the key and task that each block represents.The results from the
ResourceProfiler
object: This shows two lines, one for total CPU percentage used by all the workers, and one for total memory usage.The results from the
CacheProfiler
object: This shows a line for each task group, plotting the sum of the currentmetric
in the cache against time. In this case it’s the default metric (count) and the lines represent the number of each object in the cache at time. Note that the grouping and coloring is the same as for theProfiler
plot, and that the task represented by each line can be found by hovering over the line.
From these plots we can see that the initial tasks (calls to
numpy.random.random
and numpy.linalg.qr
for each chunk) are run
concurrently, but only use slightly more than 100% CPU. This is because the
call to numpy.linalg.qr
currently doesn’t release the Global Interpreter
Lock (GIL), so those calls can’t truly be done in parallel. Next, there’s a reduction
step where all the blocks are combined. This requires all the results from the
first step to be held in memory, as shown by the increased number of results in
the cache, and increase in memory usage. Immediately after this task ends, the
number of elements in the cache decreases, showing that they were only needed
for this step. Finally, there’s an interleaved set of calls to dot
and
sum
. Looking at the CPU plot, it shows that these run both concurrently and in
parallel, as the CPU percentage spikes up to around 350%.
Custom Callbacks¶
|
Base class for using the callback mechanism |
Schedulers based on dask.local.get_async
(currently
dask.get
, dask.threaded.get
, and dask.multiprocessing.get
)
accept five callbacks, allowing for inspection of scheduler execution.
The callbacks are:
1. start(dsk)
: Run at the beginning of execution, right before the
state is initialized. Receives the Dask graph
2. start_state(dsk, state)
: Run at the beginning of execution, right
after the state is initialized. Receives the Dask graph and scheduler state
3. pretask(key, dsk, state)
: Run every time a new task is started.
Receives the key of the task to be run, the Dask graph, and the scheduler state
4. posttask(key, result, dsk, state, id)
: Run every time a task is finished.
Receives the key of the task that just completed, the result, the Dask graph,
the scheduler state, and the id of the worker that ran the task
5. finish(dsk, state, errored)
: Run at the end of execution, right before the
result is returned. Receives the Dask graph, the scheduler state, and a boolean
indicating whether or not the exit was due to an error
Custom diagnostics can be created either by instantiating the Callback
class with the some of the above methods as keywords or by subclassing the
Callback
class.
Here we create a class that prints the name of every key as it’s computed:
from dask.callbacks import Callback
class PrintKeys(Callback):
def _pretask(self, key, dask, state):
"""Print the key of every task as it's started"""
print("Computing: {0}!".format(repr(key)))
This can now be used as a context manager during computation:
>>> from operator import add, mul
>>> dsk = {'a': (add, 1, 2), 'b': (add, 3, 'a'), 'c': (mul, 'a', 'b')}
>>> with PrintKeys():
... get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!
Alternatively, functions may be passed in as keyword arguments to Callback
:
>>> def printkeys(key, dask, state):
... print("Computing: {0}!".format(repr(key)))
>>> with Callback(pretask=printkeys):
... get(dsk, 'c')
Computing 'a'!
Computing 'b'!
Computing 'c'!
API¶
|
A profiler for dask execution at the scheduler cache level. |
|
Base class for using the callback mechanism |
|
A profiler for dask execution at the task level. |
|
A progress bar for dask. |
|
A profiler for resource use. |
|
Visualize the results of profiling in a bokeh plot. |
- dask.diagnostics.ProgressBar(minimum=0, width=40, dt=0.1, out=None)[source]¶
A progress bar for dask.
- Parameters
- minimumint, optional
Minimum time threshold in seconds before displaying a progress bar. Default is 0 (always display)
- widthint, optional
Width of the bar
- dtfloat, optional
Update resolution in seconds, default is 0.1 seconds
- outfile object, optional
File object to which the progress bar will be written It can be
sys.stdout
,sys.stderr
or any other file object able to writestr
objects Default issys.stdout
Examples
Below we create a progress bar with a minimum threshold of 1 second before displaying. For cheap computations nothing is shown:
>>> with ProgressBar(minimum=1.0): ... out = some_fast_computation.compute()
But for expensive computations a full progress bar is displayed:
>>> with ProgressBar(minimum=1.0): ... out = some_slow_computation.compute() [########################################] | 100% Completed | 10.4 s
The duration of the last computation is available as an attribute
>>> pbar = ProgressBar() >>> with pbar: ... out = some_computation.compute() [########################################] | 100% Completed | 10.4 s >>> pbar.last_duration 10.4
You can also register a progress bar so that it displays for all computations:
>>> pbar = ProgressBar() >>> pbar.register() >>> some_slow_computation.compute() [########################################] | 100% Completed | 10.4 s
- dask.diagnostics.Profiler()[source]¶
A profiler for dask execution at the task level.
- Records the following information for each task:
Key
Task
Start time in seconds since the epoch
Finish time in seconds since the epoch
Worker id
Examples
>>> from operator import add, mul >>> from dask.threaded import get >>> from dask.diagnostics import Profiler >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> with Profiler() as prof: ... get(dsk, 'z') 22
>>> prof.results [TaskData(key='y', task=(add, 'x', 10), start_time=..., end_time=..., worker_id=...), TaskData(key='z', task=(mul, 'y', 2), start_time=..., end_time=..., worker_id=...)]
These results can be visualized in a bokeh plot using the
visualize
method. Note that this requires bokeh to be installed.>>> prof.visualize()
You can activate the profiler globally
>>> prof.register()
If you use the profiler globally you will need to clear out old results manually.
>>> prof.clear() >>> prof.unregister()
- dask.diagnostics.ResourceProfiler(dt=1)[source]¶
A profiler for resource use.
- Records the following each timestep
Time in seconds since the epoch
Memory usage in MB
% CPU usage
Examples
>>> from operator import add, mul >>> from dask.threaded import get >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> with ResourceProfiler() as prof: ... get(dsk, 'z') 22
These results can be visualized in a bokeh plot using the
visualize
method. Note that this requires bokeh to be installed.>>> prof.visualize()
You can activate the profiler globally
>>> prof.register()
If you use the profiler globally you will need to clear out old results manually.
>>> prof.clear()
Note that when used as a context manager data will be collected throughout the duration of the enclosed block. In contrast, when registered globally data will only be collected while a dask scheduler is active.
>>> prof.unregister()
- dask.diagnostics.CacheProfiler(metric=None, metric_name=None)[source]¶
A profiler for dask execution at the scheduler cache level.
- Records the following information for each task:
Key
Task
Size metric
Cache entry time in seconds since the epoch
Cache exit time in seconds since the epoch
Examples
>>> from operator import add, mul >>> from dask.threaded import get >>> from dask.diagnostics import CacheProfiler >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)} >>> with CacheProfiler() as prof: ... get(dsk, 'z') 22
>>> prof.results [CacheData(key='y', task=(add, 'x', 10), metric=1, cache_time=..., free_time=...), CacheData(key='z', task=(mul, 'y', 2), metric=1, cache_time=..., free_time=...)]
The default is to count each task (
metric
is 1 for all tasks). Other functions may used as a metric instead through themetric
keyword. For example, thenbytes
function found incachey
can be used to measure the number of bytes in the cache.>>> from cachey import nbytes >>> with CacheProfiler(metric=nbytes) as prof: ... get(dsk, 'z') 22
The profiling results can be visualized in a bokeh plot using the
visualize
method. Note that this requires bokeh to be installed.>>> prof.visualize()
You can activate the profiler globally
>>> prof.register()
If you use the profiler globally you will need to clear out old results manually.
>>> prof.clear() >>> prof.unregister()
- dask.diagnostics.Callback(start=None, start_state=None, pretask=None, posttask=None, finish=None)[source]¶
Base class for using the callback mechanism
Create a callback with functions of the following signatures:
>>> def start(dsk): ... pass >>> def start_state(dsk, state): ... pass >>> def pretask(key, dsk, state): ... pass >>> def posttask(key, result, dsk, state, worker_id): ... pass >>> def finish(dsk, state, failed): ... pass
You may then construct a callback object with any number of them
>>> cb = Callback(pretask=pretask, finish=finish)
And use it either as a context manager over a compute/get call
>>> with cb: ... x.compute()
Or globally with the
register
method>>> cb.register() >>> cb.unregister()
Alternatively subclass the
Callback
class with your own methods.>>> class PrintKeys(Callback): ... def _pretask(self, key, dask, state): ... print("Computing: {0}!".format(repr(key)))
>>> with PrintKeys(): ... x.compute()
- dask.diagnostics.visualize(profilers, filename='profile.html', show=True, save=None, mode=None, **kwargs)[source]¶
Visualize the results of profiling in a bokeh plot.
If multiple profilers are passed in, the plots are stacked vertically.
- Parameters
- profilersprofiler or list
Profiler or list of profilers.
- filenamestring, optional
Name of the plot output file.
- showboolean, optional
If True (default), the plot is opened in a browser.
- saveboolean, optional
If True (default when not in notebook), the plot is saved to disk.
- modestr, optional
Mode passed to bokeh.output_file()
- **kwargs
Other keyword arguments, passed to bokeh.figure. These will override all defaults set by visualize.
- Returns
- The completed bokeh plot object.