Source code for dask.dataframe.io.csv

from __future__ import annotations

import os
from collections.abc import Mapping
from io import BytesIO
from warnings import catch_warnings, simplefilter, warn

from dask._task_spec import convert_legacy_task

try:
    import psutil
except ImportError:
    psutil = None  # type: ignore

import numpy as np
import pandas as pd
from fsspec.compression import compr
from fsspec.core import get_fs_token_paths
from fsspec.core import open as open_file
from fsspec.core import open_files
from fsspec.utils import infer_compression
from pandas.api.types import (
    CategoricalDtype,
    is_datetime64_any_dtype,
    is_float_dtype,
    is_integer_dtype,
    is_object_dtype,
)

from dask.base import tokenize
from dask.bytes import read_bytes
from dask.core import flatten
from dask.dataframe.backends import dataframe_creation_dispatch
from dask.dataframe.io.io import from_map
from dask.dataframe.io.utils import DataFrameIOFunction
from dask.dataframe.utils import clear_known_categories
from dask.delayed import delayed
from dask.utils import asciitable, parse_bytes


class CSVFunctionWrapper(DataFrameIOFunction):
    """
    CSV Function-Wrapper Class
    Reads CSV data from disk to produce a partition (given a key).
    """

    def __init__(
        self,
        full_columns,
        columns,
        colname,
        head,
        header,
        reader,
        dtypes,
        enforce,
        kwargs,
    ):
        self.full_columns = full_columns
        self._columns = columns
        self.colname = colname
        self.head = head
        self.header = header
        self.reader = reader
        self.dtypes = dtypes
        self.enforce = enforce
        self.kwargs = kwargs

    @property
    def columns(self):
        if self._columns is None:
            return self.full_columns
        if self.colname:
            return self._columns + [self.colname]
        return self._columns

    def project_columns(self, columns):
        """Return a new CSVFunctionWrapper object with
        a sub-column projection.
        """
        # Make sure columns is ordered correctly
        columns = [c for c in self.head.columns if c in columns]
        if columns == self.columns:
            return self
        if self.colname and self.colname not in columns:
            # when path-as-column is on, we must keep it at IO
            # whatever the selection
            head = self.head[columns + [self.colname]]
        else:
            head = self.head[columns]
        return CSVFunctionWrapper(
            self.full_columns,
            columns,
            self.colname,
            head,
            self.header,
            self.reader,
            {c: self.dtypes[c] for c in columns},
            self.enforce,
            self.kwargs,
        )

    def __call__(self, part):
        # Part will be a 3-element tuple
        block, path, is_first, is_last = part

        # Construct `path_info`
        if path is not None:
            path_info = (
                self.colname,
                path,
                sorted(list(self.head[self.colname].cat.categories)),
            )
        else:
            path_info = None

        # Deal with arguments that are special
        # for the first block of each file
        write_header = False
        rest_kwargs = self.kwargs.copy()
        if not is_first:
            if rest_kwargs.get("names", None) is None:
                write_header = True
            rest_kwargs.pop("skiprows", None)
            if rest_kwargs.get("header", 0) is not None:
                rest_kwargs.pop("header", None)
        if not is_last:
            rest_kwargs.pop("skipfooter", None)

        # Deal with column projection
        columns = self.full_columns
        project_after_read = False
        if self._columns is not None:
            if self.kwargs:
                # To be safe, if any kwargs are defined, avoid
                # changing `usecols` here. Instead, we can just
                # select columns after the read
                project_after_read = True
            else:
                columns = self._columns
                rest_kwargs["usecols"] = columns

        # Call `pandas_read_text`
        df = pandas_read_text(
            self.reader,
            block,
            self.header,
            rest_kwargs,
            self.dtypes,
            columns,
            write_header,
            self.enforce,
            path_info,
        )
        if project_after_read:
            return df[self.columns]
        return df


