diff options
author | Eugeniy Mikhailov <evmik@physics.wm.edu> | 2021-06-23 13:41:11 -0400 |
---|---|---|
committer | Eugeniy Mikhailov <evmik@physics.wm.edu> | 2021-06-23 13:41:11 -0400 |
commit | 46f5ca5fe4c4a0f56e3cdcef6b464938836703e1 (patch) | |
tree | 70ec3da88d8cbda02b63d69803f06d4ef5a7f849 /lib/logbook-lib.js | |
parent | 0a374a3dd1008864cb4d6ea0c67a5080f2427b28 (diff) | |
download | patchfoo-46f5ca5fe4c4a0f56e3cdcef6b464938836703e1.tar.gz patchfoo-46f5ca5fe4c4a0f56e3cdcef6b464938836703e1.zip |
channels-lib.js -> logbook-lib.js
Diffstat (limited to 'lib/logbook-lib.js')
-rw-r--r-- | lib/logbook-lib.js | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js new file mode 100644 index 0000000..ae3fab9 --- /dev/null +++ b/lib/logbook-lib.js @@ -0,0 +1,275 @@ +var clientFactory = require("ssb-client"); +var config = require("ssb-config"); +var pull = require("pull-stream"); +var Pushable = require("pull-pushable"); +var many = require("pull-many"); + +// Change to 'true' to get debugging output +DEBUG = false + +// Change to switch the default monitored channel +DEFAULT_CHANNEL_NAME = "logbook" + +DEFAULT_OPTS = { + lt: Date.now(), + gt: -Infinity +} +GRAPH_TYPE = "follow" +MAX_HOPS = 1 +MAX_REPLY_RECURSIONS = 10 + +// using global variables to this extent is horrifying, but javascript has forced my hand +let client = null; +let outputStream = Pushable(); +let allowedBlobs = []; +let idsInMainChannel = {}; +let allMessages = {}; +let messagesByUser = {}; +let opts = DEFAULT_OPTS; + +function pushMessage(newMsg) { + if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { + idsInMainChannel[newMsg.value.author] = true; + allMessages[newMsg.key] = newMsg; + requestBlobs(newMsg); + } +} + +function sortAndPushAllMessages() { + let messages = Object.keys(allMessages).map(function(msgId) { return allMessages[msgId]; }); + for(let msg of messages.sort((a, b) => b.value.timestamp - a.value.timestamp)) { + outputStream.push(msg); + } +} + +function getReplies(unchckedMessages, recursionCount) { + if(recursionCount == MAX_REPLY_RECURSIONS) { + debug("Maximum recursions reached for reply chain; continuing to fetch profile pictures...\n Messages with unchecked replies start with: \n" + uncheckedMessages); + sortAndPushAllMessages(); + getRelevantProfilePictures(); + } + + let replyStreams = []; + + for(userId of Object.entries(idsInMainChannel)) { + messagesByUser[userId] = []; + replyStreams.push(createUserStream(client, userId)); + } + + let newMessages = {}; + let foundNewMessages = false; + pull( + many(replyStreams), + pull.filter(function(msg) { + userMessages[msg.value.author].push(msg); + let messageIsValid = (value in msg) && (content in msg.value) && (link in msg.value.content); + return messageIsValid && !(msg.key in uncheckedMessages) && (msg.value.content.link in uncheckedMessages); + }), + pull.drain(function(msg) { + pushMessage(msg); + + newMessages[msg.key] = msg; + foundNewMessages = true; + }, + function() { + if(!foundNewMessages) { + sortAndPushAllMessages(); + getRelevantProfilePictures(); + } + else { + getReplies(newMessages, recursionCount + 1); + } + }) + ); +} + +function requestBlobs(msg) { + if(msg.value && msg.value.content) { + if(msg.value.content.mentions && Symbol.iterator in Object(msg.value.content.mentions)) { + for(let mention of msg.value.content.mentions) { + if(mention.type && mention.link) { + if(typeof mention.link == "string") { + getBlob(mention.link) + } + else { + debug("Message has non-string mention.link value: " + JSON.stringify(mention.link)); + } + } + } + } + } +} + +function getRelevantProfilePictures() { + let relevantIds = Object.keys(idsInMainChannel); + let profilePictureStreams = [] + let profilePictureFound = {}; + + for(let userId of relevantIds) { + profilePictureFound[userId] = false; + profilePictureStreams.push(createMetadataStream(client, userId)); + } + let collectedStream = many(profilePictureStreams); + + if(relevantIds.length == 0) { // avoid the edge case where checkIfDone is never called + exit(); + } + + pull( + collectedStream, + pull.drain(function(msg) { + if(!profilePictureFound[msg.value.author] && msg.value.content.image) { + if(typeof msg.value.content.image == "string") { + debug("Getting profile picture at " + msg.value.content.image + " for user " + msg.value.author); + getBlob(msg.value.content.image); + profilePictureFound[msg.value.author] = true; + } + else if(typeof msg.value.content.image == "object" && typeof msg.value.content.image.link == "string") { + debug("Getting profile picture at " + msg.value.content.image.link + " for user " + msg.value.author); + getBlob(msg.value.content.image.link); + profilePictureFound[msg.value.author] = true; + } + else { + debug("Message has unknown msg.value.content.image value: " + JSON.stringify(msg.value.content.image)); + } + } + }, function() { + exit(); + }) + ); +} + +function getBlob(blobId) { + allowedBlobs.push(blobId); + + debug("Ensuring existence of blob with ID " + blobId); + client.blobs.has(blobId, function(err, has) { + if(err) { + debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId); + } + if(!err && !has) { + debug("Wanting blob with ID " + blobId); + client.blobs.want(blobId, () => {}); + } + }); +} + +function exit() { // is called at the end of the getReplies -> getRelevantProfilePictures chain + outputStream.end(); +} + +function debug(message) { + if(DEBUG) { + var timestamp = new Date(); + console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message); + } +} + +function createHashtagStream(client, channelName) { + var search = client.search && client.search.query; + if(!search) { + console.log("[FATAL] ssb-search plugin must be installed to us channels"); + process.exit(5); + } + + var query = search({ + query: channelName + }); + + return query; +} + +function createChannelStream(client, channelName) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + content: { + channel: channelName + } + } + } + }], + reverse: true + }); + + return query; +} + +function createUserStream(client, userId) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + author: userId + } + } + }], + reverse: true + }); + + return query; +} + +function createMetadataStream(client, userId) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + author: userId, + content: { + type: "about" + } + } + } + }] + }); + + query.streamName = userId; // mark the stream object so we can tell which stream a message came from later + + return query; +} + +function getMessagesFrom(channelName, followedIds, preserve, cb) { + var channelTag = "#" + channelName; + var hashtagStream = createHashtagStream(client, channelName); + var channelStream = createChannelStream(client, channelName); + var stream = many([hashtagStream, channelStream]); + + pull(stream, pull.filter(function(msg) { + return followedIds.includes(msg.value.author.toLowerCase()); + }), pull.drain(function(msg) { + pushMessage(msg); + }, function() { + getReplies(allMessages, 0); + })); + + cb(outputStream, preserve); +} + +function getConfig() { + return { + host: config.host || "localhost", + port: config.port || 8008 + } +} + +module.exports = { + getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) { + client = ssbClient; // set global variables + opts = ssbOpts; + + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: hops + }, function(error, friends) { + if(error) { + throw "Couldn't get a list of friends from scuttlebot:\n" + error; + } + + var followedIds = Object.keys(friends).map(id => id.toLowerCase()); + getMessagesFrom(channelName, followedIds, preserve, cb); + }); + } +} + |