Joins

DataFrame joins are a common and expensive computation that benefit from a variety of optimizations in different situations. Understanding how your data is laid out and what you’re trying to accomplish can have a large impact on performance. This documentation page goes through the various different options and their performance impacts.

Large to Large Unsorted Joins

In the worst case scenario you have two large tables with many partitions each and you want to join them both along a column that may not be sorted.

This can be slow. In this case Dask DataFrame will need to move all of your data around so that rows with matching values in the joining columns are in the same partition. This large-scale movement can create communication costs, and can require a large amount of memory. If enough memory can not be found then Dask will have to read and write data to disk, which may cause other performance costs.

These problems are solvable, but will be significantly slower than many other operations. They are best avoided if possible.

Large to Small Joins

Many join or merge computations combine a large table with one small one. If the small table is either a single partition Dask DataFrame or even just a normal Pandas DataFrame then the computation can proceed in an embarrassingly parallel way, where each partition of the large DataFrame is joined against the single small table. This incurs almost no overhead relative to Pandas joins.

If your smaller table can easily fit in memory, then you might want to ensure that it is a single partition with the repartition method.

import dask
large = dask.datasets.timeseries(freq="10s", npartitions=10)
small = dask.datasets.timeseries(freq="1D", dtypes={"z": int})

small = small.repartition(npartitions=1)
result = large.merge(small, how="left", on=["timestamp"])

Sorted Joins

The Pandas merge API supports the left_index= and right_index= options to perform joins on the index. For Dask DataFrames these keyword options hold special significance if the index has known divisions (see Partitions). In this case the DataFrame partitions are aligned along these divisions (which is generally fast) and then an embarrassingly parallel Pandas join happens across partition pairs. This is generally relatively fast.

Sorted or indexed joins are a good solution to the large-large join problem. If you plan to join against a dataset repeatedly then it may be worthwhile to set the index ahead of time, and possibly store the data in a format that maintains that index, like Parquet.

import dask
import dask.dataframe as dd

left = dask.datasets.timeseries(dtypes={"foo": int})

# timeseries returns a dataframe indexed by
# timestamp, we don't need to set_index.

# left.set_index("timestamp")

left.to_parquet("left", overwrite=True)
left = dd.read_parquet("left")

# If the dataframe can fit in RAM, you can also use persist

# left = left.persist()

right_one = dask.datasets.timeseries(dtypes={"bar": int})
right_two = dask.datasets.timeseries(dtypes={"baz": int})

result = left.merge(
    right_one, how="left", left_index=True, right_index=True)
result = result.merge(
    right_two, how="left", left_index=True, right_index=True)