def pandas_read_text(
    reader,
    b,
    header,
    kwargs,
    dtypes=None,
    columns=None,
    write_header=True,
    enforce=False,
    path=None,
):
    """Convert a block of bytes to a Pandas DataFrame

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    b : bytestring
        The content to be parsed with ``reader``
    header : bytestring
        An optional header to prepend to ``b``
    kwargs : dict
        A dictionary of keyword arguments to be passed to ``reader``
    dtypes : dict
        dtypes to assign to columns
    path : tuple
        A tuple containing path column name, path to file, and an ordered list of paths.

    See Also
    --------
    dask.dataframe.csv.read_pandas_from_bytes
    """
    bio = BytesIO()
    if write_header and not b.startswith(header.rstrip()):
        bio.write(header)
    bio.write(b)
    bio.seek(0)
    df = reader(bio, **kwargs)
    if dtypes:
        coerce_dtypes(df, dtypes)

    if enforce and columns and (list(df.columns) != list(columns)):
        raise ValueError("Columns do not match", df.columns, columns)
    if path:
        colname, path, paths = path
        code = paths.index(path)
        df = df.assign(
            **{colname: pd.Categorical.from_codes(np.full(len(df), code), paths)}
        )
    return df


def coerce_dtypes(df, dtypes):
    """Coerce dataframe to dtypes safely

    Operates in place

    Parameters
    ----------
    df: Pandas DataFrame
    dtypes: dict like {'x': float}
    """
    bad_dtypes = []
    bad_dates = []
    errors = []
    for c in df.columns:
        if c in dtypes and df.dtypes[c] != dtypes[c]:
            actual = df.dtypes[c]
            desired = dtypes[c]
            if is_float_dtype(actual) and is_integer_dtype(desired):
                bad_dtypes.append((c, actual, desired))
            elif is_object_dtype(actual) and is_datetime64_any_dtype(desired):
                # This can only occur when parse_dates is specified, but an
                # invalid date is encountered. Pandas then silently falls back
                # to object dtype. Since `object_array.astype(datetime)` will
                # silently overflow, error here and report.
                bad_dates.append(c)
            else:
                try:
                    df[c] = df[c].astype(dtypes[c])
                except Exception as e:
                    bad_dtypes.append((c, actual, desired))
                    errors.append((c, e))

    if bad_dtypes:
        if errors:
            ex = "\n".join(
                f"- {c}\n  {e!r}" for c, e in sorted(errors, key=lambda x: str(x[0]))
            )
            exceptions = (
                "The following columns also raised exceptions on "
                "conversion:\n\n%s\n\n"
            ) % ex
            extra = ""
        else:
            exceptions = ""
            # All mismatches are int->float, also suggest `assume_missing=True`
            extra = (
                "\n\nAlternatively, provide `assume_missing=True` "
                "to interpret\n"
                "all unspecified integer columns as floats."
            )

        bad_dtypes = sorted(bad_dtypes, key=lambda x: str(x[0]))
        table = asciitable(["Column", "Found", "Expected"], bad_dtypes)
        dtype_kw = "dtype={%s}" % ",\n       ".join(
            f"{k!r}: '{v}'" for (k, v, _) in bad_dtypes
        )

        dtype_msg = (
            "{table}\n\n"
            "{exceptions}"
            "Usually this is due to dask's dtype inference failing, and\n"
            "*may* be fixed by specifying dtypes manually by adding:\n\n"
            "{dtype_kw}\n\n"
            "to the call to `read_csv`/`read_table`."
            "{extra}"
        ).format(table=table, exceptions=exceptions, dtype_kw=dtype_kw, extra=extra)
    else:
        dtype_msg = None

    if bad_dates:
        also = " also " if bad_dtypes else " "
        cols = "\n".join("- %s" % c for c in bad_dates)
        date_msg = (
            "The following columns{also}failed to properly parse as dates:\n\n"
            "{cols}\n\n"
            "This is usually due to an invalid value in that column. To\n"
            "diagnose and fix it's recommended to drop these columns from the\n"
            "`parse_dates` keyword, and manually convert them to dates later\n"
            "using `dd.to_datetime`."
        ).format(also=also, cols=cols)
    else:
        date_msg = None

    if bad_dtypes or bad_dates:
        rule = "\n\n%s\n\n" % ("-" * 61)
        msg = "Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\n\n%s" % (
            rule.join(filter(None, [dtype_msg, date_msg]))
        )
        raise ValueError(msg)


