Wait up to 1s for pending requests before destroy()

If the user calls:

client.stop()
client.destroy()

We should ensure that the final 'stopped' message reaches the tracker
server, even though the client will not get the response (because they
destroyed the client and no more events will be emitted).

If there are pending requests when destroy() is called, then a 1s timer
is set after which point all requests are forcibly cleaned up. If the
requests complete before the 1s timer fires, then cleanup happens right
away (so we're not stuck waiting for the 1s timer).

So, destroy() can happen one of three ways:

- immediately, if no pending requests exist
- after exactly 1s, if pending requests exist and they don't complete
within 1s
- less than 1s, if pending requests exist and they all complete before
the 1s timer fires
This commit is contained in:
Feross Aboukhadijeh 2017-01-20 18:41:28 -08:00
parent 9cf2dffa67
commit 0aadcc1cbb
4 changed files with 179 additions and 69 deletions

View File

@ -28,11 +28,16 @@ function HTTPTracker (client, announceUrl, opts) {
// Determine scrape url (if http tracker supports it) // Determine scrape url (if http tracker supports it)
self.scrapeUrl = null self.scrapeUrl = null
var m
if ((m = self.announceUrl.match(HTTP_SCRAPE_SUPPORT))) { var match = self.announceUrl.match(HTTP_SCRAPE_SUPPORT)
self.scrapeUrl = self.announceUrl.slice(0, m.index) + '/scrape' + if (match) {
self.announceUrl.slice(m.index + 9) var pre = self.announceUrl.slice(0, match.index)
var post = self.announceUrl.slice(match.index + 9)
self.scrapeUrl = pre + '/scrape' + post
} }
self.cleanupFns = []
self.maybeDestroyCleanup = null
} }
HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes HTTPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
@ -82,22 +87,61 @@ HTTPTracker.prototype.destroy = function (cb) {
self.destroyed = true self.destroyed = true
clearInterval(self.interval) clearInterval(self.interval)
cb(null) // If there are no pending requests, destroy immediately.
if (self.cleanupFns.length === 0) return destroyCleanup()
// Otherwise, wait a short time for pending requests to complete, then force
// destroy them.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if all pending requests complete before the timeout fires, do cleanup
// right away.
self.maybeDestroyCleanup = function () {
if (self.cleanupFns.length === 0) destroyCleanup()
}
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
self.maybeDestroyCleanup = null
self.cleanupFns.slice(0).forEach(function (cleanup) {
cleanup()
})
self.cleanupFns = []
cb(null)
}
} }
HTTPTracker.prototype._request = function (requestUrl, params, cb) { HTTPTracker.prototype._request = function (requestUrl, params, cb) {
var self = this var self = this
var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') + var u = requestUrl + (requestUrl.indexOf('?') === -1 ? '?' : '&') +
common.querystringStringify(params) common.querystringStringify(params)
var opts = {
self.cleanupFns.push(cleanup)
var request = get.concat({
url: u, url: u,
timeout: common.REQUEST_TIMEOUT,
headers: { headers: {
'user-agent': self.client._userAgent || '' 'user-agent': self.client._userAgent || ''
} }
}, onResponse)
function cleanup () {
if (request) {
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
request.abort()
request = null
}
if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
} }
get.concat(opts, function (err, res, data) { function onResponse (err, res, data) {
cleanup()
if (self.destroyed) return if (self.destroyed) return
if (err) return self.client.emit('warning', err) if (err) return self.client.emit('warning', err)
if (res.statusCode !== 200) { if (res.statusCode !== 200) {
return self.client.emit('warning', new Error('Non-200 response code ' + return self.client.emit('warning', new Error('Non-200 response code ' +
@ -128,7 +172,7 @@ HTTPTracker.prototype._request = function (requestUrl, params, cb) {
debug('response from ' + requestUrl) debug('response from ' + requestUrl)
cb(data) cb(data)
}) }
} }
HTTPTracker.prototype._onAnnounceResponse = function (data) { HTTPTracker.prototype._onAnnounceResponse = function (data) {

View File

@ -12,8 +12,6 @@ var url = require('url')
var common = require('../common') var common = require('../common')
var Tracker = require('./tracker') var Tracker = require('./tracker')
var TIMEOUT = 15000
inherits(UDPTracker, Tracker) inherits(UDPTracker, Tracker)
/** /**
@ -29,6 +27,7 @@ function UDPTracker (client, announceUrl, opts) {
debug('new udp tracker %s', announceUrl) debug('new udp tracker %s', announceUrl)
self.cleanupFns = [] self.cleanupFns = []
self.maybeDestroyCleanup = null
} }
UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes UDPTracker.prototype.DEFAULT_ANNOUNCE_INTERVAL = 30 * 60 * 1000 // 30 minutes
@ -52,11 +51,31 @@ UDPTracker.prototype.destroy = function (cb) {
self.destroyed = true self.destroyed = true
clearInterval(self.interval) clearInterval(self.interval)
self.cleanupFns.slice(0).forEach(function (cleanup) { // If there are no pending requests, destroy immediately.
cleanup() if (self.cleanupFns.length === 0) return destroyCleanup()
})
self.cleanupFns = [] // Otherwise, wait a short time for pending requests to complete, then force
cb(null) // destroy them.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if all pending requests complete before the timeout fires, do cleanup
// right away.
self.maybeDestroyCleanup = function () {
if (self.cleanupFns.length === 0) destroyCleanup()
}
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
self.maybeDestroyCleanup = null
self.cleanupFns.slice(0).forEach(function (cleanup) {
cleanup()
})
self.cleanupFns = []
cb(null)
}
} }
UDPTracker.prototype._request = function (opts) { UDPTracker.prototype._request = function (opts) {
@ -66,41 +85,51 @@ UDPTracker.prototype._request = function (opts) {
var transactionId = genTransactionId() var transactionId = genTransactionId()
var socket = dgram.createSocket('udp4') var socket = dgram.createSocket('udp4')
var cleanup = function () {
if (!socket) return
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
if (timeout) {
clearTimeout(timeout)
timeout = null
}
socket.removeListener('error', onError)
socket.removeListener('message', onSocketMessage)
socket.on('error', noop) // ignore all future errors
try { socket.close() } catch (err) {}
socket = null
}
self.cleanupFns.push(cleanup)
// does not matter if `stopped` event arrives, so supress errors & cleanup after timeout
var ms = opts.event === 'stopped' ? TIMEOUT / 10 : TIMEOUT
var timeout = setTimeout(function () { var timeout = setTimeout(function () {
timeout = null // does not matter if `stopped` event arrives, so supress errors
if (opts.event === 'stopped') cleanup() if (opts.event === 'stopped') cleanup()
else onError(new Error('tracker request timed out (' + opts.event + ')')) else onError(new Error('tracker request timed out (' + opts.event + ')'))
}, ms) timeout = null
}, common.REQUEST_TIMEOUT)
if (timeout.unref) timeout.unref() if (timeout.unref) timeout.unref()
self.cleanupFns.push(cleanup)
send(Buffer.concat([ send(Buffer.concat([
common.CONNECTION_ID, common.CONNECTION_ID,
common.toUInt32(common.ACTIONS.CONNECT), common.toUInt32(common.ACTIONS.CONNECT),
transactionId transactionId
])) ]))
socket.on('error', onError) socket.once('error', onError)
socket.on('message', onSocketMessage) socket.on('message', onSocketMessage)
function onSocketMessage (msg) { function cleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
}
if (socket) {
self.cleanupFns.splice(self.cleanupFns.indexOf(cleanup), 1)
socket.removeListener('error', onError)
socket.removeListener('message', onSocketMessage)
socket.on('error', noop) // ignore all future errors
try { socket.close() } catch (err) {}
socket = null
}
if (self.maybeDestroyCleanup) self.maybeDestroyCleanup()
}
function onError (err) {
cleanup()
if (self.destroyed) return if (self.destroyed) return
if (err.message) err.message += ' (' + self.announceUrl + ')'
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', err)
}
function onSocketMessage (msg) {
if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) { if (msg.length < 8 || msg.readUInt32BE(4) !== transactionId.readUInt32BE(0)) {
return onError(new Error('tracker sent invalid transaction id')) return onError(new Error('tracker sent invalid transaction id'))
} }
@ -109,15 +138,20 @@ UDPTracker.prototype._request = function (opts) {
debug('UDP response %s, action %s', self.announceUrl, action) debug('UDP response %s, action %s', self.announceUrl, action)
switch (action) { switch (action) {
case 0: // handshake case 0: // handshake
// Note: no check for `self.destroyed` so that pending messages to the
// tracker can still be sent/received even after destroy() is called
if (msg.length < 16) return onError(new Error('invalid udp handshake')) if (msg.length < 16) return onError(new Error('invalid udp handshake'))
if (opts._scrape) scrape(msg.slice(8, 16)) if (opts._scrape) scrape(msg.slice(8, 16))
else announce(msg.slice(8, 16), opts) else announce(msg.slice(8, 16), opts)
return break
case 1: // announce case 1: // announce
cleanup() cleanup()
if (self.destroyed) return
if (msg.length < 20) return onError(new Error('invalid announce message')) if (msg.length < 20) return onError(new Error('invalid announce message'))
var interval = msg.readUInt32BE(8) var interval = msg.readUInt32BE(8)
@ -138,10 +172,13 @@ UDPTracker.prototype._request = function (opts) {
addrs.forEach(function (addr) { addrs.forEach(function (addr) {
self.client.emit('peer', addr) self.client.emit('peer', addr)
}) })
break break
case 2: // scrape case 2: // scrape
cleanup() cleanup()
if (self.destroyed) return
if (msg.length < 20 || (msg.length - 8) % 12 !== 0) { if (msg.length < 20 || (msg.length - 8) % 12 !== 0) {
return onError(new Error('invalid scrape message')) return onError(new Error('invalid scrape message'))
} }
@ -158,12 +195,16 @@ UDPTracker.prototype._request = function (opts) {
incomplete: msg.readUInt32BE(16 + (i * 12)) incomplete: msg.readUInt32BE(16 + (i * 12))
}) })
} }
break break
case 3: // error case 3: // error
cleanup() cleanup()
if (self.destroyed) return
if (msg.length < 8) return onError(new Error('invalid error message')) if (msg.length < 8) return onError(new Error('invalid error message'))
self.client.emit('warning', new Error(msg.slice(8).toString())) self.client.emit('warning', new Error(msg.slice(8).toString()))
break break
default: default:
@ -172,14 +213,6 @@ UDPTracker.prototype._request = function (opts) {
} }
} }
function onError (err) {
if (self.destroyed) return
cleanup()
if (err.message) err.message += ' (' + self.announceUrl + ')'
// errors will often happen if a tracker is offline, so don't treat it as fatal
self.client.emit('warning', err)
}
function send (message) { function send (message) {
if (!parsedUrl.port) { if (!parsedUrl.port) {
parsedUrl.port = 80 parsedUrl.port = 80

View File

@ -34,6 +34,10 @@ function WebSocketTracker (client, announceUrl, opts) {
self.retries = 0 self.retries = 0
self.reconnectTimer = null self.reconnectTimer = null
// Simple boolean flag to track whether the socket has received data from
// the websocket server since the last time socket.send() was called.
self.expectingResponse = false
self._openSocket() self._openSocket()
} }
@ -104,18 +108,6 @@ WebSocketTracker.prototype.destroy = function (cb) {
clearInterval(self.interval) clearInterval(self.interval)
clearTimeout(self.reconnectTimer) clearTimeout(self.reconnectTimer)
if (self.socket) {
self.socket.removeListener('connect', self._onSocketConnectBound)
self.socket.removeListener('data', self._onSocketDataBound)
self.socket.removeListener('close', self._onSocketCloseBound)
self.socket.removeListener('error', self._onSocketErrorBound)
}
self._onSocketConnectBound = null
self._onSocketErrorBound = null
self._onSocketDataBound = null
self._onSocketCloseBound = null
// Destroy peers // Destroy peers
for (var peerId in self.peers) { for (var peerId in self.peers) {
var peer = self.peers[peerId] var peer = self.peers[peerId]
@ -124,24 +116,51 @@ WebSocketTracker.prototype.destroy = function (cb) {
} }
self.peers = null self.peers = null
if (self.socket) {
self.socket.removeListener('connect', self._onSocketConnectBound)
self.socket.removeListener('data', self._onSocketDataBound)
self.socket.removeListener('close', self._onSocketCloseBound)
self.socket.removeListener('error', self._onSocketErrorBound)
self.socket = null
}
self._onSocketConnectBound = null
self._onSocketErrorBound = null
self._onSocketDataBound = null
self._onSocketCloseBound = null
if (socketPool[self.announceUrl]) { if (socketPool[self.announceUrl]) {
socketPool[self.announceUrl].consumers -= 1 socketPool[self.announceUrl].consumers -= 1
} }
if (socketPool[self.announceUrl].consumers === 0) { // Other instances are using the socket, so there's nothing left to do here
delete socketPool[self.announceUrl] if (socketPool[self.announceUrl].consumers > 0) return cb()
try { var socket = socketPool[self.announceUrl]
self.socket.on('error', noop) // ignore all future errors delete socketPool[self.announceUrl]
self.socket.destroy(cb) socket.on('error', noop) // ignore all future errors
} catch (err) { socket.once('close', cb)
cb(null)
// If there is no data response expected, destroy immediately.
if (!self.expectingResponse) return destroyCleanup()
// Otherwise, wait a short time for potential responses to come in from the
// server, then force close the socket.
var timeout = setTimeout(destroyCleanup, common.DESTROY_TIMEOUT)
// But, if a response comes from the server before the timeout fires, do cleanup
// right away.
socket.once('data', destroyCleanup)
function destroyCleanup () {
if (timeout) {
clearTimeout(timeout)
timeout = null
} }
} else { socket.removeListener('data', destroyCleanup)
cb(null) socket.destroy()
socket = null
} }
self.socket = null
} }
WebSocketTracker.prototype._openSocket = function () { WebSocketTracker.prototype._openSocket = function () {
@ -192,6 +211,8 @@ WebSocketTracker.prototype._onSocketData = function (data) {
var self = this var self = this
if (self.destroyed) return if (self.destroyed) return
self.expectingResponse = false
try { try {
data = JSON.parse(data) data = JSON.parse(data)
} catch (err) { } catch (err) {
@ -352,7 +373,7 @@ WebSocketTracker.prototype._startReconnectTimer = function () {
WebSocketTracker.prototype._send = function (params) { WebSocketTracker.prototype._send = function (params) {
var self = this var self = this
if (self.destroyed) return if (self.destroyed) return
self.expectingResponse = true
var message = JSON.stringify(params) var message = JSON.stringify(params)
debug('send %s', message) debug('send %s', message)
self.socket.send(message) self.socket.send(message)

View File

@ -26,6 +26,18 @@ exports.EVENT_NAMES = {
stopped: 'stop' stopped: 'stop'
} }
/**
* Client request timeout. How long to wait before considering a request to a
* tracker server to have timed out.
*/
exports.REQUEST_TIMEOUT = 15000
/**
* Client destroy timeout. How long to wait before forcibly cleaning up all
* pending requests, open sockets, etc.
*/
exports.DESTROY_TIMEOUT = 1000
function toUInt32 (n) { function toUInt32 (n) {
var buf = Buffer.allocUnsafe(4) var buf = Buffer.allocUnsafe(4)
buf.writeUInt32BE(n, 0) buf.writeUInt32BE(n, 0)