mirror of https://github.com/status-im/codimd.git
refactor(realtime): extract "save note as revision" to job
Signed-off-by: BoHong Li <a60814billy@gmail.com>
This commit is contained in:
parent
ab37a33e0b
commit
2dedc84e28
|
@ -25,6 +25,7 @@ const { ProcessQueue } = require('./processQueue')
|
|||
const { RealtimeClientConnection } = require('./realtimeClientConnection')
|
||||
const { UpdateDirtyNoteJob } = require('./realtimeUpdateDirtyNoteJob')
|
||||
const { CleanDanglingUserJob } = require('./realtimeCleanDanglingUserJob')
|
||||
const { SaveRevisionJob } = require('./realtimeSaveRevisionJob')
|
||||
|
||||
// public
|
||||
const realtime = {
|
||||
|
@ -38,6 +39,11 @@ const realtime = {
|
|||
maintenance: true
|
||||
}
|
||||
|
||||
const disconnectProcessQueue = new ProcessQueue(2000, 500)
|
||||
const updateDirtyNoteJob = new UpdateDirtyNoteJob(realtime)
|
||||
const cleanDanglingUserJob = new CleanDanglingUserJob(realtime)
|
||||
const saveRevisionJob = new SaveRevisionJob(realtime)
|
||||
|
||||
function onAuthorizeSuccess (data, accept) {
|
||||
accept()
|
||||
}
|
||||
|
@ -88,11 +94,10 @@ function emitCheck (note) {
|
|||
var users = {}
|
||||
var notes = {}
|
||||
|
||||
const disconnectProcessQueue = new ProcessQueue(2000, 500)
|
||||
disconnectProcessQueue.start()
|
||||
|
||||
const updateDirtyNoteJob = new UpdateDirtyNoteJob(realtime)
|
||||
updateDirtyNoteJob.start(realtime)
|
||||
updateDirtyNoteJob.start()
|
||||
cleanDanglingUserJob.start()
|
||||
saveRevisionJob.start()
|
||||
|
||||
function disconnectSocketOnNote (note) {
|
||||
note.socks.forEach((sock) => {
|
||||
|
@ -157,7 +162,7 @@ function finishUpdateNote (note, _note, callback) {
|
|||
lastchangeAt: Date.now()
|
||||
}
|
||||
_note.update(values).then(function (_note) {
|
||||
saverSleep = false
|
||||
saveRevisionJob.setSaverSleep(false)
|
||||
return callback(null, _note)
|
||||
}).catch(function (err) {
|
||||
logger.error(err)
|
||||
|
@ -165,20 +170,6 @@ function finishUpdateNote (note, _note, callback) {
|
|||
})
|
||||
}
|
||||
|
||||
const cleanDanglingUserJob = new CleanDanglingUserJob(realtime)
|
||||
cleanDanglingUserJob.start()
|
||||
|
||||
var saverSleep = false
|
||||
// save note revision in interval
|
||||
setInterval(function () {
|
||||
if (saverSleep) return
|
||||
models.Revision.saveAllNotesRevision(function (err, notes) {
|
||||
if (err) return logger.error('revision saver failed: ' + err)
|
||||
if (notes && notes.length <= 0) {
|
||||
saverSleep = true
|
||||
}
|
||||
})
|
||||
}, 5 * 60 * 1000) // 5 mins
|
||||
|
||||
function getStatus (callback) {
|
||||
models.Note.count().then(function (notecount) {
|
||||
|
@ -806,3 +797,4 @@ exports.getUserPool = getUserPool
|
|||
exports.disconnectProcessQueue = disconnectProcessQueue
|
||||
exports.notes = notes
|
||||
exports.users = users
|
||||
exports.saveRevisionJob = saveRevisionJob
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
'use strict'
|
||||
|
||||
const models = require('./models')
|
||||
const logger = require('./logger')
|
||||
|
||||
/**
|
||||
* clean when user not in any rooms or user not in connected list
|
||||
*/
|
||||
class SaveRevisionJob {
|
||||
constructor (realtime) {
|
||||
this.realtime = realtime
|
||||
this.saverSleep = false
|
||||
}
|
||||
|
||||
start () {
|
||||
if (this.timer) return
|
||||
this.timer = setInterval(this.saveRevision.bind(this), 5 * 60 * 1000)
|
||||
}
|
||||
|
||||
stop () {
|
||||
if (!this.timer) return
|
||||
clearInterval(this.timer)
|
||||
this.timer = undefined
|
||||
}
|
||||
|
||||
saveRevision () {
|
||||
if (this.getSaverSleep()) return
|
||||
models.Revision.saveAllNotesRevision((err, notes) => {
|
||||
if (err) return logger.error('revision saver failed: ' + err)
|
||||
if (notes && notes.length <= 0) {
|
||||
this.setSaverSleep(true)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getSaverSleep () {
|
||||
return this.saverSleep
|
||||
}
|
||||
|
||||
setSaverSleep (val) {
|
||||
this.saverSleep = val
|
||||
}
|
||||
}
|
||||
|
||||
exports.SaveRevisionJob = SaveRevisionJob
|
|
@ -25,7 +25,8 @@ describe('cleanDanglingUser', function () {
|
|||
mock('../../lib/config', {
|
||||
debug: true
|
||||
})
|
||||
mock('../../lib/realtimeUpdateDirtyNoteJob', require('../testDoubles/realtimeUpdateDirtyNoteJobStub'))
|
||||
mock('../../lib/realtimeUpdateDirtyNoteJob', require('../testDoubles/realtimeJobStub'))
|
||||
mock('../../lib/realtimeSaveRevisionJob', require('../testDoubles/realtimeJobStub'))
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
|
@ -35,7 +36,7 @@ describe('cleanDanglingUser', function () {
|
|||
sinon.restore()
|
||||
})
|
||||
|
||||
it('should ', (done) => {
|
||||
it('should call queueForDisconnectSpy when user is dangling', (done) => {
|
||||
const realtime = require('../../lib/realtime')
|
||||
const queueForDisconnectSpy = sinon.spy(realtime, 'queueForDisconnect')
|
||||
realtime.io = {
|
||||
|
|
|
@ -61,6 +61,7 @@ describe('realtime#disconnect', function () {
|
|||
|
||||
afterEach(() => {
|
||||
removeModuleFromRequireCache('../../lib/realtime')
|
||||
mock.stopAll()
|
||||
sinon.restore()
|
||||
})
|
||||
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
/* eslint-env node, mocha */
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
const mock = require('mock-require')
|
||||
const sinon = require('sinon')
|
||||
const { removeModuleFromRequireCache, removeLibModuleCache } = require('./utils')
|
||||
|
||||
describe('save revision job', function () {
|
||||
let clock
|
||||
let mockModels
|
||||
let realtime
|
||||
beforeEach(() => {
|
||||
removeLibModuleCache()
|
||||
mockModels = {
|
||||
Revision: {
|
||||
saveAllNotesRevision: sinon.stub()
|
||||
}
|
||||
}
|
||||
clock = sinon.useFakeTimers()
|
||||
mock('../../lib/processQueue', require('../testDoubles/ProcessQueueFake'))
|
||||
mock('../../lib/logger', {
|
||||
error: () => {},
|
||||
info: () => {}
|
||||
})
|
||||
mock('../../lib/history', {})
|
||||
mock('../../lib/models', mockModels)
|
||||
mock('../../lib/config', {
|
||||
debug: true
|
||||
})
|
||||
mock('../../lib/realtimeUpdateDirtyNoteJob', require('../testDoubles/realtimeJobStub'))
|
||||
mock('../../lib/realtimeCleanDanglingUserJob', require('../testDoubles/realtimeJobStub'))
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
clock.restore()
|
||||
removeModuleFromRequireCache('../../lib/realtime')
|
||||
removeModuleFromRequireCache('../../lib/realtimeSaveRevisionJob')
|
||||
mock.stopAll()
|
||||
sinon.restore()
|
||||
})
|
||||
|
||||
it('should execute save revision job every 5 min', (done) => {
|
||||
mockModels.Revision.saveAllNotesRevision.callsFake((callback) => {
|
||||
callback(null, [])
|
||||
})
|
||||
realtime = require('../../lib/realtime')
|
||||
clock.tick(5 * 60 * 1000)
|
||||
clock.restore()
|
||||
setTimeout(() => {
|
||||
assert(mockModels.Revision.saveAllNotesRevision.called)
|
||||
assert(realtime.saveRevisionJob.getSaverSleep() === true)
|
||||
done()
|
||||
}, 50)
|
||||
})
|
||||
|
||||
it('should not set saverSleep when more than 1 note save revision', (done) => {
|
||||
mockModels.Revision.saveAllNotesRevision.callsFake((callback) => {
|
||||
callback(null, [1])
|
||||
})
|
||||
realtime = require('../../lib/realtime')
|
||||
clock.tick(5 * 60 * 1000)
|
||||
clock.restore()
|
||||
setTimeout(() => {
|
||||
assert(mockModels.Revision.saveAllNotesRevision.called)
|
||||
assert(realtime.saveRevisionJob.getSaverSleep() === false)
|
||||
done()
|
||||
}, 50)
|
||||
})
|
||||
})
|
|
@ -1,6 +1,7 @@
|
|||
'use strict'
|
||||
|
||||
const sinon = require('sinon')
|
||||
const path = require('path')
|
||||
|
||||
function makeMockSocket (headers, query) {
|
||||
const broadCastChannelCache = {}
|
||||
|
@ -32,6 +33,15 @@ function makeMockSocket (headers, query) {
|
|||
function removeModuleFromRequireCache (modulePath) {
|
||||
delete require.cache[require.resolve(modulePath)]
|
||||
}
|
||||
function removeLibModuleCache () {
|
||||
const libPath = path.resolve(path.join(__dirname, '../../lib'))
|
||||
Object.keys(require.cache).forEach(key => {
|
||||
if (key.startsWith(libPath)) {
|
||||
delete require.cache[require.resolve(key)]
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
exports.makeMockSocket = makeMockSocket
|
||||
exports.removeModuleFromRequireCache = removeModuleFromRequireCache
|
||||
exports.removeLibModuleCache = removeLibModuleCache
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
'use strict'
|
||||
|
||||
class realtimeJobStub {
|
||||
start() {
|
||||
}
|
||||
|
||||
stop() {
|
||||
}
|
||||
}
|
||||
|
||||
exports.realtimeJobStub = realtimeJobStub
|
||||
exports.UpdateDirtyNoteJob = realtimeJobStub
|
||||
exports.CleanDanglingUserJob = realtimeJobStub
|
||||
exports.SaveRevisionJob = realtimeJobStub
|
|
@ -1,12 +0,0 @@
|
|||
'use strict'
|
||||
|
||||
class UpdateDirtyNoteJobStub {
|
||||
start() {
|
||||
}
|
||||
|
||||
stop() {
|
||||
}
|
||||
}
|
||||
|
||||
exports.UpdateDirtyNoteJobStub = UpdateDirtyNoteJobStub
|
||||
exports.UpdateDirtyNoteJob = UpdateDirtyNoteJobStub
|
Loading…
Reference in New Issue