Source code for dask.bag.avro

from __future__ import annotations

import io
import uuid

from fsspec.core import OpenFile, get_fs_token_paths, open_files
from fsspec.utils import read_block
from fsspec.utils import tokenize as fs_tokenize

from dask.highlevelgraph import HighLevelGraph

MAGIC = b"Obj\x01"
SYNC_SIZE = 16


def read_long(fo):
    """variable-length, zig-zag encoding."""
    c = fo.read(1)
    b = ord(c)
    n = b & 0x7F
    shift = 7
    while (b & 0x80) != 0:
        b = ord(fo.read(1))
        n |= (b & 0x7F) << shift
        shift += 7
    return (n >> 1) ^ -(n & 1)


def read_bytes(fo):
    """a long followed by that many bytes of data."""
    size = read_long(fo)
    return fo.read(size)


def read_header(fo):
    """Extract an avro file's header

    fo: file-like
        This should be in bytes mode, e.g., io.BytesIO

    Returns dict representing the header

    Parameters
    ----------
    fo: file-like
    """
    assert fo.read(len(MAGIC)) == MAGIC, "Magic avro bytes missing"
    meta = {}
    out = {"meta": meta}
    while True:
        n_keys = read_long(fo)
        if n_keys == 0:
            break
        for _ in range(n_keys):
            # ignore dtype mapping for bag version
            read_bytes(fo)  # schema keys
            read_bytes(fo)  # schema values
    out["sync"] = fo.read(SYNC_SIZE)
    out["header_size"] = fo.tell()
    fo.seek(0)
    out["head_bytes"] = fo.read(out["header_size"])
    return out


def open_head(fs, path, compression):
    """Open a file just to read its head and size"""
    with OpenFile(fs, path, compression=compression) as f:
        head = read_header(f)
    size = fs.info(path)["size"]
    return head, size


