Source code for dask.diagnostics.profile_visualize

import random
from bisect import bisect_left
from distutils.version import LooseVersion
from itertools import cycle
from operator import add, itemgetter

from tlz import accumulate, groupby, pluck, unique

from ..core import istask
from ..utils import apply, funcname, import_required

_BOKEH_MISSING_MSG = "Diagnostics plots require `bokeh` to be installed"


def unquote(expr):
    if istask(expr):
        if expr[0] in (tuple, list, set):
            return expr[0](map(unquote, expr[1]))
        elif (
            expr[0] == dict
            and isinstance(expr[1], list)
            and isinstance(expr[1][0], list)
        ):
            return dict(map(unquote, expr[1]))
    return expr


def pprint_task(task, keys, label_size=60):
    """Return a nicely formatted string for a task.

    Parameters
    ----------
    task:
        Value within dask graph to render as text
    keys: iterable
        List of keys within dask graph
    label_size: int (optional)
        Maximum size of output label, defaults to 60

    Examples
    --------
    >>> from operator import add, mul
    >>> dsk = {'a': 1,
    ...        'b': 2,
    ...        'c': (add, 'a', 'b'),
    ...        'd': (add, (mul, 'a', 'b'), 'c'),
    ...        'e': (sum, ['a', 'b', 5]),
    ...        'f': (add,),
    ...        'g': []}

    >>> pprint_task(dsk['c'], dsk)
    'add(_, _)'
    >>> pprint_task(dsk['d'], dsk)
    'add(mul(_, _), _)'
    >>> pprint_task(dsk['e'], dsk)
    'sum([_, _, *])'
    >>> pprint_task(dsk['f'], dsk)
    'add()'
    >>> pprint_task(dsk['g'], dsk)
    '[]'
    """
    if istask(task):
        func = task[0]
        if func is apply:
            head = funcname(task[1])
            tail = ")"
            args = unquote(task[2]) if len(task) > 2 else ()
            kwargs = unquote(task[3]) if len(task) > 3 else {}
        else:
            if hasattr(func, "funcs"):
                head = "(".join(funcname(f) for f in func.funcs)
                tail = ")" * len(func.funcs)
            else:
                head = funcname(task[0])
                tail = ")"
            args = task[1:]
            kwargs = {}
        if args or kwargs:
            label_size2 = int(
                (label_size - len(head) - len(tail)) // (len(args) + len(kwargs))
            )
            pprint = lambda t: pprint_task(t, keys, label_size2)
        if args:
            if label_size2 > 5:
                args = ", ".join(pprint(t) for t in args)
            else:
                args = "..."
        else:
            args = ""
        if kwargs:
            if label_size2 > 5:
                kwargs = ", " + ", ".join(
                    "{0}={1}".format(k, pprint(v)) for k, v in sorted(kwargs.items())
                )
            else:
                kwargs = ", ..."
        else:
            kwargs = ""
        return "{0}({1}{2}{3}".format(head, args, kwargs, tail)
    elif isinstance(task, list):
        if not task:
            return "[]"
        elif len(task) > 3:
            result = pprint_task(task[:3], keys, label_size)
            return result[:-1] + ", ...]"
        else:
            label_size2 = int((label_size - 2 - 2 * len(task)) // len(task))
            args = ", ".join(pprint_task(t, keys, label_size2) for t in task)
            return "[{0}]".format(args)
    else:
        try:
            if task in keys:
                return "_"
            else:
                return "*"
        except TypeError:
            return "*"


def get_colors(palette, funcs):
    """Get a dict mapping funcs to colors from palette.

    Parameters
    ----------
    palette : string
        Name of the bokeh palette to use, must be a member of
        bokeh.palettes.all_palettes.
    funcs : iterable
        Iterable of function names
    """
    palettes = import_required("bokeh.palettes", _BOKEH_MISSING_MSG)

    unique_funcs = sorted(unique(funcs))
    n_funcs = len(unique_funcs)
    palette_lookup = palettes.all_palettes[palette]
    keys = list(sorted(palette_lookup.keys()))
    index = keys[min(bisect_left(keys, n_funcs), len(keys) - 1)]
    palette = palette_lookup[index]
    # Some bokeh palettes repeat colors, we want just the unique set
    palette = list(unique(palette))
    if len(palette) > n_funcs:
        # Consistently shuffle palette - prevents just using low-range
        random.Random(42).shuffle(palette)
    color_lookup = dict(zip(unique_funcs, cycle(palette)))
    return [color_lookup[n] for n in funcs]


[docs]def visualize(profilers, file_path=None, show=True, save=True, mode=None, **kwargs): """Visualize the results of profiling in a bokeh plot. If multiple profilers are passed in, the plots are stacked vertically. Parameters ---------- profilers : profiler or list Profiler or list of profilers. file_path : string, optional Name of the plot output file. show : boolean, optional If True (default), the plot is opened in a browser. save : boolean, optional If True (default), the plot is saved to disk. mode : str, optional Mode passed to bokeh.output_file() **kwargs Other keyword arguments, passed to bokeh.figure. These will override all defaults set by visualize. Returns ------- The completed bokeh plot object. """ bp = import_required("bokeh.plotting", _BOKEH_MISSING_MSG) import bokeh if LooseVersion(bokeh.__version__) >= "0.12.10": from bokeh.io import state in_notebook = state.curstate().notebook else: from bokeh.io import _state in_notebook = _state._notebook if not in_notebook: file_path = file_path or "profile.html" bp.output_file(file_path, mode=mode) if not isinstance(profilers, list): profilers = [profilers] figs = [prof._plot(**kwargs) for prof in profilers] # Stack the plots if len(figs) == 1: p = figs[0] else: top = figs[0] for f in figs[1:]: f.x_range = top.x_range f.title = None f.min_border_top = 20 f.plot_height -= 30 for f in figs[:-1]: f.xaxis.axis_label = None f.min_border_bottom = 20 f.plot_height -= 30 for f in figs: f.min_border_left = 75 f.min_border_right = 75 p = bp.gridplot([[f] for f in figs]) if show: bp.show(p) if file_path and save: bp.save(p) return p
def _get_figure_keywords(): bp = import_required("bokeh.plotting", _BOKEH_MISSING_MSG) o = bp.Figure.properties() o.add("tools") return o def plot_tasks(results, dsk, palette="Viridis", label_size=60, **kwargs): """Visualize the results of profiling in a bokeh plot. Parameters ---------- results : sequence Output of Profiler.results dsk : dict The dask graph being profiled. palette : string, optional Name of the bokeh palette to use, must be a member of bokeh.palettes.all_palettes. label_size: int (optional) Maximum size of output labels in plot, defaults to 60 **kwargs Other keyword arguments, passed to bokeh.figure. These will override all defaults set by visualize. Returns ------- The completed bokeh plot object. """ bp = import_required("bokeh.plotting", _BOKEH_MISSING_MSG) from bokeh.models import HoverTool defaults = dict( title="Profile Results", tools="hover,save,reset,xwheel_zoom,xpan", toolbar_location="above", width=800, height=300, ) # Support plot_width and plot_height for backwards compatibility if "plot_width" in kwargs: kwargs["width"] = kwargs.pop("plot_width") if "plot_height" in kwargs: kwargs["height"] = kwargs.pop("plot_height") defaults.update((k, v) for (k, v) in kwargs.items() if k in _get_figure_keywords()) if results: keys, tasks, starts, ends, ids = zip(*results) id_group = groupby(itemgetter(4), results) timings = dict( (k, [i.end_time - i.start_time for i in v]) for (k, v) in id_group.items() ) id_lk = dict( (t[0], n) for (n, t) in enumerate( sorted(timings.items(), key=itemgetter(1), reverse=True) ) ) left = min(starts) right = max(ends) p = bp.figure( y_range=[str(i) for i in range(len(id_lk))], x_range=[0, right - left], **defaults ) data = {} data["width"] = width = [e - s for (s, e) in zip(starts, ends)] data["x"] = [w / 2 + s - left for (w, s) in zip(width, starts)] data["y"] = [id_lk[i] + 1 for i in ids] data["function"] = funcs = [pprint_task(i, dsk, label_size) for i in tasks] data["color"] = get_colors(palette, funcs) data["key"] = [str(i) for i in keys] source = bp.ColumnDataSource(data=data) p.rect( source=source, x="x", y="y", height=1, width="width", color="color", line_color="gray", ) else: p = bp.figure(y_range=[str(i) for i in range(8)], x_range=[0, 10], **defaults) p.grid.grid_line_color = None p.axis.axis_line_color = None p.axis.major_tick_line_color = None p.yaxis.axis_label = "Worker ID" p.xaxis.axis_label = "Time (s)" hover = p.select(HoverTool) hover.tooltips = """ <div> <span style="font-size: 14px; font-weight: bold;">Key:</span>&nbsp; <span style="font-size: 10px; font-family: Monaco, monospace;">@key</span> </div> <div> <span style="font-size: 14px; font-weight: bold;">Task:</span>&nbsp; <span style="font-size: 10px; font-family: Monaco, monospace;">@function</span> </div> """ hover.point_policy = "follow_mouse" return p def plot_resources(results, palette="Viridis", **kwargs): """Plot resource usage in a bokeh plot. Parameters ---------- results : sequence Output of ResourceProfiler.results palette : string, optional Name of the bokeh palette to use, must be a member of bokeh.palettes.all_palettes. **kwargs Other keyword arguments, passed to bokeh.figure. These will override all defaults set by plot_resources. Returns ------- The completed bokeh plot object. """ bp = import_required("bokeh.plotting", _BOKEH_MISSING_MSG) import bokeh from bokeh import palettes from bokeh.models import LinearAxis, Range1d defaults = dict( title="Profile Results", tools="save,reset,xwheel_zoom,xpan", toolbar_location="above", width=800, height=300, ) # Support plot_width and plot_height for backwards compatibility if "plot_width" in kwargs: kwargs["width"] = kwargs.pop("plot_width") if "plot_height" in kwargs: kwargs["height"] = kwargs.pop("plot_height") defaults.update((k, v) for (k, v) in kwargs.items() if k in _get_figure_keywords()) if results: t, mem, cpu = zip(*results) left, right = min(t), max(t) t = [i - left for i in t] p = bp.figure( y_range=fix_bounds(0, max(cpu), 100), x_range=fix_bounds(0, right - left, 1), **defaults ) else: t = mem = cpu = [] p = bp.figure(y_range=(0, 100), x_range=(0, 1), **defaults) colors = palettes.all_palettes[palette][6] p.line( t, cpu, color=colors[0], line_width=4, **{ "legend_label" if LooseVersion(bokeh.__version__) >= "1.4" else "legend": "% CPU" } ) p.yaxis.axis_label = "% CPU" p.extra_y_ranges = { "memory": Range1d( *fix_bounds(min(mem) if mem else 0, max(mem) if mem else 100, 100) ) } p.line( t, mem, color=colors[2], y_range_name="memory", line_width=4, **{ "legend_label" if LooseVersion(bokeh.__version__) >= "1.4" else "legend": "Memory" } ) p.add_layout(LinearAxis(y_range_name="memory", axis_label="Memory (MB)"), "right") p.xaxis.axis_label = "Time (s)" return p def fix_bounds(start, end, min_span): """Adjust end point to ensure span of at least `min_span`""" return start, max(end, start + min_span) def plot_cache( results, dsk, start_time, metric_name, palette="Viridis", label_size=60, **kwargs ): """Visualize the results of profiling in a bokeh plot. Parameters ---------- results : sequence Output of CacheProfiler.results dsk : dict The dask graph being profiled. start_time : float Start time of the profile. metric_name : string Metric used to measure cache size palette : string, optional Name of the bokeh palette to use, must be a member of bokeh.palettes.all_palettes. label_size: int (optional) Maximum size of output labels in plot, defaults to 60 **kwargs Other keyword arguments, passed to bokeh.figure. These will override all defaults set by visualize. Returns ------- The completed bokeh plot object. """ bp = import_required("bokeh.plotting", _BOKEH_MISSING_MSG) from bokeh.models import HoverTool defaults = dict( title="Profile Results", tools="hover,save,reset,wheel_zoom,xpan", toolbar_location="above", width=800, height=300, ) # Support plot_width and plot_height for backwards compatibility if "plot_width" in kwargs: kwargs["width"] = kwargs.pop("plot_width") if "plot_height" in kwargs: kwargs["height"] = kwargs.pop("plot_height") defaults.update((k, v) for (k, v) in kwargs.items() if k in _get_figure_keywords()) if results: starts, ends = list(zip(*results))[3:] tics = sorted(unique(starts + ends)) groups = groupby(lambda d: pprint_task(d[1], dsk, label_size), results) data = {} for k, vals in groups.items(): cnts = dict.fromkeys(tics, 0) for v in vals: cnts[v.cache_time] += v.metric cnts[v.free_time] -= v.metric data[k] = [0] + list(accumulate(add, pluck(1, sorted(cnts.items())))) tics = [0] + [i - start_time for i in tics] p = bp.figure(x_range=[0, max(tics)], **defaults) for (key, val), color in zip(data.items(), get_colors(palette, data.keys())): p.line( "x", "y", line_color=color, line_width=3, source=bp.ColumnDataSource( {"x": tics, "y": val, "label": [key for i in val]} ), ) else: p = bp.figure(y_range=[0, 10], x_range=[0, 10], **defaults) p.yaxis.axis_label = "Cache Size ({0})".format(metric_name) p.xaxis.axis_label = "Time (s)" hover = p.select(HoverTool) hover.tooltips = """ <div> <span style="font-size: 14px; font-weight: bold;">Task:</span>&nbsp; <span style="font-size: 10px; font-family: Monaco, monospace;">@label</span> </div> """ return p