ts: lib/realtime/realtime.ts

Signed-off-by: Raccoon <raccoon@hackmd.io>
This commit is contained in:
Raccoon 2021-06-12 06:46:32 +08:00
parent 16618e1e61
commit 7cea9a29a9
No known key found for this signature in database
GPG Key ID: 06770355DC9ECD38

View File

@ -1,62 +1,53 @@
'use strict'
// realtime
// external modules
const cookie = require('cookie')
const cookieParser = require('cookie-parser')
const url = require('url')
const randomcolor = require('randomcolor')
const Chance = require('chance')
const chance = new Chance()
const moment = require('moment')
const get = require('lodash/get')
import * as cookie from "cookie";
import * as cookieParser from "cookie-parser";
import * as url from "url";
import * as randomcolor from "randomcolor";
import * as Chance from "chance";
import * as moment from "moment";
import {get} from "lodash";
// core
const config = require('../config')
const logger = require('../logger')
const history = require('../history')
const models = require('../models')
import * as config from "../config";
import * as logger from "../logger";
import * as history from "../history";
import * as models from "../models";
// ot
const ot = require('../ot')
import * as ot from "../ot";
const { ProcessQueue } = require('./processQueue')
const { RealtimeClientConnection } = require('./realtimeClientConnection')
const { UpdateDirtyNoteJob } = require('./realtimeUpdateDirtyNoteJob')
const { CleanDanglingUserJob } = require('./realtimeCleanDanglingUserJob')
const { SaveRevisionJob } = require('./realtimeSaveRevisionJob')
import {ProcessQueue} from "./processQueue";
import {RealtimeClientConnection} from "./realtimeClientConnection";
import {UpdateDirtyNoteJob} from "./realtimeUpdateDirtyNoteJob";
import {CleanDanglingUserJob} from "./realtimeCleanDanglingUserJob";
import {SaveRevisionJob} from "./realtimeSaveRevisionJob";
const chance = new Chance()
export let io = null
export let maintenance = true
// public
const realtime = {
io: null,
onAuthorizeSuccess: onAuthorizeSuccess,
onAuthorizeFail: onAuthorizeFail,
secure: secure,
connection: connection,
getStatus: getStatus,
isReady: isReady,
maintenance: true
}
const connectProcessQueue = new ProcessQueue({})
const disconnectProcessQueue = new ProcessQueue({})
const updateDirtyNoteJob = new UpdateDirtyNoteJob(realtime)
const cleanDanglingUserJob = new CleanDanglingUserJob(realtime)
const saveRevisionJob = new SaveRevisionJob(realtime)
export const disconnectProcessQueue = new ProcessQueue({})
const updateDirtyNoteJob = new UpdateDirtyNoteJob(exports)
const cleanDanglingUserJob = new CleanDanglingUserJob(exports)
export const saveRevisionJob = new SaveRevisionJob(exports)
// TODO: test it
function onAuthorizeSuccess (data, accept) {
export function onAuthorizeSuccess(data, accept) {
accept()
}
// TODO: test it
function onAuthorizeFail (data, message, error, accept) {
export function onAuthorizeFail(data, message, error, accept) {
accept() // accept whether authorize or not to allow anonymous usage
}
// TODO: test it
// secure the origin by the cookie
function secure (socket, next) {
export function secure(socket, next) {
try {
var handshakeData = socket.request
if (handshakeData.headers.cookie) {
@ -82,7 +73,7 @@ function secure (socket, next) {
// TODO: only use in `updateDirtyNote`
// TODO: test it
function emitCheck (note) {
export function emitCheck(note) {
var out = {
title: note.title,
updatetime: note.updatetime,
@ -91,50 +82,50 @@ function emitCheck (note) {
authors: note.authors,
authorship: note.authorship
}
realtime.io.to(note.id).emit('check', out)
io.to(note.id).emit('check', out)
}
// actions
var users = {}
var notes = {}
export var users = {}
export var notes = {}
function getNotePool () {
export function getNotePool() {
return notes
}
function isNoteExistsInPool (noteId) {
export function isNoteExistsInPool(noteId) {
return !!notes[noteId]
}
function addNote (note) {
export function addNote(note) {
if (exports.isNoteExistsInPool(note.id)) return false
notes[note.id] = note
return true
}
function getNotePoolSize () {
export function getNotePoolSize() {
return Object.keys(notes).length
}
function deleteNoteFromPool (noteId) {
export function deleteNoteFromPool(noteId) {
delete notes[noteId]
}
function deleteAllNoteFromPool () {
export function deleteAllNoteFromPool() {
Object.keys(notes).forEach(noteId => {
delete notes[noteId]
})
}
function getNoteFromNotePool (noteId) {
export function getNoteFromNotePool(noteId) {
return notes[noteId]
}
function getUserPool () {
export function getUserPool() {
return users
}
function getUserFromUserPool (userId) {
export function getUserFromUserPool(userId) {
return users[userId]
}
@ -143,7 +134,7 @@ updateDirtyNoteJob.start()
cleanDanglingUserJob.start()
saveRevisionJob.start()
function disconnectSocketOnNote (note) {
export function disconnectSocketOnNote(note) {
note.socks.forEach((sock) => {
if (sock) {
sock.emit('delete')
@ -154,7 +145,7 @@ function disconnectSocketOnNote (note) {
})
}
function updateNote (note, callback) {
export function updateNote(note, callback) {
_updateNoteAsync(note).then(_note => {
callback(null, _note)
}).catch((err) => {
@ -163,7 +154,7 @@ function updateNote (note, callback) {
})
}
function findNoteByIdAsync (id) {
function findNoteByIdAsync(id) {
return models.Note.findOne({
where: {
id: id
@ -171,7 +162,7 @@ function findNoteByIdAsync (id) {
})
}
function updateHistoryForEveryUserCollaborateNote (note) {
function updateHistoryForEveryUserCollaborateNote(note) {
// update history to every user in this note
const tempUsers = Object.assign({}, note.tempUsers)
note.tempUsers = {}
@ -181,7 +172,7 @@ function updateHistoryForEveryUserCollaborateNote (note) {
})
}
async function getUserProfileByIdAsync (id) {
async function getUserProfileByIdAsync(id) {
const user = await models.User.findOne({
where: {
id: id
@ -192,14 +183,14 @@ async function getUserProfileByIdAsync (id) {
}
class UserNotFoundException extends Error {
constructor () {
constructor() {
super('user not found')
this.name = this.constructor.name
Error.captureStackTrace(this, this.constructor)
}
}
async function getLastChangeUserProfileAsync (currentLastChangeUserId, lastChangeUserIdInDatabase, lastChangeUserProfileInDatabase) {
async function getLastChangeUserProfileAsync(currentLastChangeUserId, lastChangeUserIdInDatabase, lastChangeUserProfileInDatabase) {
if (!currentLastChangeUserId) return null
if (currentLastChangeUserId === lastChangeUserIdInDatabase) return lastChangeUserProfileInDatabase
const profile = await getUserProfileByIdAsync(currentLastChangeUserId)
@ -209,7 +200,7 @@ async function getLastChangeUserProfileAsync (currentLastChangeUserId, lastChang
return profile
}
function buildNoteUpdateData (note) {
function buildNoteUpdateData(note) {
const body = note.server.document
const title = note.title = models.Note.parseNoteTitle(body)
return {
@ -221,7 +212,7 @@ function buildNoteUpdateData (note) {
}
}
async function _updateNoteAsync (note) {
async function _updateNoteAsync(note) {
let noteModel = await findNoteByIdAsync(note.id)
if (!noteModel) return null
@ -247,7 +238,7 @@ async function _updateNoteAsync (note) {
}
// TODO: test it
function getStatus () {
export function getStatus() {
return models.Note.count()
.then(function (notecount) {
var distinctaddresses = []
@ -308,14 +299,14 @@ function getStatus () {
}
// TODO: test it
function isReady () {
return realtime.io &&
export function isReady() {
return io &&
Object.keys(notes).length === 0 && Object.keys(users).length === 0 &&
connectProcessQueue.queue.length === 0 && !connectProcessQueue.lock &&
disconnectProcessQueue.queue.length === 0 && !disconnectProcessQueue.lock
}
function parseUrl (data) {
function parseUrl(data) {
try {
if (url.URL) {
return new url.URL(data)
@ -329,8 +320,8 @@ function parseUrl (data) {
return null
}
function extractNoteIdFromSocket (socket) {
function extractNoteIdFromReferer (referer) {
export function extractNoteIdFromSocket(socket) {
function extractNoteIdFromReferer(referer) {
if (referer) {
const hostUrl = parseUrl(referer)
if (!hostUrl) {
@ -362,7 +353,7 @@ function extractNoteIdFromSocket (socket) {
return false
}
async function parseNoteIdFromSocketAsync (socket) {
export async function parseNoteIdFromSocketAsync(socket) {
const noteId = extractNoteIdFromSocket(socket)
if (!noteId) {
return null
@ -382,7 +373,7 @@ async function parseNoteIdFromSocketAsync (socket) {
}
// TODO: test it
function emitOnlineUsers (socket) {
export function emitOnlineUsers(socket) {
var noteId = socket.noteId
if (!noteId || !notes[noteId]) return
var users = []
@ -395,11 +386,11 @@ function emitOnlineUsers (socket) {
var out = {
users: users
}
realtime.io.to(noteId).emit('online users', out)
io.to(noteId).emit('online users', out)
}
// TODO: test it
function emitUserStatus (socket) {
export function emitUserStatus(socket) {
var noteId = socket.noteId
var user = users[socket.id]
if (!noteId || !notes[noteId] || !user) return
@ -408,7 +399,7 @@ function emitUserStatus (socket) {
}
// TODO: test it
function emitRefresh (socket) {
export function emitRefresh(socket) {
var noteId = socket.noteId
if (!noteId || !notes[noteId]) return
var note = notes[noteId]
@ -428,7 +419,7 @@ function emitRefresh (socket) {
socket.emit('refresh', out)
}
function checkViewPermission (req, note) {
export function checkViewPermission(req, note) {
if (note.permission === 'private') {
if (req.user && req.user.logged_in && req.user.id === note.owner) {
return true
@ -447,7 +438,7 @@ function checkViewPermission (req, note) {
}
// TODO: test it
async function fetchFullNoteAsync (noteId) {
async function fetchFullNoteAsync(noteId) {
return models.Note.findOne({
where: {
id: noteId
@ -469,7 +460,7 @@ async function fetchFullNoteAsync (noteId) {
})
}
function buildAuthorProfilesFromNote (noteAuthors) {
function buildAuthorProfilesFromNote(noteAuthors) {
const authors = {}
noteAuthors.forEach((author) => {
const profile = models.User.getProfile(author.user)
@ -485,7 +476,7 @@ function buildAuthorProfilesFromNote (noteAuthors) {
return authors
}
function makeNewServerNote (note) {
function makeNewServerNote(note) {
const authors = buildAuthorProfilesFromNote(note.authors)
return {
@ -509,7 +500,7 @@ function makeNewServerNote (note) {
}
// TODO: test it
function failConnection (code, err, socket) {
export function failConnection(code, err, socket) {
logger.error(err)
// emit error info
socket.emit('info', {
@ -518,7 +509,7 @@ function failConnection (code, err, socket) {
return socket.disconnect(true)
}
function queueForDisconnect (socket) {
export function queueForDisconnect(socket) {
disconnectProcessQueue.push(socket.id, async function () {
if (users[socket.id]) {
delete users[socket.id]
@ -559,7 +550,7 @@ function queueForDisconnect (socket) {
})
}
function buildUserOutData (user) {
export function buildUserOutData(user) {
var out = {
id: user.id,
login: user.login,
@ -575,7 +566,7 @@ function buildUserOutData (user) {
}
// TODO: test it
function updateUserData (socket, user) {
export function updateUserData(socket, user) {
// retrieve user data from passport
if (socket.request.user && socket.request.user.logged_in) {
var profile = models.User.getProfile(socket.request.user)
@ -590,7 +581,7 @@ function updateUserData (socket, user) {
}
}
function canEditNote (notePermission, noteOwnerId, currentUserId) {
function canEditNote(notePermission, noteOwnerId, currentUserId) {
switch (notePermission) {
case 'freely':
return true
@ -606,7 +597,7 @@ function canEditNote (notePermission, noteOwnerId, currentUserId) {
}
}
function ifMayEdit (socket, callback) {
export function ifMayEdit(socket, callback) {
const note = getNoteFromNotePool(socket.noteId)
if (!note) return
const mayEdit = canEditNote(note.permission, note.owner, socket.request.user.id)
@ -623,7 +614,7 @@ function ifMayEdit (socket, callback) {
}
// TODO: test it
function operationCallback (socket, operation) {
function operationCallback(socket, operation) {
var noteId = socket.noteId
if (!noteId || !notes[noteId]) return
var note = notes[noteId]
@ -666,12 +657,12 @@ function operationCallback (socket, operation) {
}
// TODO: test it
function updateHistory (userId, note, time) {
export function updateHistory(userId, note, time) {
var noteId = note.alias ? note.alias : models.Note.encodeNoteId(note.id)
if (note.server) history.updateHistory(userId, noteId, note.server.document, time)
}
function getUniqueColorPerNote (noteId, maxAttempt = 10) {
function getUniqueColorPerNote(noteId, maxAttempt = 10) {
// random color
let color = randomcolor()
if (!notes[noteId]) return color
@ -693,7 +684,7 @@ function getUniqueColorPerNote (noteId, maxAttempt = 10) {
return color
}
function queueForConnect (socket) {
function queueForConnect(socket) {
connectProcessQueue.push(socket.id, async function () {
try {
const noteId = await exports.parseNoteIdFromSocketAsync(socket)
@ -792,47 +783,14 @@ function queueForConnect (socket) {
})
}
function connection (socket) {
if (realtime.maintenance) return
export function connection(socket) {
if (maintenance) return
queueForConnect(socket)
}
// TODO: test it
function terminate () {
export function terminate() {
disconnectProcessQueue.stop()
connectProcessQueue.stop()
updateDirtyNoteJob.stop()
}
exports = module.exports = realtime
exports.extractNoteIdFromSocket = extractNoteIdFromSocket
exports.updateNote = updateNote
exports.failConnection = failConnection
exports.updateUserData = updateUserData
exports.emitRefresh = emitRefresh
exports.emitUserStatus = emitUserStatus
exports.emitOnlineUsers = emitOnlineUsers
exports.checkViewPermission = checkViewPermission
exports.getUserFromUserPool = getUserFromUserPool
exports.buildUserOutData = buildUserOutData
exports.emitCheck = emitCheck
exports.disconnectSocketOnNote = disconnectSocketOnNote
exports.queueForDisconnect = queueForDisconnect
exports.terminate = terminate
exports.updateHistory = updateHistory
exports.ifMayEdit = ifMayEdit
exports.parseNoteIdFromSocketAsync = parseNoteIdFromSocketAsync
exports.disconnectProcessQueue = disconnectProcessQueue
exports.users = users
exports.getUserPool = getUserPool
exports.notes = notes
exports.getNotePool = getNotePool
exports.getNotePoolSize = getNotePoolSize
exports.isNoteExistsInPool = isNoteExistsInPool
exports.addNote = addNote
exports.getNoteFromNotePool = getNoteFromNotePool
exports.deleteNoteFromPool = deleteNoteFromPool
exports.deleteAllNoteFromPool = deleteAllNoteFromPool
exports.saveRevisionJob = saveRevisionJob