From c694dacbcfd348b142c26a8b8fb53b8b6684bd62 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 18 Jan 2021 14:58:04 -0500 Subject: Initial commit --- channels-v2.js | 285 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pull-many-v2.js | 95 +++++++++++++++++++ 2 files changed, 380 insertions(+) create mode 100755 channels-v2.js create mode 100644 pull-many-v2.js diff --git a/channels-v2.js b/channels-v2.js new file mode 100755 index 0000000..5e96694 --- /dev/null +++ b/channels-v2.js @@ -0,0 +1,285 @@ +const fs = require("fs"); +const home = require("os").homedir(); + +var argv = require("minimist")(process.argv.slice(2)); +var clientFactory = require("ssb-client"); +var path = require("path"); +var pull = require("pull-stream"); +var many = require("./pull-many-v2"); + +// Change to 'true' to get debugging output +DEBUG = false + +DEFAULT_CHANNEL_METADATA = { + lastMsgTimestamp: 0 +} +DEFAULT_CHANNEL_OBJECT = { + messages: [] +} +DEFAULT_CHANNEL_OBJECT_SIZE = 15 +GRAPH_TYPE = "follow" +MESSAGE_SEPERATOR = "---------------" +MESSAGE_SEPERATOR_COLOR = "\x1b[33m" +MESSAGE_COLOR = "\x1b[0m" +SBOT_ROOT = path.join(home, ".ssb"); +SBOT_CHANNEL_DIR = path.join(SBOT_ROOT, "channels"); +SBOT_CHANNEL_DATA = path.join(SBOT_CHANNEL_DIR, ".data"); + +class StreamData { + constructor() { + this.finished = false, + this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER, + this.waitingMessages = [] + + this.markFinished = function() { + this.finished = true, + this.oldestTimestampSeen = 0 + } + } +} + +if (!fs.existsSync(SBOT_ROOT)) { + debug("no ~/.ssb folder detected, creating it..."); + fs.mkdirSync(SBOT_ROOT); +} + +if (!fs.existsSync(SBOT_CHANNEL_DIR)) { + debug("no channels folder detected, creating it..."); + fs.mkdirSync(SBOT_CHANNEL_DIR); +} + +if (!fs.existsSync(SBOT_CHANNEL_DATA)) { + debug("no channel metadata file found, creating it..."); + fs.writeFileSync(SBOT_CHANNEL_DATA, "{}"); +} + +debug("Creating client..."); +clientFactory(function (err, client) { + if(err) { + console.log("[FATAL] Error when starting scuttlebot: " + err); + throw err; + } + debug("Successfully created client"); + + + main(client); +}); + +function main(client) { + // var metadata = JSON.parse(fs.readFileSync(SBOT_CHANNEL_DATA)); + + if(!argv._ || argv._.length == 0) { + showHelp(metadata); + client.close(true, () => {}); + process.exit(0); + } + + var channelName = "#" + argv._[0]; + // var trackedChannelData = getChannelData(channelName, metadata); // only get data for the first channel given (easy to change this if needed) + // var trackedChannelMessages = getChannelMessages(channelName); + var trackedChannelMessages = DEFAULT_CHANNEL_OBJECT; + // debug("fetched " + trackedChannelMessages["messages"].length + " messages from cache"); + + client.friends.hops({ + dunbar: Number.MAX_SAFE_INTEGER, + max: 3 + }, + function(err, hops) { + if(err) { + console.log("[FATAL] Could not retreive friends list (is ssb-friends installed?)") + client.close(true, () => {}); + process.exit(4); + } + + fetchChannelMessagesFrom(client, channelName, trackedChannelMessages, Object.keys(hops)); + } + ) +} + +function fetchChannelMessagesFrom(client, channelName, trackedChannelMessages, hops) { + // So it seems like Scuttlebutt IDs are case insensitive + // This means that you can accidentially capitalize/uncapitalize letters when following someone, and the follow will still go through + // However, the typo will remain in your friend graph and ruin any attempt made to query for that ID in its normal capitalization + // To avoid this skullfuckery, we just use a list and map lower() onto it beforehand + var followedIds = hops.map(id => id.toLowerCase()); + + debug("Fetching messages from IDs in " + JSON.stringify(followedIds)); + + var streamController = { + assembledMessages: [], + streamData: { + hashtagStream: new StreamData(), + channelStream: new StreamData() + }, + pushMessage: function(source, newMsg) { + var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream; + streamData.oldestTimestampSeen = newMsg.value.timestamp; + streamData.waitingMessages.push(newMsg); + + this.printNewlySafeMessages(); + }, + printNewlySafeMessages: function() { + var newlySafeMessages = []; + var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); + debug("oldest safe timestamp:" + oldestSafeTimestamp); + + for(var streamDataIndex in this.streamData) { + var streamDatum = this.streamData[streamDataIndex] + debug("length of waiting messages: " + streamDatum.waitingMessages.length); + if(streamDatum.waitingMessages.length) { + debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp); + } + while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { + var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message + newlySafeMessages.push(safeMsg); + } + } + + // in general, newlySafeMessages might not be in order, we should sort before appending + newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); + this.assembledMessages = this.assembledMessages.concat(newlySafeMessages); + for(var msgIndex in newlySafeMessages) { + printMsg(newlySafeMessages[msgIndex]); + } + debug("***************"); + }, + finish: function() { + for(var datumIndex in this.streamData) { + this.streamData[datumIndex].oldestTimestampSeen = 0; + } + this.printNewlySafeMessages(); + } + } + + var hashtagStream = createHashtagStream(client, channelName); + var channelStream = createChannelStream(client, channelName); + var stream = many([hashtagStream, channelStream]); + + pull(stream, pull.filter(function(msg) { + return followedIds.includes(msg.data.value.author.toLowerCase()); + }), pull.filter(function(msg) { + var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelName); + if(msg.source == "hashtag") { + return hasHashtag; + } + else { + return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header + } + }), pull.drain(function(msg) { + streamController.pushMessage(msg.source, msg.data); + }, function() { + streamController.finish(); + client.close(true, () => {}); + process.exit(0); + })); +} + +function saveMessages(client, taskData, channelName, trackedChannelMessages, metadata) { + debug("found " + taskData.msgsFound + " new messages"); + debug(JSON.stringify(taskData)); + console.log("Found " + trackedChannelMessages["messages"].length + " messages:"); + + updateChannelMessages(channelName, trackedChannelMessages, metadata, taskData.lastTimestamp); + +} + +function showHelp(metadata) { + console.log("No channel provided. To get messages for a channel, run 'node channels.js example'"); + console.log("(note that 'node channels.js #example' will not work, because # is a protected character)"); + + var trackedChannels = getTrackedChannels(metadata); + if(trackedChannels.length) { + console.log("Currently tracked channels are:"); + for(const channelNameIndex in trackedChannels) { + var stats = fs.statSync(path.join(SBOT_CHANNEL_DIR, trackedChannels[channelNameIndex])); + if(stats.size > DEFAULT_CHANNEL_OBJECT_SIZE) { // don't say we're tracking empty channels -- if they were interesting, they'd have stuff in them + console.log(trackedChannels[channelNameIndex]); + } + } + } + else { + console.log("No channels are currently tracked."); + } +} + +function getTrackedChannels(data) { + return Object.keys(data); +} + +function getChannelData(channelName, data) { + if(!data[channelName]) { + data[channelName] = { + lastMsgTimestamp: 0, + } + } + + return data[channelName]; +} + +function getChannelMessages(channelName) { + var messageCachePath = path.join(SBOT_CHANNEL_DIR, channelName); + if (!fs.existsSync(messageCachePath)) { + return DEFAULT_CHANNEL_OBJECT; + } + + var raw = fs.readFileSync(messageCachePath, "utf8"); + return JSON.parse(raw); +} + +function createHashtagStream(client, channelName) { + var search = client.search && client.search.query; + if(!search) { + console.log("[FATAL] ssb-search plugin must be installed to us channels"); + process.exit(5); + } + + var query = search({ + query: channelName + }); + + query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from + + return query; +} + +function createChannelStream(client, channelName) { + var query = client.query.read({ + query: [{ + "$filter": { + value: { + content: { + channel: channelName.substring(1) + } + } + } + }], + reverse: true + }); + + query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later + + return query; +} + +function printMsg(msg) { + console.log(MESSAGE_SEPERATOR_COLOR, MESSAGE_SEPERATOR); + console.log(MESSAGE_COLOR, msg.value.content.text); +} + +function updateChannelMessages(channelName, channelJson, metadata, latestTimestamp) { + fs.writeFileSync(path.join(SBOT_CHANNEL_DIR, channelName), JSON.stringify(channelJson)); + + if(metadata.channelName == undefined) { + metadata[channelName] = DEFAULT_CHANNEL_METADATA; + } + + metadata[channelName].lastMsgTimestamp = latestTimestamp; // once we've successfully updated messages, update the most recent message ID + fs.writeFileSync(SBOT_CHANNEL_DATA, JSON.stringify(metadata)); +} + +function debug(message) { + if(DEBUG) { + var timestamp = new Date(); + console.log("[" + timestamp.toISOString() + "] " + message); + } +} diff --git a/pull-many-v2.js b/pull-many-v2.js new file mode 100644 index 0000000..beea621 --- /dev/null +++ b/pull-many-v2.js @@ -0,0 +1,95 @@ +// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js +// Edits have been made to distinguish the original stream that each object came from + +module.exports = function (ary) { + + var capped = !!ary + var inputs = (ary || []).map(create), i = 0, abort, cb + + function create (stream) { + return {ready: false, reading: false, ended: false, read: stream, data: null} + } + + function check () { + if(!cb) return + clean() + var l = inputs.length + var _cb = cb + if(l === 0 && (abort || capped)) { + cb = null; _cb(abort || true) + return + } + + //scan the inputs to check whether there is one we can use. + for(var j = 0; j < l; j++) { + var current = inputs[(i + j) % l] + if(current.ready && !current.ended) { + var data = { // [EDIT] keep track of which exact source the data came from + data: current.data, + source: current.read.streamName + } + current.ready = false + current.data = null + i ++; cb = null + return _cb(null, data) + } + } + } + + function clean () { + var l = inputs.length + //iterate backwards so that we can remove items. + while(l--) { + if(inputs[l].ended) + inputs.splice(l, 1) + } + } + + function next () { + var l = inputs.length + while(l--) + (function (current) { + //read the next item if we aren't already + if(l > inputs.length) throw new Error('this should never happen') + if(current.reading || current.ended || current.ready) return + current.reading = true + var sync = true + current.read(abort, function next (end, data) { + current.data = data + current.ready = true + current.reading = false + + if(end === true || abort) current.ended = true + else if(end) abort = current.ended = end + //check whether we need to abort this stream. + if(abort && !end) current.read(abort, next) + if(!sync) check() + }) + sync = false + })(inputs[l]) + + //scan the feed + check() + } + + function read (_abort, _cb) { + abort = abort || _abort; cb = _cb; next() + } + + read.add = function (stream) { + if(!stream) { + //the stream will now end when all the streams end. + capped = true + //we just changed state, so we may need to cb + return next() + } + inputs.push(create(stream)) + next() + } + + read.cap = function (err) { + read.add(null) + } + + return read +} -- cgit v1.2.3