aboutsummaryrefslogtreecommitdiff
path: root/lib/pull-many-v2.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pull-many-v2.js')
-rw-r--r--lib/pull-many-v2.js95
1 files changed, 95 insertions, 0 deletions
diff --git a/lib/pull-many-v2.js b/lib/pull-many-v2.js
new file mode 100644
index 0000000..beea621
--- /dev/null
+++ b/lib/pull-many-v2.js
@@ -0,0 +1,95 @@
+// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js
+// Edits have been made to distinguish the original stream that each object came from
+
+module.exports = function (ary) {
+
+ var capped = !!ary
+ var inputs = (ary || []).map(create), i = 0, abort, cb
+
+ function create (stream) {
+ return {ready: false, reading: false, ended: false, read: stream, data: null}
+ }
+
+ function check () {
+ if(!cb) return
+ clean()
+ var l = inputs.length
+ var _cb = cb
+ if(l === 0 && (abort || capped)) {
+ cb = null; _cb(abort || true)
+ return
+ }
+
+ //scan the inputs to check whether there is one we can use.
+ for(var j = 0; j < l; j++) {
+ var current = inputs[(i + j) % l]
+ if(current.ready && !current.ended) {
+ var data = { // [EDIT] keep track of which exact source the data came from
+ data: current.data,
+ source: current.read.streamName
+ }
+ current.ready = false
+ current.data = null
+ i ++; cb = null
+ return _cb(null, data)
+ }
+ }
+ }
+
+ function clean () {
+ var l = inputs.length
+ //iterate backwards so that we can remove items.
+ while(l--) {
+ if(inputs[l].ended)
+ inputs.splice(l, 1)
+ }
+ }
+
+ function next () {
+ var l = inputs.length
+ while(l--)
+ (function (current) {
+ //read the next item if we aren't already
+ if(l > inputs.length) throw new Error('this should never happen')
+ if(current.reading || current.ended || current.ready) return
+ current.reading = true
+ var sync = true
+ current.read(abort, function next (end, data) {
+ current.data = data
+ current.ready = true
+ current.reading = false
+
+ if(end === true || abort) current.ended = true
+ else if(end) abort = current.ended = end
+ //check whether we need to abort this stream.
+ if(abort && !end) current.read(abort, next)
+ if(!sync) check()
+ })
+ sync = false
+ })(inputs[l])
+
+ //scan the feed
+ check()
+ }
+
+ function read (_abort, _cb) {
+ abort = abort || _abort; cb = _cb; next()
+ }
+
+ read.add = function (stream) {
+ if(!stream) {
+ //the stream will now end when all the streams end.
+ capped = true
+ //we just changed state, so we may need to cb
+ return next()
+ }
+ inputs.push(create(stream))
+ next()
+ }
+
+ read.cap = function (err) {
+ read.add(null)
+ }
+
+ return read
+}