feat(realtime): add queue for connect and disconnect queue

Signed-off-by: BoHong Li <a60814billy@gmail.com>
This commit is contained in:
BoHong Li 2019-05-04 02:43:04 +08:00
parent ff72d99269
commit b17f417af4
No known key found for this signature in database
GPG Key ID: 9696D5590D58290F
3 changed files with 209 additions and 0 deletions

75
lib/connectionQueue.js Normal file
View File

@ -0,0 +1,75 @@
'use strict'
const EventEmitter = require('events').EventEmitter
/**
* Queuing Class for connection queuing
*/
const ConnectionQueueEvent = {
Tick: 'Tick'
}
class ConnectionQueue extends EventEmitter {
constructor (maximumLength, triggerTimeInterval = 10) {
super()
this.max = maximumLength
this.triggerTime = triggerTimeInterval
this.queue = []
this.lock = false
this.on(ConnectionQueueEvent.Tick, () => {
if (this.lock) return
setImmediate(() => {
this.process()
})
})
}
start () {
if (this.eventTrigger) return
this.eventTrigger = setInterval(() => {
this.emit(ConnectionQueueEvent.Tick)
}, this.triggerTime)
}
stop () {
if (this.eventTrigger) {
clearInterval(this.eventTrigger)
this.eventTrigger = null
}
}
/**
* push a promisify-task to queue
* @param task {Promise}
* @returns {boolean} if success return true, otherwise flase
*/
push (task) {
if (this.queue.length >= this.max) return false
this.queue.push(task)
this.start()
return true
}
process () {
if (this.lock) return
this.lock = true
if (this.queue.length <= 0) {
this.stop()
this.lock = false
return
}
const task = this.queue.shift()
const finishTask = () => {
this.lock = false
setImmediate(() => {
this.process()
})
}
task().then(finishTask).catch(finishTask)
}
}
exports.ConnectionQueue = ConnectionQueue

View File

@ -0,0 +1,130 @@
/* eslint-env node, mocha */
'use strict'
const assert = require('assert')
const sinon = require('sinon')
const ConnectionQueuing = require('../lib/connectionQueue').ConnectionQueue
describe('ConnectionQueue', function () {
let clock
beforeEach(() => {
clock = sinon.useFakeTimers({
toFake: ['setInterval']
})
})
afterEach(() => {
clock.restore()
sinon.restore()
})
it('should not accept more than maximum task', () => {
const queue = new ConnectionQueuing(2)
const task = async () => {
}
queue.start()
assert(queue.push(task))
assert(queue.push(task))
assert(queue.push(task) === false)
})
it('should run task every interval', (done) => {
const runningClock = []
const queue = new ConnectionQueuing(2)
const task = async () => {
runningClock.push(clock.now)
}
queue.start()
assert(queue.push(task))
assert(queue.push(task))
clock.tick(5)
setTimeout(() => {
clock.tick(5)
}, 1)
setTimeout(() => {
clock.tick(5)
}, 2)
setTimeout(() => {
clock.tick(5)
}, 3)
queue.stop()
setTimeout(() => {
assert(runningClock.length === 2)
done()
}, 10)
})
it('should not crash when repeat stop queue', () => {
const queue = new ConnectionQueuing(2, 10)
try {
queue.stop()
queue.stop()
queue.stop()
assert.ok(true)
} catch (e) {
assert.fail(e)
}
})
it('should run process when queue is empty', (done) => {
const queue = new ConnectionQueuing(2, 100)
const processSpy = sinon.spy(queue, 'process')
queue.start()
clock.tick(100)
setTimeout(() => {
assert(processSpy.called)
done()
}, 1)
})
it('should run process although error occurred', (done) => {
const queue = new ConnectionQueuing(2, 100)
const failedTask = sinon.spy(async () => {
throw new Error('error')
})
const normalTask = sinon.spy(async () => {
})
queue.start()
assert(queue.push(failedTask))
assert(queue.push(normalTask))
clock.tick(100)
setTimeout(() => {
clock.tick(100)
}, 1)
setTimeout(() => {
// assert(queue.queue.length === 0)
assert(failedTask.called)
assert(normalTask.called)
done()
}, 5)
})
it('should ignore trigger when event not complete', (done) => {
const queue = new ConnectionQueuing(2, 10)
const processSpy = sinon.spy(queue, 'process')
const longTask = async () => {
return new Promise((resolve) => {
setInterval(() => {
resolve()
}, 50)
})
}
queue.start()
queue.push(longTask)
clock.tick(10)
setTimeout(() => {
clock.tick(10)
}, 0)
setTimeout(() => {
clock.tick(10)
}, 1)
setTimeout(() => {
assert(processSpy.calledOnce)
done()
}, 2)
})
})

View File

@ -701,5 +701,9 @@ describe('realtime', function () {
})
})
describe('permission', function () {
})
})
})