aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xchannels-v2.js285
-rw-r--r--pull-many-v2.js95
2 files changed, 380 insertions, 0 deletions
diff --git a/channels-v2.js b/channels-v2.js
new file mode 100755
index 0000000..5e96694
--- /dev/null
+++ b/channels-v2.js
@@ -0,0 +1,285 @@
+const fs = require("fs");
+const home = require("os").homedir();
+
+var argv = require("minimist")(process.argv.slice(2));
+var clientFactory = require("ssb-client");
+var path = require("path");
+var pull = require("pull-stream");
+var many = require("./pull-many-v2");
+
+// Change to 'true' to get debugging output
+DEBUG = false
+
+DEFAULT_CHANNEL_METADATA = {
+ lastMsgTimestamp: 0
+}
+DEFAULT_CHANNEL_OBJECT = {
+ messages: []
+}
+DEFAULT_CHANNEL_OBJECT_SIZE = 15
+GRAPH_TYPE = "follow"
+MESSAGE_SEPERATOR = "---------------"
+MESSAGE_SEPERATOR_COLOR = "\x1b[33m"
+MESSAGE_COLOR = "\x1b[0m"
+SBOT_ROOT = path.join(home, ".ssb");
+SBOT_CHANNEL_DIR = path.join(SBOT_ROOT, "channels");
+SBOT_CHANNEL_DATA = path.join(SBOT_CHANNEL_DIR, ".data");
+
+class StreamData {
+ constructor() {
+ this.finished = false,
+ this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER,
+ this.waitingMessages = []
+
+ this.markFinished = function() {
+ this.finished = true,
+ this.oldestTimestampSeen = 0
+ }
+ }
+}
+
+if (!fs.existsSync(SBOT_ROOT)) {
+ debug("no ~/.ssb folder detected, creating it...");
+ fs.mkdirSync(SBOT_ROOT);
+}
+
+if (!fs.existsSync(SBOT_CHANNEL_DIR)) {
+ debug("no channels folder detected, creating it...");
+ fs.mkdirSync(SBOT_CHANNEL_DIR);
+}
+
+if (!fs.existsSync(SBOT_CHANNEL_DATA)) {
+ debug("no channel metadata file found, creating it...");
+ fs.writeFileSync(SBOT_CHANNEL_DATA, "{}");
+}
+
+debug("Creating client...");
+clientFactory(function (err, client) {
+ if(err) {
+ console.log("[FATAL] Error when starting scuttlebot: " + err);
+ throw err;
+ }
+ debug("Successfully created client");
+
+
+ main(client);
+});
+
+function main(client) {
+ // var metadata = JSON.parse(fs.readFileSync(SBOT_CHANNEL_DATA));
+
+ if(!argv._ || argv._.length == 0) {
+ showHelp(metadata);
+ client.close(true, () => {});
+ process.exit(0);
+ }
+
+ var channelName = "#" + argv._[0];
+ // var trackedChannelData = getChannelData(channelName, metadata); // only get data for the first channel given (easy to change this if needed)
+ // var trackedChannelMessages = getChannelMessages(channelName);
+ var trackedChannelMessages = DEFAULT_CHANNEL_OBJECT;
+ // debug("fetched " + trackedChannelMessages["messages"].length + " messages from cache");
+
+ client.friends.hops({
+ dunbar: Number.MAX_SAFE_INTEGER,
+ max: 3
+ },
+ function(err, hops) {
+ if(err) {
+ console.log("[FATAL] Could not retreive friends list (is ssb-friends installed?)")
+ client.close(true, () => {});
+ process.exit(4);
+ }
+
+ fetchChannelMessagesFrom(client, channelName, trackedChannelMessages, Object.keys(hops));
+ }
+ )
+}
+
+function fetchChannelMessagesFrom(client, channelName, trackedChannelMessages, hops) {
+ // So it seems like Scuttlebutt IDs are case insensitive
+ // This means that you can accidentially capitalize/uncapitalize letters when following someone, and the follow will still go through
+ // However, the typo will remain in your friend graph and ruin any attempt made to query for that ID in its normal capitalization
+ // To avoid this skullfuckery, we just use a list and map lower() onto it beforehand
+ var followedIds = hops.map(id => id.toLowerCase());
+
+ debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
+
+ var streamController = {
+ assembledMessages: [],
+ streamData: {
+ hashtagStream: new StreamData(),
+ channelStream: new StreamData()
+ },
+ pushMessage: function(source, newMsg) {
+ var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
+ streamData.oldestTimestampSeen = newMsg.value.timestamp;
+ streamData.waitingMessages.push(newMsg);
+
+ this.printNewlySafeMessages();
+ },
+ printNewlySafeMessages: function() {
+ var newlySafeMessages = [];
+ var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
+ debug("oldest safe timestamp:" + oldestSafeTimestamp);
+
+ for(var streamDataIndex in this.streamData) {
+ var streamDatum = this.streamData[streamDataIndex]
+ debug("length of waiting messages: " + streamDatum.waitingMessages.length);
+ if(streamDatum.waitingMessages.length) {
+ debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp);
+ }
+ while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
+ var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
+ newlySafeMessages.push(safeMsg);
+ }
+ }
+
+ // in general, newlySafeMessages might not be in order, we should sort before appending
+ newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
+ this.assembledMessages = this.assembledMessages.concat(newlySafeMessages);
+ for(var msgIndex in newlySafeMessages) {
+ printMsg(newlySafeMessages[msgIndex]);
+ }
+ debug("***************");
+ },
+ finish: function() {
+ for(var datumIndex in this.streamData) {
+ this.streamData[datumIndex].oldestTimestampSeen = 0;
+ }
+ this.printNewlySafeMessages();
+ }
+ }
+
+ var hashtagStream = createHashtagStream(client, channelName);
+ var channelStream = createChannelStream(client, channelName);
+ var stream = many([hashtagStream, channelStream]);
+
+ pull(stream, pull.filter(function(msg) {
+ return followedIds.includes(msg.data.value.author.toLowerCase());
+ }), pull.filter(function(msg) {
+ var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelName);
+ if(msg.source == "hashtag") {
+ return hasHashtag;
+ }
+ else {
+ return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header
+ }
+ }), pull.drain(function(msg) {
+ streamController.pushMessage(msg.source, msg.data);
+ }, function() {
+ streamController.finish();
+ client.close(true, () => {});
+ process.exit(0);
+ }));
+}
+
+function saveMessages(client, taskData, channelName, trackedChannelMessages, metadata) {
+ debug("found " + taskData.msgsFound + " new messages");
+ debug(JSON.stringify(taskData));
+ console.log("Found " + trackedChannelMessages["messages"].length + " messages:");
+
+ updateChannelMessages(channelName, trackedChannelMessages, metadata, taskData.lastTimestamp);
+
+}
+
+function showHelp(metadata) {
+ console.log("No channel provided. To get messages for a channel, run 'node channels.js example'");
+ console.log("(note that 'node channels.js #example' will not work, because # is a protected character)");
+
+ var trackedChannels = getTrackedChannels(metadata);
+ if(trackedChannels.length) {
+ console.log("Currently tracked channels are:");
+ for(const channelNameIndex in trackedChannels) {
+ var stats = fs.statSync(path.join(SBOT_CHANNEL_DIR, trackedChannels[channelNameIndex]));
+ if(stats.size > DEFAULT_CHANNEL_OBJECT_SIZE) { // don't say we're tracking empty channels -- if they were interesting, they'd have stuff in them
+ console.log(trackedChannels[channelNameIndex]);
+ }
+ }
+ }
+ else {
+ console.log("No channels are currently tracked.");
+ }
+}
+
+function getTrackedChannels(data) {
+ return Object.keys(data);
+}
+
+function getChannelData(channelName, data) {
+ if(!data[channelName]) {
+ data[channelName] = {
+ lastMsgTimestamp: 0,
+ }
+ }
+
+ return data[channelName];
+}
+
+function getChannelMessages(channelName) {
+ var messageCachePath = path.join(SBOT_CHANNEL_DIR, channelName);
+ if (!fs.existsSync(messageCachePath)) {
+ return DEFAULT_CHANNEL_OBJECT;
+ }
+
+ var raw = fs.readFileSync(messageCachePath, "utf8");
+ return JSON.parse(raw);
+}
+
+function createHashtagStream(client, channelName) {
+ var search = client.search && client.search.query;
+ if(!search) {
+ console.log("[FATAL] ssb-search plugin must be installed to us channels");
+ process.exit(5);
+ }
+
+ var query = search({
+ query: channelName
+ });
+
+ query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from
+
+ return query;
+}
+
+function createChannelStream(client, channelName) {
+ var query = client.query.read({
+ query: [{
+ "$filter": {
+ value: {
+ content: {
+ channel: channelName.substring(1)
+ }
+ }
+ }
+ }],
+ reverse: true
+ });
+
+ query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later
+
+ return query;
+}
+
+function printMsg(msg) {
+ console.log(MESSAGE_SEPERATOR_COLOR, MESSAGE_SEPERATOR);
+ console.log(MESSAGE_COLOR, msg.value.content.text);
+}
+
+function updateChannelMessages(channelName, channelJson, metadata, latestTimestamp) {
+ fs.writeFileSync(path.join(SBOT_CHANNEL_DIR, channelName), JSON.stringify(channelJson));
+
+ if(metadata.channelName == undefined) {
+ metadata[channelName] = DEFAULT_CHANNEL_METADATA;
+ }
+
+ metadata[channelName].lastMsgTimestamp = latestTimestamp; // once we've successfully updated messages, update the most recent message ID
+ fs.writeFileSync(SBOT_CHANNEL_DATA, JSON.stringify(metadata));
+}
+
+function debug(message) {
+ if(DEBUG) {
+ var timestamp = new Date();
+ console.log("[" + timestamp.toISOString() + "] " + message);
+ }
+}
diff --git a/pull-many-v2.js b/pull-many-v2.js
new file mode 100644
index 0000000..beea621
--- /dev/null
+++ b/pull-many-v2.js
@@ -0,0 +1,95 @@
+// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js
+// Edits have been made to distinguish the original stream that each object came from
+
+module.exports = function (ary) {
+
+ var capped = !!ary
+ var inputs = (ary || []).map(create), i = 0, abort, cb
+
+ function create (stream) {
+ return {ready: false, reading: false, ended: false, read: stream, data: null}
+ }
+
+ function check () {
+ if(!cb) return
+ clean()
+ var l = inputs.length
+ var _cb = cb
+ if(l === 0 && (abort || capped)) {
+ cb = null; _cb(abort || true)
+ return
+ }
+
+ //scan the inputs to check whether there is one we can use.
+ for(var j = 0; j < l; j++) {
+ var current = inputs[(i + j) % l]
+ if(current.ready && !current.ended) {
+ var data = { // [EDIT] keep track of which exact source the data came from
+ data: current.data,
+ source: current.read.streamName
+ }
+ current.ready = false
+ current.data = null
+ i ++; cb = null
+ return _cb(null, data)
+ }
+ }
+ }
+
+ function clean () {
+ var l = inputs.length
+ //iterate backwards so that we can remove items.
+ while(l--) {
+ if(inputs[l].ended)
+ inputs.splice(l, 1)
+ }
+ }
+
+ function next () {
+ var l = inputs.length
+ while(l--)
+ (function (current) {
+ //read the next item if we aren't already
+ if(l > inputs.length) throw new Error('this should never happen')
+ if(current.reading || current.ended || current.ready) return
+ current.reading = true
+ var sync = true
+ current.read(abort, function next (end, data) {
+ current.data = data
+ current.ready = true
+ current.reading = false
+
+ if(end === true || abort) current.ended = true
+ else if(end) abort = current.ended = end
+ //check whether we need to abort this stream.
+ if(abort && !end) current.read(abort, next)
+ if(!sync) check()
+ })
+ sync = false
+ })(inputs[l])
+
+ //scan the feed
+ check()
+ }
+
+ function read (_abort, _cb) {
+ abort = abort || _abort; cb = _cb; next()
+ }
+
+ read.add = function (stream) {
+ if(!stream) {
+ //the stream will now end when all the streams end.
+ capped = true
+ //we just changed state, so we may need to cb
+ return next()
+ }
+ inputs.push(create(stream))
+ next()
+ }
+
+ read.cap = function (err) {
+ read.add(null)
+ }
+
+ return read
+}