from fsspec.core import OpenFile, get_fs_token_paths
from fsspec.utils import infer_compression, read_block
from ..base import tokenize
from ..delayed import delayed
from ..utils import is_integer, parse_bytes
"""Given a path or paths, return delayed objects that read from those paths.
The path may be a filename like ``'2015-01-01.csv'`` or a globstring
The path may be preceded by a protocol, like ``s3://`` or ``hdfs://`` if
those libraries are installed.
This cleanly breaks data by a delimiter if given, so that block boundaries
start directly after a delimiter and end on the delimiter.
urlpath : string or list
Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
to read from alternative filesystems. To read from multiple files you
can pass a globstring or a list of paths, with the caveat that they
must all have the same protocol.
delimiter : bytes
An optional delimiter, like ``b'\\n'`` on which to split blocks of
not_zero : bool
Force seek of start-of-file delimiter, discarding header.
blocksize : int, str
Chunk size in bytes, defaults to "128 MiB"
compression : string or None
String like 'gzip' or 'xz'. Must support efficient random access.
sample : int, string, or boolean
Whether or not to return a header sample.
Values can be ``False`` for "no sample requested"
Or an integer or string value like ``2**20`` or ``"1 MiB"``
include_path : bool
Whether or not to include the path with the bytes representing a particular file.
Default is False.
**kwargs : dict
Extra options that make sense to a particular storage connection, e.g.
host, port, username, password, etc.
>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\\n') # doctest: +SKIP
>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\\n') # doctest: +SKIP
>>> sample, paths, blocks = read_bytes('2015-*-*.csv', include_path=True) # doctest: +SKIP
sample : bytes
The sample header
blocks : list of lists of ``dask.Delayed``
Each list corresponds to a file, and each delayed object computes to a
block of bytes from that file.
paths : list of strings, only included if include_path is True
List of same length as blocks, where each item is the path to the file
represented in the corresponding block.
if not isinstance(urlpath, (str, list, tuple, os.PathLike)):
raise TypeError("Path should be a string, os.PathLike, list or tuple")
fs, fs_token, paths = get_fs_token_paths(urlpath, mode="rb", storage_options=kwargs)
if len(paths) == 0:
raise IOError("%s resolved to no files" % urlpath)
if blocksize is not None:
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)
if not is_integer(blocksize):
raise TypeError("blocksize must be an integer")
blocksize = int(blocksize)
if blocksize is None:
offsets = [] * len(paths)
lengths = [[None]] * len(paths)
offsets = 
lengths = 
for path in paths:
if compression == "infer":
comp = infer_compression(path)
comp = compression
if comp is not None:
"Cannot do chunked reads on compressed files. "
"To read, set blocksize=None"
size = fs.info(path)["size"]
if size is None:
"Backing filesystem couldn't determine file size, cannot "
"do chunked reads. To read, set blocksize=None."
off = list(range(0, size, blocksize))
length = [blocksize] * len(off)
off = 1
length -= 1
delayed_read = delayed(read_block_from_file)
out = 
for path, offset, length in zip(paths, offsets, lengths):
token = tokenize(fs_token, delimiter, path, fs.ukey(path), compression, offset)
keys = ["read-block-%s-%s" % (o, token) for o in offset]
values = [
OpenFile(fs, path, compression=compression),
for o, key, l in zip(offset, keys, length)
if sample is True:
sample = "10 kiB" # backwards compatibility
if isinstance(sample, str):
sample = parse_bytes(sample)
with OpenFile(fs, paths, compression=compression) as f:
# read block without seek (because we start at zero)
if delimiter is None:
sample = f.read(sample)
sample_buff = f.read(sample)
new = f.read(sample)
if not new:
if delimiter in new:
sample_buff = (
sample_buff + new.split(delimiter, 1) + delimiter
sample_buff = sample_buff + new
sample = sample_buff
return sample, out, paths
return sample, out
def read_block_from_file(lazy_file, off, bs, delimiter):
with copy.copy(lazy_file) as f:
if off == 0 and bs is None:
return read_block(f, off, bs, delimiter)