From 436e6e2831ca4af007192aa3df835a640e7ba6a9 Mon Sep 17 00:00:00 2001 From: "Eugeniy E. Mikhailov" Date: Thu, 15 Jul 2021 14:15:58 -0400 Subject: Tyler redid channel search in more self contained way --- lib/logbook-lib.js | 176 +++++++++++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 80 deletions(-) diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js index 451e61d..602bfe7 100644 --- a/lib/logbook-lib.js +++ b/lib/logbook-lib.js @@ -7,9 +7,6 @@ var many = require("pull-many"); // Change to 'true' to get debugging output DEBUG = !false -// Change to switch the default monitored channel -DEFAULT_CHANNEL_NAME = "logbook" - DEFAULT_OPTS = { lt: Date.now(), gt: -Infinity @@ -18,51 +15,77 @@ GRAPH_TYPE = "follow" MAX_HOPS = 1 MAX_REPLY_RECURSIONS = 10 -// using global variables to this extent is horrifying, but javascript has forced my hand +// only instantiate a ssb-client once, so we don't have to create a new connection every time we load a channel page let client = null; -let outputStream = Pushable(); -let allowedBlobs = []; -let idsInMainChannel = {}; -let allMessages = {}; -let opts = DEFAULT_OPTS; - -function pushMessage(newMsg) { - if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { - idsInMainChannel[newMsg.value.author] = true; - allMessages[newMsg.key] = newMsg; - requestBlobs(newMsg); - } -} - -function sortAndPushAllMessages() { - let messages = Object.keys(allMessages).map(function(msgId) { return allMessages[msgId]; }); - for(let msg of messages.sort((a, b) => b.value.timestamp - a.value.timestamp)) { - outputStream.push(msg); - } - exit() -} -function getReplies(rootMessages) { - let replyStreams = []; +class ChannelController { + constructor(opts) { + this.opts = opts, + this.outputStream = Pushable(), + this.idsInChannel = {}, + this.rootMessages = {}, // used to check for replies + + this.getMessagesFrom = function(channelName, followedIds, preserve, cb) { + var hashtagStream = createHashtagStream("#" + channelName); + var channelStream = createChannelStream(channelName); + var stream = many([hashtagStream, channelStream]); + + var self = this; + pull(stream, pull.filter(function(msg) { + console.log(msg) + return followedIds.includes(msg.value.author.toLowerCase()); + }), pull.drain(function(msg) { + self.pushMessage(msg); + }, function() { + self.getReplies(); + })); + + cb(this.outputStream, preserve); + }, + this.pushMessage = function(newMsg) { + this.idsInChannel[newMsg.value.author] = true; + if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { + this.rootMessages[newMsg.key] = newMsg; + requestBlobs(newMsg); + } + }, + this.getReplies = function() { + let replyStreams = []; - for(userId of Object.keys(idsInMainChannel)) { - replyStreams.push(createUserStream(client, userId)); - } + for(let userId of Object.keys(this.idsInChannel)) { + replyStreams.push(createUserStream(userId)); + } - pull( - many(replyStreams), - pull.filter(function(msg) { - let messageIsValid = msg.value && msg.value.content && msg.value.content.root; - return messageIsValid && !(rootMessages.includes(msg.key)) && (rootMessages.includes(msg.value.content.root)); - }), - pull.drain(function(msg) { - pushMessage(msg); + let self = this; + pull( + many(replyStreams), + pull.filter(function(msg) { + let messageIsValid = msg.value && msg.value.content && msg.value.content.root; + return messageIsValid && !(msg.key in self.rootMessages) && (msg.value.content.root in self.rootMessages); + }), + pull.drain(function(msg) { + self.pushMessage(msg); + }, + function() { + self.sortAndPushAllMessages(); + getProfilePicturesOf(Object.keys(self.idsInChannel)); + }) + ); }, - function() { - sortAndPushAllMessages(); - getRelevantProfilePictures(); - }) - ); + this.sortAndPushAllMessages = function() { + let self = this; + let messages = Object.keys(this.rootMessages).map(function(msgId) { return self.rootMessages[msgId]; }); + for(let msg of messages.sort((a, b) => b.value.timestamp - a.value.timestamp)) { + this.outputStream.push(msg); + } + this.exit(); + }, + this.exit = function() { + debug("ending stream") + this.outputStream.end(); + delete this; + } + } } function requestBlobs(msg) { @@ -82,14 +105,13 @@ function requestBlobs(msg) { } } -function getRelevantProfilePictures() { - let relevantIds = Object.keys(idsInMainChannel); +function getProfilePicturesOf(relevantIds) { let profilePictureStreams = [] let profilePictureFound = {}; for(let userId of relevantIds) { profilePictureFound[userId] = false; - profilePictureStreams.push(createMetadataStream(client, userId)); + profilePictureStreams.push(createMetadataStream(userId)); } let collectedStream = many(profilePictureStreams); @@ -121,8 +143,6 @@ function getRelevantProfilePictures() { } function getBlob(blobId) { - allowedBlobs.push(blobId); - debug("Ensuring existence of blob with ID " + blobId); client.blobs.has(blobId, function(err, has) { if(err) { @@ -135,11 +155,6 @@ function getBlob(blobId) { }); } -function exit() { // is called at the end of the getReplies -> getRelevantProfilePictures chain - debug("exiting...") - outputStream.end(); -} - function debug(message) { if(DEBUG) { var timestamp = new Date(); @@ -147,11 +162,10 @@ function debug(message) { } } -function createHashtagStream(client, channelName) { +function createHashtagStream(channelName) { var search = client.search && client.search.query; if(!search) { console.log("[FATAL] ssb-search plugin must be installed to us channels"); - process.exit(5); } var query = search({ @@ -161,7 +175,28 @@ function createHashtagStream(client, channelName) { return query; } -function createChannelStream(client, channelName) { +/* +function createHashtagStream(channelTag) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + content: { + mentions: [ + channelTag + ] + } + } + } + }], + reverse: true + }); + + return query; +} +*/ + +function createChannelStream(channelName) { var query = client.query.read({ query: [{ "$filter": { @@ -178,7 +213,7 @@ function createChannelStream(client, channelName) { return query; } -function createUserStream(client, userId) { +function createUserStream(userId) { var query = client.createUserStream({ id: userId }); @@ -186,7 +221,7 @@ function createUserStream(client, userId) { return query; } -function createMetadataStream(client, userId) { +function createMetadataStream(userId) { var query = client.query.read({ query: [{ "$filter": { @@ -205,25 +240,6 @@ function createMetadataStream(client, userId) { return query; } -function getMessagesFrom(channelName, followedIds, preserve, cb) { - var channelTag = "#" + channelName; - var hashtagStream = createHashtagStream(client, channelName); - var channelStream = createChannelStream(client, channelName); - var stream = many([hashtagStream, channelStream]); - - outputStream = Pushable(); // have to create a new outputStream every time, or all calls to getMessages after the first will return the already empty outputStream, causing patchfoo to return no messages - - pull(stream, pull.filter(function(msg) { - return followedIds.includes(msg.value.author.toLowerCase()); - }), pull.drain(function(msg) { - pushMessage(msg); - }, function() { - getReplies(Object.keys(allMessages)); - })); - - cb(outputStream, preserve); -} - function getConfig() { return { host: config.host || "localhost", @@ -233,8 +249,7 @@ function getConfig() { module.exports = { getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) { - client = ssbClient; // set global variables - opts = ssbOpts; + client = ssbClient; client.friends.hops({ dunbar: Number.MAX_SAFE_INTEGER, @@ -245,7 +260,8 @@ module.exports = { } var followedIds = Object.keys(friends).map(id => id.toLowerCase()); - getMessagesFrom(channelName, followedIds, preserve, cb); + let controller = new ChannelController(ssbOpts || DEFAULT_OPTS); + controller.getMessagesFrom(channelName, followedIds, preserve, cb); }); } } -- cgit v1.2.3