aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/logbook-lib.js98
1 files changed, 81 insertions, 17 deletions
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js
index 7b821b8..90864b7 100644
--- a/lib/logbook-lib.js
+++ b/lib/logbook-lib.js
@@ -2,7 +2,7 @@ var clientFactory = require("ssb-client");
var config = require("ssb-config");
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
-var many = require("pull-many");
+var many = require("./pull-many-v2");
// Change to 'true' to get debugging output
DEBUG = !false
@@ -13,42 +13,100 @@ DEFAULT_OPTS = {
}
GRAPH_TYPE = "follow"
MAX_HOPS = 1
-MAX_REPLY_RECURSIONS = 10
// only instantiate a ssb-client once, so we don't have to create a new connection every time we load a channel page
let client = null;
+class StreamData {
+ constructor() {
+ this.finished = false,
+ this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER,
+ this.waitingMessages = []
+
+ this.markFinished = function() {
+ this.finished = true,
+ this.oldestTimestampSeen = 0
+ }
+ }
+}
+
class ChannelController {
constructor(opts) {
this.opts = opts,
this.outputStream = Pushable(),
this.idsInChannel = {},
- this.rootMessages = {}, // used to check for replies
+ this.streamData = {
+ channelStream: new StreamData(),
+ hashtagStream: new StreamData(),
+ },
this.getMessagesFrom = function(channelName, followedIds, preserve, cb) {
- var hashtagStream = createHashtagStream("#" + channelName);
+ var channelTag = "#" + channelName;
+ var hashtagStream = createHashtagStream(channelTag);
var channelStream = createChannelStream(channelName);
var stream = many([hashtagStream, channelStream]);
var self = this;
pull(stream, pull.filter(function(msg) {
- return followedIds.includes(msg.value.author.toLowerCase());
+ return followedIds.includes(msg.data.value.author.toLowerCase());
+ }), pull.filter(function(msg) {
+ if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == channelName) {
+ return true;
+ }
+
+ // prevent ssb-search from grouping messages with #logbook and #logbook2 together, without running a double search
+ if(msg.data.value && msg.data.value.content && msg.data.value.content.text) {
+ let acceptableHashtags = [channelTag + "\n", channelTag + " "];
+ for(let hashtag of acceptableHashtags) {
+ if(msg.data.value.content.text.indexOf(hashtag) != -1) {
+ return true
+ }
+ }
+
+ return false;
+ }
}), pull.drain(function(msg) {
- self.pushMessage(msg);
+ self.pushMessage(msg.source, msg.data);
}, function() {
- self.getReplies();
+ self.finish();
}));
cb(this.outputStream, preserve);
},
- this.pushMessage = function(newMsg) {
+ this.pushMessage = function(msgSource, newMsg) {
this.idsInChannel[newMsg.value.author] = true;
+ var streamData = msgSource == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
+ streamData.oldestTimestampSeen = newMsg.value.timestamp;
+
if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
- this.rootMessages[newMsg.key] = newMsg;
+ streamData.waitingMessages.push(newMsg);
requestBlobs(newMsg);
}
+
+ this.pushNewlySafeMessages();
+ },
+ this.pushNewlySafeMessages = function() {
+ var newlySafeMessages = [];
+ var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen));
+ debug("pushing messages from before " + oldestSafeTimestamp + "...");
+
+ for(var streamDataIndex in this.streamData) {
+ var streamDatum = this.streamData[streamDataIndex]
+ while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) {
+ var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message
+ debug("pushing safe message with timestamp " + safeMsg.value.timestamp);
+ newlySafeMessages.push(safeMsg);
+ }
+ }
+ debug("pushed all newly safe messages");
+
+ // in general, newlySafeMessages might not be in order, we should sort before appending
+ newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
+ for(var msgIndex in newlySafeMessages) {
+ this.outputStream.push(newlySafeMessages[msgIndex]);
+ }
},
- this.getReplies = function() {
+ /*this.getReplies = function() {
let replyStreams = [];
for(let userId of Object.keys(this.idsInChannel)) {
@@ -78,9 +136,13 @@ class ChannelController {
this.outputStream.push(msg);
}
this.exit();
- },
- this.exit = function() {
- debug("ending stream")
+ },*/
+ this.finish = function() {
+ for(var datumIndex in this.streamData) {
+ this.streamData[datumIndex].oldestTimestampSeen = 0;
+ }
+ this.pushNewlySafeMessages();
+
this.outputStream.end();
delete this;
}
@@ -171,11 +233,12 @@ function createHashtagStream(channelName) {
query: channelName
});
+ query.streamName = "hashtag";
+
return query;
}
-/*
-function createHashtagStream(channelTag) {
+/*function createHashtagStream(channelTag) {
var query = client.query.read({
query: [{
"$filter": {
@@ -192,8 +255,7 @@ function createHashtagStream(channelTag) {
});
return query;
-}
-*/
+}*/
function createChannelStream(channelName) {
var query = client.query.read({
@@ -209,6 +271,8 @@ function createChannelStream(channelName) {
reverse: true
});
+ query.streamName = "channel";
+
return query;
}