Support async createSwarm() and getSwarm()

Fix #95.

Make server.getSwarm() and server.createSwarm() into async functions
that take a callback.
This commit is contained in:
Feross Aboukhadijeh 2016-01-03 19:50:23 +01:00
parent 9d4c404dc6
commit b5096e91c3
3 changed files with 118 additions and 90 deletions

View File

@ -4,7 +4,7 @@ var debug = require('debug')('bittorrent-tracker')
var randomIterate = require('random-iterate') var randomIterate = require('random-iterate')
// Regard this as the default implementation of an interface that you // Regard this as the default implementation of an interface that you
// need to support when overriding Server.getSwarm() // need to support when overriding Server.createSwarm() and Server.getSwarm()
function Swarm (infoHash, server) { function Swarm (infoHash, server) {
this.peers = {} this.peers = {}
this.complete = 0 this.complete = 0

110
server.js
View File

@ -196,17 +196,23 @@ Server.prototype.close = function (cb) {
else cb(null) else cb(null)
} }
Server.prototype.createSwarm = function (infoHash) { Server.prototype.createSwarm = function (infoHash, cb) {
var self = this var self = this
if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex')
var swarm = self.torrents[infoHash] = new Swarm(infoHash, self)
return swarm process.nextTick(function () {
var swarm = self.torrents[infoHash] = new Swarm(infoHash, self)
cb(null, swarm)
})
} }
Server.prototype.getSwarm = function (infoHash) { Server.prototype.getSwarm = function (infoHash, cb) {
var self = this var self = this
if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex')
return self.torrents[infoHash]
process.nextTick(function () {
cb(null, self.torrents[infoHash])
})
} }
Server.prototype.onHttpRequest = function (req, res, opts) { Server.prototype.onHttpRequest = function (req, res, opts) {
@ -358,26 +364,35 @@ Server.prototype._onWebSocketRequest = function (socket, params) {
if (params.answer) { if (params.answer) {
debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id)
var swarm = self.getSwarm(params.info_hash) self.getSwarm(params.info_hash, function (err, swarm) {
if (!swarm) { if (err) return self.emit('warning', err)
return self.emit('warning', new Error('no swarm with that `info_hash`')) if (!swarm) {
} return self.emit('warning', new Error('no swarm with that `info_hash`'))
var toPeer = swarm.peers[params.to_peer_id] }
if (!toPeer) { var toPeer = swarm.peers[params.to_peer_id]
return self.emit('warning', new Error('no peer with that `to_peer_id`')) if (!toPeer) {
} return self.emit('warning', new Error('no peer with that `to_peer_id`'))
}
toPeer.socket.send(JSON.stringify({ toPeer.socket.send(JSON.stringify({
answer: params.answer, answer: params.answer,
offer_id: params.offer_id, offer_id: params.offer_id,
peer_id: common.hexToBinary(params.peer_id), peer_id: common.hexToBinary(params.peer_id),
info_hash: common.hexToBinary(params.info_hash) info_hash: common.hexToBinary(params.info_hash)
}), toPeer.socket.onSend) }), toPeer.socket.onSend)
debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) debug('sent answer to %s from %s', toPeer.peerId, params.peer_id)
done()
})
} else {
done()
} }
if (params.action === common.ACTIONS.ANNOUNCE) { function done () {
self.emit(common.EVENT_NAMES[params.event], params.peer_id, params) // emit event once the announce is fully "processed"
if (params.action === common.ACTIONS.ANNOUNCE) {
self.emit(common.EVENT_NAMES[params.event], params.peer_id, params)
}
} }
}) })
} }
@ -398,9 +413,14 @@ Server.prototype._onRequest = function (params, cb) {
Server.prototype._onAnnounce = function (params, cb) { Server.prototype._onAnnounce = function (params, cb) {
var self = this var self = this
var swarm = self.getSwarm(params.info_hash) self.getSwarm(params.info_hash, function (err, swarm) {
if (swarm) announce() if (err) return cb(err)
else createSwarm() if (swarm) {
announce(swarm)
} else {
createSwarm()
}
})
function createSwarm () { function createSwarm () {
if (self._filter) { if (self._filter) {
@ -410,17 +430,21 @@ Server.prototype._onAnnounce = function (params, cb) {
} else if (!allowed) { } else if (!allowed) {
cb(new Error('disallowed info_hash')) cb(new Error('disallowed info_hash'))
} else { } else {
swarm = self.createSwarm(params.info_hash) self.createSwarm(params.info_hash, function (err, swarm) {
announce() if (err) return cb(err)
announce(swarm)
})
} }
}) })
} else { } else {
swarm = self.createSwarm(params.info_hash) self.createSwarm(params.info_hash, function (err, swarm) {
announce() if (err) return cb(err)
announce(swarm)
})
} }
} }
function announce () { function announce (swarm) {
if (!params.event || params.event === 'empty') params.event = 'update' if (!params.event || params.event === 'empty') params.event = 'update'
swarm.announce(params, function (err, response) { swarm.announce(params, function (err, response) {
if (err) return cb(err) if (err) return cb(err)
@ -470,19 +494,21 @@ Server.prototype._onScrape = function (params, cb) {
series(params.info_hash.map(function (infoHash) { series(params.info_hash.map(function (infoHash) {
return function (cb) { return function (cb) {
var swarm = self.getSwarm(infoHash) self.getSwarm(infoHash, function (err, swarm) {
if (swarm) { if (err) return cb(err)
swarm.scrape(params, function (err, scrapeInfo) { if (swarm) {
if (err) return cb(err) swarm.scrape(params, function (err, scrapeInfo) {
cb(null, { if (err) return cb(err)
infoHash: infoHash, cb(null, {
complete: (scrapeInfo && scrapeInfo.complete) || 0, infoHash: infoHash,
incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0 complete: (scrapeInfo && scrapeInfo.complete) || 0,
incomplete: (scrapeInfo && scrapeInfo.incomplete) || 0
})
}) })
}) } else {
} else { cb(null, { infoHash: infoHash, complete: 0, incomplete: 0 })
cb(null, { infoHash: infoHash, complete: 0, incomplete: 0 }) }
} })
} }
}), function (err, results) { }), function (err, results) {
if (err) return cb(err) if (err) return cb(err)

View File

@ -8,7 +8,7 @@ var peerId2 = new Buffer('12345678901234567890')
var torrentLength = 50000 var torrentLength = 50000
function serverTest (t, serverType, serverFamily) { function serverTest (t, serverType, serverFamily) {
t.plan(25) t.plan(26)
var opts = serverType === 'http' ? { udp: false, ws: false } : { http: false, ws: false } var opts = serverType === 'http' ? { udp: false, ws: false } : { http: false, ws: false }
var server = new Server(opts) var server = new Server(opts)
@ -49,65 +49,67 @@ function serverTest (t, serverType, serverFamily) {
t.equal(data.complete, 0) t.equal(data.complete, 0)
t.equal(data.incomplete, 1) t.equal(data.incomplete, 1)
var swarm = server.getSwarm(infoHash) server.getSwarm(infoHash, function (err, swarm) {
t.error(err)
t.equal(Object.keys(server.torrents).length, 1) t.equal(Object.keys(server.torrents).length, 1)
t.equal(swarm.complete, 0) t.equal(swarm.complete, 0)
t.equal(swarm.incomplete, 1) t.equal(swarm.incomplete, 1)
t.equal(Object.keys(swarm.peers).length, 1) t.equal(Object.keys(swarm.peers).length, 1)
t.deepEqual(swarm.peers[clientAddr + ':6881'], { t.deepEqual(swarm.peers[clientAddr + ':6881'], {
ip: clientIp, ip: clientIp,
port: 6881, port: 6881,
peerId: peerId.toString('hex'), peerId: peerId.toString('hex'),
complete: false, complete: false,
socket: undefined socket: undefined
}) })
client1.complete() client1.complete()
client1.once('update', function (data) { client1.once('update', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 1)
t.equal(data.incomplete, 0)
client1.scrape()
client1.once('scrape', function (data) {
t.equal(data.announce, announceUrl) t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number') t.equal(data.complete, 1)
t.equal(typeof data.incomplete, 'number') t.equal(data.incomplete, 0)
t.equal(typeof data.downloaded, 'number')
var client2 = new Client(peerId2, 6882, { client1.scrape()
infoHash: infoHash,
length: torrentLength,
announce: [ announceUrl ]
})
client2.start() client1.once('scrape', function (data) {
t.equal(data.announce, announceUrl)
t.equal(typeof data.complete, 'number')
t.equal(typeof data.incomplete, 'number')
t.equal(typeof data.downloaded, 'number')
server.once('start', function () { var client2 = new Client(peerId2, 6882, {
t.pass('got start message from client2') infoHash: infoHash,
}) length: torrentLength,
announce: [ announceUrl ]
})
client2.once('peer', function (addr) { client2.start()
t.ok(addr === clientAddr + ':6881' || addr === clientAddr + ':6882')
client2.stop() server.once('start', function () {
client2.once('update', function (data) { t.pass('got start message from client2')
t.equal(data.announce, announceUrl) })
t.equal(data.complete, 1)
t.equal(data.incomplete, 0)
client2.destroy()
client1.stop() client2.once('peer', function (addr) {
client1.once('update', function (data) { t.ok(addr === clientAddr + ':6881' || addr === clientAddr + ':6882')
client2.stop()
client2.once('update', function (data) {
t.equal(data.announce, announceUrl) t.equal(data.announce, announceUrl)
t.equal(data.complete, 0) t.equal(data.complete, 1)
t.equal(data.incomplete, 0) t.equal(data.incomplete, 0)
client2.destroy()
client1.destroy() client1.stop()
server.close() client1.once('update', function (data) {
t.equal(data.announce, announceUrl)
t.equal(data.complete, 0)
t.equal(data.incomplete, 0)
client1.destroy()
server.close()
})
}) })
}) })
}) })