Skip to content

Commit 0fc6631

Browse files
committed
Corrections for caching stream parts.
1 parent 8bdbfb0 commit 0fc6631

File tree

2 files changed

+146
-108
lines changed

2 files changed

+146
-108
lines changed

stream.js

+145-107
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ function stream(source, res) {
173173
log.logres("work.error = " + work.error
174174
+ " and no cached data. Calling finish().",
175175
work.options, "stream")
176+
reqstatus[work.options.logsig].Nc = reqstatus[work.options.logsig].Nc + 1
176177
finish(work,"")
177178
return
178179
}
@@ -192,7 +193,6 @@ function stream(source, res) {
192193
var ps = str.substring(1,1+n-npad-1) + Np
193194

194195
var streamfilepart = streamdir + "/" + ps + "." + work.urlMd5 + ".stream.gz"
195-
196196

197197
if (!fs.existsSync(streamfilepart)) {
198198
log.logres("Stream file part does not exist.", work.options, "stream")
@@ -209,35 +209,59 @@ function stream(source, res) {
209209
createstream(work)
210210
} else {
211211
log.logres("Using cached stream file part.", work.options, "stream")
212-
util.readLockFile(streamfilepart, work, function (success) {
212+
213+
util.readLockFile(streamdir, work, function (success) {
213214
if (!success) {
214-
log.logres("Failed to read lock cached stream file part."
215+
log.logres("Failed to read lock stream file part directory."
215216
+ " Recreating stream.", work.options, "stream")
216217
createstream(work)
217-
return
218+
return
218219
}
219-
if (options.streamGzip == false) {
220-
log.logres("Unzipping it.", work.options, "stream")
221-
var streamer = fs
222-
.createReadStream(streamfilepart)
223-
.pipe(zlib.createGunzip())
224-
} else {
225-
log.logres("Sending raw.", work.options, "stream")
226-
var streamer = fs.createReadStream(streamfilepart)
227-
}
228-
streamer.on('end',function() {
229-
log.logres("Received streamer.on end event.",
230-
work.options, "stream")
231-
util.readUnlockFile(streamfilepart, work, function () {})
232-
})
233-
streamer.on('error',function (err) {
234-
log.logc("streamer error event: "
235-
+ JSON.stringify(err), 160)
236-
util.readUnlockFile(streamfilepart, work, function () {})
220+
util.readLockFile(streamfilepart, work, function (success) {
221+
if (!success) {
222+
log.logres("Failed to read lock cached stream file part."
223+
+ " Recreating stream.", work.options, "stream")
224+
createstream(work)
225+
return
226+
}
227+
if (options.streamGzip == false) {
228+
log.logres("Unzipping it.", work.options, "stream")
229+
var streamer = fs
230+
.createReadStream(streamfilepart)
231+
.pipe(zlib.createGunzip())
232+
} else {
233+
log.logres("Sending raw.", work.options, "stream")
234+
var streamer = fs.createReadStream(streamfilepart)
235+
}
236+
streamer.on('end',function() {
237+
log.logres("Received streamer.on end event.",
238+
work.options, "stream")
239+
util.readUnlockFile(streamfilepart, work, function () {
240+
util.readUnlockFile(streamdir, work, function () {
241+
log.logres("Incremening Nc from "
242+
+ reqstatus[work.options.logsig].Nc + "/" + reqstatus[work.options.logsig].N
243+
+ " to "
244+
+ (reqstatus[work.options.logsig].Nc+1) + "/" + reqstatus[work.options.logsig].N,
245+
work.options, "stream")
246+
reqstatus[work.options.logsig].Nc = reqstatus[work.options.logsig].Nc + 1
247+
if (reqstatus[work.options.logsig].Nc == reqstatus[work.options.logsig].N) {
248+
// Note that this is only executed if no errors
249+
// and no cached files were used as a part of the stream.
250+
catstreamparts()
251+
}
252+
})
253+
})
254+
})
255+
streamer.on('error',function (err) {
256+
log.logc("streamer error event: " + JSON.stringify(err), 160)
257+
util.readUnlockFile(streamfilepart, work, function () {
258+
util.readUnlockFile(streamdir, work, function () {})
259+
})
260+
})
261+
log.logres("Streaming it.", work.options, "stream")
262+
finish(work, streamer)
263+
return
237264
})
238-
log.logres("Streaming it.", work.options, "stream")
239-
finish(work, streamer)
240-
return
241265
})
242266
}
243267

@@ -369,7 +393,6 @@ function stream(source, res) {
369393
if (reqstatus[work.options.logsig].N == reqstatus[work.options.logsig].Nx) {
370394
log.logres("Sending res.end().", work.options, "stream")
371395
res.end()
372-
catstreamparts()
373396
} else {
374397
if (work.options.streamOrder) {
375398
queuecheck(work)
@@ -398,21 +421,22 @@ function stream(source, res) {
398421
}
399422
util.writeLockFile(streamfilepart, work, function (success) {
400423
if (!success) {
401-
log.logres("Could not lock streamfilepart file.", work.options, "stream")
402-
util.writeUnlockFile(streamdir, work, function () {});
424+
log.logres("Could write not lock streamfilepart file.", work.options, "stream")
425+
util.writeUnlockFile(streamdir, work, function () {
426+
})
403427
return
404428
}
405429
fs.writeFile(streamfilepart, data, function (err) {
406-
log.logres("Wrote " + streamfilepart, work.options, "stream")
407-
log.logres("Incremening Nc from "
408-
+ reqstatus[work.options.logsig].Nc + "/" + reqstatus[work.options.logsig].N
409-
+ " to "
410-
+ (reqstatus[work.options.logsig].Nc+1) + "/" + reqstatus[work.options.logsig].N,
411-
work.options, "stream")
412-
reqstatus[work.options.logsig].Nc = reqstatus[work.options.logsig].Nc + 1
413-
414430
util.writeUnlockFile(streamdir, work, function () {
415431
util.writeUnlockFile(streamfilepart, work, function () {
432+
log.logres("Wrote " + streamfilepart, work.options, "stream")
433+
log.logres("Incremening Nc from "
434+
+ reqstatus[work.options.logsig].Nc + "/" + reqstatus[work.options.logsig].N
435+
+ " to "
436+
+ (reqstatus[work.options.logsig].Nc+1) + "/" + reqstatus[work.options.logsig].N,
437+
work.options, "stream")
438+
reqstatus[work.options.logsig].Nc = reqstatus[work.options.logsig].Nc + 1
439+
416440
if (reqstatus[work.options.logsig].Nc == reqstatus[work.options.logsig].N) {
417441
catstreamparts()
418442
}
@@ -426,82 +450,95 @@ function stream(source, res) {
426450

427451
function catstreamparts() {
428452

453+
// TODO: Remove this code and pipe output sent to response to stream
454+
// file. I think this code is here because of bug in early versions
455+
// of node.js that did not properly handle concatenation of multiple
456+
// gzip files.
429457
log.logres("Reading dir " + streamdir, res.options, "stream")
430458

431-
var files = fs.readdirSync(streamdir)
459+
var files = fs.readdir(streamdir, docat)
432460

433-
log.logres("Found " + files.length + " file(s)", res.options, "stream")
461+
function docat(err, files) {
434462

435-
if (files.length != N) {
436-
log.logres("Not creating concatenated gzip stream file."
437-
+ " Did not find " + N + " files."
438-
, res.options, "stream")
439-
return
440-
}
463+
if (err) {
464+
log.logres("Not creating concatenated gzip stream file."
465+
+ " Did not find directory " + streamdir + "."
466+
, res.options, "stream")
467+
return
468+
}
469+
log.logres("Found " + files.length + " file(s)", res.options, "stream")
441470

442-
work.finishQueueCallback = false
443-
work.options.partnum = ""
444-
util.writeLockFile(streamdir, work, function (success) {
445-
if (files.length > 1) {
446-
log.logres("Concatenating "
447-
+ files.length
448-
+ " stream parts into "
449-
+ streamsignature
450-
+ ".stream.gz"
451-
, res.options, "stream")
452-
453-
var com = "cd " + streamdir
454-
+ "; cat "
455-
+ files.join(" ")
456-
+ " > ../"
457-
+ streamsignature
458-
+ ".stream.gz;"
459-
460-
log.logres("Evaluating: " + com, res.options, "stream")
461-
child = exec(com, function (error, stdout, stderr) {
462-
log.logres("Evaluated: " + com, res.options, "stream")
463-
if (error) {
464-
log.logres("Error: " + JSON.stringify(error),
465-
res.options, "stream")
466-
}
467-
if (stderr) {
468-
log.logres("Error: " + JSON.stringify(error),
469-
res.options, "stream")
470-
}
471-
util.writeUnlockFile(streamdir, work, function () {})
472-
})
473-
} else {
474-
fs.exists(streamdir + "/../" + streamsignature + ".stream.gz",
475-
function (exists) {
476-
if (exists) {
477-
log.logres("Symlink from single stream part to cat file exists. Not re-creating", work.options, "stream")
478-
util.writeUnlockFile(streamdir, work, function () {})
479-
return
480-
}
471+
if (files.length != N) {
472+
log.logres("Not creating concatenated gzip stream file."
473+
+ " Did not find " + N + " files."
474+
, res.options, "stream")
475+
return
476+
}
481477

482-
var com = "cd " + streamdir
483-
+ "/.. ; ln -s "
484-
+ streamsignature
485-
+ "/"
486-
+ files[0]
487-
+ " "
488-
+ streamsignature
489-
+ ".stream.gz;"
490-
log.logres("Evaluating " + com, work.options, "stream")
491-
child = exec(com, function (error, stdout, stderr) {
492-
log.logres("Evaluated " + com, work.options, "stream")
493-
if (error) {
494-
log.logc("Error: " + JSON.stringify(error), 160)
495-
}
496-
if (stderr) {
497-
log.logc(stderr, 160)
478+
work.finishQueueCallback = false
479+
work.options.partnum = ""
480+
util.writeLockFile(streamdir, work, function (success) {
481+
if (files.length > 1) {
482+
log.logres("Concatenating "
483+
+ files.length
484+
+ " stream parts into "
485+
+ streamsignature
486+
+ ".stream.gz"
487+
, res.options, "stream")
488+
489+
var com = "cd " + streamdir
490+
+ "; cat "
491+
+ files.join(" ")
492+
+ " > ../"
493+
+ streamsignature
494+
+ ".stream.gz;"
495+
496+
log.logres("Evaluating: " + com, res.options, "stream")
497+
child = exec(com, function (error, stdout, stderr) {
498+
log.logres("Evaluated: " + com, res.options, "stream")
499+
if (error) {
500+
log.logres("Error: " + JSON.stringify(error),
501+
res.options, "stream")
502+
}
503+
if (stderr) {
504+
log.logres("Error: " + JSON.stringify(error),
505+
res.options, "stream")
506+
}
507+
util.writeUnlockFile(streamdir, work, function () {})
508+
})
509+
} else {
510+
fs.exists(streamdir + "/../" + streamsignature + ".stream.gz",
511+
function (exists) {
512+
if (exists) {
513+
log.logres("Symlink from single stream part to cat file exists. Not re-creating", work.options, "stream")
514+
util.writeUnlockFile(streamdir, work, function () {})
515+
return
498516
}
499-
util.writeUnlockFile(streamdir, work, function () {})
500-
})
501-
}
502-
)
503-
}
504-
})
517+
518+
var com = "cd " + streamdir
519+
+ "/.. ; ln -s "
520+
+ streamsignature
521+
+ "/"
522+
+ files[0]
523+
+ " "
524+
+ streamsignature
525+
+ ".stream.gz;"
526+
log.logres("Evaluating " + com, work.options, "stream")
527+
child = exec(com, function (error, stdout, stderr) {
528+
log.logres("Evaluated " + com, work.options, "stream")
529+
if (error) {
530+
log.logc("Error: " + JSON.stringify(error), 160)
531+
}
532+
if (stderr) {
533+
log.logc(stderr, 160)
534+
}
535+
util.writeUnlockFile(streamdir, work, function () {})
536+
})
537+
}
538+
)
539+
}
540+
})
541+
}
505542
}
506543

507544
function createstream(work) {
@@ -827,18 +864,19 @@ function stream(source, res) {
827864

828865
if (work.options.streamFilter === "") {
829866
log.logres("Writing response.", work.options, "stream")
830-
831-
832867
log.logres("Uncompressed buffer has length = " + data.length, work.options, "stream")
833868
if (!work.options.streamGzip) {
834869
log.logres("Calling finish with uncompressed buffer.", work.options, "stream")
835870
//res.write(data)
836871
finish(work, data)
872+
log.logres("Compressing buffer.", work.options, "stream")
837873
zlib.createGzip({level: 1})
838874
zlib.gzip(data, function (err, buffer) {
839875
if (err) {
840876
log.logc("gzip error: " + JSON.stringify(err), 160)
841877
}
878+
log.logres("Done compressing buffer.", work.options, "stream")
879+
log.logres("Calling cachestreampart()", work.options, "stream")
842880
cachestreampart(streamfilepart, buffer)
843881
})
844882
} else {

util.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ var isCached = function isCached(work, callback) {
224224
}
225225
} else {
226226
log.logres("Not doing head check because"
227-
+ "respectHeaders = false && forceUpdate = true", work.options, "util")
227+
+ " respectHeaders = false && forceUpdate = true", work.options, "util")
228228
callback(work)
229229
}
230230
}

0 commit comments

Comments
 (0)