aboutsummaryrefslogtreecommitdiff
path: root/lib/app.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/app.js')
-rw-r--r--lib/app.js314
1 files changed, 305 insertions, 9 deletions
diff --git a/lib/app.js b/lib/app.js
index 3a41086..5cc4e62 100644
--- a/lib/app.js
+++ b/lib/app.js
@@ -4,13 +4,16 @@ var lru = require('hashlru')
var pkg = require('../package')
var u = require('./util')
var pull = require('pull-stream')
-var hasher = require('pull-hash/ext/ssb')
var multicb = require('multicb')
var paramap = require('pull-paramap')
var Contacts = require('ssb-contact')
var About = require('./about')
var Serve = require('./serve')
var Render = require('./render')
+var Git = require('./git')
+var cat = require('pull-cat')
+var proc = require('child_process')
+var toPull = require('stream-to-pull-stream')
var BoxStream = require('pull-box-stream')
var crypto = require('crypto')
@@ -24,7 +27,7 @@ function App(sbot, config) {
var conf = config.patchfoo || {}
this.port = conf.port || 8027
- this.host = conf.host || '::1'
+ this.host = conf.host || 'localhost'
var base = conf.base || '/'
this.opts = {
@@ -32,6 +35,7 @@ function App(sbot, config) {
blob_base: conf.blob_base || conf.img_base || base,
img_base: conf.img_base || base,
emoji_base: conf.emoji_base || (base + 'emoji/'),
+ encode_msgids: conf.encode_msgids == null ? true : Boolean(conf.encode_msgids),
}
sbot.get = memo({cache: lru(100)}, sbot.get)
@@ -41,20 +45,29 @@ function App(sbot, config) {
this._getAbout.bind(this))
this.unboxContent = memo({cache: lru(100)}, sbot.private.unbox)
this.reverseNameCache = lru(500)
+ this.reverseEmojiNameCache = lru(500)
+ this.getBlobSize = memo({cache: this.blobSizeCache = lru(100)},
+ sbot.blobs.size.bind(sbot.blobs))
this.unboxMsg = this.unboxMsg.bind(this)
this.render = new Render(this, this.opts)
+ this.git = new Git(this)
+
+ this.monitorBlobWants()
}
App.prototype.go = function () {
var self = this
- http.createServer(function (req, res) {
+ var server = http.createServer(function (req, res) {
new Serve(self, req, res).go()
- }).listen(self.port, self.host, function () {
+ })
+ if (self.host === 'localhost') server.listen(self.port, onListening)
+ else server.listen(self.port, self.host, onListening)
+ function onListening() {
var host = /:/.test(self.host) ? '[' + self.host + ']' : self.host
self.log('Listening on http://' + host + ':' + self.port)
- })
+ }
// invalidate cached About info when new About messages come in
pull(
@@ -62,9 +75,12 @@ App.prototype.go = function () {
pull.drain(function (link) {
self.aboutCache.remove(link.dest)
}, function (err) {
- if (err) self.error('about:', err)
+ if (err) throw err
})
)
+
+ // keep alive ssb client connection
+ setInterval(self.sbot.whoami, 10e3)
}
var logPrefix = '[' + pkg.name + ']'
@@ -172,14 +188,27 @@ App.prototype.publish = function (content, cb) {
tryPublish(2)
}
+App.prototype.wantSizeBlob = function (id, cb) {
+ // only want() the blob if we don't already have it
+ var self = this
+ var blobs = this.sbot.blobs
+ blobs.size(id, function (err, size) {
+ if (size != null) return cb(null, size)
+ self.blobWants[id] = true
+ blobs.want(id, function (err) {
+ if (err) return cb(err)
+ blobs.size(id, cb)
+ })
+ })
+}
+
App.prototype.addBlobRaw = function (cb) {
var done = multicb({pluck: 1, spread: true})
var sink = pull(
- hasher(done()),
u.pullLength(done()),
this.sbot.blobs.add(done())
)
- done(function (err, hash, size, _) {
+ done(function (err, size, hash) {
if (err) return cb(err)
cb(null, {link: hash, size: size})
})
@@ -228,11 +257,55 @@ App.prototype.pushBlob = function (id, cb) {
this.sbot.blobs.push(id, cb)
}
+App.prototype.readBlob = function (link) {
+ link = u.toLink(link)
+ return this.sbot.blobs.get({
+ hash: link.link,
+ size: link.size,
+ })
+}
+
+App.prototype.readBlobSlice = function (link, opts) {
+ if (this.sbot.blobs.getSlice) return this.sbot.blobs.getSlice({
+ hash: link.link,
+ size: link.size,
+ start: opts.start,
+ end: opts.end,
+ })
+ return pull(
+ this.readBlob(link),
+ u.pullSlice(opts.start, opts.end)
+ )
+}
+
+App.prototype.ensureHasBlobs = function (links, cb) {
+ var self = this
+ var done = multicb({pluck: 1})
+ links.forEach(function (link) {
+ var cb = done()
+ self.sbot.blobs.size(link.link, function (err, size) {
+ if (err) cb(err)
+ else if (size == null) cb(null, link)
+ else cb()
+ })
+ })
+ done(function (err, missingLinks) {
+ if (err) console.trace(err)
+ missingLinks = missingLinks.filter(Boolean)
+ if (missingLinks.length == 0) return cb()
+ return cb({name: 'BlobNotFoundError', links: missingLinks})
+ })
+}
+
App.prototype.getReverseNameSync = function (name) {
var id = this.reverseNameCache.get(name)
return id
}
+App.prototype.getReverseEmojiNameSync = function (name) {
+ return this.reverseEmojiNameCache.get(name)
+}
+
App.prototype.getNameSync = function (name) {
var about = this.aboutCache.get(name)
return about && about.name
@@ -267,10 +340,22 @@ App.prototype.pullGetMsg = function (id) {
App.prototype.createLogStream = function (opts) {
opts = opts || {}
return opts.sortByTimestamp
- ? this.sbot.createFeedStream(opts)
+ ? this.createFeedStream(opts)
: this.sbot.createLogStream(opts)
}
+App.prototype.createFeedStream = function (opts) {
+ // work around opts.gt being treated as opts.gte sometimes
+ if (opts.gt && opts.limit && !opts.reverse) return pull(
+ this.sbot.createFeedStream(u.mergeOpts(opts, {limit: opts.limit + 1})),
+ pull.filter(function (msg) {
+ return msg && msg.value.timestamp !== opts.gt
+ }),
+ opts.limit && pull.take(opts.limit)
+ )
+ return this.sbot.createFeedStream(opts)
+}
+
var stateVals = {
connected: 3,
connecting: 2,
@@ -372,6 +457,217 @@ App.prototype.createContactStreams = function (id) {
return new Contacts(this.sbot).createContactStreams(id)
}
+function compareVoted(a, b) {
+ return b.value - a.value
+}
+
+App.prototype.getVoted = function (_opts, cb) {
+ if (isNaN(_opts.limit)) return pull.error(new Error('missing limit'))
+ var self = this
+ var opts = {
+ type: 'vote',
+ limit: _opts.limit * 100,
+ reverse: !!_opts.reverse,
+ gt: _opts.gt || undefined,
+ lt: _opts.lt || undefined,
+ }
+
+ var votedObj = {}
+ var votedArray = []
+ var numItems = 0
+ var firstTimestamp, lastTimestamp
+ pull(
+ self.sbot.messagesByType(opts),
+ self.unboxMessages(),
+ pull.take(function () {
+ return numItems < _opts.limit
+ }),
+ pull.drain(function (msg) {
+ if (!firstTimestamp) firstTimestamp = msg.timestamp
+ lastTimestamp = msg.timestamp
+ var vote = msg.value.content.vote
+ if (!vote) return
+ var target = u.linkDest(vote)
+ var votes = votedObj[target]
+ if (!votes) {
+ numItems++
+ votes = {id: target, value: 0, feedsObj: {}, feeds: []}
+ votedObj[target] = votes
+ votedArray.push(votes)
+ }
+ if (msg.value.author in votes.feedsObj) {
+ if (!opts.reverse) return // leave latest vote value as-is
+ // remove old vote value
+ votes.value -= votes.feedsObj[msg.value.author]
+ } else {
+ votes.feeds.push(msg.value.author)
+ }
+ var value = vote.value > 0 ? 1 : vote.value < 0 ? -1 : 0
+ votes.feedsObj[msg.value.author] = value
+ votes.value += value
+ }, function (err) {
+ if (err && err !== true) return cb(err)
+ var items = votedArray
+ if (opts.reverse) items.reverse()
+ items.sort(compareVoted)
+ cb(null, {items: items,
+ firstTimestamp: firstTimestamp,
+ lastTimestamp: lastTimestamp})
+ })
+ )
+}
+
App.prototype.createAboutStreams = function (id) {
return this.about.createAboutStreams(id)
}
+
+App.prototype.streamEmojis = function () {
+ return pull(
+ cat([
+ this.sbot.links({
+ rel: 'mentions',
+ source: this.sbot.id,
+ dest: '&',
+ values: true
+ }),
+ this.sbot.links({rel: 'mentions', dest: '&', values: true})
+ ]),
+ this.unboxMessages(),
+ pull.map(function (msg) { return msg.value.content.mentions }),
+ pull.flatten(),
+ pull.filter('emoji'),
+ pull.unique('link')
+ )
+}
+
+App.prototype.filter = function (plugin, opts, filter) {
+ // work around flumeview-query not picking the best index.
+ // %b+QdyLFQ21UGYwvV3AiD8FEr7mKlB8w9xx3h8WzSUb0=.sha256
+ var index
+ if (plugin === this.sbot.backlinks) {
+ var c = filter && filter.value && filter.value.content
+ var filteringByType = c && c.type
+ if (!filteringByType) index = 'DTS'
+ }
+ // work around flumeview-query not supporting $lt/$gt.
+ // %FCIv0D7JQyERznC18p8Dc1KtN6SLeJAl1sR5DAIr/Ek=.sha256
+ return pull(
+ plugin.read({
+ index: index,
+ reverse: opts.reverse,
+ limit: opts.limit && (opts.limit + 1),
+ query: [{$filter: u.mergeOpts(filter, {
+ timestamp: {
+ $gte: opts.gt,
+ $lte: opts.lt,
+ }
+ })}]
+ }),
+ pull.filter(function (msg) {
+ return msg && msg.timestamp !== opts.lt && msg.timestamp !== opts.gt
+ }),
+ opts.limit && pull.take(opts.limit)
+ )
+}
+
+App.prototype.streamChannel = function (opts) {
+ // prefer ssb-backlinks to ssb-query because it also handles hashtag mentions
+ if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
+ dest: '#' + opts.channel,
+ })
+
+ if (this.sbot.query) return this.filter(this.sbot.query, opts, {
+ value: {content: {channel: opts.channel}},
+ })
+
+ return pull.error(new Error(
+ 'Viewing channels/tags requires the ssb-backlinks or ssb-query plugin'))
+}
+
+App.prototype.streamMentions = function (opts) {
+ if (!this.sbot.backlinks) return pull.error(new Error(
+ 'Viewing mentions requires the ssb-backlinks plugin'))
+
+ if (this.sbot.backlinks) return this.filter(this.sbot.backlinks, opts, {
+ dest: this.sbot.id,
+ })
+}
+
+App.prototype.streamPrivate = function (opts) {
+ if (this.sbot.private.read) return this.filter(this.sbot.private, opts, {})
+
+ return pull(
+ this.createLogStream(u.mergeOpts(opts, {limit: null})),
+ pull.filter(u.isMsgEncrypted),
+ this.unboxMessages(),
+ pull.filter(u.isMsgReadable),
+ pull.take(opts.limit)
+ )
+}
+
+App.prototype.blobMentions = function (opts) {
+ if (!this.sbot.links2) return pull.error(new Error(
+ 'missing ssb-links plugin'))
+ var filter = {rel: ['mentions', opts.name]}
+ if (opts.author) filter.source = opts.author
+ return this.sbot.links2.read({
+ query: [
+ {$filter: filter},
+ {$filter: {dest: {$prefix: '&'}}},
+ {$map: {
+ name: ['rel', 1],
+ size: ['rel', 2],
+ link: 'dest',
+ author: 'source',
+ time: 'ts'
+ }}
+ ]
+ })
+}
+
+App.prototype.monitorBlobWants = function () {
+ var self = this
+ self.blobWants = {}
+ pull(
+ this.sbot.blobs.createWants(),
+ pull.drain(function (wants) {
+ for (var id in wants) {
+ if (wants[id] < 0) self.blobWants[id] = true
+ else delete self.blobWants[id]
+ self.blobSizeCache.remove(id)
+ }
+ }, function (err) {
+ if (err) console.trace(err)
+ })
+ )
+}
+
+App.prototype.getBlobState = function (id, cb) {
+ var self = this
+ if (self.blobWants[id]) return cb(null, 'wanted')
+ self.getBlobSize(id, function (err, size) {
+ if (err) return cb(err)
+ cb(null, size != null)
+ })
+}
+
+App.prototype.getNpmReadme = function (tarballId, cb) {
+ var self = this
+ // TODO: make this portable, and handle plaintext readmes
+ var tar = proc.spawn('tar', ['--ignore-case', '-Oxz',
+ 'package/README.md', 'package/readme.markdown', 'package/readme.mkd'])
+ var done = multicb({pluck: 1, spread: true})
+ pull(
+ self.sbot.blobs.get(tarballId),
+ toPull.sink(tar.stdin, done())
+ )
+ pull(
+ toPull.source(tar.stdout),
+ pull.collect(done())
+ )
+ done(function (err, _, bufs) {
+ if (err) return cb(err)
+ var text = Buffer.concat(bufs).toString('utf8')
+ cb(null, text, true)
+ })
+}