aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/logbook-lib.js97
1 files changed, 61 insertions, 36 deletions
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js
index 90864b7..d09d327 100644
--- a/lib/logbook-lib.js
+++ b/lib/logbook-lib.js
@@ -34,6 +34,8 @@ class ChannelController {
constructor(opts) {
this.opts = opts,
this.outputStream = Pushable(),
+ this.allMessages = {},
+ this.outputQueue = {},
this.idsInChannel = {},
this.streamData = {
channelStream: new StreamData(),
@@ -53,7 +55,7 @@ class ChannelController {
if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == channelName) {
return true;
}
-
+
// prevent ssb-search from grouping messages with #logbook and #logbook2 together, without running a double search
if(msg.data.value && msg.data.value.content && msg.data.value.content.text) {
let acceptableHashtags = [channelTag + "\n", channelTag + " "];
@@ -62,13 +64,13 @@ class ChannelController {
return true
}
}
-
+
return false;
}
}), pull.drain(function(msg) {
self.pushMessage(msg.source, msg.data);
}, function() {
- self.finish();
+ self.finish(followedIds);
}));
cb(this.outputStream, preserve);
@@ -77,7 +79,8 @@ class ChannelController {
this.idsInChannel[newMsg.value.author] = true;
var streamData = msgSource == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
streamData.oldestTimestampSeen = newMsg.value.timestamp;
-
+
+ this.allMessages[newMsg.key] = newMsg; // regardless of time, we want to check for replies to this message
if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
streamData.waitingMessages.push(newMsg);
requestBlobs(newMsg);
@@ -103,48 +106,60 @@ class ChannelController {
// in general, newlySafeMessages might not be in order, we should sort before appending
newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
for(var msgIndex in newlySafeMessages) {
- this.outputStream.push(newlySafeMessages[msgIndex]);
+ // this.outputStream.push(newlySafeMessages[msgIndex]);
+ this.outputQueue[newlySafeMessages[msgIndex].key] = newlySafeMessages[msgIndex];
}
},
- /*this.getReplies = function() {
- let replyStreams = [];
+ this.finish = function(followedIds) {
+ for(var datumIndex in this.streamData) {
+ this.streamData[datumIndex].oldestTimestampSeen = 0;
+ }
+ this.pushNewlySafeMessages();
- for(let userId of Object.keys(this.idsInChannel)) {
- replyStreams.push(createUserStream(userId));
+ this.getReplies(followedIds);
+
+ // this.outputStream.end();
+ },
+ this.getReplies = function(followedIds) {
+ var backlinkStreams = [];
+ for(const msgKey in this.allMessages) {
+ backlinkStreams.push(getBacklinkStream(msgKey));
}
- let self = this;
+ var self = this;
+ debug("looking for backlinked messages...");
+ var messagesToPush = [];
pull(
- many(replyStreams),
+ many(backlinkStreams),
pull.filter(function(msg) {
- let messageIsValid = msg.value && msg.value.content && msg.value.content.root;
- return messageIsValid && !(msg.key in self.rootMessages) && (msg.value.content.root in self.rootMessages);
+ return !(msg.data.key in self.outputQueue) && (followedIds.includes(msg.data.value.author.toLowerCase()));
+ }),
+ pull.filter(function(msg) {
+ debug("found backlink message: " + JSON.stringify(msg.data));
+ return (msg.data.value.timestamp > self.opts.gt) && (msg.data.value.timestamp < self.opts.lt);
}),
pull.drain(function(msg) {
- self.pushMessage(msg);
- },
- function() {
- self.sortAndPushAllMessages();
- getProfilePicturesOf(Object.keys(self.idsInChannel));
+ debug("backlink message had correct timestamps: " + JSON.stringify(msg.data));
+ if(!(msg.data.key in self.outputQueue)) {
+ self.outputQueue[msg.data.key] = msg.data;
+ // messagesToPush.push(msg.data);
+ }
+ }, function() {
+ for(const msgKey in self.outputQueue) {
+ debug("pushing message with key " + msgKey);
+ //self.outputStream.push(self.outputQueue[msgKey]);
+ messagesToPush.push(self.outputQueue[msgKey]);
+ }
+
+ messagesToPush.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1));
+
+ for(var msg of messagesToPush) {
+ self.outputStream.push(msg);
+ }
+
+ self.outputStream.end()
})
- );
- },
- this.sortAndPushAllMessages = function() {
- let self = this;
- let messages = Object.keys(this.rootMessages).map(function(msgId) { return self.rootMessages[msgId]; });
- for(let msg of messages.sort((a, b) => b.value.timestamp - a.value.timestamp)) {
- this.outputStream.push(msg);
- }
- this.exit();
- },*/
- this.finish = function() {
- for(var datumIndex in this.streamData) {
- this.streamData[datumIndex].oldestTimestampSeen = 0;
- }
- this.pushNewlySafeMessages();
-
- this.outputStream.end();
- delete this;
+ )
}
}
}
@@ -223,6 +238,16 @@ function debug(message) {
}
}
+function getBacklinkStream(msgId) {
+ var q = {
+ dest: msgId,
+ reverse: true,
+ values: true
+ }
+
+ return client.links(q);
+}
+
function createHashtagStream(channelName) {
var search = client.search && client.search.query;
if(!search) {