Skip to content

Commit 3e2202b

Browse files
committed
Fix issues with readable string
Fix potential issue with readAsPromised Add workaround for potential empty object in readable object
1 parent 873ad0d commit 3e2202b

File tree

2 files changed

+95
-26
lines changed

2 files changed

+95
-26
lines changed

src/JsonStreamStringify.ts

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,19 +112,15 @@ function quoteString(string: string) {
112112

113113
function readAsPromised(stream: Readable, size?) {
114114
const value = stream.read(size);
115-
if (value === null) {
115+
if (value === null && !stream.readableEnded) {
116116
return new Promise((resolve, reject) => {
117-
if (stream.readableEnded) {
118-
resolve(null);
119-
return;
120-
}
121117
const endListener = () => resolve(null);
122118
stream.once('end', endListener);
123119
stream.once('error', reject);
124120
stream.once('readable', () => {
125121
stream.removeListener('end', endListener);
126122
stream.removeListener('error', reject);
127-
resolve(stream.read());
123+
readAsPromised(stream, size).then(resolve, reject);
128124
});
129125
});
130126
}
@@ -137,10 +133,11 @@ interface Item {
137133
value?: any;
138134
indent?: string;
139135
path?: (string | number)[];
136+
type?: string;
140137
}
141138

142139
enum ReadState {
143-
NotReading = 0,
140+
Inactive = 0,
144141
Reading,
145142
ReadMore,
146143
Consumed,
@@ -278,17 +275,18 @@ export class JsonStreamStringify extends Readable {
278275
this.emit('error', new Error('Readable Stream is in flowing mode, data may have been lost. Trying to pause stream.'), input, parent.path);
279276
}
280277
const that = this;
281-
this._push('"');
282-
input.once('end', () => {
283-
this._push('"');
284-
this.item = parent;
285-
this.emit('readable');
286-
});
278+
this.prePush = '"';
287279
this.item = <any>{
288280
type: 'readable string',
289281
async read(size: number) {
290282
try {
291283
const data = await readAsPromised(input, size);
284+
if (data === null) {
285+
that._push('"');
286+
that.item = parent;
287+
that.unvisit(input);
288+
return;
289+
}
292290
if (data) that._push(escapeString(data.toString()));
293291
} catch (err) {
294292
that.emit('error', err);
@@ -328,7 +326,7 @@ export class JsonStreamStringify extends Readable {
328326
if (first) first = false;
329327
else out += ',';
330328
if (that.indent) out += `\n${item.indent}`;
331-
that._push(out);
329+
that.prePush = out;
332330
that.setItem(data, item, i);
333331
i += 1;
334332
} catch (err) {
@@ -448,15 +446,20 @@ export class JsonStreamStringify extends Readable {
448446
this.item = item;
449447
}
450448

451-
prePush?: Function = undefined;
452449
buffer = '';
453450
bufferLength = 0;
454451
pushCalled = false;
455452

456453
readSize = 0;
454+
/** if set, this string will be prepended to the next _push call, if the call output is not empty, and set to undefined */
455+
prePush?: string;
457456
private _push(data) {
458-
this.buffer += (this.objectItem ? this.objectItem.write() : '') + data;
459-
this.prePush = undefined;
457+
const out = (this.objectItem ? this.objectItem.write() : '') + data;
458+
if (this.prePush && out.length) {
459+
this.buffer += this.prePush;
460+
this.prePush = undefined;
461+
}
462+
this.buffer += out;
460463
if (this.buffer.length >= this.bufferSize) {
461464
this.pushCalled = !this.push(this.buffer);
462465
this.buffer = '';
@@ -466,10 +469,10 @@ export class JsonStreamStringify extends Readable {
466469
return true;
467470
}
468471

469-
readState: ReadState = ReadState.NotReading;
470-
async _read(size?: number) {
472+
readState: ReadState = ReadState.Inactive;
473+
async _read(size?: number): Promise<void> {
471474
if (this.readState === ReadState.Consumed) return;
472-
if (this.readState !== ReadState.NotReading) {
475+
if (this.readState !== ReadState.Inactive) {
473476
this.readState = ReadState.ReadMore;
474477
return;
475478
}
@@ -487,12 +490,14 @@ export class JsonStreamStringify extends Readable {
487490
this.push(null);
488491
this.readState = ReadState.Consumed;
489492
this.cleanup();
493+
return;
490494
}
491495
if (this.readState === <any>ReadState.ReadMore) {
492-
this.readState = ReadState.NotReading;
493-
this._read(size);
496+
this.readState = ReadState.Inactive;
497+
await this._read(size);
498+
return;
494499
}
495-
this.readState = ReadState.NotReading;
500+
this.readState = ReadState.Inactive;
496501
}
497502

498503
private cleanup() {

test-src/JsonStreamStringify.spec.ts

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* istanbul ignore file */
22

3-
import { Readable } from 'stream';
3+
import { Readable, PassThrough, Writable } from 'stream';
44
// tslint:disable-next-line:import-name
55
import expect from 'expect.js';
66
import { JsonStreamStringify } from './JsonStreamStringify';
@@ -15,7 +15,7 @@ function emitError(err: Error) {
1515
}
1616

1717
function createTest(input, expected, ...args) {
18-
return () => new Promise((resolve, reject) => {
18+
return () => new Promise<{ jsonStream: InstanceType<typeof JsonStreamStringify> }>((resolve, reject) => {
1919
let str = '';
2020
const jsonStream = new JsonStreamStringify(input, ...args)
2121
.once('end', () => {
@@ -51,7 +51,9 @@ function readableStream(...args) {
5151
return stream;
5252
}
5353

54-
describe('JsonStreamStringify', () => {
54+
describe('JsonStreamStringify', function () {
55+
this.timeout(10000);
56+
5557
after(() => {
5658
// test does not exit cleanly :/
5759
setTimeout(() => process.exit(), 500).unref();
@@ -402,4 +404,66 @@ describe('JsonStreamStringify', () => {
402404
const arr = [a, a];
403405
it('[a, a] should be [{"foo":"bar"},{"foo":"bar"}]', createTest(arr, '[{"foo":"bar"},{"foo":"bar"}]'));
404406
});
407+
408+
it('bad reader', (resolve) => {
409+
const p = new PassThrough({ objectMode: true });
410+
let c = 0;
411+
p.write(c++);
412+
const a = new JsonStreamStringify(p);
413+
let out = '';
414+
a.once('end', () => {
415+
expect(out).to.be('[0,1,2,3]');
416+
resolve();
417+
})
418+
a.on('data', (data) => {
419+
out += data.toString();
420+
}).pause();
421+
read();
422+
function read() {
423+
if (a.readableEnded) return;
424+
for (let i = 0; i < 10; i++) {
425+
a._read(); // simulate bad forced read
426+
a.read(); // legitimate read call
427+
a._read(); // simulate bad forced read
428+
if (!p.writableEnded && i === 8) p.write(c++);
429+
a._read(); // simulate bad forced read
430+
}
431+
if (!p.writableEnded && c > 3) {
432+
p.end();
433+
// p.read();
434+
setTimeout(read, 10);
435+
return;
436+
}
437+
setImmediate(read);
438+
}
439+
});
440+
441+
it('prePush', (cb) => {
442+
const p = new PassThrough({ objectMode: true });
443+
const a = new JsonStreamStringify(p);
444+
(a as any).bufferSize = Infinity;
445+
a.prePush = ',';
446+
(a as any)._push('');
447+
expect(a.buffer).to.be('[');
448+
a.prePush = undefined;
449+
(a as any)._push('"a"');
450+
a.prePush = ',';
451+
(a as any)._push('');
452+
expect(a.buffer).to.be('["a"');
453+
(a as any)._push('"b"');
454+
a.prePush = ',';
455+
(a as any)._push('');
456+
expect(a.buffer).to.be('["a","b"');
457+
a.prePush = '';
458+
p.end(async () => {
459+
try {
460+
await a.item?.read();
461+
expect(a.buffer).to.be('["a","b"]');
462+
cb();
463+
} catch(err) {
464+
cb(err);
465+
}
466+
});
467+
});
468+
405469
});

0 commit comments

Comments
 (0)