From 6be6dd5b44d890f381c18227db0d061e48245710 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Thu, 11 Nov 2021 13:03:58 -0500 Subject: Tyler's changes --- lib/logbook-lib.js | 312 ++++++++++++++++++++++++++++++++++------------------ lib/pull-many-v2.js | 13 ++- 2 files changed, 215 insertions(+), 110 deletions(-) (limited to 'lib') diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js index d09d327..68cbb11 100644 --- a/lib/logbook-lib.js +++ b/lib/logbook-lib.js @@ -1,5 +1,6 @@ var clientFactory = require("ssb-client"); var config = require("ssb-config"); +var fs = require("fs"); var pull = require("pull-stream"); var Pushable = require("pull-pushable"); var many = require("./pull-many-v2"); @@ -14,50 +15,52 @@ DEFAULT_OPTS = { GRAPH_TYPE = "follow" MAX_HOPS = 1 +CACHE_FILEPATH = "./logbook_cache/" + // only instantiate a ssb-client once, so we don't have to create a new connection every time we load a channel page let client = null; - -class StreamData { - constructor() { - this.finished = false, - this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER, - this.waitingMessages = [] - - this.markFinished = function() { - this.finished = true, - this.oldestTimestampSeen = 0 - } - } -} +let memoryCache = {}; +fs.mkdirSync(CACHE_FILEPATH, {recursive: true}); class ChannelController { constructor(opts) { this.opts = opts, this.outputStream = Pushable(), - this.allMessages = {}, - this.outputQueue = {}, this.idsInChannel = {}, - this.streamData = { - channelStream: new StreamData(), - hashtagStream: new StreamData(), + this.streamOpen = { + "channel": true, + "hashtag": true, }, - - this.getMessagesFrom = function(channelName, followedIds, preserve, cb) { - var channelTag = "#" + channelName; + + this.getMessagesFrom = function(mostRecentCachedTimestamp, rootIds, allMessages, channelName, followedIds, preserve, cb) { + this.channelName = channelName; + this.rootIds = rootIds; // don't know whether it's worth constructing a hashmap for this + this.mostRecentCachedTimestamp = mostRecentCachedTimestamp; + this.outputQueue = allMessages; + + if(mostRecentCachedTimestamp == null) { + this.getInitialMessages(followedIds, preserve, cb); + } + else { + this.getNewMessages(followedIds, preserve, cb); + } + }, + this.getInitialMessages = function(followedIds, preserve, cb) { + var channelTag = "#" + this.channelName; var hashtagStream = createHashtagStream(channelTag); - var channelStream = createChannelStream(channelName); + var channelStream = createChannelStream(this.channelName); var stream = many([hashtagStream, channelStream]); var self = this; pull(stream, pull.filter(function(msg) { return followedIds.includes(msg.data.value.author.toLowerCase()); }), pull.filter(function(msg) { - if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == channelName) { + if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == self.channelName) { return true; } - // prevent ssb-search from grouping messages with #logbook and #logbook2 together, without running a double search - if(msg.data.value && msg.data.value.content && msg.data.value.content.text) { + // filter out false positives from ssb-query's loose text matching + if(msg.data.value && msg.data.value.content && msg.data.value.content.text && msg.data.value.content.text.indexOf) { let acceptableHashtags = [channelTag + "\n", channelTag + " "]; for(let hashtag of acceptableHashtags) { if(msg.data.value.content.text.indexOf(hashtag) != -1) { @@ -68,71 +71,124 @@ class ChannelController { return false; } }), pull.drain(function(msg) { + self.rootIds.push(msg.data.key); self.pushMessage(msg.source, msg.data); }, function() { - self.finish(followedIds); + if(self.mostRecentCachedTimestamp) { + self.getReplies(followedIds); + } + else { + pull( // ugly hack to get the most recent message timestamp into mostRecentCachedTimestamp, even though we only look at backlinks here + client.messagesByType({type: "post", reverse: true, limit: 1}), + pull.drain(function (msg) { + self.mostRecentCachedTimestamp = msg.value.timestamp; + }, () => {}) + ); + + self.getRepliesInitial(followedIds); + } })); cb(this.outputStream, preserve); }, this.pushMessage = function(msgSource, newMsg) { this.idsInChannel[newMsg.value.author] = true; - var streamData = msgSource == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream; - streamData.oldestTimestampSeen = newMsg.value.timestamp; - this.allMessages[newMsg.key] = newMsg; // regardless of time, we want to check for replies to this message if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { - streamData.waitingMessages.push(newMsg); requestBlobs(newMsg); + debug("pushing message with key " + newMsg.key); + this.outputQueue.push(newMsg); } - - this.pushNewlySafeMessages(); }, - this.pushNewlySafeMessages = function() { - var newlySafeMessages = []; - var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); - debug("pushing messages from before " + oldestSafeTimestamp + "..."); - - for(var streamDataIndex in this.streamData) { - var streamDatum = this.streamData[streamDataIndex] - while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { - var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message - debug("pushing safe message with timestamp " + safeMsg.value.timestamp); - newlySafeMessages.push(safeMsg); - } - } - debug("pushed all newly safe messages"); + this.finish = function() { + this.outputQueue.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); + debug("done sorting messages"); - // in general, newlySafeMessages might not be in order, we should sort before appending - newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); - for(var msgIndex in newlySafeMessages) { - // this.outputStream.push(newlySafeMessages[msgIndex]); - this.outputQueue[newlySafeMessages[msgIndex].key] = newlySafeMessages[msgIndex]; + memoryCache[this.channelName] = this.outputQueue; + for(var msg of this.outputQueue) { + this.outputStream.push(msg); } - }, - this.finish = function(followedIds) { - for(var datumIndex in this.streamData) { - this.streamData[datumIndex].oldestTimestampSeen = 0; - } - this.pushNewlySafeMessages(); - this.getReplies(followedIds); + fs.writeFile(CACHE_FILEPATH + this.channelName + ".txt", JSON.stringify({"root": this.rootIds, "allMessages": memoryCache[this.channelName], "mostRecentCachedTimestamp": this.mostRecentCachedTimestamp}), (error) => { + if(error) { + debug("[ERROR] Failed writing to message cache: " + error); + } + else { + debug("Successfully saved messages to " + CACHE_FILEPATH + this.channelName + ".txt"); + } + }); - // this.outputStream.end(); + this.outputStream.end() + }, + this.getNewMessages = function(followedIds, preserve, cb) { + var stream = client.messagesByType({type: "post", gt: this.mostRecentCachedTimestamp}) + debug("looking for new messages"); + + var self = this; + pull( + stream, + pull.filter(function(msg) { + debug("filtering message " + JSON.stringify(msg)); + self.mostRecentCheckedTimestamp = msg.value.timestamp; + return followedIds.includes(msg.value.author.toLowerCase()); + }), + pull.filter(function(msg) { // assume all messages have msg.value.content, otherwise they wouldn't have post type + if((msg.value.content.text && (msg.value.content.text.indexOf("#" + self.channelName + "\n") != -1 || msg.value.content.text.indexOf("#" + self.channelName + " ") != -1)) || msg.value.content.channelName == self.channelName) { + debug("found new root message: " + JSON.stringify(msg)); + self.rootIds.push(msg.key); + return true; + } + else if(msg.value.content.root && self.rootIds.includes(msg.value.content.root)) { + debug("found new reply: " + JSON.stringify(msg)); + return true; + } + + return false; + }), + pull.drain(function(msg) { + debug("found new message: " + JSON.stringify(msg)); + self.outputQueue.push(msg); + }, self.finish.bind(self)) + ); + + cb(this.outputStream, preserve); }, this.getReplies = function(followedIds) { + var stream = client.messagesByType({type: "post", gt: this.mostRecentCachedTimestamp}) + debug("looking for replies"); + + var self = this; + pull( + stream, + pull.filter(function(msg) { // assume all messages have msg.value.content, otherwise they wouldn't have post type + debug("filtering message " + JSON.stringify(msg)); + self.mostRecentCheckedTimestamp = msg.value.timestamp; + return (!msg.value.content.text || msg.value.content.text.indexOf("#" + self.channelName) == -1) && msg.value.content.channelName != self.channelName && followedIds.includes(msg.value.author.toLowerCase()); + }), + pull.filter(function(msg) { + return msg.value.content.root && self.rootIds.includes(msg.value.content.root); + }), + pull.drain(function(msg) { + debug("found new reply: " + JSON.stringify(msg)); + self.outputQueue.push(msg); + }, self.finish.bind(self)) + ); + }, + this.getRepliesInitial = function(followedIds) { var backlinkStreams = []; - for(const msgKey in this.allMessages) { - backlinkStreams.push(getBacklinkStream(msgKey)); + debug("creating backlink streams"); + for(const key in this.rootIds) { + backlinkStreams.push(getBacklinkStream(key)); } var self = this; + var channelTag = "#" + this.channelName; debug("looking for backlinked messages..."); - var messagesToPush = []; pull( many(backlinkStreams), pull.filter(function(msg) { - return !(msg.data.key in self.outputQueue) && (followedIds.includes(msg.data.value.author.toLowerCase())); + debug("checking backlinked message " + JSON.stringify(msg)); + return msg.data.value && msg.data.value.content && msg.data.value.content.text && (msg.data.value.content.text.indexOf(channelTag) == -1) && !(msg.data.value.content.channelName == self.channelName) && (followedIds.includes(msg.data.value.author.toLowerCase())); }), pull.filter(function(msg) { debug("found backlink message: " + JSON.stringify(msg.data)); @@ -140,26 +196,9 @@ class ChannelController { }), pull.drain(function(msg) { debug("backlink message had correct timestamps: " + JSON.stringify(msg.data)); - if(!(msg.data.key in self.outputQueue)) { - self.outputQueue[msg.data.key] = msg.data; - // messagesToPush.push(msg.data); - } - }, function() { - for(const msgKey in self.outputQueue) { - debug("pushing message with key " + msgKey); - //self.outputStream.push(self.outputQueue[msgKey]); - messagesToPush.push(self.outputQueue[msgKey]); - } - - messagesToPush.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); - - for(var msg of messagesToPush) { - self.outputStream.push(msg); - } - - self.outputStream.end() - }) - ) + self.outputQueue.push(msg.data); + }, self.finish.bind(self)) + ); } } } @@ -251,7 +290,7 @@ function getBacklinkStream(msgId) { function createHashtagStream(channelName) { var search = client.search && client.search.query; if(!search) { - console.log("[FATAL] ssb-search plugin must be installed to us channels"); + console.log("[FATAL] ssb-search plugin must be installed to use channels"); } var query = search({ @@ -263,24 +302,22 @@ function createHashtagStream(channelName) { return query; } -/*function createHashtagStream(channelTag) { +/*function createHashtagStream(channelName) { var query = client.query.read({ - query: [{ - "$filter": { - value: { - content: { - mentions: [ - channelTag - ] - } - } - } - }], - reverse: true + "$filter": { + value: { + content: { + text: channelName + } + } + } }); + query.streamName = "hashtag"; + return query; -}*/ +} +*/ function createChannelStream(channelName) { var query = client.query.read({ @@ -301,6 +338,22 @@ function createChannelStream(channelName) { return query; } +function createHashtagStreamOld(channelName) { + var query = client.query.read({ + "$filter": { + value: { + content: { + text: channelName + } + } + } + }); + + query.streamName = "hashtag"; + + return query; +} + function createUserStream(userId) { var query = client.createUserStream({ id: userId @@ -338,18 +391,61 @@ function getConfig() { module.exports = { getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) { client = ssbClient; + debug("getting messages from #" + channelName); + + if(channelName in memoryCache && 'lt' in ssbOpts) { + debug("already fetched messages from this channel"); + let outputStream = Pushable(); + ssbOpts = ssbOpts || DEFAULT_OPTS + // let limit = ssbOpts.limit || 10 + let limit = 100; // this is the largest possible limit by default; since patchfoo doesn't pass limit to us by default, this is a (relatively) safe option + debug("getting with limit " + limit); + + for(let msg of memoryCache[channelName]) { + if(msg.value.timestamp < ssbOpts.lt) { + outputStream.push(msg); + limit -= 1; + if(limit == 0) { + break; + } + } + if(msg.value.timestamp <= ssbOpts.gt) { // if we found a message that's too old, all the following messages will be too old as well + break; + } + } - client.friends.hops({ - dunbar: Number.MAX_SAFE_INTEGER, - max: hops - }, function(error, friends) { - if(error) { - throw "Couldn't get a list of friends from scuttlebot:\n" + error; + cb(outputStream, preserve); + } + else { + var mostRecentCachedTimestamp = null; + var rootIds = []; + var allMessages = []; + if (fs.existsSync(CACHE_FILEPATH + channelName + ".txt")) { + cache = JSON.parse(fs.readFileSync(CACHE_FILEPATH + channelName + ".txt")); + if (cache.root.length) { + mostRecentCachedTimestamp = cache.mostRecentCachedTimestamp; + rootIds = cache.root; + allMessages = cache.allMessages; + debug("successfully read messages from cache"); + } + else { + debug("[WARNING] Cache was empty!") + } + debug("most recent cached timestamp: " + mostRecentCachedTimestamp); } - var followedIds = Object.keys(friends).map(id => id.toLowerCase()); - let controller = new ChannelController(ssbOpts || DEFAULT_OPTS); - controller.getMessagesFrom(channelName, followedIds, preserve, cb); - }); + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: hops + }, function(error, friends) { + if(error) { + throw "Couldn't get a list of friends from scuttlebot:\n" + error; + } + + var followedIds = Object.keys(friends).map(id => id.toLowerCase()); + let controller = new ChannelController(ssbOpts || DEFAULT_OPTS); + controller.getMessagesFrom(mostRecentCachedTimestamp, rootIds, allMessages, channelName, followedIds, preserve, cb); + }); + } } } diff --git a/lib/pull-many-v2.js b/lib/pull-many-v2.js index beea621..feb68ce 100644 --- a/lib/pull-many-v2.js +++ b/lib/pull-many-v2.js @@ -1,5 +1,6 @@ // All credit to https://github.com/pull-stream/pull-many/blob/master/index.js -// Edits have been made to distinguish the original stream that each object came from +// Edits have been made to distinguish the original stream that each object came from, +// and to be able to end each stream independently module.exports = function (ary) { @@ -59,7 +60,13 @@ module.exports = function (ary) { current.ready = true current.reading = false - if(end === true || abort) current.ended = true + if(end === true || abort) { + current.ended = true + if(current.read.streamName) { + var timestamp = new Date(); + console.log("[pull-many-v2] [" + timestamp.toISOString() + "] " + "reached the end of stream " + current.read.streamName); + } + } else if(end) abort = current.ended = end //check whether we need to abort this stream. if(abort && !end) current.read(abort, next) @@ -90,6 +97,8 @@ module.exports = function (ary) { read.cap = function (err) { read.add(null) } + + read.inputs = inputs return read } -- cgit v1.2.3