import io
from functools import partial
from fsspec.core import open_files
from tlz import concat
from dask.bag.core import from_delayed
from dask.bytes import read_bytes
from dask.delayed import delayed
from dask.utils import parse_bytes, system_encoding
delayed = delayed(pure=True)
[docs]def read_text(
urlpath,
blocksize=None,
compression="infer",
encoding=system_encoding,
errors="strict",
linedelimiter=None,
collection=True,
storage_options=None,
files_per_partition=None,
include_path=False,
):
"""Read lines from text files
Parameters
----------
urlpath : string or list
Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
to read from alternative filesystems. To read from multiple files you
can pass a globstring or a list of paths, with the caveat that they
must all have the same protocol.
blocksize: None, int, or str
Size (in bytes) to cut up larger files. Streams by default.
Can be ``None`` for streaming, an integer number of bytes, or a string
like "128MiB"
compression: string
Compression format like 'gzip' or 'xz'. Defaults to 'infer'
encoding: string
errors: string
linedelimiter: string or None
collection: bool, optional
Return dask.bag if True, or list of delayed values if false
storage_options: dict
Extra options that make sense to a particular storage connection, e.g.
host, port, username, password, etc.
files_per_partition: None or int
If set, group input files into partitions of the requested size,
instead of one partition per file. Mutually exclusive with blocksize.
include_path: bool
Whether or not to include the path in the bag.
If true, elements are tuples of (line, path).
Default is False.
Examples
--------
>>> b = read_text('myfiles.1.txt') # doctest: +SKIP
>>> b = read_text('myfiles.*.txt') # doctest: +SKIP
>>> b = read_text('myfiles.*.txt.gz') # doctest: +SKIP
>>> b = read_text('s3://bucket/myfiles.*.txt') # doctest: +SKIP
>>> b = read_text('s3://key:secret@bucket/myfiles.*.txt') # doctest: +SKIP
>>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt') # doctest: +SKIP
Parallelize a large file by providing the number of uncompressed bytes to
load into each partition.
>>> b = read_text('largefile.txt', blocksize='10MB') # doctest: +SKIP
Get file paths of the bag by setting include_path=True
>>> b = read_text('myfiles.*.txt', include_path=True) # doctest: +SKIP
>>> b.take(1) # doctest: +SKIP
(('first line of the first file', '/home/dask/myfiles.0.txt'),)
Returns
-------
dask.bag.Bag or list
dask.bag.Bag if collection is True or list of Delayed lists otherwise.
See Also
--------
from_sequence: Build bag from Python sequence
"""
if blocksize is not None and files_per_partition is not None:
raise ValueError("Only one of blocksize or files_per_partition can be set")
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)
if blocksize is None:
if linedelimiter in [None, "", "\n", "\r", "\r\n"]:
newline = linedelimiter
linedelimiter = None
else:
newline = ""
files = open_files(
urlpath,
mode="rt",
encoding=encoding,
errors=errors,
compression=compression,
newline=newline,
**(storage_options or {}),
)
if files_per_partition is None:
blocks = [
delayed(list)(
delayed(
partial(file_to_blocks, include_path, delimiter=linedelimiter)
)(fil)
)
for fil in files
]
else:
blocks = []
for start in range(0, len(files), files_per_partition):
block_files = files[start : (start + files_per_partition)]
block_lines = delayed(concat)(
delayed(map)(
partial(file_to_blocks, include_path, delimiter=linedelimiter),
block_files,
)
)
blocks.append(block_lines)
else:
# special case for linedelimiter=None: we will need to split on an actual bytestring
# and the line reader will then use "universal" mode. Just as well that \r\n and \n
# will both work (thankfully \r for MacOS is no longer a thing)
o = read_bytes(
urlpath,
delimiter=linedelimiter.encode() if linedelimiter is not None else b"\n",
blocksize=blocksize,
sample=False,
compression=compression,
include_path=include_path,
**(storage_options or {}),
)
raw_blocks = o[1]
blocks = [
delayed(decode)(b, encoding, errors, linedelimiter)
for b in concat(raw_blocks)
]
if include_path:
paths = list(
concat([[path] * len(raw_blocks[i]) for i, path in enumerate(o[2])])
)
blocks = [
delayed(attach_path)(entry, path) for entry, path in zip(blocks, paths)
]
if not blocks:
raise ValueError("No files found", urlpath)
if collection:
blocks = from_delayed(blocks)
return blocks
def file_to_blocks(include_path, lazy_file, delimiter=None):
# blocksize is None branch
with lazy_file as f:
if delimiter is not None:
text = f.read()
if not text:
return []
parts = text.split(delimiter)
yield from (
(line, lazy_file.path) if include_path else line
for line in [line + delimiter for line in parts[:-1]] + parts[-1:]
)
else:
for line in f:
yield (line, lazy_file.path) if include_path else line
def attach_path(block, path):
for p in block:
yield (p, path)
def decode(block, encoding, errors, line_delimiter):
# blocksize is not None branch
text = block.decode(encoding, errors)
if line_delimiter in [None, "", "\n", "\r", "\r\n"]:
lines = io.StringIO(text, newline=line_delimiter)
return list(lines)
else:
if not text:
return []
parts = text.split(line_delimiter)
out = [t + line_delimiter for t in parts[:-1]] + (
parts[-1:] if not text.endswith(line_delimiter) else []
)
return out