aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/channels-lib.js24
-rw-r--r--lib/serve.js2
2 files changed, 14 insertions, 12 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js
index 0d8193c..7783993 100644
--- a/lib/channels-lib.js
+++ b/lib/channels-lib.js
@@ -1,4 +1,3 @@
-var { promisify } = require("util");
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
var many = require("./pull-many-v2");
@@ -28,7 +27,7 @@ class StreamData {
}
class StreamController {
- constructor() {
+ constructor(opts) {
this.outputStream = Pushable(),
this.streamData = {
channelStream: new StreamData(),
@@ -38,7 +37,9 @@ class StreamController {
this.pushMessage = function(source, newMsg) {
var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
streamData.oldestTimestampSeen = newMsg.value.timestamp;
- streamData.waitingMessages.push(newMsg);
+ if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
+ streamData.waitingMessages.push(newMsg);
+ }
this.pushNewlySafeMessages();
},
@@ -71,13 +72,13 @@ class StreamController {
this.streamData[datumIndex].oldestTimestampSeen = 0;
}
this.pushNewlySafeMessages();
- this.outputStream.end()
+ this.outputStream.end();
}
- }
+ }a
}
module.exports = {
- getMessages: function(client, channelName, preserve, cb, hops=MAX_HOPS) {
+ getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) {
client.friends.hops({
dunbar: Number.MAX_SAFE_INTEGER,
max: hops
@@ -87,16 +88,16 @@ module.exports = {
}
var followedIds = Object.keys(friends).map(id => id.toLowerCase());
- getMessagesFrom(client, channelName, followedIds, preserve, cb);
+ getMessagesFrom(client, channelName, followedIds, opts, preserve, cb);
});
}
}
-function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
+function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) {
debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
var channelTag = "#" + channelName;
- var streamController = new StreamController();
+ var streamController = new StreamController(opts);
var hashtagStream = createHashtagStream(client, channelName);
var channelStream = createChannelStream(client, channelName);
var stream = many([hashtagStream, channelStream]);
@@ -120,7 +121,7 @@ function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
cb(streamController.outputStream, preserve);
}
-function createHashtagStream(client, channelName) {
+function createHashtagStream(client, channelName, opts) {
var search = client.search && client.search.query;
if(!search) {
console.log("[FATAL] ssb-search plugin must be installed to us channels");
@@ -136,7 +137,7 @@ function createHashtagStream(client, channelName) {
return query;
}
-function createChannelStream(client, channelName) {
+function createChannelStream(client, channelName, opts) {
var query = client.query.read({
query: [{
"$filter": {
@@ -161,3 +162,4 @@ function debug(message) {
console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
}
}
+
diff --git a/lib/serve.js b/lib/serve.js
index f7e0620..fbf8d80 100644
--- a/lib/serve.js
+++ b/lib/serve.js
@@ -591,7 +591,7 @@ Serve.prototype.logbook2 = function (ext) {
filter: q.filter,
}
- channels.getMessages(this.app.sbot, "logbook", this, function(messageStream, serve) {
+ channels.getMessages(this.app.sbot, "logbook", opts, this, function(messageStream, serve) {
pull(messageStream,
serve.renderThreadPaginated(opts, null, q),
serve.wrapMessages(),