diff options
author | root <tschroeder@email.wm.edu> | 2021-01-24 17:00:16 -0500 |
---|---|---|
committer | root <tschroeder@email.wm.edu> | 2021-01-24 17:00:16 -0500 |
commit | 7a5961d19c5df9c44edbcbbd6cf344288bbaf2df (patch) | |
tree | fc23540c159eae0cf6675d4ed918ce8464a782d0 | |
parent | c694dacbcfd348b142c26a8b8fb53b8b6684bd62 (diff) | |
download | ssb-logbook-7a5961d19c5df9c44edbcbbd6cf344288bbaf2df.tar.gz ssb-logbook-7a5961d19c5df9c44edbcbbd6cf344288bbaf2df.zip |
Moved from CLI to pull-stream
-rw-r--r-- | channels-lib.js | 181 |
1 files changed, 181 insertions, 0 deletions
diff --git a/channels-lib.js b/channels-lib.js new file mode 100644 index 0000000..2968e45 --- /dev/null +++ b/channels-lib.js @@ -0,0 +1,181 @@ +var { promisify } = require("util"); +var pull = require("pull-stream"); +var Pushable = require("pull-pushable"); +var many = require("./pull-many-v2"); + +// Change to 'true' to get debugging output +DEBUG = false + +DEFAULT_CHANNEL_OBJECT = { + messages: [] +} +DEFAULT_CHANNEL_OBJECT_SIZE = 15 +FRIENDS_POLL_TIME = 1 // ms +GRAPH_TYPE = "follow" +MAX_HOPS = 1 + +class StreamData { + constructor() { + this.finished = false, + this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER, + this.waitingMessages = [] + + this.markFinished = function() { + this.finished = true, + this.oldestTimestampSeen = 0 + } + } +} + +class StreamController { + constructor() { + this.outputStream = Pushable(), + this.streamData = { + channelStream: new StreamData(), + hashtagStream: new StreamData(), + }, + + this.pushMessage = function(source, newMsg) { + var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream; + streamData.oldestTimestampSeen = newMsg.value.timestamp; + streamData.waitingMessages.push(newMsg); + + this.pushNewlySafeMessages(); + }, + this.pushNewlySafeMessages = function() { + var newlySafeMessages = []; + var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); + debug("oldest safe timestamp:" + oldestSafeTimestamp); + + for(var streamDataIndex in this.streamData) { + var streamDatum = this.streamData[streamDataIndex] + debug("length of waiting messages: " + streamDatum.waitingMessages.length); + if(streamDatum.waitingMessages.length) { + debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp); + } + while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { + var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message + newlySafeMessages.push(safeMsg); + } + } + + // in general, newlySafeMessages might not be in order, we should sort before appending + newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); + for(var msgIndex in newlySafeMessages) { + debug("pushing safe message..."); + this.outputStream.push(newlySafeMessages[msgIndex]); + } + }, + this.finish = function() { + for(var datumIndex in this.streamData) { + this.streamData[datumIndex].oldestTimestampSeen = 0; + } + this.pushNewlySafeMessages(); + } + } +} + +function getFriendsSync(client, hops) { + debug("Fetching list of friends..."); + var formattedFriends; + var intervalIterations = 1; + + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: hops + }, function(error, friends) { + if(error) { + throw "Couldn't get list of friends from scuttlebot: " + error; + } + + formattedFriends = Object.keys(friends).map(id => id.toLowerCase()); + }); + + return waitUntilDefined(formattedFriends); // since this has to be synchronous +} + +module.exports = { + getMessages: function(client, channelName, cb, hops=1) { + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: hops + }, function(error, friends) { + if(error) { + throw "Couldn't get a list of friends from scuttlebot:\n" + error; + } + + var followedIds = Object.keys(friends).map(id => id.toLowerCase()); + getMessagesFrom(client, channelName, followedIds, cb); + }); + } +} + +function getMessagesFrom(client, channelName, followedIds, cb) { + debug("Fetching messages from IDs in " + JSON.stringify(followedIds)); + + var channelTag = "#" + channelName; + var streamController = new StreamController(); + var hashtagStream = createHashtagStream(client, channelName); + var channelStream = createChannelStream(client, channelName); + var stream = many([hashtagStream, channelStream]); + + pull(stream, pull.filter(function(msg) { + return followedIds.includes(msg.data.value.author.toLowerCase()); + }), pull.filter(function(msg) { + var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelTag); + if(msg.source == "hashtag") { + return hasHashtag; + } + else { + return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header + } + }), pull.drain(function(msg) { + streamController.pushMessage(msg.source, msg.data); + }, function() { + streamController.finish(); + })); + + cb(streamController.outputStream); +} + +function createHashtagStream(client, channelName) { + var search = client.search && client.search.query; + if(!search) { + console.log("[FATAL] ssb-search plugin must be installed to us channels"); + process.exit(5); + } + + var query = search({ + query: channelName + }); + + query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from + + return query; +} + +function createChannelStream(client, channelName) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + content: { + channel: channelName + } + } + } + }], + reverse: true + }); + + query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later + + return query; +} + +function debug(message) { + if(DEBUG) { + var timestamp = new Date(); + console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message); + } +} |