diff options
Diffstat (limited to 'lib/app.js')
-rw-r--r-- | lib/app.js | 314 |
1 files changed, 305 insertions, 9 deletions
@@ -4,13 +4,16 @@ var lru = require('hashlru') var pkg = require('../package') var u = require('./util') var pull = require('pull-stream') -var hasher = require('pull-hash/ext/ssb') var multicb = require('multicb') var paramap = require('pull-paramap') var Contacts = require('ssb-contact') var About = require('./about') var Serve = require('./serve') var Render = require('./render') +var Git = require('./git') +var cat = require('pull-cat') +var proc = require('child_process') +var toPull = require('stream-to-pull-stream') var BoxStream = require('pull-box-stream') var crypto = require('crypto') @@ -24,7 +27,7 @@ function App(sbot, config) { var conf = config.patchfoo || {} this.port = conf.port || 8027 - this.host = conf.host || '::1' + this.host = conf.host || 'localhost' var base = conf.base || '/' this.opts = { @@ -32,6 +35,7 @@ function App(sbot, config) { blob_base: conf.blob_base || conf.img_base || base, img_base: conf.img_base || base, emoji_base: conf.emoji_base || (base + 'emoji/'), + encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids), } sbot.get = memo({cache: lru(100)}, sbot.get) @@ -41,20 +45,29 @@ function App(sbot, config) { this._getAbout.bind(this)) this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox) this.reverseNameCache = lru(500) + this.reverseEmojiNameCache = lru(500) + this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)}, + sbot.blobs.size.bind(sbot.blobs)) this.unboxMsg = this.unboxMsg.bind(this) this.render = new Render(this, this.opts) + this.git = new Git(this) + + this.monitorBlobWants() } App.prototype.go = function () { var self = this - http.createServer(function (req, res) { + var server = http.createServer(function (req, res) { new Serve(self, req, res).go() - }).listen(self.port, self.host, function () { + }) + if (self.host === 'localhost') server.listen(self.port, onListening) + else server.listen(self.port, self.host, onListening) + function onListening() { var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host self.log('Listening on http://' + host + ':' + self.port) - }) + } // invalidate cached About info when new About messages come in pull( @@ -62,9 +75,12 @@ App.prototype.go = function () { pull.drain(function (link) { self.aboutCache.remove(link.dest) }, function (err) { - if (err) self.error('about:', err) + if (err) throw err }) ) + + // keep alive ssb client connection + setInterval(self.sbot.whoami, 10e3) } var logPrefix = '[' + pkg.name + ']' @@ -172,14 +188,27 @@ App.prototype.publish = function (content, cb) { tryPublish(2) } +App.prototype.wantSizeBlob = function (id, cb) { + // only want() the blob if we don't already have it + var self = this + var blobs = this.sbot.blobs + blobs.size(id, function (err, size) { + if (size != null) return cb(null, size) + self.blobWants[id] = true + blobs.want(id, function (err) { + if (err) return cb(err) + blobs.size(id, cb) + }) + }) +} + App.prototype.addBlobRaw = function (cb) { var done = multicb({pluck: 1, spread: true}) var sink = pull( - hasher(done()), u.pullLength(done()), this.sbot.blobs.add(done()) ) - done(function (err, hash, size, _) { + done(function (err, size, hash) { if (err) return cb(err) cb(null, {link: hash, size: size}) }) @@ -228,11 +257,55 @@ App.prototype.pushBlob = function (id, cb) { this.sbot.blobs.push(id, cb) } +App.prototype.readBlob = function (link) { + link = u.toLink(link) + return this.sbot.blobs.get({ + hash: link.link, + size: link.size, + }) +} + +App.prototype.readBlobSlice = function (link, opts) { + if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({ + hash: link.link, + size: link.size, + start: opts.start, + end: opts.end, + }) + return pull( + this.readBlob(link), + u.pullSlice(opts.start, opts.end) + ) +} + +App.prototype.ensureHasBlobs = function (links, cb) { + var self = this + var done = multicb({pluck: 1}) + links.forEach(function (link) { + var cb = done() + self.sbot.blobs.size(link.link, function (err, size) { + if (err) cb(err) + else if (size == null) cb(null, link) + else cb() + }) + }) + done(function (err, missingLinks) { + if (err) console.trace(err) + missingLinks = missingLinks.filter(Boolean) + if (missingLinks.length == 0) return cb() + return cb({name: 'BlobNotFoundError', links: missingLinks}) + }) +} + App.prototype.getReverseNameSync = function (name) { var id = this.reverseNameCache.get(name) return id } +App.prototype.getReverseEmojiNameSync = function (name) { + return this.reverseEmojiNameCache.get(name) +} + App.prototype.getNameSync = function (name) { var about = this.aboutCache.get(name) return about && about.name @@ -267,10 +340,22 @@ App.prototype.pullGetMsg = function (id) { App.prototype.createLogStream = function (opts) { opts = opts || {} return opts.sortByTimestamp - ? this.sbot.createFeedStream(opts) + ? this.createFeedStream(opts) : this.sbot.createLogStream(opts) } +App.prototype.createFeedStream = function (opts) { + // work around opts.gt being treated as opts.gte sometimes + if (opts.gt && opts.limit && !opts.reverse) return pull( + this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})), + pull.filter(function (msg) { + return msg && msg.value.timestamp !== opts.gt + }), + opts.limit && pull.take(opts.limit) + ) + return this.sbot.createFeedStream(opts) +} + var stateVals = { connected: 3, connecting: 2, @@ -372,6 +457,217 @@ App.prototype.createContactStreams = function (id) { return new Contacts(this.sbot).createContactStreams(id) } +function compareVoted(a, b) { + return b.value - a.value +} + +App.prototype.getVoted = function (_opts, cb) { + if (isNaN(_opts.limit)) return pull.error(new Error('missing limit')) + var self = this + var opts = { + type: 'vote', + limit: _opts.limit * 100, + reverse: !!_opts.reverse, + gt: _opts.gt || undefined, + lt: _opts.lt || undefined, + } + + var votedObj = {} + var votedArray = [] + var numItems = 0 + var firstTimestamp, lastTimestamp + pull( + self.sbot.messagesByType(opts), + self.unboxMessages(), + pull.take(function () { + return numItems < _opts.limit + }), + pull.drain(function (msg) { + if (!firstTimestamp) firstTimestamp = msg.timestamp + lastTimestamp = msg.timestamp + var vote = msg.value.content.vote + if (!vote) return + var target = u.linkDest(vote) + var votes = votedObj[target] + if (!votes) { + numItems++ + votes = {id: target, value: 0, feedsObj: {}, feeds: []} + votedObj[target] = votes + votedArray.push(votes) + } + if (msg.value.author in votes.feedsObj) { + if (!opts.reverse) return // leave latest vote value as-is + // remove old vote value + votes.value -= votes.feedsObj[msg.value.author] + } else { + votes.feeds.push(msg.value.author) + } + var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0 + votes.feedsObj[msg.value.author] = value + votes.value += value + }, function (err) { + if (err && err !== true) return cb(err) + var items = votedArray + if (opts.reverse) items.reverse() + items.sort(compareVoted) + cb(null, {items: items, + firstTimestamp: firstTimestamp, + lastTimestamp: lastTimestamp}) + }) + ) +} + App.prototype.createAboutStreams = function (id) { return this.about.createAboutStreams(id) } + +App.prototype.streamEmojis = function () { + return pull( + cat([ + this.sbot.links({ + rel: 'mentions', + source: this.sbot.id, + dest: '&', + values: true + }), + this.sbot.links({rel: 'mentions', dest: '&', values: true}) + ]), + this.unboxMessages(), + pull.map(function (msg) { return msg.value.content.mentions }), + pull.flatten(), + pull.filter('emoji'), + pull.unique('link') + ) +} + +App.prototype.filter = function (plugin, opts, filter) { + // work around flumeview-query not picking the best index. + // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256 + var index + if (plugin === this.sbot.backlinks) { + var c = filter && filter.value && filter.value.content + var filteringByType = c && c.type + if (!filteringByType) index = 'DTS' + } + // work around flumeview-query not supporting $lt/$gt. + // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256 + return pull( + plugin.read({ + index: index, + reverse: opts.reverse, + limit: opts.limit && (opts.limit + 1), + query: [{$filter: u.mergeOpts(filter, { + timestamp: { + $gte: opts.gt, + $lte: opts.lt, + } + })}] + }), + pull.filter(function (msg) { + return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt + }), + opts.limit && pull.take(opts.limit) + ) +} + +App.prototype.streamChannel = function (opts) { + // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions + if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { + dest: '#' + opts.channel, + }) + + if (this.sbot.query) return this.filter(this.sbot.query, opts, { + value: {content: {channel: opts.channel}}, + }) + + return pull.error(new Error( + 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin')) +} + +App.prototype.streamMentions = function (opts) { + if (!this.sbot.backlinks) return pull.error(new Error( + 'Viewing mentions requires the ssb-backlinks plugin')) + + if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, { + dest: this.sbot.id, + }) +} + +App.prototype.streamPrivate = function (opts) { + if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {}) + + return pull( + this.createLogStream(u.mergeOpts(opts, {limit: null})), + pull.filter(u.isMsgEncrypted), + this.unboxMessages(), + pull.filter(u.isMsgReadable), + pull.take(opts.limit) + ) +} + +App.prototype.blobMentions = function (opts) { + if (!this.sbot.links2) return pull.error(new Error( + 'missing ssb-links plugin')) + var filter = {rel: ['mentions', opts.name]} + if (opts.author) filter.source = opts.author + return this.sbot.links2.read({ + query: [ + {$filter: filter}, + {$filter: {dest: {$prefix: '&'}}}, + {$map: { + name: ['rel', 1], + size: ['rel', 2], + link: 'dest', + author: 'source', + time: 'ts' + }} + ] + }) +} + +App.prototype.monitorBlobWants = function () { + var self = this + self.blobWants = {} + pull( + this.sbot.blobs.createWants(), + pull.drain(function (wants) { + for (var id in wants) { + if (wants[id] < 0) self.blobWants[id] = true + else delete self.blobWants[id] + self.blobSizeCache.remove(id) + } + }, function (err) { + if (err) console.trace(err) + }) + ) +} + +App.prototype.getBlobState = function (id, cb) { + var self = this + if (self.blobWants[id]) return cb(null, 'wanted') + self.getBlobSize(id, function (err, size) { + if (err) return cb(err) + cb(null, size != null) + }) +} + +App.prototype.getNpmReadme = function (tarballId, cb) { + var self = this + // TODO: make this portable, and handle plaintext readmes + var tar = proc.spawn('tar', ['--ignore-case', '-Oxz', + 'package/README.md', 'package/readme.markdown', 'package/readme.mkd']) + var done = multicb({pluck: 1, spread: true}) + pull( + self.sbot.blobs.get(tarballId), + toPull.sink(tar.stdin, done()) + ) + pull( + toPull.source(tar.stdout), + pull.collect(done()) + ) + done(function (err, _, bufs) { + if (err) return cb(err) + var text = Buffer.concat(bufs).toString('utf8') + cb(null, text, true) + }) +} |