diff options
Diffstat (limited to 'lib/logbook-lib.js')
-rw-r--r-- | lib/logbook-lib.js | 98 |
1 files changed, 81 insertions, 17 deletions
diff --git a/lib/logbook-lib.js b/lib/logbook-lib.js index 7b821b8..90864b7 100644 --- a/lib/logbook-lib.js +++ b/lib/logbook-lib.js @@ -2,7 +2,7 @@ var clientFactory = require("ssb-client"); var config = require("ssb-config"); var pull = require("pull-stream"); var Pushable = require("pull-pushable"); -var many = require("pull-many"); +var many = require("./pull-many-v2"); // Change to 'true' to get debugging output DEBUG = !false @@ -13,42 +13,100 @@ DEFAULT_OPTS = { } GRAPH_TYPE = "follow" MAX_HOPS = 1 -MAX_REPLY_RECURSIONS = 10 // only instantiate a ssb-client once, so we don't have to create a new connection every time we load a channel page let client = null; +class StreamData { + constructor() { + this.finished = false, + this.oldestTimestampSeen = Number.MAX_SAFE_INTEGER, + this.waitingMessages = [] + + this.markFinished = function() { + this.finished = true, + this.oldestTimestampSeen = 0 + } + } +} + class ChannelController { constructor(opts) { this.opts = opts, this.outputStream = Pushable(), this.idsInChannel = {}, - this.rootMessages = {}, // used to check for replies + this.streamData = { + channelStream: new StreamData(), + hashtagStream: new StreamData(), + }, this.getMessagesFrom = function(channelName, followedIds, preserve, cb) { - var hashtagStream = createHashtagStream("#" + channelName); + var channelTag = "#" + channelName; + var hashtagStream = createHashtagStream(channelTag); var channelStream = createChannelStream(channelName); var stream = many([hashtagStream, channelStream]); var self = this; pull(stream, pull.filter(function(msg) { - return followedIds.includes(msg.value.author.toLowerCase()); + return followedIds.includes(msg.data.value.author.toLowerCase()); + }), pull.filter(function(msg) { + if(msg.data.value && msg.data.value.content && msg.data.value.content.channel && msg.data.value.content.channel == channelName) { + return true; + } + + // prevent ssb-search from grouping messages with #logbook and #logbook2 together, without running a double search + if(msg.data.value && msg.data.value.content && msg.data.value.content.text) { + let acceptableHashtags = [channelTag + "\n", channelTag + " "]; + for(let hashtag of acceptableHashtags) { + if(msg.data.value.content.text.indexOf(hashtag) != -1) { + return true + } + } + + return false; + } }), pull.drain(function(msg) { - self.pushMessage(msg); + self.pushMessage(msg.source, msg.data); }, function() { - self.getReplies(); + self.finish(); })); cb(this.outputStream, preserve); }, - this.pushMessage = function(newMsg) { + this.pushMessage = function(msgSource, newMsg) { this.idsInChannel[newMsg.value.author] = true; + var streamData = msgSource == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream; + streamData.oldestTimestampSeen = newMsg.value.timestamp; + if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) { - this.rootMessages[newMsg.key] = newMsg; + streamData.waitingMessages.push(newMsg); requestBlobs(newMsg); } + + this.pushNewlySafeMessages(); + }, + this.pushNewlySafeMessages = function() { + var newlySafeMessages = []; + var oldestSafeTimestamp = Math.max(...Object.values(this.streamData).map(datum => datum.oldestTimestampSeen)); + debug("pushing messages from before " + oldestSafeTimestamp + "..."); + + for(var streamDataIndex in this.streamData) { + var streamDatum = this.streamData[streamDataIndex] + while(streamDatum.waitingMessages.length && streamDatum.waitingMessages[0].value.timestamp >= oldestSafeTimestamp) { + var safeMsg = streamDatum.waitingMessages.shift(); // pop the newest waiting message + debug("pushing safe message with timestamp " + safeMsg.value.timestamp); + newlySafeMessages.push(safeMsg); + } + } + debug("pushed all newly safe messages"); + + // in general, newlySafeMessages might not be in order, we should sort before appending + newlySafeMessages.sort((msg1, msg2) => (msg1.value.timestamp > msg2.value.timestamp ? -1 : 1)); + for(var msgIndex in newlySafeMessages) { + this.outputStream.push(newlySafeMessages[msgIndex]); + } }, - this.getReplies = function() { + /*this.getReplies = function() { let replyStreams = []; for(let userId of Object.keys(this.idsInChannel)) { @@ -78,9 +136,13 @@ class ChannelController { this.outputStream.push(msg); } this.exit(); - }, - this.exit = function() { - debug("ending stream") + },*/ + this.finish = function() { + for(var datumIndex in this.streamData) { + this.streamData[datumIndex].oldestTimestampSeen = 0; + } + this.pushNewlySafeMessages(); + this.outputStream.end(); delete this; } @@ -171,11 +233,12 @@ function createHashtagStream(channelName) { query: channelName }); + query.streamName = "hashtag"; + return query; } -/* -function createHashtagStream(channelTag) { +/*function createHashtagStream(channelTag) { var query = client.query.read({ query: [{ "$filter": { @@ -192,8 +255,7 @@ function createHashtagStream(channelTag) { }); return query; -} -*/ +}*/ function createChannelStream(channelName) { var query = client.query.read({ @@ -209,6 +271,8 @@ function createChannelStream(channelName) { reverse: true }); + query.streamName = "channel"; + return query; } |