Source code for dask.dataframe.io.orc.core

import copy

from fsspec.core import get_fs_token_paths
from fsspec.utils import stringify_path
from packaging.version import parse as parse_version

from ....base import compute_as_if_collection, tokenize
from ....highlevelgraph import HighLevelGraph
from ....layers import DataFrameIOLayer
from ....utils import apply
from ...core import DataFrame, Scalar, new_dd_object
from .utils import ORCEngine


class ORCFunctionWrapper:
    """
    ORC Function-Wrapper Class
    Reads ORC data from disk to produce a partition.
    """

    def __init__(self, fs, columns, schema, engine, index):
        self.fs = fs
        self.columns = columns
        self.schema = schema
        self.engine = engine
        self.index = index

    def project_columns(self, columns):
        """Return a new ORCFunctionWrapper object with
        a sub-column projection.
        """
        if columns == self.columns:
            return self
        func = copy.deepcopy(self)
        func.columns = columns
        return func

    def __call__(self, parts):
        _df = self.engine.read_partition(
            self.fs,
            parts,
            self.schema,
            self.columns,
        )
        if self.index:
            _df.set_index(self.index, inplace=True)
        return _df


def _get_engine(engine, write=False):
    # Get engine
    if engine == "pyarrow":
        import pyarrow as pa

        from .arrow import ArrowORCEngine

        if write and parse_version(pa.__version__) < parse_version("4.0.0"):
            raise ValueError("to_orc is not supported for pyarrow<4.0.0")

        return ArrowORCEngine
    elif not isinstance(engine, ORCEngine):
        raise TypeError("engine must be 'pyarrow', or an ORCEngine object")
    return engine


[docs]def read_orc( path, engine="pyarrow", columns=None, index=None, split_stripes=1, aggregate_files=None, storage_options=None, ): """Read dataframe from ORC file(s) Parameters ---------- path: str or list(str) Location of file(s), which can be a full URL with protocol specifier, and may include glob character if a single string. engine: 'pyarrow' or ORCEngine Backend ORC engine to use for IO. Default is "pyarrow". columns: None or list(str) Columns to load. If None, loads all. index: str Column name to set as index. split_stripes: int or False Maximum number of ORC stripes to include in each output-DataFrame partition. Use False to specify a 1-to-1 mapping between files and partitions. Default is 1. aggregate_files : bool, default False Whether distinct file paths may be aggregated into the same output partition. A setting of True means that any two file paths may be aggregated into the same output partition, while False means that inter-file aggregation is prohibited. storage_options: None or dict Further parameters to pass to the bytes backend. Returns ------- Dask.DataFrame (even if there is only one column) Examples -------- >>> df = dd.read_orc('https://github.com/apache/orc/raw/' ... 'master/examples/demo-11-zlib.orc') # doctest: +SKIP """ # Get engine engine = _get_engine(engine) # Process file path(s) storage_options = storage_options or {} fs, fs_token, paths = get_fs_token_paths( path, mode="rb", storage_options=storage_options ) # Let backend engine generate a list of parts # from the ORC metadata. The backend should also # return the schema and DataFrame-collection metadata parts, schema, meta = engine.read_metadata( fs, paths, columns, index, split_stripes, aggregate_files, ) # Construct and return a Blockwise layer label = "read-orc-" output_name = label + tokenize(fs_token, path, columns) layer = DataFrameIOLayer( output_name, columns, parts, ORCFunctionWrapper(fs, columns, schema, engine, index), label=label, ) graph = HighLevelGraph({output_name: layer}, {output_name: set()}) return new_dd_object(graph, output_name, meta, [None] * (len(parts) + 1))
def to_orc( df, path, engine="pyarrow", write_index=True, storage_options=None, compute=True, compute_kwargs=None, ): """Store Dask.dataframe to ORC files Notes ----- Each partition will be written to a separate file. Parameters ---------- df : dask.dataframe.DataFrame path : string or pathlib.Path Destination directory for data. Prepend with protocol like ``s3://`` or ``hdfs://`` for remote data. engine : 'pyarrow' or ORCEngine Parquet library to use. If only one library is installed, it will use that one; if both, it will use 'fastparquet'. write_index : boolean, default True Whether or not to write the index. Defaults to True. storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. compute : bool, default True If True (default) then the result is computed immediately. If False then a ``dask.delayed`` object is returned for future computation. compute_kwargs : dict, default True Options to be passed in to the compute method Examples -------- >>> df = dd.read_csv(...) # doctest: +SKIP >>> df.to_orc('/path/to/output/', ...) # doctest: +SKIP See Also -------- read_orc: Read ORC data to dask.dataframe """ # Get engine engine = _get_engine(engine, write=True) if hasattr(path, "name"): path = stringify_path(path) fs, _, _ = get_fs_token_paths(path, mode="wb", storage_options=storage_options) # Trim any protocol information from the path before forwarding path = fs._strip_protocol(path) if not write_index: # Not writing index - might as well drop it df = df.reset_index(drop=True) # Use df.npartitions to define file-name list fs.mkdirs(path, exist_ok=True) filenames = [f"part.{i}.orc" for i in range(df.npartitions)] # Construct IO graph dsk = {} name = "to-orc-" + tokenize( df, fs, path, engine, write_index, storage_options, ) final_name = name + "-final" for d, filename in enumerate(filenames): dsk[(name, d)] = ( apply, engine.write_partition, [ (df._name, d), path, fs, filename, ], ) part_tasks = list(dsk.keys()) dsk[(final_name, 0)] = (lambda x: None, part_tasks) graph = HighLevelGraph.from_collections((final_name, 0), dsk, dependencies=[df]) # Compute or return future if compute: if compute_kwargs is None: compute_kwargs = dict() return compute_as_if_collection(DataFrame, graph, part_tasks, **compute_kwargs) return Scalar(graph, final_name, "")