1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js
// Edits have been made to distinguish the original stream that each object came from
module.exports = function (ary) {
var capped = !!ary
var inputs = (ary || []).map(create), i = 0, abort, cb
function create (stream) {
return {ready: false, reading: false, ended: false, read: stream, data: null}
}
function check () {
if(!cb) return
clean()
var l = inputs.length
var _cb = cb
if(l === 0 && (abort || capped)) {
cb = null; _cb(abort || true)
return
}
//scan the inputs to check whether there is one we can use.
for(var j = 0; j < l; j++) {
var current = inputs[(i + j) % l]
if(current.ready && !current.ended) {
var data = { // [EDIT] keep track of which exact source the data came from
data: current.data,
source: current.read.streamName
}
current.ready = false
current.data = null
i ++; cb = null
return _cb(null, data)
}
}
}
function clean () {
var l = inputs.length
//iterate backwards so that we can remove items.
while(l--) {
if(inputs[l].ended)
inputs.splice(l, 1)
}
}
function next () {
var l = inputs.length
while(l--)
(function (current) {
//read the next item if we aren't already
if(l > inputs.length) throw new Error('this should never happen')
if(current.reading || current.ended || current.ready) return
current.reading = true
var sync = true
current.read(abort, function next (end, data) {
current.data = data
current.ready = true
current.reading = false
if(end === true || abort) current.ended = true
else if(end) abort = current.ended = end
//check whether we need to abort this stream.
if(abort && !end) current.read(abort, next)
if(!sync) check()
})
sync = false
})(inputs[l])
//scan the feed
check()
}
function read (_abort, _cb) {
abort = abort || _abort; cb = _cb; next()
}
read.add = function (stream) {
if(!stream) {
//the stream will now end when all the streams end.
capped = true
//we just changed state, so we may need to cb
return next()
}
inputs.push(create(stream))
next()
}
read.cap = function (err) {
read.add(null)
}
return read
}
|