diff options
-rw-r--r-- | lib/channels-lib.js | 185 | ||||
-rw-r--r-- | lib/logbook-lib.js | 275 | ||||
-rw-r--r-- | lib/serve.js | 3 |
3 files changed, 277 insertions, 186 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js deleted file mode 100644 index 327ec38..0000000 --- a/lib/channels-lib.js +++ /dev/null @@ -1,185 +0,0 @@ -var pull = require("pull-stream"); -var Pushable = require("pull-pushable"); -var many = require("./pull-many-v2"); - -// Change to 'true' to get debugging output -DEBUG = false - -FRIENDS_POLL_TIME = 1 // ms -GRAPH_TYPE = "follow" -MAX_HOPS = 1 - -class StreamData { - constructor() { - this.finished = false, - this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER, - this.waitingMessages = [] - - this.markFinished = function() { - this.finished = true, - this.oldestTimestampSeen = 0 - } - } -} - -class StreamController { - constructor(client, opts) { - this.client = client, - this.outputStream = Pushable(), - this.streamData = { - channelStream: new StreamData(), - hashtagStream: new StreamData(), - }, - - this.pushMessage = function(source, newMsg) { - var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream; - streamData.oldestTimestampSeen = newMsg.value.timestamp; - if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { - streamData.waitingMessages.push(newMsg); - } - - this.pushNewlySafeMessages(); - }, - this.pushNewlySafeMessages = function() { - var newlySafeMessages = []; - var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); - - for(var streamDataIndex in this.streamData) { - var streamDatum = this.streamData[streamDataIndex] - while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { - var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message - newlySafeMessages.push(safeMsg); - } - } - - // in general, newlySafeMessages might not be in order, we should sort before appending - newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); - for(var msgIndex in newlySafeMessages) { - this.outputStream.push(newlySafeMessages[msgIndex]); - this.requestBlobs(newlySafeMessages[msgIndex]); - } - }, - this.finish = function() { - for(var datumIndex in this.streamData) { - this.streamData[datumIndex].oldestTimestampSeen = 0; - } - this.pushNewlySafeMessages(); - - this.outputStream.end(); - }, - this.requestBlobs = function(msg) { - let client = this.client; // did the javascript devs ever consider that you might have a callback inside a member function? no? ok - if(msg.value && msg.value.content) { - if(msg.value.content.mentions) { - for(let mention of msg.value.content.mentions) { - if(mention.type && mention.link) { - this.getBlob(mention.link) - } - } - } - - if(msg.value.content.image) { - this.getBlob(msg.value.content.image); - } - } - } - this.getBlob = function(blobId) { - debug("Ensuring existence of blob with ID " + blobId); - client.blobs.has(blobId, function(err, has) { - if(err) { - debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId); - } - if(!err && !has) { - debug("Wanting blob with ID " + blobId); - client.blobs.want(blobId, function() { - debug("Downloaded blob with ID " + blobId); - }); - } - }); - } - } -} - -module.exports = { - getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) { - client.friends.hops({ - dunbar: Number.MAX_SAFE_INTEGER, - max: hops - }, function(error, friends) { - if(error) { - throw "Couldn't get a list of friends from scuttlebot:\n" + error; - } - - var followedIds = Object.keys(friends).map(id => id.toLowerCase()); - getMessagesFrom(client, channelName, followedIds, opts, preserve, cb); - }); - } -} - -function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) { - var channelTag = "#" + channelName; - var streamController = new StreamController(client, opts); - var hashtagStream = createHashtagStream(client, channelName); - var channelStream = createChannelStream(client, channelName); - var stream = many([hashtagStream, channelStream]); - - pull(stream, pull.filter(function(msg) { - return followedIds.includes(msg.data.value.author.toLowerCase()); - }), pull.filter(function(msg) { - var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelTag); - if(msg.source == "hashtag") { - return hasHashtag; - } - else { - return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header - } - }), pull.drain(function(msg) { - streamController.pushMessage(msg.source, msg.data); - }, function() { - streamController.finish(); - })); - - cb(streamController.outputStream, preserve); -} - -function createHashtagStream(client, channelName, opts) { - var search = client.search && client.search.query; - if(!search) { - console.log("[FATAL] ssb-search plugin must be installed to us channels"); - process.exit(5); - } - - var query = search({ - query: channelName - }); - - query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from - - return query; -} - -function createChannelStream(client, channelName, opts) { - var query = client.query.read({ - query: [{ - "$filter": { - value: { - content: { - channel: channelName - } - } - } - }], - reverse: true - }); - - query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later - - return query; -} - -function debug(message) { - if(DEBUG) { - var timestamp = new Date(); - console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message); - } -} diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js new file mode 100644 index 0000000..ae3fab9 --- /dev/null +++ b/lib/logbook-lib.js @@ -0,0 +1,275 @@ +var clientFactory = require("ssb-client"); +var config = require("ssb-config"); +var pull = require("pull-stream"); +var Pushable = require("pull-pushable"); +var many = require("pull-many"); + +// Change to 'true' to get debugging output +DEBUG = false + +// Change to switch the default monitored channel +DEFAULT_CHANNEL_NAME = "logbook" + +DEFAULT_OPTS = { + lt: Date.now(), + gt: -Infinity +} +GRAPH_TYPE = "follow" +MAX_HOPS = 1 +MAX_REPLY_RECURSIONS = 10 + +// using global variables to this extent is horrifying, but javascript has forced my hand +let client = null; +let outputStream = Pushable(); +let allowedBlobs = []; +let idsInMainChannel = {}; +let allMessages = {}; +let messagesByUser = {}; +let opts = DEFAULT_OPTS; + +function pushMessage(newMsg) { + if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { + idsInMainChannel[newMsg.value.author] = true; + allMessages[newMsg.key] = newMsg; + requestBlobs(newMsg); + } +} + +function sortAndPushAllMessages() { + let messages = Object.keys(allMessages).map(function(msgId) { return allMessages[msgId]; }); + for(let msg of messages.sort((a, b) => b.value.timestamp - a.value.timestamp)) { + outputStream.push(msg); + } +} + +function getReplies(unchckedMessages, recursionCount) { + if(recursionCount == MAX_REPLY_RECURSIONS) { + debug("Maximum recursions reached for reply chain; continuing to fetch profile pictures...\n Messages with unchecked replies start with: \n" + uncheckedMessages); + sortAndPushAllMessages(); + getRelevantProfilePictures(); + } + + let replyStreams = []; + + for(userId of Object.entries(idsInMainChannel)) { + messagesByUser[userId] = []; + replyStreams.push(createUserStream(client, userId)); + } + + let newMessages = {}; + let foundNewMessages = false; + pull( + many(replyStreams), + pull.filter(function(msg) { + userMessages[msg.value.author].push(msg); + let messageIsValid = (value in msg) && (content in msg.value) && (link in msg.value.content); + return messageIsValid && !(msg.key in uncheckedMessages) && (msg.value.content.link in uncheckedMessages); + }), + pull.drain(function(msg) { + pushMessage(msg); + + newMessages[msg.key] = msg; + foundNewMessages = true; + }, + function() { + if(!foundNewMessages) { + sortAndPushAllMessages(); + getRelevantProfilePictures(); + } + else { + getReplies(newMessages, recursionCount + 1); + } + }) + ); +} + +function requestBlobs(msg) { + if(msg.value && msg.value.content) { + if(msg.value.content.mentions && Symbol.iterator in Object(msg.value.content.mentions)) { + for(let mention of msg.value.content.mentions) { + if(mention.type && mention.link) { + if(typeof mention.link == "string") { + getBlob(mention.link) + } + else { + debug("Message has non-string mention.link value: " + JSON.stringify(mention.link)); + } + } + } + } + } +} + +function getRelevantProfilePictures() { + let relevantIds = Object.keys(idsInMainChannel); + let profilePictureStreams = [] + let profilePictureFound = {}; + + for(let userId of relevantIds) { + profilePictureFound[userId] = false; + profilePictureStreams.push(createMetadataStream(client, userId)); + } + let collectedStream = many(profilePictureStreams); + + if(relevantIds.length == 0) { // avoid the edge case where checkIfDone is never called + exit(); + } + + pull( + collectedStream, + pull.drain(function(msg) { + if(!profilePictureFound[msg.value.author] && msg.value.content.image) { + if(typeof msg.value.content.image == "string") { + debug("Getting profile picture at " + msg.value.content.image + " for user " + msg.value.author); + getBlob(msg.value.content.image); + profilePictureFound[msg.value.author] = true; + } + else if(typeof msg.value.content.image == "object" && typeof msg.value.content.image.link == "string") { + debug("Getting profile picture at " + msg.value.content.image.link + " for user " + msg.value.author); + getBlob(msg.value.content.image.link); + profilePictureFound[msg.value.author] = true; + } + else { + debug("Message has unknown msg.value.content.image value: " + JSON.stringify(msg.value.content.image)); + } + } + }, function() { + exit(); + }) + ); +} + +function getBlob(blobId) { + allowedBlobs.push(blobId); + + debug("Ensuring existence of blob with ID " + blobId); + client.blobs.has(blobId, function(err, has) { + if(err) { + debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId); + } + if(!err && !has) { + debug("Wanting blob with ID " + blobId); + client.blobs.want(blobId, () => {}); + } + }); +} + +function exit() { // is called at the end of the getReplies -> getRelevantProfilePictures chain + outputStream.end(); +} + +function debug(message) { + if(DEBUG) { + var timestamp = new Date(); + console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message); + } +} + +function createHashtagStream(client, channelName) { + var search = client.search && client.search.query; + if(!search) { + console.log("[FATAL] ssb-search plugin must be installed to us channels"); + process.exit(5); + } + + var query = search({ + query: channelName + }); + + return query; +} + +function createChannelStream(client, channelName) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + content: { + channel: channelName + } + } + } + }], + reverse: true + }); + + return query; +} + +function createUserStream(client, userId) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + author: userId + } + } + }], + reverse: true + }); + + return query; +} + +function createMetadataStream(client, userId) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + author: userId, + content: { + type: "about" + } + } + } + }] + }); + + query.streamName = userId; // mark the stream object so we can tell which stream a message came from later + + return query; +} + +function getMessagesFrom(channelName, followedIds, preserve, cb) { + var channelTag = "#" + channelName; + var hashtagStream = createHashtagStream(client, channelName); + var channelStream = createChannelStream(client, channelName); + var stream = many([hashtagStream, channelStream]); + + pull(stream, pull.filter(function(msg) { + return followedIds.includes(msg.value.author.toLowerCase()); + }), pull.drain(function(msg) { + pushMessage(msg); + }, function() { + getReplies(allMessages, 0); + })); + + cb(outputStream, preserve); +} + +function getConfig() { + return { + host: config.host || "localhost", + port: config.port || 8008 + } +} + +module.exports = { + getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) { + client = ssbClient; // set global variables + opts = ssbOpts; + + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: hops + }, function(error, friends) { + if(error) { + throw "Couldn't get a list of friends from scuttlebot:\n" + error; + } + + var followedIds = Object.keys(friends).map(id => id.toLowerCase()); + getMessagesFrom(channelName, followedIds, preserve, cb); + }); + } +} + diff --git a/lib/serve.js b/lib/serve.js index 26637dc..da052bb 100644 --- a/lib/serve.js +++ b/lib/serve.js @@ -28,7 +28,8 @@ var Url = require('url') var many = require('pull-many') var merge = require('pull-merge') var pSort = require('pull-sort') -var channels = require("./channels-lib"); +var channels = require("./logbook-lib"); +// var channels = require("./channels-lib"); module.exports = Serve |