diff options
Diffstat (limited to 'lib/pull-many-v2.js')
-rw-r--r-- | lib/pull-many-v2.js | 104 |
1 files changed, 0 insertions, 104 deletions
diff --git a/lib/pull-many-v2.js b/lib/pull-many-v2.js deleted file mode 100644 index feb68ce..0000000 --- a/lib/pull-many-v2.js +++ /dev/null @@ -1,104 +0,0 @@ -// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js -// Edits have been made to distinguish the original stream that each object came from, -// and to be able to end each stream independently - -module.exports = function (ary) { - - var capped = !!ary - var inputs = (ary || []).map(create), i = 0, abort, cb - - function create (stream) { - return {ready: false, reading: false, ended: false, read: stream, data: null} - } - - function check () { - if(!cb) return - clean() - var l = inputs.length - var _cb = cb - if(l === 0 && (abort || capped)) { - cb = null; _cb(abort || true) - return - } - - //scan the inputs to check whether there is one we can use. - for(var j = 0; j < l; j++) { - var current = inputs[(i + j) % l] - if(current.ready && !current.ended) { - var data = { // [EDIT] keep track of which exact source the data came from - data: current.data, - source: current.read.streamName - } - current.ready = false - current.data = null - i ++; cb = null - return _cb(null, data) - } - } - } - - function clean () { - var l = inputs.length - //iterate backwards so that we can remove items. - while(l--) { - if(inputs[l].ended) - inputs.splice(l, 1) - } - } - - function next () { - var l = inputs.length - while(l--) - (function (current) { - //read the next item if we aren't already - if(l > inputs.length) throw new Error('this should never happen') - if(current.reading || current.ended || current.ready) return - current.reading = true - var sync = true - current.read(abort, function next (end, data) { - current.data = data - current.ready = true - current.reading = false - - if(end === true || abort) { - current.ended = true - if(current.read.streamName) { - var timestamp = new Date(); - console.log("[pull-many-v2] [" + timestamp.toISOString() + "] " + "reached the end of stream " + current.read.streamName); - } - } - else if(end) abort = current.ended = end - //check whether we need to abort this stream. - if(abort && !end) current.read(abort, next) - if(!sync) check() - }) - sync = false - })(inputs[l]) - - //scan the feed - check() - } - - function read (_abort, _cb) { - abort = abort || _abort; cb = _cb; next() - } - - read.add = function (stream) { - if(!stream) { - //the stream will now end when all the streams end. - capped = true - //we just changed state, so we may need to cb - return next() - } - inputs.push(create(stream)) - next() - } - - read.cap = function (err) { - read.add(null) - } - - read.inputs = inputs - - return read -} |