Skip to content

Commit

Permalink
Add Streaming parser (--stream)
Browse files Browse the repository at this point in the history
Streaming means that outputs are produced as soon as possible.  With the
`foreach` syntax one can write programs which reduce portions of the
streaming parse of a large input (reduce into proper JSON values, for
example), and discard the rest, processing incrementally.

This:

    $ jq -c --stream .

should produce the same output as this:

    $ jq -c '. as $dot | path(..) as $p | $dot | getpath($p) | [$p,.]'

The output of `jq --stream .` should be a sequence of`[[<path>],<leaf>]`
and `[[<path>]]` values.  The latter indicate that the array/object at
that path ended.

Scalars and empty arrays and objects are leaf values for this purpose.

For example, a truncated input produces a path as soon as possible, then
later the error:

    $ printf '[0,\n'|./jq -c --stream .
    [[0],0]
    parse error: Unfinished JSON term at EOF at line 3, column 0
    $
  • Loading branch information
nicowilliams committed Dec 27, 2014
1 parent 906d253 commit 5bfb978
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 80 deletions.
14 changes: 14 additions & 0 deletions builtin.c
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,18 @@ static jv f_modulemeta(jq_state *jq, jv a) {
return load_module_meta(jq, a);
}

static jv f_input(jq_state *jq, jv input) {
jv_free(input);
jq_input_cb cb;
void *data;
jq_get_input_cb(jq, &cb, &data);
jv v = cb(jq, data);
if (jv_is_valid(v) || jv_invalid_has_msg(jv_copy(v)))
return v;
return jv_invalid_with_msg(jv_string("break"));
}



#define LIBM_DD(name) \
{(cfunction_ptr)f_ ## name, "_" #name, 1},
Expand Down Expand Up @@ -912,6 +924,7 @@ static const struct cfunction function_list[] = {
{(cfunction_ptr)f_env, "env", 1},
{(cfunction_ptr)f_match, "_match_impl", 4},
{(cfunction_ptr)f_modulemeta, "modulemeta", 1},
{(cfunction_ptr)f_input, "_input", 1},
};
#undef LIBM_DD

Expand Down Expand Up @@ -1014,6 +1027,7 @@ static const char* const jq_builtins[] = {
"def nulls: select(type == \"null\");",
"def values: select(. != null);",
"def scalars: select(. == null or . == true or . == false or type == \"number\" or type == \"string\");",
"def scalars_or_empty: select(. == null or . == true or . == false or type == \"number\" or type == \"string\" or ((type==\"array\" or type==\"object\") and length==0));",
"def leaf_paths: paths(scalars);",
"def join($x): reduce .[] as $i (\"\"; . + (if . == \"\" then $i else $x + $i end));",
"def flatten: reduce .[] as $i ([]; if $i | type == \"array\" then . + ($i | flatten) else . + [$i] end);",
Expand Down
34 changes: 34 additions & 0 deletions docs/content/3.manual/manual.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ sections:
RS. This more also parses the output of jq without the `--seq`
option.
* `--stream`:
Parse the input in streaming fashion, outputing arrays of path
and leaf values (scalars and empty arrays or empty objects).
For example, `"a"` becomes `[[],"a"]`, and `[[],"a",["b"]]`
becomes `[[0],[]]`, `[[1],"a"]`, and `[[1,0],"b"]`.
This is useful for processing very large inputs. Use this in
conjunction with filtering and the `reduce` and `foreach` syntax
to reduce large inputs incrementally.
* `--slurp`/`-s`:
Instead of running the filter for each JSON object in the
Expand Down Expand Up @@ -2205,6 +2216,29 @@ sections:
input: '1'
output: ['[1,2,4,8,16,32,64]']

- title: 'I/O'
body: |
At this time jq has minimal support for I/O, mostly in the
form of control over when inputs are read. Two builtins functions
are provided for this, `input` and `inputs`, that read from the
same sources (e.g., `stdin`, files named on the command-line) as
jq itself. These two builtins, and jq's own reading actions, can
be interleaved with each other.
- title: "`input`"
body: |
Outputs one new input.
- title: "`inputs`"
body: |
Outputs all remaining inputs, one by one.
This is primarily useful for reductions over a program's
inputs.
- title: Assignment
body: |
Expand Down
12 changes: 12 additions & 0 deletions execute.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ struct jq_state {
int initial_execution;

jv attrs;
jq_input_cb input_cb;
void *input_cb_data;
};

struct closure {
Expand Down Expand Up @@ -1037,3 +1039,13 @@ jv jq_get_attr(jq_state *jq, jv attr) {
void jq_dump_disassembly(jq_state *jq, int indent) {
dump_disassembly(indent, jq->bc);
}

void jq_set_input_cb(jq_state *jq, jq_input_cb cb, void *data) {
jq->input_cb = cb;
jq->input_cb_data = data;
}

void jq_get_input_cb(jq_state *jq, jq_input_cb *cb, void **data) {
*cb = jq->input_cb;
*data = jq->input_cb_data;
}
4 changes: 4 additions & 0 deletions jq.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ void jq_start(jq_state *, jv value, int);
jv jq_next(jq_state *);
void jq_teardown(jq_state **);

typedef jv (*jq_input_cb)(jq_state *, void *);
void jq_set_input_cb(jq_state *, jq_input_cb, void *);
void jq_get_input_cb(jq_state *, jq_input_cb *, void **);

void jq_set_attrs(jq_state *, jv);
jv jq_get_attrs(jq_state *);
jv jq_get_jq_origin(jq_state *);
Expand Down
7 changes: 6 additions & 1 deletion jv.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,11 @@ void jv_dump(jv, int flags);
void jv_show(jv, int flags);
jv jv_dump_string(jv, int flags);

#define JV_PARSE_SEQ 1
enum {
JV_PARSE_SEQ = 1,
JV_PARSE_STREAMING = 2,
JV_PARSE_STREAM_ERRORS = 4,
};

jv jv_parse(const char* string);
jv jv_parse_sized(const char* string, int length);
Expand All @@ -183,6 +187,7 @@ jv jv_load_file(const char *, int);
struct jv_parser;
struct jv_parser* jv_parser_new(int);
void jv_parser_set_buf(struct jv_parser*, const char*, int, int);
int jv_parser_remaining(struct jv_parser*);
jv jv_parser_next(struct jv_parser*);
void jv_parser_free(struct jv_parser*);

Expand Down
Loading

0 comments on commit 5bfb978

Please sign in to comment.