aboutsummaryrefslogtreecommitdiff
path: root/channels-lib.js
diff options
context:
space:
mode:
Diffstat (limited to 'channels-lib.js')
-rw-r--r--channels-lib.js20
1 files changed, 11 insertions, 9 deletions
diff --git a/channels-lib.js b/channels-lib.js
index f2d8484..8d057ce 100644
--- a/channels-lib.js
+++ b/channels-lib.js
@@ -1,4 +1,3 @@
-var { promisify } = require("util");
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
var many = require("./pull-many-v2");
@@ -28,7 +27,7 @@ class StreamData {
}
class StreamController {
- constructor() {
+ constructor(opts) {
this.outputStream = Pushable(),
this.streamData = {
channelStream: new StreamData(),
@@ -38,7 +37,9 @@ class StreamController {
this.pushMessage = function(source, newMsg) {
var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
streamData.oldestTimestampSeen = newMsg.value.timestamp;
- streamData.waitingMessages.push(newMsg);
+ if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
+ streamData.waitingMessages.push(newMsg);
+ }
this.pushNewlySafeMessages();
},
@@ -76,7 +77,7 @@ class StreamController {
}
module.exports = {
- getMessages: function(client, channelName, preserve, cb, hops=MAX_HOPS) {
+ getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) {
client.friends.hops({
dunbar: Number.MAX_SAFE_INTEGER,
max: hops
@@ -86,16 +87,16 @@ module.exports = {
}
var followedIds = Object.keys(friends).map(id => id.toLowerCase());
- getMessagesFrom(client, channelName, followedIds, preserve, cb);
+ getMessagesFrom(client, channelName, followedIds, opts, preserve, cb);
});
}
}
-function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
+function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) {
debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
var channelTag = "#" + channelName;
- var streamController = new StreamController();
+ var streamController = new StreamController(opts);
var hashtagStream = createHashtagStream(client, channelName);
var channelStream = createChannelStream(client, channelName);
var stream = many([hashtagStream, channelStream]);
@@ -119,7 +120,7 @@ function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
cb(streamController.outputStream, preserve);
}
-function createHashtagStream(client, channelName) {
+function createHashtagStream(client, channelName, opts) {
var search = client.search && client.search.query;
if(!search) {
console.log("[FATAL] ssb-search plugin must be installed to us channels");
@@ -135,7 +136,7 @@ function createHashtagStream(client, channelName) {
return query;
}
-function createChannelStream(client, channelName) {
+function createChannelStream(client, channelName, opts) {
var query = client.query.read({
query: [{
"$filter": {
@@ -160,3 +161,4 @@ function debug(message) {
console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
}
}
+s