diff options
-rw-r--r-- | lib/channels-lib.js | 84 |
1 files changed, 68 insertions, 16 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js index 7783993..e70bb2b 100644 --- a/lib/channels-lib.js +++ b/lib/channels-lib.js @@ -1,18 +1,21 @@ var pull = require("pull-stream"); var Pushable = require("pull-pushable"); var many = require("./pull-many-v2"); +const fs = require("fs"); +const home = require("os").homedir(); +const path = require("path"); // Change to 'true' to get debugging output DEBUG = false -DEFAULT_CHANNEL_OBJECT = { - messages: [] -} -DEFAULT_CHANNEL_OBJECT_SIZE = 15 FRIENDS_POLL_TIME = 1 // ms GRAPH_TYPE = "follow" MAX_HOPS = 1 +SBOT_ROOT = path.join(home, ".ssb"); +SBOT_BLOB_DIR = path.join(SBOT_ROOT, "blobs"); +SBOT_BLOBS_FILE = path.join(SBOT_BLOB_DIR, "allowed_blobs"); + class StreamData { constructor() { this.finished = false, @@ -27,8 +30,10 @@ class StreamData { } class StreamController { - constructor(opts) { + constructor(client, opts) { + this.client = client, this.outputStream = Pushable(), + this.allowedBlobs = [], this.streamData = { channelStream: new StreamData(), hashtagStream: new StreamData(), @@ -46,14 +51,9 @@ class StreamController { this.pushNewlySafeMessages = function() { var newlySafeMessages = []; var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); - debug("oldest safe timestamp:" + oldestSafeTimestamp); for(var streamDataIndex in this.streamData) { var streamDatum = this.streamData[streamDataIndex] - debug("length of waiting messages: " + streamDatum.waitingMessages.length); - if(streamDatum.waitingMessages.length) { - debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp); - } while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message newlySafeMessages.push(safeMsg); @@ -63,8 +63,8 @@ class StreamController { // in general, newlySafeMessages might not be in order, we should sort before appending newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); for(var msgIndex in newlySafeMessages) { - debug("pushing safe message..."); this.outputStream.push(newlySafeMessages[msgIndex]); + this.requestBlobs(newlySafeMessages[msgIndex]); } }, this.finish = function() { @@ -72,13 +72,51 @@ class StreamController { this.streamData[datumIndex].oldestTimestampSeen = 0; } this.pushNewlySafeMessages(); + + fs.writeFile(SBOT_BLOBS_FILE, this.allowedBlobs.join("\n"), function(err) { + debug("Failed to write allowed blobs to file: \n" + err); + }); + this.outputStream.end(); + }, + this.requestBlobs = function(msg) { + let client = this.client; // did the javascript devs ever consider that you might have a callback inside a member function? no? ok + if(msg.value && msg.value.content) { + if(msg.value.content.mentions) { + for(let mention of msg.value.content.mentions) { + if(mention.type && mention.link) { + this.getBlob(mention.link) + } + } + } + + if(msg.value.content.image) { + this.getBlob(msg.value.content.image); + } + } + } + this.getBlob = function(blobId) { + this.allowedBlobs.push(blobId); + + debug("Ensuring existence of blob with ID " + blobId); + client.blobs.has(blobId, function(err, has) { + if(err) { + debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId); + } + if(!err && !has) { + debug("Wanting blob with ID " + blobId); + client.blobs.want(blobId, function() { + debug("Downloaded blob with ID " + blobId); + }); + } + }); } - }a + } } module.exports = { getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) { + ensureFiles(); client.friends.hops({ dunbar: Number.MAX_SAFE_INTEGER, max: hops @@ -94,10 +132,8 @@ module.exports = { } function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) { - debug("Fetching messages from IDs in " + JSON.stringify(followedIds)); - var channelTag = "#" + channelName; - var streamController = new StreamController(opts); + var streamController = new StreamController(client, opts); var hashtagStream = createHashtagStream(client, channelName); var channelStream = createChannelStream(client, channelName); var stream = many([hashtagStream, channelStream]); @@ -156,10 +192,26 @@ function createChannelStream(client, channelName, opts) { return query; } +function ensureFiles() { + if (!fs.existsSync(SBOT_ROOT)) { + debug("no ~/.ssb folder detected, creating it..."); + fs.mkdirSync(SBOT_ROOT); + } + + if (!fs.existsSync(SBOT_BLOB_DIR)) { + debug("no blobs folder detected, creating it..."); + fs.mkdirSync(SBOT_BLOB_DIR); + } + + if (!fs.existsSync(SBOT_BLOBS_FILE)) { + debug("no metadata file found, creating it..."); + fs.writeFileSync(SBOT_BLOBS_FILE, ""); + } +} + function debug(message) { if(DEBUG) { var timestamp = new Date(); console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message); } } - |