[docs]def read_avro(urlpath, blocksize=100000000, storage_options=None, compression=None): """Read set of avro files Use this with arbitrary nested avro schemas. Please refer to the fastavro documentation for its capabilities: https://github.com/fastavro/fastavro Parameters ---------- urlpath: string or list Absolute or relative filepath, URL (may include protocols like ``s3://``), or globstring pointing to data. blocksize: int or None Size of chunks in bytes. If None, there will be no chunking and each file will become one partition. storage_options: dict or None passed to backend file-system compression: str or None Compression format of the targe(s), like 'gzip'. Should only be used with blocksize=None. """ from dask import compute, delayed from dask.bag import from_delayed from dask.utils import import_required import_required( "fastavro", "fastavro is a required dependency for using bag.read_avro()." ) storage_options = storage_options or {} if blocksize is not None: fs, fs_token, paths = get_fs_token_paths( urlpath, mode="rb", storage_options=storage_options ) dhead = delayed(open_head) out = compute(*[dhead(fs, path, compression) for path in paths]) heads, sizes = zip(*out) dread = delayed(read_chunk) offsets = [] lengths = [] for size in sizes: off = list(range(0, size, blocksize)) length = [blocksize] * len(off) offsets.append(off) lengths.append(length) out = [] for path, offset, length, head in zip(paths, offsets, lengths, heads): delimiter = head["sync"] f = OpenFile(fs, path, compression=compression) token = fs_tokenize( fs_token, delimiter, path, fs.ukey(path), compression, offset ) keys = [f"read-avro-{o}-{token}" for o in offset] values = [ dread(f, o, l, head, dask_key_name=key) for o, key, l in zip(offset, keys, length) ] out.extend(values) return from_delayed(out) else: files = open_files(urlpath, compression=compression, **storage_options) dread = delayed(read_file) chunks = [dread(fo) for fo in files] return from_delayed(chunks)
def read_chunk(fobj, off, l, head): """Get rows from raw bytes block""" import fastavro if hasattr(fastavro, "iter_avro"): reader = fastavro.iter_avro else: reader = fastavro.reader with fobj as f: chunk = read_block(f, off, l, head["sync"]) head_bytes = head["head_bytes"] if not chunk.startswith(MAGIC): chunk = head_bytes + chunk i = io.BytesIO(chunk) return list(reader(i)) def read_file(fo): """Get rows from file-like""" import fastavro if hasattr(fastavro, "iter_avro"): reader = fastavro.iter_avro else: reader = fastavro.reader with fo as f: return list(reader(f))
[docs]def to_avro( b, filename, schema, name_function=None, storage_options=None, codec="null", sync_interval=16000, metadata=None, compute=True, **kwargs, ): """Write bag to set of avro files The schema is a complex dictionary describing the data, see https://avro.apache.org/docs/1.8.2/gettingstartedpython.html#Defining+a+schema and https://fastavro.readthedocs.io/en/latest/writer.html . It's structure is as follows:: {'name': 'Test', 'namespace': 'Test', 'doc': 'Descriptive text', 'type': 'record', 'fields': [ {'name': 'a', 'type': 'int'}, ]} where the "name" field is required, but "namespace" and "doc" are optional descriptors; "type" must always be "record". The list of fields should have an entry for every key of the input records, and the types are like the primitive, complex or logical types of the Avro spec ( https://avro.apache.org/docs/1.8.2/spec.html ). Results in one avro file per input partition. Parameters ---------- b: dask.bag.Bag filename: list of str or str Filenames to write to. If a list, number must match the number of partitions. If a string, must include a glob character "*", which will be expanded using name_function schema: dict Avro schema dictionary, see above name_function: None or callable Expands integers into strings, see ``dask.bytes.utils.build_name_function`` storage_options: None or dict Extra key/value options to pass to the backend file-system codec: 'null', 'deflate', or 'snappy' Compression algorithm sync_interval: int Number of records to include in each block within a file metadata: None or dict Included in the file header compute: bool If True, files are written immediately, and function blocks. If False, returns delayed objects, which can be computed by the user where convenient. kwargs: passed to compute(), if compute=True Examples -------- >>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'value': 100}, ... {'name': 'Bob', 'value': 200}]) >>> schema = {'name': 'People', 'doc': "Set of people's scores", ... 'type': 'record', ... 'fields': [ ... {'name': 'name', 'type': 'string'}, ... {'name': 'value', 'type': 'int'}]} >>> b.to_avro('my-data.*.avro', schema) # doctest: +SKIP ['my-data.0.avro', 'my-data.1.avro'] """ # TODO infer schema from first partition of data from dask.utils import import_required import_required( "fastavro", "fastavro is a required dependency for using bag.to_avro()." ) _verify_schema(schema) storage_options = storage_options or {} files = open_files( filename, "wb", name_function=name_function, num=b.npartitions, **storage_options, ) name = "to-avro-" + uuid.uuid4().hex dsk = { (name, i): ( _write_avro_part, (b.name, i), f, schema, codec, sync_interval, metadata, ) for i, f in enumerate(files) } graph = HighLevelGraph.from_collections(name, dsk, dependencies=[b]) out = type(b)(graph, name, b.npartitions) if compute: out.compute(**kwargs) return [f.path for f in files] else: return out.to_delayed()
def _verify_schema(s): assert isinstance(s, dict), "Schema must be dictionary" for field in ["name", "type", "fields"]: assert field in s, "Schema missing '%s' field" % field assert s["type"] == "record", "Schema must be of type 'record'" assert isinstance(s["fields"], list), "Fields entry must be a list" for f in s["fields"]: assert "name" in f and "type" in f, "Field spec incomplete: %s" % f def _write_avro_part(part, f, schema, codec, sync_interval, metadata): """Create single avro file from list of dictionaries""" import fastavro with f as f: fastavro.writer(f, schema, part, codec, sync_interval, metadata)