aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/channels-lib.js84
1 files changed, 68 insertions, 16 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js
index 7783993..e70bb2b 100644
--- a/lib/channels-lib.js
+++ b/lib/channels-lib.js
@@ -1,18 +1,21 @@
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
var many = require("./pull-many-v2");
+const fs = require("fs");
+const home = require("os").homedir();
+const path = require("path");
// Change to 'true' to get debugging output
DEBUG = false
-DEFAULT_CHANNEL_OBJECT = {
- messages: []
-}
-DEFAULT_CHANNEL_OBJECT_SIZE = 15
FRIENDS_POLL_TIME = 1 // ms
GRAPH_TYPE = "follow"
MAX_HOPS = 1
+SBOT_ROOT = path.join(home, ".ssb");
+SBOT_BLOB_DIR = path.join(SBOT_ROOT, "blobs");
+SBOT_BLOBS_FILE = path.join(SBOT_BLOB_DIR, "allowed_blobs");
+
class StreamData {
constructor() {
this.finished = false,
@@ -27,8 +30,10 @@ class StreamData {
}
class StreamController {
- constructor(opts) {
+ constructor(client, opts) {
+ this.client = client,
this.outputStream = Pushable(),
+ this.allowedBlobs = [],
this.streamData = {
channelStream: new StreamData(),
hashtagStream: new StreamData(),
@@ -46,14 +51,9 @@ class StreamController {
this.pushNewlySafeMessages = function() {
var newlySafeMessages = [];
var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
- debug("oldest safe timestamp:" + oldestSafeTimestamp);
for(var streamDataIndex in this.streamData) {
var streamDatum = this.streamData[streamDataIndex]
- debug("length of waiting messages: " + streamDatum.waitingMessages.length);
- if(streamDatum.waitingMessages.length) {
- debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp);
- }
while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
newlySafeMessages.push(safeMsg);
@@ -63,8 +63,8 @@ class StreamController {
// in general, newlySafeMessages might not be in order, we should sort before appending
newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
for(var msgIndex in newlySafeMessages) {
- debug("pushing safe message...");
this.outputStream.push(newlySafeMessages[msgIndex]);
+ this.requestBlobs(newlySafeMessages[msgIndex]);
}
},
this.finish = function() {
@@ -72,13 +72,51 @@ class StreamController {
this.streamData[datumIndex].oldestTimestampSeen = 0;
}
this.pushNewlySafeMessages();
+
+ fs.writeFile(SBOT_BLOBS_FILE, this.allowedBlobs.join("\n"), function(err) {
+ debug("Failed to write allowed blobs to file: \n" + err);
+ });
+
this.outputStream.end();
+ },
+ this.requestBlobs = function(msg) {
+ let client = this.client; // did the javascript devs ever consider that you might have a callback inside a member function? no? ok
+ if(msg.value && msg.value.content) {
+ if(msg.value.content.mentions) {
+ for(let mention of msg.value.content.mentions) {
+ if(mention.type && mention.link) {
+ this.getBlob(mention.link)
+ }
+ }
+ }
+
+ if(msg.value.content.image) {
+ this.getBlob(msg.value.content.image);
+ }
+ }
+ }
+ this.getBlob = function(blobId) {
+ this.allowedBlobs.push(blobId);
+
+ debug("Ensuring existence of blob with ID " + blobId);
+ client.blobs.has(blobId, function(err, has) {
+ if(err) {
+ debug("[ERROR] ssb.blobs.has failed on the blob with ID " + blobId);
+ }
+ if(!err && !has) {
+ debug("Wanting blob with ID " + blobId);
+ client.blobs.want(blobId, function() {
+ debug("Downloaded blob with ID " + blobId);
+ });
+ }
+ });
}
- }a
+ }
}
module.exports = {
getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) {
+ ensureFiles();
client.friends.hops({
dunbar: Number.MAX_SAFE_INTEGER,
max: hops
@@ -94,10 +132,8 @@ module.exports = {
}
function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) {
- debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
-
var channelTag = "#" + channelName;
- var streamController = new StreamController(opts);
+ var streamController = new StreamController(client, opts);
var hashtagStream = createHashtagStream(client, channelName);
var channelStream = createChannelStream(client, channelName);
var stream = many([hashtagStream, channelStream]);
@@ -156,10 +192,26 @@ function createChannelStream(client, channelName, opts) {
return query;
}
+function ensureFiles() {
+ if (!fs.existsSync(SBOT_ROOT)) {
+ debug("no ~/.ssb folder detected, creating it...");
+ fs.mkdirSync(SBOT_ROOT);
+ }
+
+ if (!fs.existsSync(SBOT_BLOB_DIR)) {
+ debug("no blobs folder detected, creating it...");
+ fs.mkdirSync(SBOT_BLOB_DIR);
+ }
+
+ if (!fs.existsSync(SBOT_BLOBS_FILE)) {
+ debug("no metadata file found, creating it...");
+ fs.writeFileSync(SBOT_BLOBS_FILE, "");
+ }
+}
+
function debug(message) {
if(DEBUG) {
var timestamp = new Date();
console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
}
}
-