codimd/lib/realtime/processQueue.ts
Raccoon f21b2342de
ts: lib/realtime/processQueue.js
Signed-off-by: Raccoon <raccoon@hackmd.io>
2021-06-16 18:35:23 +08:00

111 lines
2.4 KiB
TypeScript

import {EventEmitter} from "events";
/**
* Queuing Class for connection queuing
*/
const QueueEvent = {
Tick: 'Tick',
Push: 'Push',
Finish: 'Finish'
}
export class ProcessQueue extends EventEmitter {
private max: number;
private triggerTime: number;
private taskMap: Map<string, boolean>;
public queue: any[];
public lock: boolean;
private eventTrigger: NodeJS.Timeout;
constructor({
maximumLength = 500,
triggerTimeInterval = 5000,
// execute on push
proactiveMode = true,
// execute next work on finish
continuousMode = true
}) {
super()
this.max = maximumLength
this.triggerTime = triggerTimeInterval
this.taskMap = new Map()
this.queue = []
this.lock = false
this.on(QueueEvent.Tick, this.onEventProcessFunc.bind(this))
if (proactiveMode) {
this.on(QueueEvent.Push, this.onEventProcessFunc.bind(this))
}
if (continuousMode) {
this.on(QueueEvent.Finish, this.onEventProcessFunc.bind(this))
}
}
onEventProcessFunc() {
if (this.lock) return
this.lock = true
setImmediate(() => {
this.process()
})
}
start() {
if (this.eventTrigger) return
this.eventTrigger = setInterval(() => {
this.emit(QueueEvent.Tick)
}, this.triggerTime)
}
stop() {
if (this.eventTrigger) {
clearInterval(this.eventTrigger)
this.eventTrigger = null
}
}
checkTaskIsInQueue(id) {
return this.taskMap.has(id)
}
/**
* pushWithKey a promisify-task to queue
* @param id {string}
* @param processingFunc {Function<Promise>}
* @returns {boolean} if success return true, otherwise false
*/
push(id: string, processingFunc) {
if (this.queue.length >= this.max) return false
if (this.checkTaskIsInQueue(id)) return false
const task = {
id: id,
processingFunc: processingFunc
}
this.taskMap.set(id, true)
this.queue.push(task)
this.start()
this.emit(QueueEvent.Push)
return true
}
process() {
if (this.queue.length <= 0) {
this.stop()
this.lock = false
return
}
const task = this.queue.shift()
this.taskMap.delete(task.id)
const finishTask = () => {
this.lock = false
setImmediate(() => {
this.emit(QueueEvent.Finish)
})
}
task.processingFunc().then(finishTask).catch(finishTask)
}
}