aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/channels-lib.js162
-rw-r--r--lib/pull-many-v2.js95
-rw-r--r--lib/serve.js57
3 files changed, 312 insertions, 2 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js
new file mode 100644
index 0000000..f2d8484
--- /dev/null
+++ b/lib/channels-lib.js
@@ -0,0 +1,162 @@
+var { promisify } = require("util");
+var pull = require("pull-stream");
+var Pushable = require("pull-pushable");
+var many = require("./pull-many-v2");
+
+// Change to 'true' to get debugging output
+DEBUG = false
+
+DEFAULT_CHANNEL_OBJECT = {
+ messages: []
+}
+DEFAULT_CHANNEL_OBJECT_SIZE = 15
+FRIENDS_POLL_TIME = 1 // ms
+GRAPH_TYPE = "follow"
+MAX_HOPS = 1
+
+class StreamData {
+ constructor() {
+ this.finished = false,
+ this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER,
+ this.waitingMessages = []
+
+ this.markFinished = function() {
+ this.finished = true,
+ this.oldestTimestampSeen = 0
+ }
+ }
+}
+
+class StreamController {
+ constructor() {
+ this.outputStream = Pushable(),
+ this.streamData = {
+ channelStream: new StreamData(),
+ hashtagStream: new StreamData(),
+ },
+
+ this.pushMessage = function(source, newMsg) {
+ var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
+ streamData.oldestTimestampSeen = newMsg.value.timestamp;
+ streamData.waitingMessages.push(newMsg);
+
+ this.pushNewlySafeMessages();
+ },
+ this.pushNewlySafeMessages = function() {
+ var newlySafeMessages = [];
+ var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
+ debug("oldest safe timestamp:" + oldestSafeTimestamp);
+
+ for(var streamDataIndex in this.streamData) {
+ var streamDatum = this.streamData[streamDataIndex]
+ debug("length of waiting messages: " + streamDatum.waitingMessages.length);
+ if(streamDatum.waitingMessages.length) {
+ debug("timestamp of first waiting message: " + streamDatum.waitingMessages[0].value.timestamp);
+ }
+ while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
+ var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
+ newlySafeMessages.push(safeMsg);
+ }
+ }
+
+ // in general, newlySafeMessages might not be in order, we should sort before appending
+ newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
+ for(var msgIndex in newlySafeMessages) {
+ debug("pushing safe message...");
+ this.outputStream.push(newlySafeMessages[msgIndex]);
+ }
+ },
+ this.finish = function() {
+ for(var datumIndex in this.streamData) {
+ this.streamData[datumIndex].oldestTimestampSeen = 0;
+ }
+ this.pushNewlySafeMessages();
+ }
+ }
+}
+
+module.exports = {
+ getMessages: function(client, channelName, preserve, cb, hops=MAX_HOPS) {
+ client.friends.hops({
+ dunbar: Number.MAX_SAFE_INTEGER,
+ max: hops
+ }, function(error, friends) {
+ if(error) {
+ throw "Couldn't get a list of friends from scuttlebot:\n" + error;
+ }
+
+ var followedIds = Object.keys(friends).map(id => id.toLowerCase());
+ getMessagesFrom(client, channelName, followedIds, preserve, cb);
+ });
+ }
+}
+
+function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
+ debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
+
+ var channelTag = "#" + channelName;
+ var streamController = new StreamController();
+ var hashtagStream = createHashtagStream(client, channelName);
+ var channelStream = createChannelStream(client, channelName);
+ var stream = many([hashtagStream, channelStream]);
+
+ pull(stream, pull.filter(function(msg) {
+ return followedIds.includes(msg.data.value.author.toLowerCase());
+ }), pull.filter(function(msg) {
+ var hasHashtag = msg.data.value.content && msg.data.value.content.text && typeof(msg.data.value.content.text) == "string" && msg.data.value.content.text.includes(channelTag);
+ if(msg.source == "hashtag") {
+ return hasHashtag;
+ }
+ else {
+ return !hasHashtag; // prevents us from double-counting messages with both the hashtag and the channel header
+ }
+ }), pull.drain(function(msg) {
+ streamController.pushMessage(msg.source, msg.data);
+ }, function() {
+ streamController.finish();
+ }));
+
+ cb(streamController.outputStream, preserve);
+}
+
+function createHashtagStream(client, channelName) {
+ var search = client.search && client.search.query;
+ if(!search) {
+ console.log("[FATAL] ssb-search plugin must be installed to us channels");
+ process.exit(5);
+ }
+
+ var query = search({
+ query: channelName
+ });
+
+ query.streamName = "hashtag"; // mark the stream object so we can tell which stream an object came from
+
+ return query;
+}
+
+function createChannelStream(client, channelName) {
+ var query = client.query.read({
+ query: [{
+ "$filter": {
+ value: {
+ content: {
+ channel: channelName
+ }
+ }
+ }
+ }],
+ reverse: true
+ });
+
+ query.streamName = "channel"; // mark the stream object so we can tell which stream a message came from later
+
+ return query;
+}
+
+function debug(message) {
+ if(DEBUG) {
+ var timestamp = new Date();
+ console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
+ }
+}
diff --git a/lib/pull-many-v2.js b/lib/pull-many-v2.js
new file mode 100644
index 0000000..beea621
--- /dev/null
+++ b/lib/pull-many-v2.js
@@ -0,0 +1,95 @@
+// All credit to https://github.com/pull-stream/pull-many/blob/master/index.js
+// Edits have been made to distinguish the original stream that each object came from
+
+module.exports = function (ary) {
+
+ var capped = !!ary
+ var inputs = (ary || []).map(create), i = 0, abort, cb
+
+ function create (stream) {
+ return {ready: false, reading: false, ended: false, read: stream, data: null}
+ }
+
+ function check () {
+ if(!cb) return
+ clean()
+ var l = inputs.length
+ var _cb = cb
+ if(l === 0 && (abort || capped)) {
+ cb = null; _cb(abort || true)
+ return
+ }
+
+ //scan the inputs to check whether there is one we can use.
+ for(var j = 0; j < l; j++) {
+ var current = inputs[(i + j) % l]
+ if(current.ready && !current.ended) {
+ var data = { // [EDIT] keep track of which exact source the data came from
+ data: current.data,
+ source: current.read.streamName
+ }
+ current.ready = false
+ current.data = null
+ i ++; cb = null
+ return _cb(null, data)
+ }
+ }
+ }
+
+ function clean () {
+ var l = inputs.length
+ //iterate backwards so that we can remove items.
+ while(l--) {
+ if(inputs[l].ended)
+ inputs.splice(l, 1)
+ }
+ }
+
+ function next () {
+ var l = inputs.length
+ while(l--)
+ (function (current) {
+ //read the next item if we aren't already
+ if(l > inputs.length) throw new Error('this should never happen')
+ if(current.reading || current.ended || current.ready) return
+ current.reading = true
+ var sync = true
+ current.read(abort, function next (end, data) {
+ current.data = data
+ current.ready = true
+ current.reading = false
+
+ if(end === true || abort) current.ended = true
+ else if(end) abort = current.ended = end
+ //check whether we need to abort this stream.
+ if(abort && !end) current.read(abort, next)
+ if(!sync) check()
+ })
+ sync = false
+ })(inputs[l])
+
+ //scan the feed
+ check()
+ }
+
+ function read (_abort, _cb) {
+ abort = abort || _abort; cb = _cb; next()
+ }
+
+ read.add = function (stream) {
+ if(!stream) {
+ //the stream will now end when all the streams end.
+ capped = true
+ //we just changed state, so we may need to cb
+ return next()
+ }
+ inputs.push(create(stream))
+ next()
+ }
+
+ read.cap = function (err) {
+ read.add(null)
+ }
+
+ return read
+}
diff --git a/lib/serve.js b/lib/serve.js
index 21f08ac..f7e0620 100644
--- a/lib/serve.js
+++ b/lib/serve.js
@@ -28,6 +28,7 @@ var Url = require('url')
var many = require('pull-many')
var merge = require('pull-merge')
var pSort = require('pull-sort')
+var channels = require("./channels-lib");
module.exports = Serve
@@ -520,7 +521,7 @@ Serve.prototype.path = function (url) {
switch (m[1]) {
case '/new': return this.new(m[2])
case '/public': return this.public(m[2])
- case '/logbook': return this.logbook(m[2])
+ case '/logbook': return this.logbook2(m[2])
case '/threads': return this.threads(m[2])
case '/private': return this.private(m[2])
case '/mentions': return this.mentions(m[2])
@@ -579,6 +580,57 @@ Serve.prototype.home = function () {
)
}
+Serve.prototype.logbook2 = function (ext) {
+ var q = this.query
+ var opts = {
+ reverse: !q.forwards,
+ //sortByTimestamp: q.sort === 'claimed',
+ sortByTimestamp: q.sort || 'claimed',
+ lt: Number(q.lt) || Date.now(),
+ gt: Number(q.gt) || -Infinity,
+ filter: q.filter,
+ }
+
+ channels.getMessages(this.app.sbot, "logbook", this, function(messageStream, serve) {
+ pull(messageStream,
+ serve.renderThreadPaginated(opts, null, q),
+ serve.wrapMessages(),
+ serve.wrapPublic(),
+ serve.wrapPage('logbook'),
+ serve.respondSink(200, {
+ 'Content-Type': ctype(ext)
+ })
+ //pull.drain(function(msg) {
+ //console.log(JSON.stringify(msg));
+ //})
+ )
+ }
+ , hops=3)
+
+ //this.renderThreadPaginated(opts, null, q),
+ //this.wrapMessages(),
+ //this.wrapPublic(),
+ //this.wrapPage('public'),
+ //this.respondSink(200, {
+ //'Content-Type': ctype(ext)
+ //})
+ //)
+ //});
+
+ //pull(
+ //this.app.createLogStream(opts),
+ //pull.filter(msg => {
+ //return !msg.value.content.vote
+ //}),
+ //this.renderThreadPaginated(opts, null, q),
+ //this.wrapMessages(),
+ //this.wrapPublic(),
+ //this.wrapPage('public'),
+ //this.respondSink(200, {
+ //'Content-Type': ctype(ext)
+ //})
+ //)
+}
compareMsgs = function(a, b) {
return -(a.value.timestamp - b.value.timestamp)
}
@@ -622,7 +674,8 @@ Serve.prototype.public = function (ext) {
var q = this.query
var opts = {
reverse: !q.forwards,
- sortByTimestamp: q.sort === 'claimed',
+ //sortByTimestamp: q.sort === 'claimed',
+ sortByTimestamp: q.sort || 'claimed',
lt: Number(q.lt) || Date.now(),
gt: Number(q.gt) || -Infinity,
filter: q.filter,