refactor(realtime): extract "SocketClient" to separate class "RealtimeClientConnection"

Signed-off-by: BoHong Li <a60814billy@gmail.com>
This commit is contained in:
BoHong Li 2019-05-06 17:49:08 +08:00
parent f317171aa0
commit 702fc48fa8
No known key found for this signature in database
GPG Key ID: 9696D5590D58290F
4 changed files with 260 additions and 239 deletions

View File

@ -21,6 +21,8 @@ var models = require('./models')
// ot // ot
var ot = require('./ot') var ot = require('./ot')
const {RealtimeClientConnection} = require('./realtimeClientConnection')
// public // public
var realtime = { var realtime = {
io: null, io: null,
@ -780,239 +782,6 @@ function getNoteFromNotePool (noteId) {
return notes[noteId] return notes[noteId]
} }
class SocketClient {
constructor (socket) {
this.socket = socket
}
registerEventHandler () {
// received client refresh request
this.socket.on('refresh', this.refreshEventHandler.bind(this))
// received user status
this.socket.on('user status', this.userStatusEventHandler.bind(this))
// when a new client disconnect
this.socket.on('disconnect', this.disconnectEventHandler.bind(this))
// received cursor focus
this.socket.on('cursor focus', this.cursorFocusEventHandler.bind(this))
// received cursor activity
this.socket.on('cursor activity', this.cursorActivityEventHandler.bind(this))
// received cursor blur
this.socket.on('cursor blur', this.cursorBlurEventHandlder.bind(this))
// check version
this.socket.on('version', this.checkVersionEventHandler.bind(this))
// received sync of online users request
this.socket.on('online users', this.onlineUsersEventHandler.bind(this))
// reveiced when user logout or changed
this.socket.on('user changed', this.userChangedEventHandler.bind(this))
// delete a note
this.socket.on('delete', this.deleteNoteEventHandler.bind(this))
// received note permission change request
this.socket.on('permission', this.permissionChangeEventHandler.bind(this))
}
isUserLoggedIn () {
return this.socket.request.user && this.socket.request.user.logged_in
}
isNoteAndUserExists () {
const note = getNoteFromNotePool(this.socket.noteId)
const user = getUserFromUserPool(this.socket.id)
return note && user
}
isNoteOwner () {
const note = this.getCurrentNote()
return get(note, 'owner') === this.getCurrentLoggedInUserId()
}
isAnonymousEnable () {
//TODO: move this method to config module
return config.allowAnonymous || config.allowAnonymousEdits
}
disconnectSocketOnNote (note) {
note.socks.forEach((sock) => {
if (sock) {
sock.emit('delete')
setImmediate(() => {
sock.disconnect(true)
})
}
})
}
getCurrentUser () {
if (!this.socket.id) return
return getUserFromUserPool(this.socket.id)
}
getCurrentLoggedInUserId () {
return get(this.socket, 'request.user.id')
}
getCurrentNote () {
if (!this.socket.noteId) return
return getNoteFromNotePool(this.socket.noteId)
}
getNoteChannel () {
return this.socket.broadcast.to(this.socket.noteId)
}
async destroyNote (id) {
return models.Note.destroy({
where: { id: id }
})
}
async changeNotePermission (newPermission) {
const changedRows = await models.Note.update({
permission: newPermission
}, {
where: {
id: this.getCurrentNote().id
}
})
if (changedRows !== 1) {
throw new Error(`update database failed, cannot set permission ${newPermission} to note ${this.getCurrentNote().id}`)
}
}
notifyPermissionChanged () {
realtime.io.to(this.getCurrentNote().id).emit('permission', {
permission: this.getCurrentNote().permission
})
this.getCurrentNote().socks.forEach((sock) => {
if (sock) {
if (!exports.checkViewPermission(sock.request, this.getCurrentNote())) {
sock.emit('info', {
code: 403
})
setTimeout(function () {
sock.disconnect(true)
}, 0)
}
}
})
}
refreshEventHandler () {
exports.emitRefresh(this.socket)
}
checkVersionEventHandler () {
this.socket.emit('version', {
version: config.fullversion,
minimumCompatibleVersion: config.minimumCompatibleVersion
})
}
userStatusEventHandler (data) {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
if (config.debug) {
logger.info('SERVER received [' + this.socket.noteId + '] user status from [' + this.socket.id + ']: ' + JSON.stringify(data))
}
if (data) {
user.idle = data.idle
user.type = data.type
}
exports.emitUserStatus(this.socket)
}
userChangedEventHandler () {
logger.info('user changed')
const note = this.getCurrentNote()
if (!note) return
const user = note.users[this.socket.id]
if (!user) return
exports.updateUserData(this.socket, user)
exports.emitOnlineUsers(this.socket)
}
onlineUsersEventHandler () {
if (!this.isNoteAndUserExists()) return
const currentNote = this.getCurrentNote()
const currentNoteOnlineUserList = Object.keys(currentNote.users)
.map(key => buildUserOutData(currentNote.users[key]))
this.socket.emit('online users', {
users: currentNoteOnlineUserList
})
}
cursorFocusEventHandler (data) {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
user.cursor = data
const out = buildUserOutData(user)
this.getNoteChannel().emit('cursor focus', out)
}
cursorActivityEventHandler (data) {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
user.cursor = data
const out = buildUserOutData(user)
this.getNoteChannel().emit('cursor activity', out)
}
cursorBlurEventHandlder () {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
user.cursor = null
this.getNoteChannel().emit('cursor blur', {
id: this.socket.id
})
}
deleteNoteEventHandler () {
// need login to do more actions
if (this.isUserLoggedIn() && this.isNoteAndUserExists()) {
const note = this.getCurrentNote()
// Only owner can delete note
if (note.owner && note.owner === this.getCurrentLoggedInUserId()) {
this.destroyNote(note.id)
.then((successRows) => {
if (!successRows) return
this.disconnectSocketOnNote(note)
})
.catch(function (err) {
return logger.error('delete note failed: ' + err)
})
}
}
}
permissionChangeEventHandler (permission) {
if (!this.isUserLoggedIn()) return
if (!this.isNoteAndUserExists()) return
const note = this.getCurrentNote()
// Only owner can change permission
if (!this.isNoteOwner()) return
if (!this.isAnonymousEnable() && permission === 'freely') return
this.changeNotePermission(permission)
.then(() => {
console.log('---')
note.permission = permission
this.notifyPermissionChanged()
})
.catch(err => logger.error('update note permission failed: ' + err))
}
disconnectEventHandler () {
if (isDuplicatedInSocketQueue(disconnectSocketQueue, this.socket)) return
disconnectSocketQueue.push(this.socket)
exports.disconnect(this.socket)
}
}
function connection (socket) { function connection (socket) {
if (realtime.maintenance) return if (realtime.maintenance) return
exports.parseNoteIdFromSocket(socket, function (err, noteId) { exports.parseNoteIdFromSocket(socket, function (err, noteId) {
@ -1068,7 +837,7 @@ function connection (socket) {
exports.startConnection(socket) exports.startConnection(socket)
}) })
const socketClient = new SocketClient(socket) const socketClient = new RealtimeClientConnection(socket)
socketClient.registerEventHandler() socketClient.registerEventHandler()
} }
@ -1086,6 +855,9 @@ exports.emitUserStatus = emitUserStatus
exports.disconnect = disconnect exports.disconnect = disconnect
exports.emitOnlineUsers = emitOnlineUsers exports.emitOnlineUsers = emitOnlineUsers
exports.checkViewPermission = checkViewPermission exports.checkViewPermission = checkViewPermission
exports.getNoteFromNotePool = getNoteFromNotePool
exports.getUserFromUserPool = getUserFromUserPool
exports.buildUserOutData = buildUserOutData
exports.notes = notes exports.notes = notes
exports.users = users exports.users = users
exports.disconnectSocketQueue = disconnectSocketQueue exports.disconnectSocketQueue = disconnectSocketQueue

View File

@ -0,0 +1,242 @@
'use strict'
const get = require('lodash/get')
const config = require('./config')
const models = require('./models')
const logger = require('./logger')
class RealtimeClientConnection {
constructor (socket) {
this.socket = socket
this.realtime = require('./realtime')
}
registerEventHandler () {
// received client refresh request
this.socket.on('refresh', this.refreshEventHandler.bind(this))
// received user status
this.socket.on('user status', this.userStatusEventHandler.bind(this))
// when a new client disconnect
this.socket.on('disconnect', this.disconnectEventHandler.bind(this))
// received cursor focus
this.socket.on('cursor focus', this.cursorFocusEventHandler.bind(this))
// received cursor activity
this.socket.on('cursor activity', this.cursorActivityEventHandler.bind(this))
// received cursor blur
this.socket.on('cursor blur', this.cursorBlurEventHandler.bind(this))
// check version
this.socket.on('version', this.checkVersionEventHandler.bind(this))
// received sync of online users request
this.socket.on('online users', this.onlineUsersEventHandler.bind(this))
// reveiced when user logout or changed
this.socket.on('user changed', this.userChangedEventHandler.bind(this))
// delete a note
this.socket.on('delete', this.deleteNoteEventHandler.bind(this))
// received note permission change request
this.socket.on('permission', this.permissionChangeEventHandler.bind(this))
}
isUserLoggedIn () {
return this.socket.request.user && this.socket.request.user.logged_in
}
isNoteAndUserExists () {
const note = this.realtime.getNoteFromNotePool(this.socket.noteId)
const user = this.realtime.getUserFromUserPool(this.socket.id)
return note && user
}
isNoteOwner () {
const note = this.getCurrentNote()
return get(note, 'owner') === this.getCurrentLoggedInUserId()
}
isAnonymousEnable () {
// TODO: move this method to config module
return config.allowAnonymous || config.allowAnonymousEdits
}
disconnectSocketOnNote (note) {
note.socks.forEach((sock) => {
if (sock) {
sock.emit('delete')
setImmediate(() => {
sock.disconnect(true)
})
}
})
}
getCurrentUser () {
if (!this.socket.id) return
return this.realtime.getUserFromUserPool(this.socket.id)
}
getCurrentLoggedInUserId () {
return get(this.socket, 'request.user.id')
}
getCurrentNote () {
if (!this.socket.noteId) return
return this.realtime.getNoteFromNotePool(this.socket.noteId)
}
getNoteChannel () {
return this.socket.broadcast.to(this.socket.noteId)
}
async destroyNote (id) {
return models.Note.destroy({
where: { id: id }
})
}
async changeNotePermission (newPermission) {
const changedRows = await models.Note.update({
permission: newPermission
}, {
where: {
id: this.getCurrentNote().id
}
})
if (changedRows !== 1) {
throw new Error(`updated permission failed, cannot set permission ${newPermission} to note ${this.getCurrentNote().id}`)
}
}
notifyPermissionChanged () {
this.realtime.io.to(this.getCurrentNote().id).emit('permission', {
permission: this.getCurrentNote().permission
})
this.getCurrentNote().socks.forEach((sock) => {
if (sock) {
if (!this.realtime.checkViewPermission(sock.request, this.getCurrentNote())) {
sock.emit('info', {
code: 403
})
setTimeout(function () {
sock.disconnect(true)
}, 0)
}
}
})
}
refreshEventHandler () {
this.realtime.emitRefresh(this.socket)
}
checkVersionEventHandler () {
this.socket.emit('version', {
version: config.fullversion,
minimumCompatibleVersion: config.minimumCompatibleVersion
})
}
userStatusEventHandler (data) {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
if (config.debug) {
logger.info('SERVER received [' + this.socket.noteId + '] user status from [' + this.socket.id + ']: ' + JSON.stringify(data))
}
if (data) {
user.idle = data.idle
user.type = data.type
}
this.realtime.emitUserStatus(this.socket)
}
userChangedEventHandler () {
logger.info('user changed')
const note = this.getCurrentNote()
if (!note) return
const user = note.users[this.socket.id]
if (!user) return
this.realtime.updateUserData(this.socket, user)
this.realtime.emitOnlineUsers(this.socket)
}
onlineUsersEventHandler () {
if (!this.isNoteAndUserExists()) return
const currentNote = this.getCurrentNote()
const currentNoteOnlineUserList = Object.keys(currentNote.users)
.map(key => this.realtime.buildUserOutData(currentNote.users[key]))
this.socket.emit('online users', {
users: currentNoteOnlineUserList
})
}
cursorFocusEventHandler (data) {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
user.cursor = data
const out = this.realtime.buildUserOutData(user)
this.getNoteChannel().emit('cursor focus', out)
}
cursorActivityEventHandler (data) {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
user.cursor = data
const out = this.realtime.buildUserOutData(user)
this.getNoteChannel().emit('cursor activity', out)
}
cursorBlurEventHandler () {
if (!this.isNoteAndUserExists()) return
const user = this.getCurrentUser()
user.cursor = null
this.getNoteChannel().emit('cursor blur', {
id: this.socket.id
})
}
deleteNoteEventHandler () {
// need login to do more actions
if (this.isUserLoggedIn() && this.isNoteAndUserExists()) {
const note = this.getCurrentNote()
// Only owner can delete note
if (note.owner && note.owner === this.getCurrentLoggedInUserId()) {
this.destroyNote(note.id)
.then((successRows) => {
if (!successRows) return
this.disconnectSocketOnNote(note)
})
.catch(function (err) {
return logger.error('delete note failed: ' + err)
})
}
}
}
permissionChangeEventHandler (permission) {
if (!this.isUserLoggedIn()) return
if (!this.isNoteAndUserExists()) return
const note = this.getCurrentNote()
// Only owner can change permission
if (!this.isNoteOwner()) return
if (!this.isAnonymousEnable() && permission === 'freely') return
this.changeNotePermission(permission)
.then(() => {
note.permission = permission
this.notifyPermissionChanged()
})
.catch(err => logger.error('update note permission failed: ' + err))
}
disconnectEventHandler () {
if (this.realtime.isDuplicatedInSocketQueue(this.realtime.disconnectSocketQueue, this.socket)) return
this.realtime.disconnectSocketQueue.push(this.socket)
this.realtime.disconnect(this.socket)
}
}
exports.RealtimeClientConnection = RealtimeClientConnection

View File

@ -8,6 +8,7 @@ const ConnectionQueuing = require('../lib/connectionQueue').ConnectionQueue
describe('ConnectionQueue', function () { describe('ConnectionQueue', function () {
let clock let clock
const waitTimeForCheckResult = 50
beforeEach(() => { beforeEach(() => {
clock = sinon.useFakeTimers({ clock = sinon.useFakeTimers({
@ -50,12 +51,12 @@ describe('ConnectionQueue', function () {
setTimeout(() => { setTimeout(() => {
clock.tick(5) clock.tick(5)
}, 3) }, 3)
queue.stop()
setTimeout(() => { setTimeout(() => {
queue.stop()
assert(runningClock.length === 2) assert(runningClock.length === 2)
done() done()
}, 10) }, waitTimeForCheckResult)
}) })
it('should not crash when repeat stop queue', () => { it('should not crash when repeat stop queue', () => {
@ -78,7 +79,7 @@ describe('ConnectionQueue', function () {
setTimeout(() => { setTimeout(() => {
assert(processSpy.called) assert(processSpy.called)
done() done()
}, 10) }, waitTimeForCheckResult)
}) })
it('should run process although error occurred', (done) => { it('should run process although error occurred', (done) => {
@ -100,7 +101,7 @@ describe('ConnectionQueue', function () {
assert(failedTask.called) assert(failedTask.called)
assert(normalTask.called) assert(normalTask.called)
done() done()
}, 10) }, waitTimeForCheckResult)
}) })
it('should ignore trigger when event not complete', (done) => { it('should ignore trigger when event not complete', (done) => {
@ -125,6 +126,6 @@ describe('ConnectionQueue', function () {
setTimeout(() => { setTimeout(() => {
assert(processSpy.calledOnce) assert(processSpy.calledOnce)
done() done()
}, 10) }, waitTimeForCheckResult)
}) })
}) })

View File

@ -14,8 +14,12 @@ describe('realtime#socket event', function () {
let modelsMock let modelsMock
let eventFuncMap let eventFuncMap
let configMock let configMock
let clock
beforeEach(function () { beforeEach(function () {
clock = sinon.useFakeTimers({
toFake: ['setInterval']
})
eventFuncMap = new Map() eventFuncMap = new Map()
modelsMock = { modelsMock = {
Note: { Note: {
@ -62,8 +66,10 @@ describe('realtime#socket event', function () {
afterEach(function () { afterEach(function () {
removeModuleFromRequireCache('../../lib/realtime') removeModuleFromRequireCache('../../lib/realtime')
removeModuleFromRequireCache('../../lib/realtimeClientConnection')
mock.stopAll() mock.stopAll()
sinon.restore() sinon.restore()
clock.restore()
}) })
describe('refresh', function () { describe('refresh', function () {