diff --git a/lib/client/http-tracker.js b/lib/client/http-tracker.js index 570a910..0398773 100644 --- a/lib/client/http-tracker.js +++ b/lib/client/http-tracker.js @@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) { // Determine scrape url (if http tracker supports it) self.scrapeUrl = null - var m - if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) { - self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' + - self.announceUrl.slice(m.index + 9) + + var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT) + if (match) { + var pre = self.announceUrl.slice(0, match.index) + var post = self.announceUrl.slice(match.index + 9) + self.scrapeUrl = pre + '/scrape' + post } + + self.cleanupFns = [] + self.maybeDestroyCleanup = null } HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } HTTPTracker.prototype._request = function (requestUrl, params, cb) { var self = this var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + common.querystringStringify(params) - var opts = { + + self.cleanupFns.push(cleanup) + + var request = get.concat({ url: u, + timeout: common.REQUEST_TIMEOUT, headers: { 'user-agent': self.client._userAgent || '' } + }, onResponse) + + function cleanup () { + if (request) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + request.abort() + request = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() } - get.concat(opts, function (err, res, data) { + function onResponse (err, res, data) { + cleanup() if (self.destroyed) return + if (err) return self.client.emit('warning', err) if (res.statusCode !== 200) { return self.client.emit('warning', new Error('Non-200 response code ' + @@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) { debug('response from ' + requestUrl) cb(data) - }) + } } HTTPTracker.prototype._onAnnounceResponse = function (data) { diff --git a/lib/client/udp-tracker.js b/lib/client/udp-tracker.js index b9ccbb2..ae6e06c 100644 --- a/lib/client/udp-tracker.js +++ b/lib/client/udp-tracker.js @@ -12,8 +12,6 @@ var url = require('url') var common = require('../common') var Tracker = require('./tracker') -var TIMEOUT = 15000 - inherits(UDPTracker, Tracker) /** @@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) { debug('new udp tracker %s', announceUrl) self.cleanupFns = [] + self.maybeDestroyCleanup = null } UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes @@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) { self.destroyed = true clearInterval(self.interval) - self.cleanupFns.slice(0).forEach(function (cleanup) { - cleanup() - }) - self.cleanupFns = [] - cb(null) + // If there are no pending requests, destroy immediately. + if (self.cleanupFns.length === 0) return destroyCleanup() + + // Otherwise, wait a short time for pending requests to complete, then force + // destroy them. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if all pending requests complete before the timeout fires, do cleanup + // right away. + self.maybeDestroyCleanup = function () { + if (self.cleanupFns.length === 0) destroyCleanup() + } + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + self.maybeDestroyCleanup = null + self.cleanupFns.slice(0).forEach(function (cleanup) { + cleanup() + }) + self.cleanupFns = [] + cb(null) + } } UDPTracker.prototype._request = function (opts) { @@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) { var transactionId = genTransactionId() var socket = dgram.createSocket('udp4') - var cleanup = function () { - if (!socket) return - self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) - if (timeout) { - clearTimeout(timeout) - timeout = null - } - socket.removeListener('error', onError) - socket.removeListener('message', onSocketMessage) - socket.on('error', noop) // ignore all future errors - try { socket.close() } catch (err) {} - socket = null - } - self.cleanupFns.push(cleanup) - - // does not matter if `stopped` event arrives, so supress errors & cleanup after timeout - var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT var timeout = setTimeout(function () { - timeout = null + // does not matter if `stopped` event arrives, so supress errors if (opts.event === 'stopped') cleanup() else onError(new Error('tracker request timed out (' + opts.event + ')')) - }, ms) + timeout = null + }, common.REQUEST_TIMEOUT) if (timeout.unref) timeout.unref() + self.cleanupFns.push(cleanup) + send(Buffer.concat([ common.CONNECTION_ID, common.toUInt32(common.ACTIONS.CONNECT), transactionId ])) - socket.on('error', onError) + socket.once('error', onError) socket.on('message', onSocketMessage) - function onSocketMessage (msg) { + function cleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null + } + if (socket) { + self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1) + socket.removeListener('error', onError) + socket.removeListener('message', onSocketMessage) + socket.on('error', noop) // ignore all future errors + try { socket.close() } catch (err) {} + socket = null + } + if (self.maybeDestroyCleanup) self.maybeDestroyCleanup() + } + + function onError (err) { + cleanup() if (self.destroyed) return + + if (err.message) err.message += ' (' + self.announceUrl + ')' + // errors will often happen if a tracker is offline, so don't treat it as fatal + self.client.emit('warning', err) + } + + function onSocketMessage (msg) { if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { return onError(new Error('tracker sent invalid transaction id')) } @@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) { debug('UDP response %s, action %s', self.announceUrl, action) switch (action) { case 0: // handshake + // Note: no check for `self.destroyed` so that pending messages to the + // tracker can still be sent/received even after destroy() is called + if (msg.length < 16) return onError(new Error('invalid udp handshake')) if (opts._scrape) scrape(msg.slice(8, 16)) else announce(msg.slice(8, 16), opts) - return + break case 1: // announce cleanup() + if (self.destroyed) return + if (msg.length < 20) return onError(new Error('invalid announce message')) var interval = msg.readUInt32BE(8) @@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) { addrs.forEach(function (addr) { self.client.emit('peer', addr) }) + break case 2: // scrape cleanup() + if (self.destroyed) return + if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { return onError(new Error('invalid scrape message')) } @@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) { incomplete: msg.readUInt32BE(16 + (i * 12)) }) } + break case 3: // error cleanup() + if (self.destroyed) return + if (msg.length < 8) return onError(new Error('invalid error message')) self.client.emit('warning', new Error(msg.slice(8).toString())) + break default: @@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) { } } - function onError (err) { - if (self.destroyed) return - cleanup() - if (err.message) err.message += ' (' + self.announceUrl + ')' - // errors will often happen if a tracker is offline, so don't treat it as fatal - self.client.emit('warning', err) - } - function send (message) { if (!parsedUrl.port) { parsedUrl.port = 80 diff --git a/lib/client/websocket-tracker.js b/lib/client/websocket-tracker.js index 3cd7d1f..0e7844b 100644 --- a/lib/client/websocket-tracker.js +++ b/lib/client/websocket-tracker.js @@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) { self.retries = 0 self.reconnectTimer = null + // Simple boolean flag to track whether the socket has received data from + // the websocket server since the last time socket.send() was called. + self.expectingResponse = false + self._openSocket() } @@ -104,18 +108,6 @@ WebSocketTracker.prototype.destroy = function (cb) { clearInterval(self.interval) clearTimeout(self.reconnectTimer) - if (self.socket) { - self.socket.removeListener('connect', self._onSocketConnectBound) - self.socket.removeListener('data', self._onSocketDataBound) - self.socket.removeListener('close', self._onSocketCloseBound) - self.socket.removeListener('error', self._onSocketErrorBound) - } - - self._onSocketConnectBound = null - self._onSocketErrorBound = null - self._onSocketDataBound = null - self._onSocketCloseBound = null - // Destroy peers for (var peerId in self.peers) { var peer = self.peers[peerId] @@ -124,24 +116,51 @@ WebSocketTracker.prototype.destroy = function (cb) { } self.peers = null + if (self.socket) { + self.socket.removeListener('connect', self._onSocketConnectBound) + self.socket.removeListener('data', self._onSocketDataBound) + self.socket.removeListener('close', self._onSocketCloseBound) + self.socket.removeListener('error', self._onSocketErrorBound) + self.socket = null + } + + self._onSocketConnectBound = null + self._onSocketErrorBound = null + self._onSocketDataBound = null + self._onSocketCloseBound = null + if (socketPool[self.announceUrl]) { socketPool[self.announceUrl].consumers -= 1 } - if (socketPool[self.announceUrl].consumers === 0) { - delete socketPool[self.announceUrl] + // Other instances are using the socket, so there's nothing left to do here + if (socketPool[self.announceUrl].consumers > 0) return cb() - try { - self.socket.on('error', noop) // ignore all future errors - self.socket.destroy(cb) - } catch (err) { - cb(null) + var socket = socketPool[self.announceUrl] + delete socketPool[self.announceUrl] + socket.on('error', noop) // ignore all future errors + socket.once('close', cb) + + // If there is no data response expected, destroy immediately. + if (!self.expectingResponse) return destroyCleanup() + + // Otherwise, wait a short time for potential responses to come in from the + // server, then force close the socket. + var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT) + + // But, if a response comes from the server before the timeout fires, do cleanup + // right away. + socket.once('data', destroyCleanup) + + function destroyCleanup () { + if (timeout) { + clearTimeout(timeout) + timeout = null } - } else { - cb(null) + socket.removeListener('data', destroyCleanup) + socket.destroy() + socket = null } - - self.socket = null } WebSocketTracker.prototype._openSocket = function () { @@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) { var self = this if (self.destroyed) return + self.expectingResponse = false + try { data = JSON.parse(data) } catch (err) { @@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () { WebSocketTracker.prototype._send = function (params) { var self = this if (self.destroyed) return - + self.expectingResponse = true var message = JSON.stringify(params) debug('send %s', message) self.socket.send(message) diff --git a/lib/common-node.js b/lib/common-node.js index 5cc1c0f..55ef8eb 100644 --- a/lib/common-node.js +++ b/lib/common-node.js @@ -26,6 +26,18 @@ exports.EVENT_NAMES = { stopped: 'stop' } +/** + * Client request timeout. How long to wait before considering a request to a + * tracker server to have timed out. + */ +exports.REQUEST_TIMEOUT = 15000 + +/** + * Client destroy timeout. How long to wait before forcibly cleaning up all + * pending requests, open sockets, etc. + */ +exports.DESTROY_TIMEOUT = 1000 + function toUInt32 (n) { var buf = Buffer.allocUnsafe(4) buf.writeUInt32BE(n, 0)