aboutsummaryrefslogtreecommitdiff
path: root/lib/channels-lib.js
diff options
context:
space:
mode:
authorEugeniy E. Mikhailov <evgmik@gmail.com>2021-02-08 18:15:25 -0500
committerEugeniy E. Mikhailov <evgmik@gmail.com>2021-02-08 18:15:25 -0500
commit9816a0870954e623038c7b43acee99cc14b84cb3 (patch)
treef93d64ac192e48dd50cbdedac977f591a960628c /lib/channels-lib.js
parent9b85a6eac776ac3caca9180e3aa78d02470b8e47 (diff)
downloadpatchfoo-9816a0870954e623038c7b43acee99cc14b84cb3.tar.gz
patchfoo-9816a0870954e623038c7b43acee99cc14b84cb3.zip
logbook2 search takes options also fix end of stream marker
Diffstat (limited to 'lib/channels-lib.js')
-rw-r--r--lib/channels-lib.js24
1 files changed, 13 insertions, 11 deletions
diff --git a/lib/channels-lib.js b/lib/channels-lib.js
index 0d8193c..7783993 100644
--- a/lib/channels-lib.js
+++ b/lib/channels-lib.js
@@ -1,4 +1,3 @@
-var { promisify } = require("util");
var pull = require("pull-stream");
var Pushable = require("pull-pushable");
var many = require("./pull-many-v2");
@@ -28,7 +27,7 @@ class StreamData {
}
class StreamController {
- constructor() {
+ constructor(opts) {
this.outputStream = Pushable(),
this.streamData = {
channelStream: new StreamData(),
@@ -38,7 +37,9 @@ class StreamController {
this.pushMessage = function(source, newMsg) {
var streamData = source == "hashtag" ? this.streamData.hashtagStream : this.streamData.channelStream;
streamData.oldestTimestampSeen = newMsg.value.timestamp;
- streamData.waitingMessages.push(newMsg);
+ if(newMsg.value.timestamp > opts.gt && newMsg.value.timestamp < opts.lt) {
+ streamData.waitingMessages.push(newMsg);
+ }
this.pushNewlySafeMessages();
},
@@ -71,13 +72,13 @@ class StreamController {
this.streamData[datumIndex].oldestTimestampSeen = 0;
}
this.pushNewlySafeMessages();
- this.outputStream.end()
+ this.outputStream.end();
}
- }
+ }a
}
module.exports = {
- getMessages: function(client, channelName, preserve, cb, hops=MAX_HOPS) {
+ getMessages: function(client, channelName, opts, preserve, cb, hops=MAX_HOPS) {
client.friends.hops({
dunbar: Number.MAX_SAFE_INTEGER,
max: hops
@@ -87,16 +88,16 @@ module.exports = {
}
var followedIds = Object.keys(friends).map(id => id.toLowerCase());
- getMessagesFrom(client, channelName, followedIds, preserve, cb);
+ getMessagesFrom(client, channelName, followedIds, opts, preserve, cb);
});
}
}
-function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
+function getMessagesFrom(client, channelName, followedIds, opts, preserve, cb) {
debug("Fetching messages from IDs in " + JSON.stringify(followedIds));
var channelTag = "#" + channelName;
- var streamController = new StreamController();
+ var streamController = new StreamController(opts);
var hashtagStream = createHashtagStream(client, channelName);
var channelStream = createChannelStream(client, channelName);
var stream = many([hashtagStream, channelStream]);
@@ -120,7 +121,7 @@ function getMessagesFrom(client, channelName, followedIds, preserve, cb) {
cb(streamController.outputStream, preserve);
}
-function createHashtagStream(client, channelName) {
+function createHashtagStream(client, channelName, opts) {
var search = client.search && client.search.query;
if(!search) {
console.log("[FATAL] ssb-search plugin must be installed to us channels");
@@ -136,7 +137,7 @@ function createHashtagStream(client, channelName) {
return query;
}
-function createChannelStream(client, channelName) {
+function createChannelStream(client, channelName, opts) {
var query = client.query.read({
query: [{
"$filter": {
@@ -161,3 +162,4 @@ function debug(message) {
console.log("[channels-lib] [" + timestamp.toISOString() + "] " + message);
}
}
+