diff options
-rw-r--r-- | lib/channels-lib.js | 162 | ||||
-rw-r--r-- | lib/pull-many-v2.js | 95 | ||||
-rw-r--r-- | lib/serve.js | 57 |
3 files changed, 312 insertions, 2 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js new file mode 100644 index 0000000..f2d8484 --- /dev/null +++ b/lib/channels-lib.js @@ -0,0 +1,162 @@ +var { promisify } = require("util"); +var pull = require("pull-stream"); +var Pushable = require("pull-pushable"); +var many = require("./pull-many-v2"); + +// Change to 'true' to get debugging output +DEBUG = false + +DEFAULT_CHANNEL_OBJECT = { + messages: [] +} +DEFAULT_CHANNEL_OBJECT_SIZE = 15 +FRIENDS_POLL_TIME = 1 // ms +GRAPH_TYPE = "follow" +MAX_HOPS = 1 + +class StreamData { + constructor() { + this.finished = false, + this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER, + this.waitingMessages = [] + + this.markFinished = function() { + this.finished = true, + this.oldestTimestampSeen = 0 + } + } +} + +class StreamController { + constructor() { + this.outputStream = Pushable(), + this.streamData = { + channelStream: new StreamData(), + hashtagStream: new StreamData(), + }, + + this.pushMessage = function(source, newMsg) { + var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream; + streamData.oldestTimestampSeen = newMsg.value.timestamp; + streamData.waitingMessages.push(newMsg); + + this.pushNewlySafeMessages(); + }, + this.pushNewlySafeMessages = function() { + var newlySafeMessages = []; + var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); + debug("oldest safe timestamp:" + oldestSafeTimestamp); + + for(var streamDataIndex in this.streamData) { + var streamDatum = this.streamData[streamDataIndex] + debug("length of waiting messages: " + streamDatum.waitingMessages.length); + if(streamDatum.waitingMessages.length) { + debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp); + } + while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { + var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message + newlySafeMessages.push(safeMsg); + } + } + + // in general, newlySafeMessages might not be in order, we should sort before appending + newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); + for(var msgIndex in newlySafeMessages) { + debug("pushing safe message..."); + this.outputStream.push(newlySafeMessages[msgIndex]); + } + }, + this.finish = function() { + for(var datumIndex in this.streamData) { + this.streamData[datumIndex].oldestTimestampSeen = 0; + } + this.pushNewlySafeMessages(); + } + } +} + +module.exports = { + getMessages: function(client, channelName, preserve, cb, hops=MAX_HOPS) { + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: hops + }, function(error, friends) { + if(error) { + throw "Couldn't get a list of friends from scuttlebot:\n" + error; + } + + var followedIds = Object.keys(friends).map(id => id.toLowerCase()); + getMessagesFrom(client, channelName, followedIds, preserve, cb); + }); + } +} + +function getMessagesFrom(client, channelName, followedIds, preserve, cb) { + debug("Fetching messages from IDs in " + JSON.stringify(followedIds)); + + var channelTag = "#" + channelName; + var streamController = new StreamController(); + var hashtagStream = createHashtagStream(client, channelName); + var channelStream = createChannelStream(client, channelName); + var stream = many([hashtagStream, channelStream]); + + pull(stream, pull.filter(function(msg) { + return followedIds.includes(msg.data.value.author.toLowerCase()); + }), pull.filter(function(msg) { + var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelTag); + if(msg.source == "hashtag") { + return hasHashtag; + } + else { + return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header + } + }), pull.drain(function(msg) { + streamController.pushMessage(msg.source, msg.data); + }, function() { + streamController.finish(); + })); + + cb(streamController.outputStream, preserve); +} + +function createHashtagStream(client, channelName) { + var search = client.search && client.search.query; + if(!search) { + console.log("[FATAL] ssb-search plugin must be installed to us channels"); + process.exit(5); + } + + var query = search({ + query: channelName + }); + + query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from + + return query; +} + +function createChannelStream(client, channelName) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + content: { + channel: channelName + } + } + } + }], + reverse: true + }); + + query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later + + return query; +} + +function debug(message) { + if(DEBUG) { + var timestamp = new Date(); + console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message); + } +} diff --git a/lib/pull-many-v2.js b/lib/pull-many-v2.js new file mode 100644 index 0000000..beea621 --- /dev/null +++ b/lib/pull-many-v2.js @@ -0,0 +1,95 @@ +// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js +// Edits have been made to distinguish the original stream that each object came from + +module.exports = function (ary) { + + var capped = !!ary + var inputs = (ary || []).map(create), i = 0, abort, cb + + function create (stream) { + return {ready: false, reading: false, ended: false, read: stream, data: null} + } + + function check () { + if(!cb) return + clean() + var l = inputs.length + var _cb = cb + if(l === 0 && (abort || capped)) { + cb = null; _cb(abort || true) + return + } + + //scan the inputs to check whether there is one we can use. + for(var j = 0; j < l; j++) { + var current = inputs[(i + j) % l] + if(current.ready && !current.ended) { + var data = { // [EDIT] keep track of which exact source the data came from + data: current.data, + source: current.read.streamName + } + current.ready = false + current.data = null + i ++; cb = null + return _cb(null, data) + } + } + } + + function clean () { + var l = inputs.length + //iterate backwards so that we can remove items. + while(l--) { + if(inputs[l].ended) + inputs.splice(l, 1) + } + } + + function next () { + var l = inputs.length + while(l--) + (function (current) { + //read the next item if we aren't already + if(l > inputs.length) throw new Error('this should never happen') + if(current.reading || current.ended || current.ready) return + current.reading = true + var sync = true + current.read(abort, function next (end, data) { + current.data = data + current.ready = true + current.reading = false + + if(end === true || abort) current.ended = true + else if(end) abort = current.ended = end + //check whether we need to abort this stream. + if(abort && !end) current.read(abort, next) + if(!sync) check() + }) + sync = false + })(inputs[l]) + + //scan the feed + check() + } + + function read (_abort, _cb) { + abort = abort || _abort; cb = _cb; next() + } + + read.add = function (stream) { + if(!stream) { + //the stream will now end when all the streams end. + capped = true + //we just changed state, so we may need to cb + return next() + } + inputs.push(create(stream)) + next() + } + + read.cap = function (err) { + read.add(null) + } + + return read +} diff --git a/lib/serve.js b/lib/serve.js index 21f08ac..f7e0620 100644 --- a/lib/serve.js +++ b/lib/serve.js @@ -28,6 +28,7 @@ var Url = require('url') var many = require('pull-many') var merge = require('pull-merge') var pSort = require('pull-sort') +var channels = require("./channels-lib"); module.exports = Serve @@ -520,7 +521,7 @@ Serve.prototype.path = function (url) { switch (m[1]) { case '/new': return this.new(m[2]) case '/public': return this.public(m[2]) - case '/logbook': return this.logbook(m[2]) + case '/logbook': return this.logbook2(m[2]) case '/threads': return this.threads(m[2]) case '/private': return this.private(m[2]) case '/mentions': return this.mentions(m[2]) @@ -579,6 +580,57 @@ Serve.prototype.home = function () { ) } +Serve.prototype.logbook2 = function (ext) { + var q = this.query + var opts = { + reverse: !q.forwards, + //sortByTimestamp: q.sort === 'claimed', + sortByTimestamp: q.sort || 'claimed', + lt: Number(q.lt) || Date.now(), + gt: Number(q.gt) || -Infinity, + filter: q.filter, + } + + channels.getMessages(this.app.sbot, "logbook", this, function(messageStream, serve) { + pull(messageStream, + serve.renderThreadPaginated(opts, null, q), + serve.wrapMessages(), + serve.wrapPublic(), + serve.wrapPage('logbook'), + serve.respondSink(200, { + 'Content-Type': ctype(ext) + }) + //pull.drain(function(msg) { + //console.log(JSON.stringify(msg)); + //}) + ) + } + , hops=3) + + //this.renderThreadPaginated(opts, null, q), + //this.wrapMessages(), + //this.wrapPublic(), + //this.wrapPage('public'), + //this.respondSink(200, { + //'Content-Type': ctype(ext) + //}) + //) + //}); + + //pull( + //this.app.createLogStream(opts), + //pull.filter(msg => { + //return !msg.value.content.vote + //}), + //this.renderThreadPaginated(opts, null, q), + //this.wrapMessages(), + //this.wrapPublic(), + //this.wrapPage('public'), + //this.respondSink(200, { + //'Content-Type': ctype(ext) + //}) + //) +} compareMsgs = function(a, b) { return -(a.value.timestamp - b.value.timestamp) } @@ -622,7 +674,8 @@ Serve.prototype.public = function (ext) { var q = this.query var opts = { reverse: !q.forwards, - sortByTimestamp: q.sort === 'claimed', + //sortByTimestamp: q.sort === 'claimed', + sortByTimestamp: q.sort || 'claimed', lt: Number(q.lt) || Date.now(), gt: Number(q.gt) || -Infinity, filter: q.filter, |