Source code for dask.dataframe.io.json

import io

import pandas as pd
from fsspec.core import open_files

from ...base import compute as dask_compute
from ...bytes import read_bytes
from ...core import flatten
from ...delayed import delayed
from ..utils import insert_meta_param_description, make_meta
from .io import from_delayed


[docs]def to_json( df, url_path, orient="records", lines=None, storage_options=None, compute=True, encoding="utf-8", errors="strict", compression=None, compute_kwargs=None, name_function=None, **kwargs, ): """Write dataframe into JSON text files This utilises ``pandas.DataFrame.to_json()``, and most parameters are passed through - see its docstring. Differences: orient is 'records' by default, with lines=True; this produces the kind of JSON output that is most common in big-data applications, and which can be chunked when reading (see ``read_json()``). Parameters ---------- df: dask.DataFrame Data to save url_path: str, list of str Location to write to. If a string, and there are more than one partitions in df, should include a glob character to expand into a set of file names, or provide a ``name_function=`` parameter. Supports protocol specifications such as ``"s3://"``. encoding, errors: The text encoding to implement, e.g., "utf-8" and how to respond to errors in the conversion (see ``str.encode()``). orient, lines, kwargs passed to pandas; if not specified, lines=True when orient='records', False otherwise. storage_options: dict Passed to backend file-system implementation compute: bool If true, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time. compute_kwargs : dict, optional Options to be passed in to the compute method encoding, errors: Text conversion, ``see str.encode()`` compression : string or None String like 'gzip' or 'xz'. name_function : callable, default None Function accepting an integer (partition index) and producing a string to replace the asterisk in the given filename globstring. Should preserve the lexicographic order of partitions. """ if lines is None: lines = orient == "records" if orient != "records" and lines: raise ValueError( "Line-delimited JSON is only available with" 'orient="records".' ) kwargs["orient"] = orient kwargs["lines"] = lines and orient == "records" outfiles = open_files( url_path, "wt", encoding=encoding, errors=errors, name_function=name_function, num=df.npartitions, compression=compression, **(storage_options or {}), ) parts = [ delayed(write_json_partition)(d, outfile, kwargs) for outfile, d in zip(outfiles, df.to_delayed()) ] if compute: if compute_kwargs is None: compute_kwargs = dict() dask_compute(parts, **compute_kwargs) return [f.path for f in outfiles] else: return parts
def write_json_partition(df, openfile, kwargs): with openfile as f: df.to_json(f, **kwargs)
[docs]@insert_meta_param_description def read_json( url_path, orient="records", lines=None, storage_options=None, blocksize=None, sample=2 ** 20, encoding="utf-8", errors="strict", compression="infer", meta=None, engine=pd.read_json, **kwargs, ): """Create a dataframe from a set of JSON files This utilises ``pandas.read_json()``, and most parameters are passed through - see its docstring. Differences: orient is 'records' by default, with lines=True; this is appropriate for line-delimited "JSON-lines" data, the kind of JSON output that is most common in big-data scenarios, and which can be chunked when reading (see ``read_json()``). All other options require blocksize=None, i.e., one partition per input file. Parameters ---------- url_path: str, list of str Location to read from. If a string, can include a glob character to find a set of file names. Supports protocol specifications such as ``"s3://"``. encoding, errors: The text encoding to implement, e.g., "utf-8" and how to respond to errors in the conversion (see ``str.encode()``). orient, lines, kwargs passed to pandas; if not specified, lines=True when orient='records', False otherwise. storage_options: dict Passed to backend file-system implementation blocksize: None or int If None, files are not blocked, and you get one partition per input file. If int, which can only be used for line-delimited JSON files, each partition will be approximately this size in bytes, to the nearest newline character. sample: int Number of bytes to pre-load, to provide an empty dataframe structure to any blocks without data. Only relevant is using blocksize. encoding, errors: Text conversion, ``see bytes.decode()`` compression : string or None String like 'gzip' or 'xz'. engine : function object, default ``pd.read_json`` The underlying function that dask will use to read JSON files. By default, this will be the pandas JSON reader (``pd.read_json``). $META Returns ------- dask.DataFrame Examples -------- Load single file >>> dd.read_json('myfile.1.json') # doctest: +SKIP Load multiple files >>> dd.read_json('myfile.*.json') # doctest: +SKIP >>> dd.read_json(['myfile.1.json', 'myfile.2.json']) # doctest: +SKIP Load large line-delimited JSON files using partitions of approx 256MB size >> dd.read_json('data/file*.csv', blocksize=2**28) """ if lines is None: lines = orient == "records" if orient != "records" and lines: raise ValueError( "Line-delimited JSON is only available with" 'orient="records".' ) if blocksize and (orient != "records" or not lines): raise ValueError( "JSON file chunking only allowed for JSON-lines" "input (orient='records', lines=True)." ) storage_options = storage_options or {} if blocksize: first, chunks = read_bytes( url_path, b"\n", blocksize=blocksize, sample=sample, compression=compression, **storage_options, ) chunks = list(flatten(chunks)) if meta is None: meta = read_json_chunk(first, encoding, errors, engine, kwargs) meta = make_meta(meta) parts = [ delayed(read_json_chunk)(chunk, encoding, errors, engine, kwargs, meta=meta) for chunk in chunks ] return from_delayed(parts, meta=meta) else: files = open_files( url_path, "rt", encoding=encoding, errors=errors, compression=compression, **storage_options, ) parts = [ delayed(read_json_file)(f, orient, lines, engine, kwargs) for f in files ] return from_delayed(parts, meta=meta)
def read_json_chunk(chunk, encoding, errors, engine, kwargs, meta=None): s = io.StringIO(chunk.decode(encoding, errors)) s.seek(0) df = engine(s, orient="records", lines=True, **kwargs) if meta is not None and df.empty: return meta else: return df def read_json_file(f, orient, lines, engine, kwargs): with f as f: return engine(f, orient=orient, lines=lines, **kwargs)