aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorEugeniy E. Mikhailov <evgmik@gmail.com>2021-11-11 13:03:58 -0500
committerEugeniy E. Mikhailov <evgmik@gmail.com>2021-11-11 13:03:58 -0500
commit6be6dd5b44d890f381c18227db0d061e48245710 (patch)
tree5350070d13a156ee94a2756cb61b3b44492d7d55 /lib
parentca3b8e0465605759d512b0852813e599b94874e4 (diff)
downloadpatchfoo-6be6dd5b44d890f381c18227db0d061e48245710.tar.gz
patchfoo-6be6dd5b44d890f381c18227db0d061e48245710.zip
Tyler's changes
Diffstat (limited to 'lib')
-rw-r--r--lib/logbook-lib.js312
-rw-r--r--lib/pull-many-v2.js13
2 files changed, 215 insertions, 110 deletions
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js
index d09d327..68cbb11 100644
--- a/lib/logbook-lib.js
+++ b/lib/logbook-lib.js
@@ -1,5 +1,6 @@
var clientFactory = require("ssb-client");
var config = require("ssb-config");
+var fs = require("fs");
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
var many = require("./pull-many-v2");
@@ -14,50 +15,52 @@ DEFAULT_OPTS = {
GRAPH_TYPE = "follow"
MAX_HOPS = 1
+CACHE_FILEPATH = "./logbook_cache/"
+
// only instantiate a ssb-client once, so we don't have to create a new connection every time we load a channel page
let client = null;
-
-class StreamData {
- constructor() {
- this.finished = false,
- this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER,
- this.waitingMessages = []
-
- this.markFinished = function() {
- this.finished = true,
- this.oldestTimestampSeen = 0
- }
- }
-}
+let memoryCache = {};
+fs.mkdirSync(CACHE_FILEPATH, {recursive: true});
class ChannelController {
constructor(opts) {
this.opts = opts,
this.outputStream = Pushable(),
- this.allMessages = {},
- this.outputQueue = {},
this.idsInChannel = {},
- this.streamData = {
- channelStream: new StreamData(),
- hashtagStream: new StreamData(),
+ this.streamOpen = {
+ "channel": true,
+ "hashtag": true,
},
-
- this.getMessagesFrom = function(channelName, followedIds, preserve, cb) {
- var channelTag = "#" + channelName;
+
+ this.getMessagesFrom = function(mostRecentCachedTimestamp, rootIds, allMessages, channelName, followedIds, preserve, cb) {
+ this.channelName = channelName;
+ this.rootIds = rootIds; // don't know whether it's worth constructing a hashmap for this
+ this.mostRecentCachedTimestamp = mostRecentCachedTimestamp;
+ this.outputQueue = allMessages;
+
+ if(mostRecentCachedTimestamp == null) {
+ this.getInitialMessages(followedIds, preserve, cb);
+ }
+ else {
+ this.getNewMessages(followedIds, preserve, cb);
+ }
+ },
+ this.getInitialMessages = function(followedIds, preserve, cb) {
+ var channelTag = "#" + this.channelName;
var hashtagStream = createHashtagStream(channelTag);
- var channelStream = createChannelStream(channelName);
+ var channelStream = createChannelStream(this.channelName);
var stream = many([hashtagStream, channelStream]);
var self = this;
pull(stream, pull.filter(function(msg) {
return followedIds.includes(msg.data.value.author.toLowerCase());
}), pull.filter(function(msg) {
- if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == channelName) {
+ if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == self.channelName) {
return true;
}
- // prevent ssb-search from grouping messages with #logbook and #logbook2 together, without running a double search
- if(msg.data.value && msg.data.value.content && msg.data.value.content.text) {
+ // filter out false positives from ssb-query's loose text matching
+ if(msg.data.value && msg.data.value.content && msg.data.value.content.text && msg.data.value.content.text.indexOf) {
let acceptableHashtags = [channelTag + "\n", channelTag + " "];
for(let hashtag of acceptableHashtags) {
if(msg.data.value.content.text.indexOf(hashtag) != -1) {
@@ -68,71 +71,124 @@ class ChannelController {
return false;
}
}), pull.drain(function(msg) {
+ self.rootIds.push(msg.data.key);
self.pushMessage(msg.source, msg.data);
}, function() {
- self.finish(followedIds);
+ if(self.mostRecentCachedTimestamp) {
+ self.getReplies(followedIds);
+ }
+ else {
+ pull( // ugly hack to get the most recent message timestamp into mostRecentCachedTimestamp, even though we only look at backlinks here
+ client.messagesByType({type: "post", reverse: true, limit: 1}),
+ pull.drain(function (msg) {
+ self.mostRecentCachedTimestamp = msg.value.timestamp;
+ }, () => {})
+ );
+
+ self.getRepliesInitial(followedIds);
+ }
}));
cb(this.outputStream, preserve);
},
this.pushMessage = function(msgSource, newMsg) {
this.idsInChannel[newMsg.value.author] = true;
- var streamData = msgSource == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
- streamData.oldestTimestampSeen = newMsg.value.timestamp;
- this.allMessages[newMsg.key] = newMsg; // regardless of time, we want to check for replies to this message
if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
- streamData.waitingMessages.push(newMsg);
requestBlobs(newMsg);
+ debug("pushing message with key " + newMsg.key);
+ this.outputQueue.push(newMsg);
}
-
- this.pushNewlySafeMessages();
},
- this.pushNewlySafeMessages = function() {
- var newlySafeMessages = [];
- var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
- debug("pushing messages from before " + oldestSafeTimestamp + "...");
-
- for(var streamDataIndex in this.streamData) {
- var streamDatum = this.streamData[streamDataIndex]
- while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
- var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
- debug("pushing safe message with timestamp " + safeMsg.value.timestamp);
- newlySafeMessages.push(safeMsg);
- }
- }
- debug("pushed all newly safe messages");
+ this.finish = function() {
+ this.outputQueue.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
+ debug("done sorting messages");
- // in general, newlySafeMessages might not be in order, we should sort before appending
- newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
- for(var msgIndex in newlySafeMessages) {
- // this.outputStream.push(newlySafeMessages[msgIndex]);
- this.outputQueue[newlySafeMessages[msgIndex].key] = newlySafeMessages[msgIndex];
+ memoryCache[this.channelName] = this.outputQueue;
+ for(var msg of this.outputQueue) {
+ this.outputStream.push(msg);
}
- },
- this.finish = function(followedIds) {
- for(var datumIndex in this.streamData) {
- this.streamData[datumIndex].oldestTimestampSeen = 0;
- }
- this.pushNewlySafeMessages();
- this.getReplies(followedIds);
+ fs.writeFile(CACHE_FILEPATH + this.channelName + ".txt", JSON.stringify({"root": this.rootIds, "allMessages": memoryCache[this.channelName], "mostRecentCachedTimestamp": this.mostRecentCachedTimestamp}), (error) => {
+ if(error) {
+ debug("[ERROR] Failed writing to message cache: " + error);
+ }
+ else {
+ debug("Successfully saved messages to " + CACHE_FILEPATH + this.channelName + ".txt");
+ }
+ });
- // this.outputStream.end();
+ this.outputStream.end()
+ },
+ this.getNewMessages = function(followedIds, preserve, cb) {
+ var stream = client.messagesByType({type: "post", gt: this.mostRecentCachedTimestamp})
+ debug("looking for new messages");
+
+ var self = this;
+ pull(
+ stream,
+ pull.filter(function(msg) {
+ debug("filtering message " + JSON.stringify(msg));
+ self.mostRecentCheckedTimestamp = msg.value.timestamp;
+ return followedIds.includes(msg.value.author.toLowerCase());
+ }),
+ pull.filter(function(msg) { // assume all messages have msg.value.content, otherwise they wouldn't have post type
+ if((msg.value.content.text && (msg.value.content.text.indexOf("#" + self.channelName + "\n") != -1 || msg.value.content.text.indexOf("#" + self.channelName + " ") != -1)) || msg.value.content.channelName == self.channelName) {
+ debug("found new root message: " + JSON.stringify(msg));
+ self.rootIds.push(msg.key);
+ return true;
+ }
+ else if(msg.value.content.root && self.rootIds.includes(msg.value.content.root)) {
+ debug("found new reply: " + JSON.stringify(msg));
+ return true;
+ }
+
+ return false;
+ }),
+ pull.drain(function(msg) {
+ debug("found new message: " + JSON.stringify(msg));
+ self.outputQueue.push(msg);
+ }, self.finish.bind(self))
+ );
+
+ cb(this.outputStream, preserve);
},
this.getReplies = function(followedIds) {
+ var stream = client.messagesByType({type: "post", gt: this.mostRecentCachedTimestamp})
+ debug("looking for replies");
+
+ var self = this;
+ pull(
+ stream,
+ pull.filter(function(msg) { // assume all messages have msg.value.content, otherwise they wouldn't have post type
+ debug("filtering message " + JSON.stringify(msg));
+ self.mostRecentCheckedTimestamp = msg.value.timestamp;
+ return (!msg.value.content.text || msg.value.content.text.indexOf("#" + self.channelName) == -1) && msg.value.content.channelName != self.channelName && followedIds.includes(msg.value.author.toLowerCase());
+ }),
+ pull.filter(function(msg) {
+ return msg.value.content.root && self.rootIds.includes(msg.value.content.root);
+ }),
+ pull.drain(function(msg) {
+ debug("found new reply: " + JSON.stringify(msg));
+ self.outputQueue.push(msg);
+ }, self.finish.bind(self))
+ );
+ },
+ this.getRepliesInitial = function(followedIds) {
var backlinkStreams = [];
- for(const msgKey in this.allMessages) {
- backlinkStreams.push(getBacklinkStream(msgKey));
+ debug("creating backlink streams");
+ for(const key in this.rootIds) {
+ backlinkStreams.push(getBacklinkStream(key));
}
var self = this;
+ var channelTag = "#" + this.channelName;
debug("looking for backlinked messages...");
- var messagesToPush = [];
pull(
many(backlinkStreams),
pull.filter(function(msg) {
- return !(msg.data.key in self.outputQueue) && (followedIds.includes(msg.data.value.author.toLowerCase()));
+ debug("checking backlinked message " + JSON.stringify(msg));
+ return msg.data.value && msg.data.value.content && msg.data.value.content.text && (msg.data.value.content.text.indexOf(channelTag) == -1) && !(msg.data.value.content.channelName == self.channelName) && (followedIds.includes(msg.data.value.author.toLowerCase()));
}),
pull.filter(function(msg) {
debug("found backlink message: " + JSON.stringify(msg.data));
@@ -140,26 +196,9 @@ class ChannelController {
}),
pull.drain(function(msg) {
debug("backlink message had correct timestamps: " + JSON.stringify(msg.data));
- if(!(msg.data.key in self.outputQueue)) {
- self.outputQueue[msg.data.key] = msg.data;
- // messagesToPush.push(msg.data);
- }
- }, function() {
- for(const msgKey in self.outputQueue) {
- debug("pushing message with key " + msgKey);
- //self.outputStream.push(self.outputQueue[msgKey]);
- messagesToPush.push(self.outputQueue[msgKey]);
- }
-
- messagesToPush.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
-
- for(var msg of messagesToPush) {
- self.outputStream.push(msg);
- }
-
- self.outputStream.end()
- })
- )
+ self.outputQueue.push(msg.data);
+ }, self.finish.bind(self))
+ );
}
}
}
@@ -251,7 +290,7 @@ function getBacklinkStream(msgId) {
function createHashtagStream(channelName) {
var search = client.search && client.search.query;
if(!search) {
- console.log("[FATAL] ssb-search plugin must be installed to us channels");
+ console.log("[FATAL] ssb-search plugin must be installed to use channels");
}
var query = search({
@@ -263,24 +302,22 @@ function createHashtagStream(channelName) {
return query;
}
-/*function createHashtagStream(channelTag) {
+/*function createHashtagStream(channelName) {
var query = client.query.read({
- query: [{
- "$filter": {
- value: {
- content: {
- mentions: [
- channelTag
- ]
- }
- }
- }
- }],
- reverse: true
+ "$filter": {
+ value: {
+ content: {
+ text: channelName
+ }
+ }
+ }
});
+ query.streamName = "hashtag";
+
return query;
-}*/
+}
+*/
function createChannelStream(channelName) {
var query = client.query.read({
@@ -301,6 +338,22 @@ function createChannelStream(channelName) {
return query;
}
+function createHashtagStreamOld(channelName) {
+ var query = client.query.read({
+ "$filter": {
+ value: {
+ content: {
+ text: channelName
+ }
+ }
+ }
+ });
+
+ query.streamName = "hashtag";
+
+ return query;
+}
+
function createUserStream(userId) {
var query = client.createUserStream({
id: userId
@@ -338,18 +391,61 @@ function getConfig() {
module.exports = {
getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) {
client = ssbClient;
+ debug("getting messages from #" + channelName);
+
+ if(channelName in memoryCache && 'lt' in ssbOpts) {
+ debug("already fetched messages from this channel");
+ let outputStream = Pushable();
+ ssbOpts = ssbOpts || DEFAULT_OPTS
+ // let limit = ssbOpts.limit || 10
+ let limit = 100; // this is the largest possible limit by default; since patchfoo doesn't pass limit to us by default, this is a (relatively) safe option
+ debug("getting with limit " + limit);
+
+ for(let msg of memoryCache[channelName]) {
+ if(msg.value.timestamp < ssbOpts.lt) {
+ outputStream.push(msg);
+ limit -= 1;
+ if(limit == 0) {
+ break;
+ }
+ }
+ if(msg.value.timestamp <= ssbOpts.gt) { // if we found a message that's too old, all the following messages will be too old as well
+ break;
+ }
+ }
- client.friends.hops({
- dunbar: Number.MAX_SAFE_INTEGER,
- max: hops
- }, function(error, friends) {
- if(error) {
- throw "Couldn't get a list of friends from scuttlebot:\n" + error;
+ cb(outputStream, preserve);
+ }
+ else {
+ var mostRecentCachedTimestamp = null;
+ var rootIds = [];
+ var allMessages = [];
+ if (fs.existsSync(CACHE_FILEPATH + channelName + ".txt")) {
+ cache = JSON.parse(fs.readFileSync(CACHE_FILEPATH + channelName + ".txt"));
+ if (cache.root.length) {
+ mostRecentCachedTimestamp = cache.mostRecentCachedTimestamp;
+ rootIds = cache.root;
+ allMessages = cache.allMessages;
+ debug("successfully read messages from cache");
+ }
+ else {
+ debug("[WARNING] Cache was empty!")
+ }
+ debug("most recent cached timestamp: " + mostRecentCachedTimestamp);
}
- var followedIds = Object.keys(friends).map(id => id.toLowerCase());
- let controller = new ChannelController(ssbOpts || DEFAULT_OPTS);
- controller.getMessagesFrom(channelName, followedIds, preserve, cb);
- });
+ client.friends.hops({
+ dunbar: Number.MAX_SAFE_INTEGER,
+ max: hops
+ }, function(error, friends) {
+ if(error) {
+ throw "Couldn't get a list of friends from scuttlebot:\n" + error;
+ }
+
+ var followedIds = Object.keys(friends).map(id => id.toLowerCase());
+ let controller = new ChannelController(ssbOpts || DEFAULT_OPTS);
+ controller.getMessagesFrom(mostRecentCachedTimestamp, rootIds, allMessages, channelName, followedIds, preserve, cb);
+ });
+ }
}
}
diff --git a/lib/pull-many-v2.js b/lib/pull-many-v2.js
index beea621..feb68ce 100644
--- a/lib/pull-many-v2.js
+++ b/lib/pull-many-v2.js
@@ -1,5 +1,6 @@
// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js
-// Edits have been made to distinguish the original stream that each object came from
+// Edits have been made to distinguish the original stream that each object came from,
+// and to be able to end each stream independently
module.exports = function (ary) {
@@ -59,7 +60,13 @@ module.exports = function (ary) {
current.ready = true
current.reading = false
- if(end === true || abort) current.ended = true
+ if(end === true || abort) {
+ current.ended = true
+ if(current.read.streamName) {
+ var timestamp = new Date();
+ console.log("[pull-many-v2] [" + timestamp.toISOString() + "] " + "reached the end of stream " + current.read.streamName);
+ }
+ }
else if(end) abort = current.ended = end
//check whether we need to abort this stream.
if(abort && !end) current.read(abort, next)
@@ -90,6 +97,8 @@ module.exports = function (ary) {
read.cap = function (err) {
read.add(null)
}
+
+ read.inputs = inputs
return read
}