diff options
Diffstat (limited to 'pull-many-v2.js')
-rw-r--r-- | pull-many-v2.js | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/pull-many-v2.js b/pull-many-v2.js new file mode 100644 index 0000000..beea621 --- /dev/null +++ b/pull-many-v2.js @@ -0,0 +1,95 @@ +// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js +// Edits have been made to distinguish the original stream that each object came from + +module.exports = function (ary) { + + var capped = !!ary + var inputs = (ary || []).map(create), i = 0, abort, cb + + function create (stream) { + return {ready: false, reading: false, ended: false, read: stream, data: null} + } + + function check () { + if(!cb) return + clean() + var l = inputs.length + var _cb = cb + if(l === 0 && (abort || capped)) { + cb = null; _cb(abort || true) + return + } + + //scan the inputs to check whether there is one we can use. + for(var j = 0; j < l; j++) { + var current = inputs[(i + j) % l] + if(current.ready && !current.ended) { + var data = { // [EDIT] keep track of which exact source the data came from + data: current.data, + source: current.read.streamName + } + current.ready = false + current.data = null + i ++; cb = null + return _cb(null, data) + } + } + } + + function clean () { + var l = inputs.length + //iterate backwards so that we can remove items. + while(l--) { + if(inputs[l].ended) + inputs.splice(l, 1) + } + } + + function next () { + var l = inputs.length + while(l--) + (function (current) { + //read the next item if we aren't already + if(l > inputs.length) throw new Error('this should never happen') + if(current.reading || current.ended || current.ready) return + current.reading = true + var sync = true + current.read(abort, function next (end, data) { + current.data = data + current.ready = true + current.reading = false + + if(end === true || abort) current.ended = true + else if(end) abort = current.ended = end + //check whether we need to abort this stream. + if(abort && !end) current.read(abort, next) + if(!sync) check() + }) + sync = false + })(inputs[l]) + + //scan the feed + check() + } + + function read (_abort, _cb) { + abort = abort || _abort; cb = _cb; next() + } + + read.add = function (stream) { + if(!stream) { + //the stream will now end when all the streams end. + capped = true + //we just changed state, so we may need to cb + return next() + } + inputs.push(create(stream)) + next() + } + + read.cap = function (err) { + read.add(null) + } + + return read +} |