diff --git a/client.js b/client.js index 49eaad1..42567e6 100644 --- a/client.js +++ b/client.js @@ -14,7 +14,9 @@ var WebSocketTracker = require('./lib/websocket-tracker') inherits(Client, EventEmitter) /** - * A Client manages tracker connections for a torrent. + * BitTorrent tracker client. + * + * Find torrent peers, to help a torrent client participate in a torrent swarm. * * @param {string} peerId peer id * @param {Number} port torrent client listening port diff --git a/lib/parse_http.js b/lib/parse_http.js index 7d06937..6e575f8 100644 --- a/lib/parse_http.js +++ b/lib/parse_http.js @@ -4,18 +4,19 @@ var common = require('./common') var REMOVE_IPV4_MAPPED_IPV6_RE = /^::ffff:/ -function parseHttpRequest (req, options) { - options = options || {} +function parseHttpRequest (req, opts) { + if (!opts) opts = {} var s = req.url.split('?') var params = common.querystringParse(s[1]) - if (options.action === 'announce' || s[0] === '/announce') { + if (opts.action === 'announce' || s[0] === '/announce') { params.action = common.ACTIONS.ANNOUNCE if (typeof params.info_hash !== 'string' || params.info_hash.length !== 20) { throw new Error('invalid info_hash') } params.info_hash = common.binaryToHex(params.info_hash) + if (typeof params.peer_id !== 'string' || params.peer_id.length !== 20) { throw new Error('invalid peer_id') } @@ -24,18 +25,18 @@ function parseHttpRequest (req, options) { params.port = Number(params.port) if (!params.port) throw new Error('invalid port') - params.left = Number(params.left) - params.compact = Number(params.compact) + params.left = Number(params.left) || Infinity + params.compact = Number(params.compact) || 0 params.numwant = Math.min( Number(params.numwant) || common.DEFAULT_ANNOUNCE_PEERS, common.MAX_ANNOUNCE_PEERS ) - params.ip = options.trustProxy + params.ip = opts.trustProxy ? req.headers['x-forwarded-for'] || req.connection.remoteAddress : req.connection.remoteAddress.replace(REMOVE_IPV4_MAPPED_IPV6_RE, '') // force ipv4 params.addr = (common.IPV6_RE.test(params.ip) ? '[' + params.ip + ']' : params.ip) + ':' + params.port - } else if (options.action === 'scrape' || s[0] === '/scrape') { + } else if (opts.action === 'scrape' || s[0] === '/scrape') { params.action = common.ACTIONS.SCRAPE if (typeof params.info_hash === 'string') params.info_hash = [ params.info_hash ] @@ -48,7 +49,7 @@ function parseHttpRequest (req, options) { }) } } else { - throw new Error('Invalid action in HTTP request: ' + params.action) + throw new Error('invalid action in HTTP request: ' + params.action) } return params diff --git a/lib/parse_websocket.js b/lib/parse_websocket.js new file mode 100644 index 0000000..9463e06 --- /dev/null +++ b/lib/parse_websocket.js @@ -0,0 +1,33 @@ +module.exports = parseWebSocketRequest + +var common = require('./common') + +function parseWebSocketRequest (socket, params) { + params = JSON.parse(params) // may throw + + params.action = common.ACTIONS.ANNOUNCE + params.socket = socket + + if (typeof params.info_hash !== 'string' || params.info_hash.length !== 20) { + throw new Error('invalid info_hash') + } + params.info_hash = common.binaryToHex(params.info_hash) + + if (typeof params.peer_id !== 'string' || params.peer_id.length !== 20) { + throw new Error('invalid peer_id') + } + params.peer_id = common.binaryToHex(params.peer_id) + + if (params.answer && + (typeof params.to_peer_id !== 'string' || params.to_peer_id.length !== 20)) { + throw new Error('invalid `to_peer_id` (required with `answer`)') + } + + params.left = Number(params.left) || Infinity + params.numwant = Math.min( + Number(params.offers && params.offers.length) || 0, // no default - explicit only + common.MAX_ANNOUNCE_PEERS + ) + + return params +} diff --git a/lib/swarm.js b/lib/swarm.js index 1affa0e..347b119 100644 --- a/lib/swarm.js +++ b/lib/swarm.js @@ -12,7 +12,7 @@ function Swarm (infoHash, server) { Swarm.prototype.announce = function (params, cb) { var self = this - var peer = self.peers[params.addr] + var peer = self.peers[params.addr || params.peer_id] // Dispatch announce event var fn = '_onAnnounce_' + params.event @@ -24,13 +24,20 @@ Swarm.prototype.announce = function (params, cb) { cb(null, { complete: self.complete, incomplete: self.incomplete, - peers: self._getPeers(params.numwant) + peers: self._getPeers(params.numwant, params.peer_id) }) } else { cb(new Error('invalid event')) } } +Swarm.prototype.scrape = function (params, cb) { + cb(null, { + complete: this.complete, + incomplete: this.incomplete + }) +} + Swarm.prototype._onAnnounce_started = function (params, peer) { if (peer) { debug('unexpected `started` event from peer that is already in swarm') @@ -39,11 +46,12 @@ Swarm.prototype._onAnnounce_started = function (params, peer) { if (params.left === 0) this.complete += 1 else this.incomplete += 1 - peer = this.peers[params.addr] = { - ip: params.ip, - port: params.port, + peer = this.peers[params.addr || params.peer_id] = { + complete: false, + ip: params.ip, // only http+udp peerId: params.peer_id, - complete: false + port: params.port, // only http+udp + socket: params.socket // only websocket } } @@ -55,7 +63,7 @@ Swarm.prototype._onAnnounce_stopped = function (params, peer) { if (peer.complete) this.complete -= 1 else this.incomplete -= 1 - this.peers[params.addr] = null + this.peers[params.addr || params.peer_id] = null } Swarm.prototype._onAnnounce_completed = function (params, peer) { @@ -80,24 +88,16 @@ Swarm.prototype._onAnnounce_update = function (params, peer) { } } -Swarm.prototype._getPeers = function (numwant) { +// TODO: randomize the peers that are given out +Swarm.prototype._getPeers = function (numWant, fromPeerId) { var peers = [] for (var peerId in this.peers) { - if (peers.length >= numwant) break + if (peers.length >= numWant) break + if (peerId === fromPeerId) continue // skip self + var peer = this.peers[peerId] if (!peer) continue // ignore null values - peers.push({ - 'peer id': peer.peerId, - ip: peer.ip, - port: peer.port - }) + peers.push(peer) } return peers } - -Swarm.prototype.scrape = function (params, cb) { - cb(null, { - complete: this.complete, - incomplete: this.incomplete - }) -} diff --git a/package.json b/package.json index 9e9b34b..e4e2666 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "simple-peer": "4.0.4", "simple-websocket": "1.0.4", "string2compact": "^1.1.1", + "ws": "^0.7.1", "xtend": "4.0.0" }, "devDependencies": { diff --git a/server.js b/server.js index f3df23d..6a6f267 100644 --- a/server.js +++ b/server.js @@ -8,34 +8,38 @@ var http = require('http') var inherits = require('inherits') var series = require('run-series') var string2compact = require('string2compact') +var WebSocketServer = require('ws').Server var common = require('./lib/common') var Swarm = require('./lib/swarm') var parseHttpRequest = require('./lib/parse_http') var parseUdpRequest = require('./lib/parse_udp') +var parseWebSocketRequest = require('./lib/parse_websocket') inherits(Server, EventEmitter) /** - * A BitTorrent tracker server. + * BitTorrent tracker server. * - * A "BitTorrent tracker" is an HTTP service which responds to GET requests from - * BitTorrent clients. The requests include metrics from clients that help the tracker - * keep overall statistics about the torrent. The response includes a peer list that - * helps the client participate in the torrent. + * HTTP service which responds to GET requests from torrent clients. Requests include + * metrics from clients that help the tracker keep overall statistics about the torrent. + * Responses include a peer list that helps the client participate in the torrent. * * @param {Object} opts options object * @param {Number} opts.interval tell clients to announce on this interval (ms) * @param {Number} opts.trustProxy trust 'x-forwarded-for' header from reverse proxy * @param {boolean} opts.http start an http server? (default: true) * @param {boolean} opts.udp start a udp server? (default: true) + * @param {boolean} opts.ws start a websocket server? (default: true) * @param {function} opts.filter black/whitelist fn for disallowing/allowing torrents */ function Server (opts) { var self = this if (!(self instanceof Server)) return new Server(opts) EventEmitter.call(self) - opts = opts || {} + if (!opts) opts = {} + + debug('new server %s', JSON.stringify(opts)) self._intervalMs = opts.interval ? opts.interval @@ -47,7 +51,11 @@ function Server (opts) { self.listening = false self.torrents = {} - // default to starting an http server unless the user explictly says no + self.http = null + self.udp = null + self.ws = null + + // start an http tracker unless the user explictly says no if (opts.http !== false) { self.http = http.createServer() self.http.on('request', self.onHttpRequest.bind(self)) @@ -55,7 +63,7 @@ function Server (opts) { self.http.on('listening', onListening) } - // default to starting a udp server unless the user explicitly says no + // start a udp tracker unless the user explicitly says no if (opts.udp !== false) { self.udp = dgram.createSocket('udp4') self.udp.on('message', self.onUdpRequest.bind(self)) @@ -63,11 +71,24 @@ function Server (opts) { self.udp.on('listening', onListening) } - var num = !!self.http + !!self.udp + // start a websocket tracker (for WebTorrent) unless the user explicitly says no + if (opts.ws === true) { + if (!self.http) { + self.http = http.createServer() + self.http.on('error', self._onError.bind(self)) + self.http.on('listening', onListening) + } + self.ws = new WebSocketServer({ server: self.http }) + self.ws.on('error', self._onError.bind(self)) + self.ws.on('connection', self.onWebSocketConnection.bind(self)) + } + + var num = !!(self.http || self.ws) + !!self.udp function onListening () { num -= 1 if (num === 0) { self.listening = true + debug('listening') self.emit('listening') } } @@ -84,10 +105,11 @@ Server.prototype.listen = function (port, onlistening) { onlistening = port port = undefined } - if (self.listening) throw new Error('server already listening') - if (onlistening) self.once('listening', onlistening) - if (!port) port = 0 + if (self.listening) throw new Error('server already listening') + debug('listen %o', port) + + if (onlistening) self.once('listening', onlistening) // ATTENTION: // binding to :: only receives IPv4 connections if the bindv6only @@ -98,40 +120,51 @@ Server.prototype.listen = function (port, onlistening) { Server.prototype.close = function (cb) { var self = this + if (!cb) cb = function () {} + debug('close') + self.listening = false - cb = cb || function () {} + if (self.udp) { - self.udp.close() + try { + self.udp.close() + } catch (err) {} } - if (self.http) { - self.http.close(cb) - } else { - cb(null) + + if (self.ws) { + try { + self.ws.close() + } catch (err) {} } + + if (self.http) self.http.close(cb) + else cb(null) } Server.prototype.getSwarm = function (infoHash, params) { var self = this if (!params) params = {} if (Buffer.isBuffer(infoHash)) infoHash = infoHash.toString('hex') + if (self._filter && !self._filter(infoHash, params)) return null + var swarm = self.torrents[infoHash] - if (!swarm) swarm = self.torrents[infoHash] = new Swarm(infoHash, this) + if (!swarm) swarm = self.torrents[infoHash] = new Swarm(infoHash, self) + return swarm } -Server.prototype.onHttpRequest = function (req, res, options) { +Server.prototype.onHttpRequest = function (req, res, opts) { var self = this - options = options || {} - options.trustProxy = options.trustProxy || self._trustProxy + if (!opts) opts = {} + opts.trustProxy = opts.trustProxy || self._trustProxy var params try { - params = parseHttpRequest(req, options) + params = parseHttpRequest(req, opts) params.httpReq = req params.httpRes = res } catch (err) { - debug('sent error %s', err.message) res.end(bencode.encode({ 'failure reason': err.message })) @@ -139,11 +172,10 @@ Server.prototype.onHttpRequest = function (req, res, options) { // even though it's an error for the client, it's just a warning for the server. // don't crash the server because a client sent bad data :) self.emit('warning', err) - return } - this._onRequest(params, function (err, response) { + self._onRequest(params, function (err, response) { if (err) { self.emit('warning', err) response = { @@ -172,8 +204,7 @@ Server.prototype.onUdpRequest = function (msg, rinfo) { return } - // Handle - this._onRequest(params, function (err, response) { + self._onRequest(params, function (err, response) { if (err) { self.emit('warning', err) response = { @@ -195,13 +226,103 @@ Server.prototype.onUdpRequest = function (msg, rinfo) { }) } +Server.prototype.onWebSocketConnection = function (socket) { + var self = this + socket.peerId = null + socket.infoHashes = [] + socket.onSend = self._onWebSocketSend.bind(self, socket) + socket.on('message', self._onWebSocketRequest.bind(self, socket)) + socket.on('error', self._onWebSocketError.bind(self, socket)) + socket.on('close', self._onWebSocketClose.bind(self, socket)) +} + +Server.prototype._onWebSocketRequest = function (socket, params) { + var self = this + + try { + params = parseWebSocketRequest(socket, params) + } catch (err) { + socket.send(JSON.stringify({ + 'failure reason': err.message, + info_hash: params.info_hash + }), socket.onSend) + + // even though it's an error for the client, it's just a warning for the server. + // don't crash the server because a client sent bad data :) + self.emit('warning', err) + return + } + + if (!socket.peerId) socket.peerId = params.peer_id + + self._onRequest(params, function (err, response) { + if (err) { + self.emit('warning', err) + response = { + 'failure reason': err.message + } + } + if (!self.listening) return + + if (socket.infoHashes.indexOf(params.info_hash) === -1) { + socket.infoHashes.push(params.info_hash) + } + + var peers = response.peers + delete response.peers + + response.interval = self._intervalMs + response.info_hash = params.info_hash + socket.send(JSON.stringify(response), socket.onSend) + + debug('sent response %s to %s', JSON.stringify(response), params.peer_id) + + if (params.numwant) { + debug('got offers %s from %s', JSON.stringify(params.offers), params.peer_id) + debug('got %s peers from swarm %s', peers.length, params.info_hash) + peers.forEach(function (peer, i) { + peer.socket.send(JSON.stringify({ + offer: params.offers[i].offer, + offer_id: params.offers[i].offer_id, + peer_id: params.peer_id, + info_hash: params.info_hash + })) + debug('sent offer to %s from %s', peer.peerId, params.peer_id) + }) + } + + if (params.answer) { + debug('got answer %s from %s', JSON.stringify(params.answer), params.peer_id) + + var swarm = self.getSwarm(params.info_hash, params) + var toPeer = swarm.peers[params.to_peer_id] + if (!toPeer) { + return self.emit('warning', new Error('no peer with that `to_peer_id`')) + } + + toPeer.socket.send(JSON.stringify({ + answer: params.answer, + offer_id: params.offer_id, + peer_id: params.peer_id, + info_hash: params.info_hash + })) + debug('sent answer to %s from %s', toPeer.peerId, params.peer_id) + } + + if (params.action === common.ACTIONS.ANNOUNCE) { + self.emit(common.EVENT_NAMES[params.event], params.addr) + } + }) +} + Server.prototype._onRequest = function (params, cb) { + var self = this if (params && params.action === common.ACTIONS.CONNECT) { cb(null, { action: common.ACTIONS.CONNECT }) } else if (params && params.action === common.ACTIONS.ANNOUNCE) { - this._onAnnounce(params, cb) + self._onAnnounce(params, cb) } else if (params && params.action === common.ACTIONS.SCRAPE) { - this._onScrape(params, cb) + self._onScrape(params, cb) } else { cb(new Error('Invalid action')) } @@ -213,27 +334,37 @@ Server.prototype._onAnnounce = function (params, cb) { if (swarm === null) return cb(new Error('disallowed info_hash')) if (!params.event || params.event === 'empty') params.event = 'update' swarm.announce(params, function (err, response) { - if (response) { - if (!response.action) response.action = common.ACTIONS.ANNOUNCE - if (!response.interval) response.interval = Math.ceil(self._intervalMs / 1000) + if (err) return cb(err) - if (params.compact === 1) { - var peers = response.peers - // Find IPv4 peers - response.peers = string2compact(peers.filter(function (peer) { - return common.IPV4_RE.test(peer.ip) - }).map(function (peer) { - return peer.ip + ':' + peer.port - })) - // Find IPv6 peers - response.peers6 = string2compact(peers.filter(function (peer) { - return common.IPV6_RE.test(peer.ip) - }).map(function (peer) { - return '[' + peer.ip + ']:' + peer.port - })) - } + if (!response.action) response.action = common.ACTIONS.ANNOUNCE + if (!response.interval) response.interval = Math.ceil(self._intervalMs / 1000) + + if (params.compact === 1) { + var peers = response.peers + + // Find IPv4 peers + response.peers = string2compact(peers.filter(function (peer) { + return common.IPV4_RE.test(peer.ip) + }).map(function (peer) { + return peer.ip + ':' + peer.port + })) + // Find IPv6 peers + response.peers6 = string2compact(peers.filter(function (peer) { + return common.IPV6_RE.test(peer.ip) + }).map(function (peer) { + return '[' + peer.ip + ']:' + peer.port + })) + } else if (params.compact === 0) { // IPv6 peers are not separate for non-compact responses - } + response.peers = response.peers.map(function (peer) { + return { + 'peer id': peer.peerId, + ip: peer.ip, + port: peer.port + } + }) + } // else, return full peer objects (used for websocket responses) + cb(err, response) }) } @@ -326,3 +457,32 @@ function makeUdpPacket (params) { } return packet } + +Server.prototype._onWebSocketSend = function (socket, err) { + var self = this + if (err) self._onWebSocketError(socket, err) +} + +Server.prototype._onWebSocketClose = function (socket) { + var self = this + if (!socket.peerId || !socket.infoHashes) return + debug('websocket close') + + socket.infoHashes.forEach(function (infoHash) { + var swarm = self.torrents[infoHash] + if (swarm) { + swarm.announce({ + event: 'stopped', + numwant: 0, + peer_id: socket.peerId + }, function () {}) + } + }) +} + +Server.prototype._onWebSocketError = function (socket, err) { + var self = this + debug('websocket error %s', err.message || err) + self.emit('warning', err) + self._onWebSocketClose(socket) +} diff --git a/test/server.js b/test/server.js index f2aae5e..1e50852 100644 --- a/test/server.js +++ b/test/server.js @@ -57,7 +57,8 @@ function serverTest (t, serverType, serverFamily) { ip: clientIp, port: 6881, peerId: peerId.toString('hex'), - complete: false + complete: false, + socket: undefined }) client.complete()