def text_blocks_to_pandas(
    reader,
    block_lists,
    header,
    head,
    kwargs,
    enforce=False,
    specified_dtypes=None,
    path=None,
    blocksize=None,
    urlpath=None,
):
    """Convert blocks of bytes to a dask.dataframe

    This accepts a list of lists of values of bytes where each list corresponds
    to one file, and the value of bytes concatenate to comprise the entire
    file, in order.

    Parameters
    ----------
    reader : callable
        ``pd.read_csv`` or ``pd.read_table``.
    block_lists : list of lists of delayed values of bytes
        The lists of bytestrings where each list corresponds to one logical file
    header : bytestring
        The header, found at the front of the first file, to be prepended to
        all blocks
    head : pd.DataFrame
        An example Pandas DataFrame to be used for metadata.
    kwargs : dict
        Keyword arguments to pass down to ``reader``
    path : tuple, optional
        A tuple containing column name for path and the path_converter if provided

    Returns
    -------
    A dask.dataframe
    """
    dtypes = head.dtypes.to_dict()
    # dtypes contains only instances of CategoricalDtype, which causes issues
    # in coerce_dtypes for non-uniform categories across partitions.
    # We will modify `dtype` (which is inferred) to
    # 1. contain instances of CategoricalDtypes for user-provided types
    # 2. contain 'category' for data inferred types
    categoricals = head.select_dtypes(include=["category"]).columns

    if isinstance(specified_dtypes, Mapping):
        known_categoricals = [
            k
            for k in categoricals
            if isinstance(specified_dtypes.get(k), CategoricalDtype)
            and specified_dtypes.get(k).categories is not None
        ]
        unknown_categoricals = categoricals.difference(known_categoricals)
    else:
        unknown_categoricals = categoricals

    # Fixup the dtypes
    for k in unknown_categoricals:
        dtypes[k] = "category"

    columns = list(head.columns)

    blocks = tuple(flatten(block_lists))
    # Create mask of first blocks from nested block_lists
    is_first = tuple(block_mask(block_lists))
    is_last = tuple(block_mask_last(block_lists))

    if path:
        colname, path_converter = path
        paths = [b.args[0].path for b in blocks]
        if path_converter:
            paths = [path_converter(p) for p in paths]
        head = head.assign(
            **{
                colname: pd.Categorical.from_codes(
                    np.zeros(len(head), dtype=int), set(paths)
                )
            }
        )
        path = (colname, paths)

    if len(unknown_categoricals):
        head = clear_known_categories(head, cols=unknown_categoricals)

    # Define parts
    parts = []
    colname, paths = path or (None, None)
    for i in range(len(blocks)):
        parts.append([blocks[i], paths[i] if paths else None, is_first[i], is_last[i]])

    # Construct the output collection with from_map
    return from_map(
        CSVFunctionWrapper(
            columns,
            None,
            colname,
            head,
            header,
            reader,
            dtypes,
            enforce,
            kwargs,
        ),
        parts,
        meta=head,
        label="read-csv",
        token=tokenize(reader, urlpath, columns, enforce, head, blocksize),
        enforce_metadata=False,
        produces_tasks=True,
    )


def block_mask(block_lists):
    """
    Yields a flat iterable of booleans to mark the zeroth elements of the
    nested input ``block_lists`` in a flattened output.

    >>> list(block_mask([[1, 2], [3, 4], [5]]))
    [True, False, True, False, True]
    """
    for block in block_lists:
        if not block:
            continue
        yield True
        yield from (False for _ in block[1:])


def block_mask_last(block_lists):
    """
    Yields a flat iterable of booleans to mark the last element of the
    nested input ``block_lists`` in a flattened output.

    >>> list(block_mask_last([[1, 2], [3, 4], [5]]))
    [False, True, False, True, True]
    """
    for block in block_lists:
        if not block:
            continue
        yield from (False for _ in block[:-1])
        yield True


