diff --git a/lib/common.js b/lib/common.js index ccf1bd4..cb83d5b 100644 --- a/lib/common.js +++ b/lib/common.js @@ -7,6 +7,12 @@ var querystring = require('querystring') exports.CONNECTION_ID = Buffer.concat([ toUInt32(0x417), toUInt32(0x27101980) ]) exports.ACTIONS = { CONNECT: 0, ANNOUNCE: 1, SCRAPE: 2, ERROR: 3 } exports.EVENTS = { update: 0, completed: 1, started: 2, stopped: 3 } +exports.EVENT_IDS = { + 0: 'update', + 1: 'completed', + 2: 'started', + 3: 'stopped' +}; function toUInt32 (n) { var buf = new Buffer(4) diff --git a/server.js b/server.js index 26b12e0..ec8790c 100644 --- a/server.js +++ b/server.js @@ -296,75 +296,40 @@ Server.prototype._onHttpRequest = function (req, res) { Server.prototype._onUdpRequest = function (msg, rinfo) { var self = this - if (msg.length < 16) { - return error('received packet is too short') + var params + try { + params = parseUdpRequest(msg, rinfo) + } catch (err) { + console.error(err.stack) + return error(err.message) } - - if (rinfo.family !== 'IPv4') { - return error('udp tracker does not support IPv6') - } - - var connectionId = msg.slice(0, 8) // 64-bit - var action = msg.readUInt32BE(8) - var transactionId = msg.readUInt32BE(12) - - if (!bufferEqual(connectionId, common.CONNECTION_ID)) { - return error('received packet with invalid connection id') - } - + var socket = dgram.createSocket('udp4') - var infoHash, swarm - if (action === common.ACTIONS.CONNECT) { + var swarm + if (params && params.request === 'connect') { send(Buffer.concat([ common.toUInt32(common.ACTIONS.CONNECT), - common.toUInt32(transactionId), - connectionId + common.toUInt32(params.transactionId), + params.connectionId ])) - } else if (action === common.ACTIONS.ANNOUNCE) { - infoHash = msg.slice(16, 36).toString('binary') // 20 bytes - var peerId = msg.slice(36, 56).toString('utf8') // 20 bytes - var downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? - var left = fromUInt64(msg.slice(64, 72)) - var uploaded = fromUInt64(msg.slice(72, 80)) // TODO: track this? - var event = msg.readUInt32BE(80) - var ip = msg.readUInt32BE(84) // optional - var key = msg.readUInt32BE(88) // TODO: what is this for? - var numwant = msg.readUInt32BE(92) // optional - var port = msg.readUInt16BE(96) // optional - - if (ip) { - ip = ipLib.toString(ip) - } else { - ip = rinfo.address - } - - if (!port) { - port = rinfo.port - } - - var addr = ip + ':' + port - - swarm = self._getSwarm(infoHash) - var peer = swarm.peers[addr] - - // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than - // 512 bytes which is not safe - numwant = Math.min(numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) + } else if (params && params.request === 'announce') { + swarm = self._getSwarm(params.info_hash) + var peer = swarm.peers[params.addr] var start = function () { if (peer) { debug('unexpected `started` event from peer that is already in swarm') return update() // treat as an update } - if (left === 0) swarm.complete += 1 + if (params.left === 0) swarm.complete += 1 else swarm.incomplete += 1 - peer = swarm.peers[addr] = { - ip: ip, - port: port, - peerId: peerId + peer = swarm.peers[params.addr] = { + ip: params.ip, + port: params.port, + peerId: params.peer_id } - self.emit('start', addr) + self.emit('start', params.addr) } var stop = function () { @@ -374,8 +339,8 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { } if (peer.complete) swarm.complete -= 1 else swarm.incomplete -= 1 - swarm.peers[addr] = null - self.emit('stop', addr) + swarm.peers[params.addr] = null + self.emit('stop', params.addr) } var complete = function () { @@ -390,7 +355,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { swarm.complete += 1 swarm.incomplete -= 1 peer.complete = true - self.emit('complete', addr) + self.emit('complete', params.addr) } var update = function () { @@ -398,53 +363,46 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { debug('unexpected `update` event from peer that is not in swarm') return start() // treat as a start } - self.emit('update', addr) + self.emit('update', params.addr) } - switch (event) { - case common.EVENTS.started: + switch (params.event) { + case 'started': start() break - case common.EVENTS.stopped: + case 'stopped': stop() break - case common.EVENTS.completed: + case 'completed': complete() break - case common.EVENTS.update: // update + case 'update': update() break default: return error('invalid event') // early return } - if (left === 0 && peer) peer.complete = true + if (params.left === 0 && peer) peer.complete = true // send peers - var peers = self._getPeersCompact(swarm, numwant) + var peers = self._getPeersCompact(swarm, params.numwant) send(Buffer.concat([ common.toUInt32(common.ACTIONS.ANNOUNCE), - common.toUInt32(transactionId), + common.toUInt32(params.transactionId), common.toUInt32(self._intervalMs), common.toUInt32(swarm.incomplete), common.toUInt32(swarm.complete), peers ])) - } else if (action === common.ACTIONS.SCRAPE) { // scrape message - infoHash = msg.slice(16, 36).toString('binary') // 20 bytes - - // TODO: support multiple info_hash scrape - if (msg.length > 36) { - error('multiple info_hash scrape not supported') - } - - swarm = self._getSwarm(infoHash) + } else if (params && params.request === 'scrape') { // scrape message + swarm = self._getSwarm(params.info_hash) send(Buffer.concat([ common.toUInt32(common.ACTIONS.SCRAPE), - common.toUInt32(transactionId), + common.toUInt32(params.transactionId), common.toUInt32(swarm.complete), common.toUInt32(swarm.complete), // TODO: this only provides a lower-bound common.toUInt32(swarm.incomplete) @@ -464,7 +422,7 @@ Server.prototype._onUdpRequest = function (msg, rinfo) { debug('sent error %s', message) send(Buffer.concat([ common.toUInt32(common.ACTIONS.ERROR), - common.toUInt32(transactionId || 0), + common.toUInt32(params.transactionId || 0), new Buffer(message, 'utf8') ])) self.emit('warning', new Error(message)) @@ -553,6 +511,65 @@ function parseHttpRequest (req, options) { } } +function parseUdpRequest (msg, rinfo) { + if (msg.length < 16) { + throw new Error('received packet is too short') + } + + if (rinfo.family !== 'IPv4') { + throw new Error('udp tracker does not support IPv6') + } + + var params = { + connectionId: msg.slice(0, 8), // 64-bit + action: msg.readUInt32BE(8), + transactionId: msg.readUInt32BE(12) + } + + // TODO: randomize: + if (!bufferEqual(params.connectionId, common.CONNECTION_ID)) { + throw new Error('received packet with invalid connection id') + } + + if (params.action === common.ACTIONS.CONNECT) { + params.request = 'connect' + } else if (params.action === common.ACTIONS.ANNOUNCE) { + params.request = 'announce' + params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes + params.peer_id = msg.slice(36, 56).toString('utf8') // 20 bytes + params.downloaded = fromUInt64(msg.slice(56, 64)) // TODO: track this? + params.left = fromUInt64(msg.slice(64, 72)) + params.uploaded = fromUInt64(msg.slice(72, 80)) // TODO: track this? + params.event = msg.readUInt32BE(80) + params.event = common.EVENT_IDS[params.event] + if (!params.event) throw new Error('invalid event') // early return + params.ip = msg.readUInt32BE(84) // optional + params.ip = params.ip ? + ipLib.toString(params.ip) : + params.ip = rinfo.address + params.key = msg.readUInt32BE(88) // TODO: what is this for? + params.numwant = msg.readUInt32BE(92) // optional + // never send more than MAX_ANNOUNCE_PEERS or else the UDP packet will get bigger than + // 512 bytes which is not safe + params.numwant = Math.min(params.numwant || NUM_ANNOUNCE_PEERS, MAX_ANNOUNCE_PEERS) + params.port = msg.readUInt16BE(96) || rinfo.port // optional + params.addr = params.ip + ':' + params.port // TODO: ipv6 brackets + + } else if (params.action === common.ACTIONS.SCRAPE) { // scrape message + params.request = 'scrape' + params.info_hash = msg.slice(16, 36).toString('binary') // 20 bytes + + // TODO: support multiple info_hash scrape + if (msg.length > 36) { + throw new Error('multiple info_hash scrape not supported') + } + } else { + return null + } + + return params +} + // HELPER FUNCTIONS