aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/channels-lib.js185
-rw-r--r--lib/logbook-lib.js275
-rw-r--r--lib/serve.js3
3 files changed, 277 insertions, 186 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js
deleted file mode 100644
index 327ec38..0000000
--- a/lib/channels-lib.js
+++ /dev/null
@@ -1,185 +0,0 @@
-var pull = require("pull-stream");
-var Pushable = require("pull-pushable");
-var many = require("./pull-many-v2");
-
-// Change to 'true' to get debugging output
-DEBUG = false
-
-FRIENDS_POLL_TIME = 1 // ms
-GRAPH_TYPE = "follow"
-MAX_HOPS = 1
-
-class StreamData {
- constructor() {
- this.finished = false,
- this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER,
- this.waitingMessages = []
-
- this.markFinished = function() {
- this.finished = true,
- this.oldestTimestampSeen = 0
- }
- }
-}
-
-class StreamController {
- constructor(client, opts) {
- this.client = client,
- this.outputStream = Pushable(),
- this.streamData = {
- channelStream: new StreamData(),
- hashtagStream: new StreamData(),
- },
-
- this.pushMessage = function(source, newMsg) {
- var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
- streamData.oldestTimestampSeen = newMsg.value.timestamp;
- if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
- streamData.waitingMessages.push(newMsg);
- }
-
- this.pushNewlySafeMessages();
- },
- this.pushNewlySafeMessages = function() {
- var newlySafeMessages = [];
- var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
-
- for(var streamDataIndex in this.streamData) {
- var streamDatum = this.streamData[streamDataIndex]
- while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
- var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
- newlySafeMessages.push(safeMsg);
- }
- }
-
- // in general, newlySafeMessages might not be in order, we should sort before appending
- newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
- for(var msgIndex in newlySafeMessages) {
- this.outputStream.push(newlySafeMessages[msgIndex]);
- this.requestBlobs(newlySafeMessages[msgIndex]);
- }
- },
- this.finish = function() {
- for(var datumIndex in this.streamData) {
- this.streamData[datumIndex].oldestTimestampSeen = 0;
- }
- this.pushNewlySafeMessages();
-
- this.outputStream.end();
- },
- this.requestBlobs = function(msg) {
- let client = this.client; // did the javascript devs ever consider that you might have a callback inside a member function? no? ok
- if(msg.value && msg.value.content) {
- if(msg.value.content.mentions) {
- for(let mention of msg.value.content.mentions) {
- if(mention.type && mention.link) {
- this.getBlob(mention.link)
- }
- }
- }
-
- if(msg.value.content.image) {
- this.getBlob(msg.value.content.image);
- }
- }
- }
- this.getBlob = function(blobId) {
- debug("Ensuring existence of blob with ID " + blobId);
- client.blobs.has(blobId, function(err, has) {
- if(err) {
- debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId);
- }
- if(!err && !has) {
- debug("Wanting blob with ID " + blobId);
- client.blobs.want(blobId, function() {
- debug("Downloaded blob with ID " + blobId);
- });
- }
- });
- }
- }
-}
-
-module.exports = {
- getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) {
- client.friends.hops({
- dunbar: Number.MAX_SAFE_INTEGER,
- max: hops
- }, function(error, friends) {
- if(error) {
- throw "Couldn't get a list of friends from scuttlebot:\n" + error;
- }
-
- var followedIds = Object.keys(friends).map(id => id.toLowerCase());
- getMessagesFrom(client, channelName, followedIds, opts, preserve, cb);
- });
- }
-}
-
-function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) {
- var channelTag = "#" + channelName;
- var streamController = new StreamController(client, opts);
- var hashtagStream = createHashtagStream(client, channelName);
- var channelStream = createChannelStream(client, channelName);
- var stream = many([hashtagStream, channelStream]);
-
- pull(stream, pull.filter(function(msg) {
- return followedIds.includes(msg.data.value.author.toLowerCase());
- }), pull.filter(function(msg) {
- var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelTag);
- if(msg.source == "hashtag") {
- return hasHashtag;
- }
- else {
- return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header
- }
- }), pull.drain(function(msg) {
- streamController.pushMessage(msg.source, msg.data);
- }, function() {
- streamController.finish();
- }));
-
- cb(streamController.outputStream, preserve);
-}
-
-function createHashtagStream(client, channelName, opts) {
- var search = client.search && client.search.query;
- if(!search) {
- console.log("[FATAL] ssb-search plugin must be installed to us channels");
- process.exit(5);
- }
-
- var query = search({
- query: channelName
- });
-
- query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from
-
- return query;
-}
-
-function createChannelStream(client, channelName, opts) {
- var query = client.query.read({
- query: [{
- "$filter": {
- value: {
- content: {
- channel: channelName
- }
- }
- }
- }],
- reverse: true
- });
-
- query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later
-
- return query;
-}
-
-function debug(message) {
- if(DEBUG) {
- var timestamp = new Date();
- console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
- }
-}
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js
new file mode 100644
index 0000000..ae3fab9
--- /dev/null
+++ b/lib/logbook-lib.js
@@ -0,0 +1,275 @@
+var clientFactory = require("ssb-client");
+var config = require("ssb-config");
+var pull = require("pull-stream");
+var Pushable = require("pull-pushable");
+var many = require("pull-many");
+
+// Change to 'true' to get debugging output
+DEBUG = false
+
+// Change to switch the default monitored channel
+DEFAULT_CHANNEL_NAME = "logbook"
+
+DEFAULT_OPTS = {
+ lt: Date.now(),
+ gt: -Infinity
+}
+GRAPH_TYPE = "follow"
+MAX_HOPS = 1
+MAX_REPLY_RECURSIONS = 10
+
+// using global variables to this extent is horrifying, but javascript has forced my hand
+let client = null;
+let outputStream = Pushable();
+let allowedBlobs = [];
+let idsInMainChannel = {};
+let allMessages = {};
+let messagesByUser = {};
+let opts = DEFAULT_OPTS;
+
+function pushMessage(newMsg) {
+ if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
+ idsInMainChannel[newMsg.value.author] = true;
+ allMessages[newMsg.key] = newMsg;
+ requestBlobs(newMsg);
+ }
+}
+
+function sortAndPushAllMessages() {
+ let messages = Object.keys(allMessages).map(function(msgId) { return allMessages[msgId]; });
+ for(let msg of messages.sort((a, b) => b.value.timestamp - a.value.timestamp)) {
+ outputStream.push(msg);
+ }
+}
+
+function getReplies(unchckedMessages, recursionCount) {
+ if(recursionCount == MAX_REPLY_RECURSIONS) {
+ debug("Maximum recursions reached for reply chain; continuing to fetch profile pictures...\n Messages with unchecked replies start with: \n" + uncheckedMessages);
+ sortAndPushAllMessages();
+ getRelevantProfilePictures();
+ }
+
+ let replyStreams = [];
+
+ for(userId of Object.entries(idsInMainChannel)) {
+ messagesByUser[userId] = [];
+ replyStreams.push(createUserStream(client, userId));
+ }
+
+ let newMessages = {};
+ let foundNewMessages = false;
+ pull(
+ many(replyStreams),
+ pull.filter(function(msg) {
+ userMessages[msg.value.author].push(msg);
+ let messageIsValid = (value in msg) && (content in msg.value) && (link in msg.value.content);
+ return messageIsValid && !(msg.key in uncheckedMessages) && (msg.value.content.link in uncheckedMessages);
+ }),
+ pull.drain(function(msg) {
+ pushMessage(msg);
+
+ newMessages[msg.key] = msg;
+ foundNewMessages = true;
+ },
+ function() {
+ if(!foundNewMessages) {
+ sortAndPushAllMessages();
+ getRelevantProfilePictures();
+ }
+ else {
+ getReplies(newMessages, recursionCount + 1);
+ }
+ })
+ );
+}
+
+function requestBlobs(msg) {
+ if(msg.value && msg.value.content) {
+ if(msg.value.content.mentions && Symbol.iterator in Object(msg.value.content.mentions)) {
+ for(let mention of msg.value.content.mentions) {
+ if(mention.type && mention.link) {
+ if(typeof mention.link == "string") {
+ getBlob(mention.link)
+ }
+ else {
+ debug("Message has non-string mention.link value: " + JSON.stringify(mention.link));
+ }
+ }
+ }
+ }
+ }
+}
+
+function getRelevantProfilePictures() {
+ let relevantIds = Object.keys(idsInMainChannel);
+ let profilePictureStreams = []
+ let profilePictureFound = {};
+
+ for(let userId of relevantIds) {
+ profilePictureFound[userId] = false;
+ profilePictureStreams.push(createMetadataStream(client, userId));
+ }
+ let collectedStream = many(profilePictureStreams);
+
+ if(relevantIds.length == 0) { // avoid the edge case where checkIfDone is never called
+ exit();
+ }
+
+ pull(
+ collectedStream,
+ pull.drain(function(msg) {
+ if(!profilePictureFound[msg.value.author] && msg.value.content.image) {
+ if(typeof msg.value.content.image == "string") {
+ debug("Getting profile picture at " + msg.value.content.image + " for user " + msg.value.author);
+ getBlob(msg.value.content.image);
+ profilePictureFound[msg.value.author] = true;
+ }
+ else if(typeof msg.value.content.image == "object" && typeof msg.value.content.image.link == "string") {
+ debug("Getting profile picture at " + msg.value.content.image.link + " for user " + msg.value.author);
+ getBlob(msg.value.content.image.link);
+ profilePictureFound[msg.value.author] = true;
+ }
+ else {
+ debug("Message has unknown msg.value.content.image value: " + JSON.stringify(msg.value.content.image));
+ }
+ }
+ }, function() {
+ exit();
+ })
+ );
+}
+
+function getBlob(blobId) {
+ allowedBlobs.push(blobId);
+
+ debug("Ensuring existence of blob with ID " + blobId);
+ client.blobs.has(blobId, function(err, has) {
+ if(err) {
+ debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId);
+ }
+ if(!err && !has) {
+ debug("Wanting blob with ID " + blobId);
+ client.blobs.want(blobId, () => {});
+ }
+ });
+}
+
+function exit() { // is called at the end of the getReplies -> getRelevantProfilePictures chain
+ outputStream.end();
+}
+
+function debug(message) {
+ if(DEBUG) {
+ var timestamp = new Date();
+ console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
+ }
+}
+
+function createHashtagStream(client, channelName) {
+ var search = client.search && client.search.query;
+ if(!search) {
+ console.log("[FATAL] ssb-search plugin must be installed to us channels");
+ process.exit(5);
+ }
+
+ var query = search({
+ query: channelName
+ });
+
+ return query;
+}
+
+function createChannelStream(client, channelName) {
+ var query = client.query.read({
+ query: [{
+ "$filter": {
+ value: {
+ content: {
+ channel: channelName
+ }
+ }
+ }
+ }],
+ reverse: true
+ });
+
+ return query;
+}
+
+function createUserStream(client, userId) {
+ var query = client.query.read({
+ query: [{
+ "$filter": {
+ value: {
+ author: userId
+ }
+ }
+ }],
+ reverse: true
+ });
+
+ return query;
+}
+
+function createMetadataStream(client, userId) {
+ var query = client.query.read({
+ query: [{
+ "$filter": {
+ value: {
+ author: userId,
+ content: {
+ type: "about"
+ }
+ }
+ }
+ }]
+ });
+
+ query.streamName = userId; // mark the stream object so we can tell which stream a message came from later
+
+ return query;
+}
+
+function getMessagesFrom(channelName, followedIds, preserve, cb) {
+ var channelTag = "#" + channelName;
+ var hashtagStream = createHashtagStream(client, channelName);
+ var channelStream = createChannelStream(client, channelName);
+ var stream = many([hashtagStream, channelStream]);
+
+ pull(stream, pull.filter(function(msg) {
+ return followedIds.includes(msg.value.author.toLowerCase());
+ }), pull.drain(function(msg) {
+ pushMessage(msg);
+ }, function() {
+ getReplies(allMessages, 0);
+ }));
+
+ cb(outputStream, preserve);
+}
+
+function getConfig() {
+ return {
+ host: config.host || "localhost",
+ port: config.port || 8008
+ }
+}
+
+module.exports = {
+ getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) {
+ client = ssbClient; // set global variables
+ opts = ssbOpts;
+
+ client.friends.hops({
+ dunbar: Number.MAX_SAFE_INTEGER,
+ max: hops
+ }, function(error, friends) {
+ if(error) {
+ throw "Couldn't get a list of friends from scuttlebot:\n" + error;
+ }
+
+ var followedIds = Object.keys(friends).map(id => id.toLowerCase());
+ getMessagesFrom(channelName, followedIds, preserve, cb);
+ });
+ }
+}
+
diff --git a/lib/serve.js b/lib/serve.js
index 26637dc..da052bb 100644
--- a/lib/serve.js
+++ b/lib/serve.js
@@ -28,7 +28,8 @@ var Url = require('url')
var many = require('pull-many')
var merge = require('pull-merge')
var pSort = require('pull-sort')
-var channels = require("./channels-lib");
+var channels = require("./logbook-lib");
+// var channels = require("./channels-lib");
module.exports = Serve