diff --git a/lib/connectionQueue.js b/lib/connectionQueue.js new file mode 100644 index 00000000..f2bf9c8c --- /dev/null +++ b/lib/connectionQueue.js @@ -0,0 +1,75 @@ +'use strict' + +const EventEmitter = require('events').EventEmitter + +/** + * Queuing Class for connection queuing + */ + +const ConnectionQueueEvent = { + Tick: 'Tick' +} + +class ConnectionQueue extends EventEmitter { + constructor (maximumLength, triggerTimeInterval = 10) { + super() + this.max = maximumLength + this.triggerTime = triggerTimeInterval + this.queue = [] + this.lock = false + + this.on(ConnectionQueueEvent.Tick, () => { + if (this.lock) return + setImmediate(() => { + this.process() + }) + }) + } + + start () { + if (this.eventTrigger) return + this.eventTrigger = setInterval(() => { + this.emit(ConnectionQueueEvent.Tick) + }, this.triggerTime) + } + + stop () { + if (this.eventTrigger) { + clearInterval(this.eventTrigger) + this.eventTrigger = null + } + } + + /** + * push a promisify-task to queue + * @param task {Promise} + * @returns {boolean} if success return true, otherwise flase + */ + push (task) { + if (this.queue.length >= this.max) return false + this.queue.push(task) + this.start() + return true + } + + process () { + if (this.lock) return + this.lock = true + if (this.queue.length <= 0) { + this.stop() + this.lock = false + return + } + const task = this.queue.shift() + + const finishTask = () => { + this.lock = false + setImmediate(() => { + this.process() + }) + } + task().then(finishTask).catch(finishTask) + } +} + +exports.ConnectionQueue = ConnectionQueue diff --git a/test/connectionQueue.test.js b/test/connectionQueue.test.js new file mode 100644 index 00000000..589d865b --- /dev/null +++ b/test/connectionQueue.test.js @@ -0,0 +1,130 @@ +/* eslint-env node, mocha */ +'use strict' + +const assert = require('assert') +const sinon = require('sinon') + +const ConnectionQueuing = require('../lib/connectionQueue').ConnectionQueue + +describe('ConnectionQueue', function () { + let clock + + beforeEach(() => { + clock = sinon.useFakeTimers({ + toFake: ['setInterval'] + }) + }) + + afterEach(() => { + clock.restore() + sinon.restore() + }) + + it('should not accept more than maximum task', () => { + const queue = new ConnectionQueuing(2) + const task = async () => { + } + + queue.start() + assert(queue.push(task)) + assert(queue.push(task)) + assert(queue.push(task) === false) + }) + + it('should run task every interval', (done) => { + const runningClock = [] + const queue = new ConnectionQueuing(2) + const task = async () => { + runningClock.push(clock.now) + } + queue.start() + assert(queue.push(task)) + assert(queue.push(task)) + clock.tick(5) + setTimeout(() => { + clock.tick(5) + }, 1) + setTimeout(() => { + clock.tick(5) + }, 2) + setTimeout(() => { + clock.tick(5) + }, 3) + queue.stop() + + setTimeout(() => { + assert(runningClock.length === 2) + done() + }, 10) + }) + + it('should not crash when repeat stop queue', () => { + const queue = new ConnectionQueuing(2, 10) + try { + queue.stop() + queue.stop() + queue.stop() + assert.ok(true) + } catch (e) { + assert.fail(e) + } + }) + + it('should run process when queue is empty', (done) => { + const queue = new ConnectionQueuing(2, 100) + const processSpy = sinon.spy(queue, 'process') + queue.start() + clock.tick(100) + setTimeout(() => { + assert(processSpy.called) + done() + }, 1) + }) + + it('should run process although error occurred', (done) => { + const queue = new ConnectionQueuing(2, 100) + const failedTask = sinon.spy(async () => { + throw new Error('error') + }) + const normalTask = sinon.spy(async () => { + }) + queue.start() + assert(queue.push(failedTask)) + assert(queue.push(normalTask)) + clock.tick(100) + setTimeout(() => { + clock.tick(100) + }, 1) + setTimeout(() => { + // assert(queue.queue.length === 0) + assert(failedTask.called) + assert(normalTask.called) + done() + }, 5) + }) + + it('should ignore trigger when event not complete', (done) => { + const queue = new ConnectionQueuing(2, 10) + const processSpy = sinon.spy(queue, 'process') + const longTask = async () => { + return new Promise((resolve) => { + setInterval(() => { + resolve() + }, 50) + }) + } + queue.start() + queue.push(longTask) + clock.tick(10) + setTimeout(() => { + clock.tick(10) + }, 0) + setTimeout(() => { + clock.tick(10) + }, 1) + setTimeout(() => { + assert(processSpy.calledOnce) + done() + }, 2) + }) +}) diff --git a/test/realtime.test.js b/test/realtime.test.js index e3d00f5d..718b4a86 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -701,5 +701,9 @@ describe('realtime', function () { }) }) + describe('permission', function () { + + }) + }) })