def auto_blocksize(total_memory, cpu_count):
    memory_factor = 10
    blocksize = int(total_memory // cpu_count / memory_factor)
    return min(blocksize, int(64e6))


def _infer_block_size():
    default = 2**25
    if psutil is not None:
        with catch_warnings():
            simplefilter("ignore", RuntimeWarning)
            mem = psutil.virtual_memory().total
            cpu = psutil.cpu_count()

        if mem and cpu:
            return auto_blocksize(mem, cpu)

    return default


# guess blocksize if psutil is installed or use acceptable default one if not
AUTO_BLOCKSIZE = _infer_block_size()


def read_pandas(
    reader,
    urlpath,
    blocksize="default",
    lineterminator=None,
    compression="infer",
    sample=256000,
    sample_rows=10,
    enforce=False,
    assume_missing=False,
    storage_options=None,
    include_path_column=False,
    **kwargs,
):
    reader_name = reader.__name__
    if lineterminator is not None and len(lineterminator) == 1:
        kwargs["lineterminator"] = lineterminator
    else:
        lineterminator = "\n"
    if "encoding" in kwargs:
        b_lineterminator = lineterminator.encode(kwargs["encoding"])
        empty_blob = "".encode(kwargs["encoding"])
        if empty_blob:
            # This encoding starts with a Byte Order Mark (BOM), so strip that from the
            # start of the line terminator, since this value is not a full file.
            b_lineterminator = b_lineterminator[len(empty_blob) :]
    else:
        b_lineterminator = lineterminator.encode()
    if include_path_column and isinstance(include_path_column, bool):
        include_path_column = "path"
    if "index" in kwargs or (
        "index_col" in kwargs and kwargs.get("index_col") is not False
    ):
        raise ValueError(
            "Keywords 'index' and 'index_col' not supported, except for "
            "'index_col=False'. Use dd.{reader_name}(...).set_index('my-index') instead"
        )
    for kw in ["iterator", "chunksize"]:
        if kw in kwargs:
            raise ValueError(f"{kw} not supported for dd.{reader_name}")
    if kwargs.get("nrows", None):
        raise ValueError(
            "The 'nrows' keyword is not supported by "
            "`dd.{0}`. To achieve the same behavior, it's "
            "recommended to use `dd.{0}(...)."
            "head(n=nrows)`".format(reader_name)
        )
    if isinstance(kwargs.get("skiprows"), int):
        lastskiprow = firstrow = kwargs.get("skiprows")
    elif kwargs.get("skiprows") is None:
        lastskiprow = firstrow = 0
    else:
        # When skiprows is a list, we expect more than max(skiprows) to
        # be included in the sample. This means that [0,2] will work well,
        # but [0, 440] might not work.
        skiprows = set(kwargs.get("skiprows"))
        lastskiprow = max(skiprows)
        # find the firstrow that is not skipped, for use as header
        firstrow = min(set(range(len(skiprows) + 1)) - set(skiprows))
    if isinstance(kwargs.get("header"), list):
        raise TypeError(f"List of header rows not supported for dd.{reader_name}")
    if isinstance(kwargs.get("converters"), dict) and include_path_column:
        path_converter = kwargs.get("converters").get(include_path_column, None)
    else:
        path_converter = None

    # If compression is "infer", inspect the (first) path suffix and
    # set the proper compression option if the suffix is recognized.
    if compression == "infer":
        # Translate the input urlpath to a simple path list
        paths = get_fs_token_paths(urlpath, mode="rb", storage_options=storage_options)[
            2
        ]

        # Check for at least one valid path
        if len(paths) == 0:
            raise OSError(f"{urlpath} resolved to no files")

        # Infer compression from first path
        compression = infer_compression(paths[0])

    if blocksize == "default":
        blocksize = AUTO_BLOCKSIZE
    if isinstance(blocksize, str):
        blocksize = parse_bytes(blocksize)
    if blocksize and compression:
        # NONE of the compressions should use chunking
        warn(
            "Warning %s compression does not support breaking apart files\n"
            "Please ensure that each individual file can fit in memory and\n"
            "use the keyword ``blocksize=None to remove this message``\n"
            "Setting ``blocksize=None``" % compression
        )
        blocksize = None
    if compression not in compr:
        raise NotImplementedError("Compression format %s not installed" % compression)
    if blocksize and sample and blocksize < sample and lastskiprow != 0:
        warn(
            "Unexpected behavior can result from passing skiprows when\n"
            "blocksize is smaller than sample size.\n"
            "Setting ``sample=blocksize``"
        )
        sample = blocksize
    b_out = read_bytes(
        urlpath,
        delimiter=b_lineterminator,
        blocksize=blocksize,
        sample=sample,
        compression=compression,
        include_path=include_path_column,
        **(storage_options or {}),
    )

    if include_path_column:
        b_sample, values, paths = b_out
        path = (include_path_column, path_converter)
    else:
        b_sample, values = b_out
        path = None

    if not isinstance(values[0], (tuple, list)):
        values = [values]
    # If we have not sampled, then use the first row of the first values
    # as a representative sample.
    if b_sample is False and len(values[0]):
        b_sample = values[0][0].compute()

    # Get header row, and check that sample is long enough. If the file
    # contains a header row, we need at least 2 nonempty rows + the number of
    # rows to skip.
    names = kwargs.get("names", None)
    header = kwargs.get("header", "infer" if names is None else None)
    need = 1 if header is None else 2
    if isinstance(header, int):
        firstrow += header
    if kwargs.get("comment"):
        # if comment is provided, step through lines of b_sample and strip out comments
        parts = []
        for part in b_sample.split(b_lineterminator):
            split_comment = part.decode().split(kwargs.get("comment"))
            if len(split_comment) > 1:
                # if line starts with comment, don't include that line in parts.
                if len(split_comment[0]) > 0:
                    parts.append(split_comment[0].strip().encode())
            else:
                parts.append(part)
            if len(parts) > need:
                break
    else:
        parts = b_sample.split(
            b_lineterminator, max(lastskiprow + need, firstrow + need)
        )

    # If the last partition is empty, don't count it
    nparts = 0 if not parts else len(parts) - int(not parts[-1])

    if sample is not False and nparts < lastskiprow + need and len(b_sample) >= sample:
        raise ValueError(
            "Sample is not large enough to include at least one "
            "row of data. Please increase the number of bytes "
            "in `sample` in the call to `read_csv`/`read_table`"
        )

    header = b"" if header is None else parts[firstrow] + b_lineterminator

    # Use sample to infer dtypes and check for presence of include_path_column
    head_kwargs = kwargs.copy()
    head_kwargs.pop("skipfooter", None)
    if head_kwargs.get("engine") == "pyarrow":
        # Use c engine to infer since Arrow engine does not support nrows
        head_kwargs["engine"] = "c"
    try:
        head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
    except pd.errors.ParserError as e:
        if "EOF" in str(e):
            raise ValueError(
                "EOF encountered while reading header. \n"
                "Pass argument `sample_rows` and make sure the value of `sample` "
                "is large enough to accommodate that many rows of data"
            ) from e
        raise
    if include_path_column and (include_path_column in head.columns):
        raise ValueError(
            "Files already contain the column name: %s, so the "
            "path column cannot use this name. Please set "
            "`include_path_column` to a unique name." % include_path_column
        )

    specified_dtypes = kwargs.get("dtype", {})
    if specified_dtypes is None:
        specified_dtypes = {}
    # If specified_dtypes is a single type, then all columns were specified
    if assume_missing and isinstance(specified_dtypes, dict):
        # Convert all non-specified integer columns to floats
        for c in head.columns:
            if is_integer_dtype(head[c].dtype) and c not in specified_dtypes:
                head[c] = head[c].astype(float)

    values = [
        [
            [convert_legacy_task(k, ts, {}) for k, ts in dsk.dask.items()]
            for dsk in block
        ]
        for block in values
    ]

    return text_blocks_to_pandas(
        reader,
        values,
        header,
        head,
        kwargs,
        enforce=enforce,
        specified_dtypes=specified_dtypes,
        path=path,
        blocksize=blocksize,
        urlpath=urlpath,
    )


READ_DOC_TEMPLATE = """
Read {file_type} files into a Dask.DataFrame

This parallelizes the :func:`pandas.{reader}` function in the following ways:

- It supports loading many files at once using globstrings:

    >>> df = dd.{reader}('myfiles.*.csv')  # doctest: +SKIP

- In some cases it can break up large files:

    >>> df = dd.{reader}('largefile.csv', blocksize=25e6)  # 25MB chunks  # doctest: +SKIP

- It can read CSV files from external resources (e.g. S3, HDFS) by
  providing a URL:

    >>> df = dd.{reader}('s3://bucket/myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs:///myfiles.*.csv')  # doctest: +SKIP
    >>> df = dd.{reader}('hdfs://namenode.example.com/myfiles.*.csv')  # doctest: +SKIP

Internally ``dd.{reader}`` uses :func:`pandas.{reader}` and supports many of the
same keyword arguments with the same performance guarantees. See the docstring
for :func:`pandas.{reader}` for more information on available keyword arguments.

Parameters
----------
urlpath : string or list
    Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
    to read from alternative filesystems. To read from multiple files you
    can pass a globstring or a list of paths, with the caveat that they
    must all have the same protocol.
blocksize : str, int or None, optional
    Number of bytes by which to cut up larger files. Default value is computed
    based on available physical memory and the number of cores, up to a maximum
    of 64MB. Can be a number like ``64000000`` or a string like ``"64MB"``. If
    ``None``, a single block is used for each file.
sample : int, optional
    Number of bytes to use when determining dtypes
assume_missing : bool, optional
    If True, all integer columns that aren't specified in ``dtype`` are assumed
    to contain missing values, and are converted to floats. Default is False.
storage_options : dict, optional
    Extra options that make sense for a particular storage connection, e.g.
    host, port, username, password, etc.
include_path_column : bool or str, optional
    Whether or not to include the path to each particular file. If True a new
    column is added to the dataframe called ``path``. If str, sets new column
    name. Default is False.
**kwargs
    Extra keyword arguments to forward to :func:`pandas.{reader}`.

Notes
-----
Dask dataframe tries to infer the ``dtype`` of each column by reading a sample
from the start of the file (or of the first file if it's a glob). Usually this
works fine, but if the ``dtype`` is different later in the file (or in other
files) this can cause issues. For example, if all the rows in the sample had
integer dtypes, but later on there was a ``NaN``, then this would error at
compute time. To fix this, you have a few options:

- Provide explicit dtypes for the offending columns using the ``dtype``
  keyword. This is the recommended solution.

- Use the ``assume_missing`` keyword to assume that all columns inferred as
  integers contain missing values, and convert them to floats.

- Increase the size of the sample using the ``sample`` keyword.

It should also be noted that this function may fail if a {file_type} file
includes quoted strings that contain the line terminator. To get around this
you can specify ``blocksize=None`` to not split files into multiple partitions,
at the cost of reduced parallelism.
"""


def make_reader(reader, reader_name, file_type):
    def read(
        urlpath,
        blocksize="default",
        lineterminator=None,
        compression="infer",
        sample=256000,
        sample_rows=10,
        enforce=False,
        assume_missing=False,
        storage_options=None,
        include_path_column=False,
        **kwargs,
    ):
        return read_pandas(
            reader,
            urlpath,
            blocksize=blocksize,
            lineterminator=lineterminator,
            compression=compression,
            sample=sample,
            sample_rows=sample_rows,
            enforce=enforce,
            assume_missing=assume_missing,
            storage_options=storage_options,
            include_path_column=include_path_column,
            **kwargs,
        )

    read.__doc__ = READ_DOC_TEMPLATE.format(reader=reader_name, file_type=file_type)
    read.__name__ = reader_name
    return read


read_csv = dataframe_creation_dispatch.register_inplace(
    backend="pandas",
    name="read_csv",
)(make_reader(pd.read_csv, "read_csv", "CSV"))


read_table = make_reader(pd.read_table, "read_table", "delimited")
read_fwf = make_reader(pd.read_fwf, "read_fwf", "fixed-width")


def _write_csv(df, fil, *, depend_on=None, **kwargs):
    with fil as f:
        df.to_csv(f, **kwargs)
    return os.path.normpath(fil.path)


[docs]def to_csv( df, filename, single_file=False, encoding="utf-8", mode="wt", name_function=None, compression=None, compute=True, scheduler=None, storage_options=None, header_first_partition_only=None, compute_kwargs=None, **kwargs, ): """ Store Dask DataFrame to CSV files One filename per partition will be created. You can specify the filenames in a variety of ways. Use a globstring:: >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP The * will be replaced by the increasing sequence 0, 1, 2, ... :: /path/to/data/export-0.csv /path/to/data/export-1.csv Use a globstring and a ``name_function=`` keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices. >>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1)) >>> name(0) '2015-01-01' >>> name(15) '2015-01-16' >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP :: /path/to/data/export-2015-01-01.csv /path/to/data/export-2015-01-02.csv ... You can also provide an explicit list of paths:: >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP >>> df.to_csv(paths) # doctest: +SKIP You can also provide a directory name: >>> df.to_csv('/path/to/data') # doctest: +SKIP The files will be numbered 0, 1, 2, (and so on) suffixed with '.part': :: /path/to/data/0.part /path/to/data/1.part Parameters ---------- df : dask.DataFrame Data to save filename : string or list Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` to save to remote filesystems. single_file : bool, default False Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. encoding : string, default 'utf-8' A string representing the encoding to use in the output file. mode : str, default 'w' Python file mode. The default is 'w' (or 'wt'), for writing a new file or overwriting an existing file in text mode. 'a' (or 'at') will append to an existing file in text mode or create a new file if it does not already exist. See :py:func:`open`. name_function : callable, default None Function accepting an integer (partition index) and producing a string to replace the asterisk in the given filename globstring. Should preserve the lexicographic order of partitions. Not supported when ``single_file`` is True. compression : string, optional A string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz', only used when the first argument is a filename. compute : bool, default True If True, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time. storage_options : dict Parameters passed on to the backend filesystem class. header_first_partition_only : bool, default None If set to True, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (``single_file`` is False) and written only once under the single file mode (``single_file`` is True). It must be True under the single file mode. compute_kwargs : dict, optional Options to be passed in to the compute method kwargs : dict, optional Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`. Returns ------- The names of the file written if they were computed right away. If not, the delayed tasks associated with writing the files. Raises ------ ValueError If ``header_first_partition_only`` is set to False or ``name_function`` is specified when ``single_file`` is True. See Also -------- fsspec.open_files """ if single_file and name_function is not None: raise ValueError("name_function is not supported under the single file mode") if header_first_partition_only is None: header_first_partition_only = single_file elif not header_first_partition_only and single_file: raise ValueError( "header_first_partition_only cannot be False in the single file mode." ) file_options = dict( compression=compression, encoding=encoding, newline="", **(storage_options or {}), ) to_csv_chunk = delayed(_write_csv, pure=False) dfs = df.to_delayed() if single_file: first_file = open_file(filename, mode=mode, **file_options) value = to_csv_chunk(dfs[0], first_file, **kwargs) append_mode = mode if "a" in mode else mode + "a" append_mode = append_mode.replace("w", "").replace("x", "") append_file = open_file(filename, mode=append_mode, **file_options) kwargs["header"] = False for d in dfs[1:]: value = to_csv_chunk(d, append_file, depend_on=value, **kwargs) values = [value] files = [first_file] else: files = open_files( filename, mode=mode, name_function=name_function, num=df.npartitions, **file_options, ) values = [to_csv_chunk(dfs[0], files[0], **kwargs)] if header_first_partition_only: kwargs["header"] = False values.extend( [to_csv_chunk(d, f, **kwargs) for d, f in zip(dfs[1:], files[1:])] ) if compute: if compute_kwargs is None: compute_kwargs = dict() if scheduler is not None: warn( "The 'scheduler' keyword argument for `to_csv()` is deprecated and" "will be removed in a future version. " "Please use the `compute_kwargs` argument instead. " f"For example, df.to_csv(..., compute_kwargs={{scheduler: {scheduler}}})", FutureWarning, ) if ( scheduler is not None and compute_kwargs.get("scheduler") is not None and compute_kwargs.get("scheduler") != scheduler ): raise ValueError( f"Differing values for 'scheduler' have been passed in.\n" f"scheduler argument: {scheduler}\n" f"via compute_kwargs: {compute_kwargs.get('scheduler')}" ) if scheduler is not None and compute_kwargs.get("scheduler") is None: compute_kwargs["scheduler"] = scheduler import dask return list(dask.compute(*values, **compute_kwargs)) else: return values
from dask.dataframe.core import _Frame _Frame.to_csv.__doc__ = to_csv.__doc__