diff --git a/lib/swarm.js b/lib/swarm.js new file mode 100644 index 0000000..2413139 --- /dev/null +++ b/lib/swarm.js @@ -0,0 +1,133 @@ +var debug = require('debug')('bittorrent-tracker') +var string2compact = require('string2compact') + +module.exports = Swarm + +// Regard this as the default implementation of an interface that you +// need to support when overriding Server.getSwarm() +function Swarm (infoHash, server) { + this.peers = {} + this.complete = 0 + this.incomplete = 0 + this.emit = server.emit.bind(server) +} + +Swarm.prototype.announce = function (params, cb) { + var self = this + var peer = self.peers[params.addr] + + var start = function () { + if (peer) { + debug('unexpected `started` event from peer that is already in swarm') + return update() // treat as an update + } + if (params.left === 0) self.complete += 1 + else self.incomplete += 1 + peer = self.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id + } + self.emit('start', params.addr) + } + + var stop = function () { + if (!peer) { + debug('unexpected `stopped` event from peer that is not in swarm') + return // do nothing + } + if (peer.complete) self.complete -= 1 + else self.incomplete -= 1 + self.peers[params.addr] = null + self.emit('stop', params.addr) + } + + var complete = function () { + if (!peer) { + debug('unexpected `completed` event from peer that is not in swarm') + return start() // treat as a start + } + if (peer.complete) { + debug('unexpected `completed` event from peer that is already marked as completed') + return // do nothing + } + self.complete += 1 + self.incomplete -= 1 + peer.complete = true + self.emit('complete', params.addr) + } + + var update = function () { + if (!peer) { + debug('unexpected `update` event from peer that is not in swarm') + return start() // treat as a start + } + self.emit('update', params.addr) + } + + switch (params.event) { + case 'started': + start() + break + case 'stopped': + stop() + break + case 'completed': + complete() + break + case '': case undefined: case 'empty': case 'update': // update + update() + break + default: + return cb(new Error('invalid event')) // early return + } + + if (params.left === 0 && peer) peer.complete = true + + // send peers + var peers = params.compact === 1 + ? self._getPeersCompact(params.numwant) + : self._getPeers(params.numwant) + + cb(null, { + complete: this.complete, + incomplete: this.incomplete, + peers: peers + }) +} + +Swarm.prototype._getPeers = function (numwant) { + var peers = [] + for (var peerId in this.peers) { + if (peers.length >= numwant) break + var peer = this.peers[peerId] + if (!peer) continue // ignore null values + peers.push({ + 'peer id': peer.peerId, + ip: peer.ip, + port: peer.port + }) + } + return peers +} + +Swarm.prototype._getPeersCompact = function (numwant) { + var peers = [] + + for (var peerId in this.peers) { + if (peers.length >= numwant) break + var peer = this.peers[peerId] + if (!peer) continue // ignore null values + peers.push(peer.ip + ':' + peer.port) + } + + return string2compact(peers) +} + + +Swarm.prototype.scrape = function (infoHash, params, cb) { + cb(null, { + complete: this.complete, + incomplete: this.incomplete + }) +} diff --git a/package.json b/package.json index a83fc5c..e002d8a 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "ip": "^0.3.0", "once": "^1.3.0", "portfinder": "^0.2.1", + "run-series": "^1.0.2", "string2compact": "^1.1.1" }, "devDependencies": { diff --git a/server.js b/server.js index d84de01..241e12c 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,6 @@ module.exports = Server var bencode = require('bencode') -var common = require('./lib/common') var debug = require('debug')('bittorrent-tracker') var dgram = require('dgram') var EventEmitter = require('events').EventEmitter @@ -9,8 +8,10 @@ var http = require('http') var inherits = require('inherits') var ipLib = require('ip') var portfinder = require('portfinder') -var string2compact = require('string2compact') +var series = require('run-series') +var common = require('./lib/common') +var Swarm = require('./lib/swarm') var parseHttpRequest = require('./lib/parse_http') var parseUdpRequest = require('./lib/parse_udp') @@ -119,11 +120,7 @@ Server.prototype.getSwarm = function (binaryInfoHash) { if (Buffer.isBuffer(binaryInfoHash)) binaryInfoHash = binaryInfoHash.toString('binary') var swarm = self.torrents[binaryInfoHash] if (!swarm) { - swarm = self.torrents[binaryInfoHash] = { - complete: 0, - incomplete: 0, - peers: {} - } + swarm = self.torrents[binaryInfoHash] = new Swarm(binaryInfoHash, this) } return swarm } @@ -214,89 +211,13 @@ Server.prototype._onRequest = function (params, cb) { Server.prototype._onAnnounce = function (params, cb) { var self = this - var swarm = self.getSwarm(params.info_hash) - var peer = swarm.peers[params.addr] - - var start = function () { - if (peer) { - debug('unexpected `started` event from peer that is already in swarm') - return update() // treat as an update + swarm.announce(params, function (err, response) { + if (response) { + if (!response.action) response.action = common.ACTIONS.ANNOUNCE + if (!response.intervalMs) response.intervalMs = self._intervalMs } - if (params.left === 0) swarm.complete += 1 - else swarm.incomplete += 1 - peer = swarm.peers[params.addr] = { - ip: params.ip, - port: params.port, - peerId: params.peer_id - } - self.emit('start', params.addr) - } - - var stop = function () { - if (!peer) { - debug('unexpected `stopped` event from peer that is not in swarm') - return // do nothing - } - if (peer.complete) swarm.complete -= 1 - else swarm.incomplete -= 1 - swarm.peers[params.addr] = null - self.emit('stop', params.addr) - } - - var complete = function () { - if (!peer) { - debug('unexpected `completed` event from peer that is not in swarm') - return start() // treat as a start - } - if (peer.complete) { - debug('unexpected `completed` event from peer that is already marked as completed') - return // do nothing - } - swarm.complete += 1 - swarm.incomplete -= 1 - peer.complete = true - self.emit('complete', params.addr) - } - - var update = function () { - if (!peer) { - debug('unexpected `update` event from peer that is not in swarm') - return start() // treat as a start - } - self.emit('update', params.addr) - } - - switch (params.event) { - case 'started': - start() - break - case 'stopped': - stop() - break - case 'completed': - complete() - break - case '': case undefined: case 'empty': case 'update': // update - update() - break - default: - return cb(new Error('invalid event')) // early return - } - - if (params.left === 0 && peer) peer.complete = true - - // send peers - var peers = params.compact === 1 - ? self._getPeersCompact(swarm, params.numwant) - : self._getPeers(swarm, params.numwant) - - cb(null, { - action: common.ACTIONS.ANNOUNCE, - complete: swarm.complete, - incomplete: swarm.incomplete, - peers: peers, - intervalMs: self._intervalMs + cb(err, response) }) } @@ -324,46 +245,31 @@ Server.prototype._onScrape = function (params, cb) { min_request_interval: self._intervalMs } } - - params.info_hash.some(function (infoHash) { + + series(params.info_hash.map(function (infoHash) { var swarm = self.getSwarm(infoHash) - - response.files[infoHash] = { - complete: swarm.complete, - incomplete: swarm.incomplete, - downloaded: swarm.complete // TODO: this only provides a lower-bound + return function (cb) { + swarm.scrape(infoHash, params, function (err, scrapeInfo) { + cb(err, scrapeInfo && { + infoHash: infoHash, + complete: scrapeInfo.complete || 0, + incomplete: scrapeInfo.incomplete || 0 + }) + }) } - }) + }), function (err, results) { + if (err) return cb(err) - cb(null, response) -} - -Server.prototype._getPeers = function (swarm, numwant) { - var peers = [] - for (var peerId in swarm.peers) { - if (peers.length >= numwant) break - var peer = swarm.peers[peerId] - if (!peer) continue // ignore null values - peers.push({ - 'peer id': peer.peerId, - ip: peer.ip, - port: peer.port + results.forEach(function (result) { + response.files[result.infoHash] = { + complete: result.complete, + incomplete: result.incomplete, + downloaded: result.complete // TODO: this only provides a lower-bound + } }) - } - return peers -} -Server.prototype._getPeersCompact = function (swarm, numwant) { - var peers = [] - - for (var peerId in swarm.peers) { - if (peers.length >= numwant) break - var peer = swarm.peers[peerId] - if (!peer) continue // ignore null values - peers.push(peer.ip + ':' + peer.port) - } - - return string2compact(peers) + cb(null, response) + }) }