aboutsummaryrefslogtreecommitdiff
path: root/lib/pull-many-v2.js
blob: beea621575fe6e235a1e0285869c7427bf607249 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js
// Edits have been made to distinguish the original stream that each object came from

module.exports = function (ary) {

  var capped = !!ary
  var inputs = (ary || []).map(create), i = 0, abort, cb

  function create (stream) {
    return {ready: false, reading: false, ended: false, read: stream, data: null}
  }

  function check () {
    if(!cb) return
    clean()
    var l = inputs.length
    var _cb = cb
    if(l === 0 && (abort || capped)) {
      cb = null; _cb(abort ||  true)
      return
    }

    //scan the inputs to check whether there is one we can use.
    for(var j = 0; j < l; j++) {
      var current = inputs[(i + j) % l]
      if(current.ready && !current.ended) {
        var data = { // [EDIT] keep track of which exact source the data came from
		data: current.data,
		source: current.read.streamName
	}
        current.ready = false
        current.data = null
        i ++; cb = null
        return _cb(null, data)
      }
    }
  }

  function clean () {
    var l = inputs.length
    //iterate backwards so that we can remove items.
    while(l--) {
      if(inputs[l].ended)
        inputs.splice(l, 1)
    }
  }

  function next () {
    var l = inputs.length
    while(l--)
      (function (current) {
        //read the next item if we aren't already
        if(l > inputs.length) throw new Error('this should never happen')
        if(current.reading || current.ended || current.ready) return
        current.reading = true
        var sync = true
        current.read(abort, function next (end, data) {
          current.data = data
          current.ready = true
          current.reading = false

          if(end === true || abort) current.ended = true
          else if(end) abort = current.ended = end
          //check whether we need to abort this stream.
          if(abort && !end) current.read(abort, next)
          if(!sync) check()
        })
        sync = false
      })(inputs[l])

    //scan the feed
    check()
  }

  function read (_abort, _cb) {
    abort = abort || _abort; cb = _cb; next()
  }

  read.add = function (stream) {
    if(!stream) {
      //the stream will now end when all the streams end.
      capped = true
      //we just changed state, so we may need to cb
      return next()
    }
    inputs.push(create(stream))
    next()
  }

  read.cap = function (err) {
    read.add(null)
  }

  return read
}