aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <tschroeder@email.wm.edu>2021-01-24 17:00:16 -0500
committerroot <tschroeder@email.wm.edu>2021-01-24 17:00:16 -0500
commit7a5961d19c5df9c44edbcbbd6cf344288bbaf2df (patch)
treefc23540c159eae0cf6675d4ed918ce8464a782d0
parentc694dacbcfd348b142c26a8b8fb53b8b6684bd62 (diff)
downloadssb-logbook-7a5961d19c5df9c44edbcbbd6cf344288bbaf2df.tar.gz
ssb-logbook-7a5961d19c5df9c44edbcbbd6cf344288bbaf2df.zip
Moved from CLI to pull-stream
-rw-r--r--channels-lib.js181
1 files changed, 181 insertions, 0 deletions
diff --git a/channels-lib.js b/channels-lib.js
new file mode 100644
index 0000000..2968e45
--- /dev/null
+++ b/channels-lib.js
@@ -0,0 +1,181 @@
+var { promisify } = require("util");
+var pull = require("pull-stream");
+var Pushable = require("pull-pushable");
+var many = require("./pull-many-v2");
+
+// Change to 'true' to get debugging output
+DEBUG = false
+
+DEFAULT_CHANNEL_OBJECT = {
+ messages: []
+}
+DEFAULT_CHANNEL_OBJECT_SIZE = 15
+FRIENDS_POLL_TIME = 1 // ms
+GRAPH_TYPE = "follow"
+MAX_HOPS = 1
+
+class StreamData {
+ constructor() {
+ this.finished = false,
+ this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER,
+ this.waitingMessages = []
+
+ this.markFinished = function() {
+ this.finished = true,
+ this.oldestTimestampSeen = 0
+ }
+ }
+}
+
+class StreamController {
+ constructor() {
+ this.outputStream = Pushable(),
+ this.streamData = {
+ channelStream: new StreamData(),
+ hashtagStream: new StreamData(),
+ },
+
+ this.pushMessage = function(source, newMsg) {
+ var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
+ streamData.oldestTimestampSeen = newMsg.value.timestamp;
+ streamData.waitingMessages.push(newMsg);
+
+ this.pushNewlySafeMessages();
+ },
+ this.pushNewlySafeMessages = function() {
+ var newlySafeMessages = [];
+ var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
+ debug("oldest safe timestamp:" + oldestSafeTimestamp);
+
+ for(var streamDataIndex in this.streamData) {
+ var streamDatum = this.streamData[streamDataIndex]
+ debug("length of waiting messages: " + streamDatum.waitingMessages.length);
+ if(streamDatum.waitingMessages.length) {
+ debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp);
+ }
+ while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
+ var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
+ newlySafeMessages.push(safeMsg);
+ }
+ }
+
+ // in general, newlySafeMessages might not be in order, we should sort before appending
+ newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
+ for(var msgIndex in newlySafeMessages) {
+ debug("pushing safe message...");
+ this.outputStream.push(newlySafeMessages[msgIndex]);
+ }
+ },
+ this.finish = function() {
+ for(var datumIndex in this.streamData) {
+ this.streamData[datumIndex].oldestTimestampSeen = 0;
+ }
+ this.pushNewlySafeMessages();
+ }
+ }
+}
+
+function getFriendsSync(client, hops) {
+ debug("Fetching list of friends...");
+ var formattedFriends;
+ var intervalIterations = 1;
+
+ client.friends.hops({
+ dunbar: Number.MAX_SAFE_INTEGER,
+ max: hops
+ }, function(error, friends) {
+ if(error) {
+ throw "Couldn't get list of friends from scuttlebot: " + error;
+ }
+
+ formattedFriends = Object.keys(friends).map(id => id.toLowerCase());
+ });
+
+ return waitUntilDefined(formattedFriends); // since this has to be synchronous
+}
+
+module.exports = {
+ getMessages: function(client, channelName, cb, hops=1) {
+ client.friends.hops({
+ dunbar: Number.MAX_SAFE_INTEGER,
+ max: hops
+ }, function(error, friends) {
+ if(error) {
+ throw "Couldn't get a list of friends from scuttlebot:\n" + error;
+ }
+
+ var followedIds = Object.keys(friends).map(id => id.toLowerCase());
+ getMessagesFrom(client, channelName, followedIds, cb);
+ });
+ }
+}
+
+function getMessagesFrom(client, channelName, followedIds, cb) {
+ debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
+
+ var channelTag = "#" + channelName;
+ var streamController = new StreamController();
+ var hashtagStream = createHashtagStream(client, channelName);
+ var channelStream = createChannelStream(client, channelName);
+ var stream = many([hashtagStream, channelStream]);
+
+ pull(stream, pull.filter(function(msg) {
+ return followedIds.includes(msg.data.value.author.toLowerCase());
+ }), pull.filter(function(msg) {
+ var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelTag);
+ if(msg.source == "hashtag") {
+ return hasHashtag;
+ }
+ else {
+ return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header
+ }
+ }), pull.drain(function(msg) {
+ streamController.pushMessage(msg.source, msg.data);
+ }, function() {
+ streamController.finish();
+ }));
+
+ cb(streamController.outputStream);
+}
+
+function createHashtagStream(client, channelName) {
+ var search = client.search && client.search.query;
+ if(!search) {
+ console.log("[FATAL] ssb-search plugin must be installed to us channels");
+ process.exit(5);
+ }
+
+ var query = search({
+ query: channelName
+ });
+
+ query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from
+
+ return query;
+}
+
+function createChannelStream(client, channelName) {
+ var query = client.query.read({
+ query: [{
+ "$filter": {
+ value: {
+ content: {
+ channel: channelName
+ }
+ }
+ }
+ }],
+ reverse: true
+ });
+
+ query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later
+
+ return query;
+}
+
+function debug(message) {
+ if(DEBUG) {
+ var timestamp = new Date();
+ console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
+ }
+}