aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugeniy E. Mikhailov <evgmik@gmail.com>2021-11-18 13:37:56 -0500
committerEugeniy E. Mikhailov <evgmik@gmail.com>2021-11-18 13:37:56 -0500
commite0ca0b8b100b2566d5d104f60ef223d5ea8308f4 (patch)
tree6ddbe4623ffe47507e6d6af35085094d12f23892
parent6be6dd5b44d890f381c18227db0d061e48245710 (diff)
downloadpatchfoo-e0ca0b8b100b2566d5d104f60ef223d5ea8308f4.tar.gz
patchfoo-e0ca0b8b100b2566d5d104f60ef223d5ea8308f4.zip
duplicates fixes
-rw-r--r--lib/logbook-lib.js126
1 files changed, 77 insertions, 49 deletions
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js
index 68cbb11..5dbe558 100644
--- a/lib/logbook-lib.js
+++ b/lib/logbook-lib.js
@@ -3,7 +3,9 @@ var config = require("ssb-config");
var fs = require("fs");
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
-var many = require("./pull-many-v2");
+var many = require("pull-many");
+var path = require("path");
+const home = require("os").homedir();
// Change to 'true' to get debugging output
DEBUG = !false
@@ -15,12 +17,18 @@ DEFAULT_OPTS = {
GRAPH_TYPE = "follow"
MAX_HOPS = 1
-CACHE_FILEPATH = "./logbook_cache/"
+SBOT_ROOT = path.join(home, ".ssb");
+SBOT_BLOB_DIR = path.join(SBOT_ROOT, "blobs");
+CACHE_FILEPATH = path.join(SBOT_ROOT, "logbook_cache");
+SBOT_BLOBS_FILE = path.join(SBOT_BLOB_DIR, "allowed_blobs");
+PATHS = [SBOT_ROOT, SBOT_BLOB_DIR, CACHE_FILEPATH]
+FILES = [SBOT_BLOBS_FILE]
+ensureFiles();
// only instantiate a ssb-client once, so we don't have to create a new connection every time we load a channel page
let client = null;
let memoryCache = {};
-fs.mkdirSync(CACHE_FILEPATH, {recursive: true});
+let allowedBlobs = readAllowedBlobs();
class ChannelController {
constructor(opts) {
@@ -36,7 +44,11 @@ class ChannelController {
this.channelName = channelName;
this.rootIds = rootIds; // don't know whether it's worth constructing a hashmap for this
this.mostRecentCachedTimestamp = mostRecentCachedTimestamp;
- this.outputQueue = allMessages;
+ this.outputQueue = [];
+
+ for(let msg of allMessages) {
+ this.pushMessage(msg);
+ }
if(mostRecentCachedTimestamp == null) {
this.getInitialMessages(followedIds, preserve, cb);
@@ -53,17 +65,17 @@ class ChannelController {
var self = this;
pull(stream, pull.filter(function(msg) {
- return followedIds.includes(msg.data.value.author.toLowerCase());
+ return followedIds.includes(msg.value.author.toLowerCase());
}), pull.filter(function(msg) {
- if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == self.channelName) {
+ if(msg.value && msg.value.content && msg.value.content.channel && msg.value.content.channel == self.channelName) {
return true;
}
// filter out false positives from ssb-query's loose text matching
- if(msg.data.value && msg.data.value.content && msg.data.value.content.text && msg.data.value.content.text.indexOf) {
+ if(msg.value && msg.value.content && msg.value.content.text && msg.value.content.text.indexOf) {
let acceptableHashtags = [channelTag + "\n", channelTag + " "];
for(let hashtag of acceptableHashtags) {
- if(msg.data.value.content.text.indexOf(hashtag) != -1) {
+ if(msg.value.content.text.indexOf(hashtag) != -1) {
return true
}
}
@@ -71,8 +83,8 @@ class ChannelController {
return false;
}
}), pull.drain(function(msg) {
- self.rootIds.push(msg.data.key);
- self.pushMessage(msg.source, msg.data);
+ self.rootIds.push(msg.key);
+ self.pushMessage(msg);
}, function() {
if(self.mostRecentCachedTimestamp) {
self.getReplies(followedIds);
@@ -82,16 +94,16 @@ class ChannelController {
client.messagesByType({type: "post", reverse: true, limit: 1}),
pull.drain(function (msg) {
self.mostRecentCachedTimestamp = msg.value.timestamp;
- }, () => {})
+ }, () => {self.getRepliesInitial(followedIds)})
);
- self.getRepliesInitial(followedIds);
+ //self.getRepliesInitial(followedIds);
}
}));
cb(this.outputStream, preserve);
},
- this.pushMessage = function(msgSource, newMsg) {
+ this.pushMessage = function(newMsg) {
this.idsInChannel[newMsg.value.author] = true;
if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
@@ -109,14 +121,18 @@ class ChannelController {
this.outputStream.push(msg);
}
- fs.writeFile(CACHE_FILEPATH + this.channelName + ".txt", JSON.stringify({"root": this.rootIds, "allMessages": memoryCache[this.channelName], "mostRecentCachedTimestamp": this.mostRecentCachedTimestamp}), (error) => {
- if(error) {
- debug("[ERROR] Failed writing to message cache: " + error);
- }
- else {
- debug("Successfully saved messages to " + CACHE_FILEPATH + this.channelName + ".txt");
- }
- });
+ if(this.outputQueue.length) {
+ fs.writeFile(path.join(CACHE_FILEPATH, this.channelName + ".txt"), JSON.stringify({"root": this.rootIds, "allMessages": memoryCache[this.channelName], "mostRecentCachedTimestamp": this.mostRecentCachedTimestamp}), (error) => {
+ if(error) {
+ debug("[ERROR] Failed writing to message cache: " + error);
+ }
+ else {
+ debug("Successfully saved messages to " + path.join(CACHE_FILEPATH, this.channelName + ".txt"));
+ }
+ });
+ }
+
+ updateBlobsFile();
this.outputStream.end()
},
@@ -133,7 +149,7 @@ class ChannelController {
return followedIds.includes(msg.value.author.toLowerCase());
}),
pull.filter(function(msg) { // assume all messages have msg.value.content, otherwise they wouldn't have post type
- if((msg.value.content.text && (msg.value.content.text.indexOf("#" + self.channelName + "\n") != -1 || msg.value.content.text.indexOf("#" + self.channelName + " ") != -1)) || msg.value.content.channelName == self.channelName) {
+ if((msg.value.content.text && (msg.value.content.text.indexOf("#" + self.channelName + "\n") != -1 || msg.value.content.text.indexOf("#" + self.channelName + " ") != -1)) || msg.value.content.channel == self.channelName) {
debug("found new root message: " + JSON.stringify(msg));
self.rootIds.push(msg.key);
return true;
@@ -147,7 +163,7 @@ class ChannelController {
}),
pull.drain(function(msg) {
debug("found new message: " + JSON.stringify(msg));
- self.outputQueue.push(msg);
+ self.pushMessage(msg);
}, self.finish.bind(self))
);
@@ -188,15 +204,15 @@ class ChannelController {
many(backlinkStreams),
pull.filter(function(msg) {
debug("checking backlinked message " + JSON.stringify(msg));
- return msg.data.value && msg.data.value.content && msg.data.value.content.text && (msg.data.value.content.text.indexOf(channelTag) == -1) && !(msg.data.value.content.channelName == self.channelName) && (followedIds.includes(msg.data.value.author.toLowerCase()));
+ return msg.value && msg.value.content && msg.value.content.text && (msg.value.content.text.indexOf(channelTag) == -1) && !(msg.value.content.channelName == self.channelName) && (followedIds.includes(msg.value.author.toLowerCase()));
}),
pull.filter(function(msg) {
- debug("found backlink message: " + JSON.stringify(msg.data));
- return (msg.data.value.timestamp > self.opts.gt) && (msg.data.value.timestamp < self.opts.lt);
+ debug("found backlink message: " + JSON.stringify(msg));
+ return (msg.value.timestamp > self.opts.gt) && (msg.value.timestamp < self.opts.lt);
}),
pull.drain(function(msg) {
- debug("backlink message had correct timestamps: " + JSON.stringify(msg.data));
- self.outputQueue.push(msg.data);
+ debug("backlink message had correct timestamps: " + JSON.stringify(msg));
+ self.outputQueue.push(msg);
}, self.finish.bind(self))
);
}
@@ -259,6 +275,9 @@ function getProfilePicturesOf(relevantIds) {
function getBlob(blobId) {
debug("Ensuring existence of blob with ID " + blobId);
+ if(!(blobId in allowedBlobs)) {
+ allowedBlobs.push(blobId);
+ }
client.blobs.has(blobId, function(err, has) {
if(err) {
debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId);
@@ -289,35 +308,19 @@ function getBacklinkStream(msgId) {
function createHashtagStream(channelName) {
var search = client.search && client.search.query;
+ debug("search: " + client.search);
+ debug("query: " + client.search.query);
if(!search) {
console.log("[FATAL] ssb-search plugin must be installed to use channels");
+ process.exit(5);
}
var query = search({
query: channelName
});
- query.streamName = "hashtag";
-
- return query;
-}
-
-/*function createHashtagStream(channelName) {
- var query = client.query.read({
- "$filter": {
- value: {
- content: {
- text: channelName
- }
- }
- }
- });
-
- query.streamName = "hashtag";
-
return query;
}
-*/
function createChannelStream(channelName) {
var query = client.query.read({
@@ -388,6 +391,31 @@ function getConfig() {
}
}
+function ensureFiles() {
+ for(let path of PATHS) {
+ if (!fs.existsSync(path)) {
+ debug("no" + path + " directory detected, creating it...");
+ fs.mkdirSync(path);
+ }
+ }
+
+ for(let file of FILES) {
+ if (!fs.existsSync(file)) {
+ debug("no " + file + " file detected, creating it...");
+ fs.writeFileSync(file);
+ }
+ }
+}
+
+function readAllowedBlobs() {
+ let contents = fs.readFileSync(SBOT_BLOBS_FILE, "utf8");
+ return contents.split("\n"); // this could cause slowdowns if the metadata file is several gigabytes, but i don't forsee that happening
+}
+
+function updateBlobsFile() {
+ fs.writeFile(SBOT_BLOBS_FILE, allowedBlobs.join("\n"), {encoding: 'utf8'}, () => {});
+}
+
module.exports = {
getMessages: function(ssbClient, channelName, ssbOpts, preserve, cb, hops=MAX_HOPS) {
client = ssbClient;
@@ -420,8 +448,8 @@ module.exports = {
var mostRecentCachedTimestamp = null;
var rootIds = [];
var allMessages = [];
- if (fs.existsSync(CACHE_FILEPATH + channelName + ".txt")) {
- cache = JSON.parse(fs.readFileSync(CACHE_FILEPATH + channelName + ".txt"));
+ if (fs.existsSync(path.join(CACHE_FILEPATH, channelName + ".txt"))) {
+ cache = JSON.parse(fs.readFileSync(path.join(CACHE_FILEPATH, channelName + ".txt")));
if (cache.root.length) {
mostRecentCachedTimestamp = cache.mostRecentCachedTimestamp;
rootIds = cache.root;