diff --git a/README.md b/README.md index d1e62dd..a968c7c 100644 --- a/README.md +++ b/README.md @@ -25,5 +25,3 @@ Will output: { a: 42 } { hello: 'world' } ``` - -If invalid JSON gets written, it's silently ignored. diff --git a/lib/json-stream.js b/lib/json-stream.js index 269d0a1..890f172 100644 --- a/lib/json-stream.js +++ b/lib/json-stream.js @@ -1,43 +1,62 @@ -var util = require('util'), - TransformStream = require('stream').Transform; +const util = require('util'); +const StringDecoder = require('string_decoder').StringDecoder; +const Transform = require('stream').Transform; +util.inherits(JSONStream, Transform); -module.exports = function (options) { - return new JSONStream(options); -}; - -var JSONStream = module.exports.JSONStream = function (options) { +// Gets \n-delimited JSON string data, and emits the parsed objects +function JSONStream(options) { options = options || {}; - TransformStream.call(this, options); + Transform.call(this, options); this._writableState.objectMode = false; this._readableState.objectMode = true; - this._async = options.async || false; -}; -util.inherits(JSONStream, TransformStream); -JSONStream.prototype._transform = function (data, encoding, callback) { - if (!Buffer.isBuffer(data)) data = new Buffer(data); - if (this._buffer) { - data = Buffer.concat([this._buffer, data]); + this._buffer = ''; + this._decoder = new StringDecoder('utf8'); + this.async = options && options.async? setImmediate: function (fn) { fn(); }; +} + +JSONStream.prototype._transform = function(chunk, encoding, cb) { + this._buffer += this._decoder.write(chunk); + // split on newlines + var lines = this._buffer.split(/\r?\n/); + // keep the last partial line buffered + this._buffer = lines.pop(); + for (var l = 0; l < lines.length; l++) { + var line = lines[l]; + this._pushLine(line); } + cb(); +}; - var ptr = 0, start = 0; - while (++ptr <= data.length) { - if (data[ptr] === 10 || ptr === data.length) { - var line; - try { - line = JSON.parse(data.slice(start, ptr)); - } - catch (ex) { } - if (line) { - this.push(line); - line = null; - } - if (data[ptr] === 10) start = ++ptr; +JSONStream.prototype._pushLine = function(line) { + var self = this; + + if (line.trim()) { + try { + var obj = JSON.parse(line); + } catch (er) { + this.async(function () { + self.emit('error', er); + }); + return; } + // push the parsed object out to the readable consumer + + this.async(function () { + self.push(obj); + }); } +} + +JSONStream.prototype._flush = function(cb) { + // Just handle any leftover + var rem = this._buffer.trim(); + this._pushLine(rem); + this.async(function () { + cb(); + }); +}; - this._buffer = data.slice(start); - return this._async - ? void setImmediate(callback) - : void callback(); +module.exports = function (options) { + return new JSONStream(options); }; diff --git a/package.json b/package.json index 741c563..38af172 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "author": "Maciej Małecki ", "main": "./lib/json-stream", "scripts": { - "test": "node test/pipe-test.js && node test/json-stream-test.js && node test/throw-in-readable-test.js" + "test": "node test/pipe-test.js && node test/json-stream-test.js && node test/throw-in-readable-test.js && node test/json-error-test.js" }, "repository": { "type": "git", diff --git a/test/json-error-test.js b/test/json-error-test.js new file mode 100644 index 0000000..458686f --- /dev/null +++ b/test/json-error-test.js @@ -0,0 +1,30 @@ +var assert = require('assert'); +var JSONStream = require('../'); + +var stream = JSONStream(); + +var objects = []; + +stream.on('data', function (data) { + objects.push(data); +}); + +stream.on('error', function (error) { + objects.push(error); + assert.equal(error.message, 'Unexpected token t'); +}); + +stream.on('end', function () { + assert.equal(objects.length, 3); + assert.deepEqual(objects[0], {"this": "is valid JSON"}); + assert.equal(objects[1].message, 'Unexpected token t'); + assert.deepEqual(objects[2], ["this", "is", "valid", "JSON"]); +}); + +stream.write('{"this": "is valid JSON"}\n'); +try { + stream.write('{this is not valid JSON]\n'); +} catch (e) { +} +stream.write('["this", "is", "valid", "JSON"]\n'); +stream.end(); diff --git a/test/json-stream-test.js b/test/json-stream-test.js index 48cd2c5..11f0a74 100644 --- a/test/json-stream-test.js +++ b/test/json-stream-test.js @@ -47,11 +47,11 @@ write(stream, '{"a":', '42', ',"b": 1337', '}\n{"hel', 'lo": "wor', 'ld"}\n'); stream = JSONStream(); expect(stream, [ { a: 42 }, { hello: 'world' } ]); -write(stream, '{"a":', '42}\n{ blah blah blah }\n{"hel', 'lo": "wor', 'ld"}\n'); +write(stream, '{"a":', '42}\n\n{"hel', 'lo": "wor', 'ld"}\n'); stream = JSONStream(); expect(stream, [ { a: 42 }, { hello: 'world' } ]); -write(stream, '{"a":', '42}\n{ blah blah', 'blah }\n{"hel', 'lo": "wor', 'ld"}\n'); +write(stream, '{"a":', '42}\n{"hel', 'lo": "wor', 'ld"}\n'); stream = JSONStream(); expect(stream, [ { å: '⇢ utf8!', b: 1337 } ]); diff --git a/test/pipe-test.js b/test/pipe-test.js index 040c4ff..7534941 100644 --- a/test/pipe-test.js +++ b/test/pipe-test.js @@ -33,7 +33,7 @@ source._read = function () { }; source.pipe(dest); source.push('{"a": 4'); -source.push('2}\nblah'); +source.push('2}'); source.push('\n{"hello"'); source.push(': "world"}\n'); source.push(null);