refactor(realtime): disconnect flow

1. use queue for queueing disconnect request

Signed-off-by: BoHong Li <a60814billy@gmail.com>
This commit is contained in:
BoHong Li 2019-05-27 15:19:53 +08:00
parent f892c68e30
commit d8b18ee241
No known key found for this signature in database
GPG Key ID: 9696D5590D58290F
7 changed files with 203 additions and 108 deletions

1
app.js
View File

@ -278,6 +278,7 @@ process.on('uncaughtException', function (err) {
function handleTermSignals () {
logger.info('CodiMD has been killed by signal, try to exit gracefully...')
realtime.maintenance = true
realtime.terminate()
// disconnect all socket.io clients
Object.keys(io.sockets.sockets).forEach(function (key) {
var socket = io.sockets.sockets[key]

View File

@ -6,20 +6,22 @@ const EventEmitter = require('events').EventEmitter
* Queuing Class for connection queuing
*/
const ConnectionQueueEvent = {
const QueueEvent = {
Tick: 'Tick'
}
class ConnectionQueue extends EventEmitter {
class ProcessQueue extends EventEmitter {
constructor (maximumLength, triggerTimeInterval = 10) {
super()
this.max = maximumLength
this.triggerTime = triggerTimeInterval
this.taskMap = new Map()
this.queue = []
this.lock = false
this.on(ConnectionQueueEvent.Tick, () => {
this.on(QueueEvent.Tick, () => {
if (this.lock) return
this.lock = true
setImmediate(() => {
this.process()
})
@ -29,7 +31,7 @@ class ConnectionQueue extends EventEmitter {
start () {
if (this.eventTrigger) return
this.eventTrigger = setInterval(() => {
this.emit(ConnectionQueueEvent.Tick)
this.emit(QueueEvent.Tick)
}, this.triggerTime)
}
@ -40,36 +42,48 @@ class ConnectionQueue extends EventEmitter {
}
}
checkTaskIsInQueue (id) {
return this.taskMap.has(id)
}
/**
* push a promisify-task to queue
* @param task {Promise}
* @returns {boolean} if success return true, otherwise flase
* pushWithKey a promisify-task to queue
* @param id {string}
* @param processingFunc {Function<Promise>}
* @returns {boolean} if success return true, otherwise false
*/
push (task) {
push (id, processingFunc) {
if (this.queue.length >= this.max) return false
if (this.checkTaskIsInQueue(id)) return false
const task = {
id: id,
processingFunc: processingFunc
}
this.taskMap.set(id, true)
this.queue.push(task)
this.start()
this.emit(QueueEvent.Tick)
return true
}
process () {
if (this.lock) return
this.lock = true
if (this.queue.length <= 0) {
this.stop()
this.lock = false
return
}
const task = this.queue.shift()
this.taskMap.delete(task.id)
const finishTask = () => {
this.lock = false
setImmediate(() => {
this.process()
this.emit(QueueEvent.Tick)
})
}
task().then(finishTask).catch(finishTask)
task.processingFunc().then(finishTask).catch(finishTask)
}
}
exports.ConnectionQueue = ConnectionQueue
exports.ProcessQueue = ProcessQueue

View File

@ -21,6 +21,7 @@ const models = require('./models')
// ot
const ot = require('./ot')
const { ProcessQueue } = require('./processQueue')
const { RealtimeClientConnection } = require('./realtimeClientConnection')
const { UpdateDirtyNoteJob } = require('./realtimeUpdateDirtyNoteJob')
@ -69,6 +70,7 @@ function secure (socket, next) {
}
}
// TODO: only use in `updateDirtyNote`
function emitCheck (note) {
var out = {
title: note.title,
@ -85,6 +87,9 @@ function emitCheck (note) {
var users = {}
var notes = {}
const disconnectProcessQueue = new ProcessQueue(2000, 500)
disconnectProcessQueue.start()
const updateDirtyNoteJob = new UpdateDirtyNoteJob(realtime)
updateDirtyNoteJob.start(realtime)
@ -173,8 +178,9 @@ setInterval(function () {
id: key
}
}
disconnectSocketQueue.push(socket)
disconnect(socket)
if (!disconnectProcessQueue.checkTaskIsInQueue(socket.id)) {
exports.queueForDisconnect(socket)
}
}
return callback(null, null)
}, function (err) {
@ -238,8 +244,8 @@ function getStatus (callback) {
distinctOnlineRegisteredUsers: distinctregaddresses.length,
isConnectionBusy: isConnectionBusy,
connectionSocketQueueLength: connectionSocketQueue.length,
isDisconnectBusy: isDisconnectBusy,
disconnectSocketQueueLength: disconnectSocketQueue.length
isDisconnectBusy: disconnectProcessQueue.lock,
disconnectSocketQueueLength: disconnectProcessQueue.queue.length
}) : null
}).catch(function (err) {
return logger.error('count user failed: ' + err)
@ -253,7 +259,7 @@ function isReady () {
return realtime.io &&
Object.keys(notes).length === 0 && Object.keys(users).length === 0 &&
connectionSocketQueue.length === 0 && !isConnectionBusy &&
disconnectSocketQueue.length === 0 && !isDisconnectBusy
disconnectProcessQueue.queue.length === 0 && !disconnectProcessQueue.lock
}
function parseUrl (data) {
@ -416,8 +422,6 @@ function checkViewPermission (req, note) {
var isConnectionBusy = false
var connectionSocketQueue = []
var isDisconnectBusy = false
var disconnectSocketQueue = []
function finishConnection (socket, noteId, socketId) {
// if no valid info provided will drop the client
@ -562,48 +566,36 @@ function failConnection (code, err, socket) {
return socket.disconnect(true)
}
function disconnect (socket) {
if (isDisconnectBusy) return
isDisconnectBusy = true
if (config.debug) {
logger.info('SERVER disconnected a client')
logger.info(JSON.stringify(users[socket.id]))
}
function queueForDisconnect (socket) {
disconnectProcessQueue.push(socket.id, async function () {
if (users[socket.id]) {
delete users[socket.id]
}
var noteId = socket.noteId
var note = notes[noteId]
const noteId = socket.noteId
const note = notes[noteId]
if (note) {
// delete user in users
if (note.users[socket.id]) {
delete note.users[socket.id]
}
// remove sockets in the note socks
let index
do {
var index = note.socks.indexOf(socket)
index = note.socks.indexOf(socket)
if (index !== -1) {
note.socks.splice(index, 1)
}
} while (index !== -1)
// remove note in notes if no user inside
if (Object.keys(note.users).length <= 0) {
if (Object.keys(note.users).length === 0) {
if (note.server.isDirty) {
updateNote(note, function (err, _note) {
exports.updateNote(note, function (err, _note) {
if (err) return logger.error('disconnect note failed: ' + err)
// clear server before delete to avoid memory leaks
note.server.document = ''
note.server.operations = []
delete note.server
delete notes[noteId]
if (config.debug) {
// logger.info(notes);
getStatus(function (data) {
logger.info(JSON.stringify(data))
})
}
})
} else {
delete note.server
@ -611,22 +603,8 @@ function disconnect (socket) {
}
}
}
emitOnlineUsers(socket)
// clear finished socket in queue
clearSocketQueue(disconnectSocketQueue, socket)
// seek for next socket
isDisconnectBusy = false
if (disconnectSocketQueue.length > 0) {
disconnect(disconnectSocketQueue[0])
}
if (config.debug) {
// logger.info(notes);
getStatus(function (data) {
logger.info(JSON.stringify(data))
exports.emitOnlineUsers(socket)
})
}
}
function buildUserOutData (user) {
@ -818,6 +796,11 @@ function connection (socket) {
socketClient.registerEventHandler()
}
function terminate () {
disconnectProcessQueue.stop()
updateDirtyNoteJob.stop()
}
exports = module.exports = realtime
exports.extractNoteIdFromSocket = extractNoteIdFromSocket
exports.parseNoteIdFromSocket = parseNoteIdFromSocket
@ -829,7 +812,6 @@ exports.updateUserData = updateUserData
exports.startConnection = startConnection
exports.emitRefresh = emitRefresh
exports.emitUserStatus = emitUserStatus
exports.disconnect = disconnect
exports.emitOnlineUsers = emitOnlineUsers
exports.checkViewPermission = checkViewPermission
exports.getNoteFromNotePool = getNoteFromNotePool
@ -838,6 +820,9 @@ exports.buildUserOutData = buildUserOutData
exports.getNotePool = getNotePool
exports.emitCheck = emitCheck
exports.disconnectSocketOnNote = disconnectSocketOnNote
exports.queueForDisconnect = queueForDisconnect
exports.terminate = terminate
exports.getUserPool = getUserPool
exports.disconnectProcessQueue = disconnectProcessQueue
exports.notes = notes
exports.users = users
exports.disconnectSocketQueue = disconnectSocketQueue

View File

@ -222,9 +222,10 @@ class RealtimeClientConnection {
}
disconnectEventHandler () {
if (this.realtime.isDuplicatedInSocketQueue(this.realtime.disconnectSocketQueue, this.socket)) return
this.realtime.disconnectSocketQueue.push(this.socket)
this.realtime.disconnect(this.socket)
if (this.realtime.disconnectProcessQueue.checkTaskIsInQueue(this.socket.id)) {
return
}
this.realtime.queueForDisconnect(this.socket)
}
}

View File

@ -4,9 +4,9 @@
const assert = require('assert')
const sinon = require('sinon')
const ConnectionQueuing = require('../lib/connectionQueue').ConnectionQueue
const { ProcessQueue } = require('../lib/processQueue')
describe('ConnectionQueue', function () {
describe('ProcessQueue', function () {
let clock
const waitTimeForCheckResult = 50
@ -22,25 +22,27 @@ describe('ConnectionQueue', function () {
})
it('should not accept more than maximum task', () => {
const queue = new ConnectionQueuing(2)
const task = async () => {
const queue = new ProcessQueue(2)
const task = {
id: 1,
processingFunc: async () => {
}
}
queue.start()
assert(queue.push(task))
assert(queue.push(task))
assert(queue.push(task) === false)
assert(queue.push(1, () => (Promise.resolve())))
assert(queue.push(1, () => (Promise.resolve())) === false)
})
it('should run task every interval', (done) => {
const runningClock = []
const queue = new ConnectionQueuing(2)
const queue = new ProcessQueue(2)
const task = async () => {
runningClock.push(clock.now)
}
queue.start()
assert(queue.push(task))
assert(queue.push(task))
assert(queue.push(1, task))
assert(queue.push(2, task))
clock.tick(5)
setTimeout(() => {
clock.tick(5)
@ -60,7 +62,7 @@ describe('ConnectionQueue', function () {
})
it('should not crash when repeat stop queue', () => {
const queue = new ConnectionQueuing(2, 10)
const queue = new ProcessQueue(2, 10)
try {
queue.stop()
queue.stop()
@ -72,7 +74,7 @@ describe('ConnectionQueue', function () {
})
it('should run process when queue is empty', (done) => {
const queue = new ConnectionQueuing(2, 100)
const queue = new ProcessQueue(2, 100)
const processSpy = sinon.spy(queue, 'process')
queue.start()
clock.tick(100)
@ -83,15 +85,15 @@ describe('ConnectionQueue', function () {
})
it('should run process although error occurred', (done) => {
const queue = new ConnectionQueuing(2, 100)
const queue = new ProcessQueue(2, 100)
const failedTask = sinon.spy(async () => {
throw new Error('error')
})
const normalTask = sinon.spy(async () => {
})
queue.start()
assert(queue.push(failedTask))
assert(queue.push(normalTask))
assert(queue.push(1, failedTask))
assert(queue.push(2, normalTask))
clock.tick(100)
setTimeout(() => {
clock.tick(100)
@ -105,7 +107,7 @@ describe('ConnectionQueue', function () {
})
it('should ignore trigger when event not complete', (done) => {
const queue = new ConnectionQueuing(2, 10)
const queue = new ProcessQueue(2, 10)
const processSpy = sinon.spy(queue, 'process')
const longTask = async () => {
return new Promise((resolve) => {
@ -115,7 +117,7 @@ describe('ConnectionQueue', function () {
})
}
queue.start()
queue.push(longTask)
queue.push(1, longTask)
clock.tick(10)
setTimeout(() => {
clock.tick(10)
@ -124,6 +126,7 @@ describe('ConnectionQueue', function () {
clock.tick(10)
}, 1)
setTimeout(() => {
assert(processSpy.callCount === 1)
assert(processSpy.calledOnce)
done()
}, waitTimeForCheckResult)

View File

@ -0,0 +1,93 @@
/* eslint-env node, mocha */
'use strict'
const assert = require('assert')
const mock = require('mock-require')
const sinon = require('sinon')
const { makeMockSocket, removeModuleFromRequireCache } = require('./utils')
describe('realtime#disconnect', function () {
const noteId = 'note1_id'
let realtime
let updateNoteStub
let emitOnlineUsersStub
let client
beforeEach(() => {
mock('../../lib/logger', {
error: () => {
}
})
mock('../../lib/history', {})
mock('../../lib/models', {
Revision: {
saveAllNotesRevision: () => {
}
}
})
mock('../../lib/config', {})
realtime = require('../../lib/realtime')
updateNoteStub = sinon.stub(realtime, 'updateNote').callsFake((note, callback) => {
callback(null, note)
})
emitOnlineUsersStub = sinon.stub(realtime, 'emitOnlineUsers')
client = makeMockSocket()
client.noteId = noteId
realtime.users[client.id] = {
id: client.id,
color: '#ff0000',
cursor: null,
login: false,
userid: null,
name: null,
idle: false,
type: null
}
realtime.getNotePool()[noteId] = {
id: noteId,
server: {
isDirty: true
},
users: {
[client.id]: realtime.users[client.id]
},
socks: [client]
}
})
afterEach(() => {
removeModuleFromRequireCache('../../lib/realtime')
sinon.restore()
})
it('should disconnect success', function (done) {
realtime.queueForDisconnect(client)
setTimeout(() => {
assert(typeof realtime.users[client.id] === 'undefined')
assert(emitOnlineUsersStub.called)
assert(updateNoteStub.called)
assert(Object.keys(realtime.users).length === 0)
assert(Object.keys(realtime.notes).length === 0)
done()
}, 5)
})
it('should disconnect success when note is not dirty', function (done) {
realtime.notes[noteId].server.isDirty = false
realtime.queueForDisconnect(client)
setTimeout(() => {
assert(typeof realtime.users[client.id] === 'undefined')
assert(emitOnlineUsersStub.called)
assert(updateNoteStub.called === false)
assert(Object.keys(realtime.users).length === 0)
assert(Object.keys(realtime.notes).length === 0)
done()
}, 5)
})
})

View File

@ -135,24 +135,22 @@ describe('realtime#socket event', function () {
userStatusFunc(userData)
assert(emitUserStatusStub.called === false)
})
})
describe('disconnect', function () {
it('should push socket to disconnect queue and call disconnect function', () => {
const disconnectFunc = eventFuncMap.get('disconnect')
const disconnectStub = sinon.stub(realtime, 'disconnect')
const queueForDisconnectStub = sinon.stub(realtime, 'queueForDisconnect')
disconnectFunc()
assert(realtime.disconnectSocketQueue.length === 1)
assert(disconnectStub.calledOnce)
assert(queueForDisconnectStub.calledOnce)
})
it('should quick return when socket is in disconnect queue', () => {
const disconnectFunc = eventFuncMap.get('disconnect')
const disconnectStub = sinon.stub(realtime, 'disconnect')
realtime.disconnectSocketQueue.push(clientSocket)
const queueForDisconnectStub = sinon.stub(realtime, 'queueForDisconnect')
realtime.disconnectProcessQueue.push(clientSocket.id, async () => {})
disconnectFunc()
assert(disconnectStub.called === false)
assert(queueForDisconnectStub.called === false)
})
})