Add support of saving note revision and improve app start and stop procedure to ensure data integrity

This commit is contained in:
Cheng-Han, Wu 2016-06-17 16:09:33 +08:00
parent 56b4739e6d
commit dbc126b156
7 changed files with 366 additions and 24 deletions

31
app.js
View File

@ -484,16 +484,26 @@ function startListen() {
if (config.usessl) { if (config.usessl) {
server.listen(config.port, function () { server.listen(config.port, function () {
logger.info('HTTPS Server listening at port %d', config.port); logger.info('HTTPS Server listening at port %d', config.port);
config.maintenance = false;
}); });
} else { } else {
server.listen(config.port, function () { server.listen(config.port, function () {
logger.info('HTTP Server listening at port %d', config.port); logger.info('HTTP Server listening at port %d', config.port);
config.maintenance = false;
}); });
} }
} }
// sync db then start listen // sync db then start listen
models.sequelize.sync().then(startListen); models.sequelize.sync().then(function () {
// check if realtime is ready
if (realtime.isReady()) {
models.Revision.checkAllNotesRevision(function (err, notes) {
if (err) return new Error(err);
if (notes.length <= 0) return startListen();
});
}
});
// log uncaught exception // log uncaught exception
process.on('uncaughtException', function (err) { process.on('uncaughtException', function (err) {
@ -510,21 +520,18 @@ process.on('SIGINT', function () {
Object.keys(io.sockets.sockets).forEach(function (key) { Object.keys(io.sockets.sockets).forEach(function (key) {
var socket = io.sockets.sockets[key]; var socket = io.sockets.sockets[key];
// notify client server going into maintenance status // notify client server going into maintenance status
socket.emit('maintenance', config.version); socket.emit('maintenance');
socket.disconnect(true); socket.disconnect(true);
}); });
var checkCleanTimer = setInterval(function () { var checkCleanTimer = setInterval(function () {
var usersCount = Object.keys(realtime.users).length; if (realtime.isReady()) {
var notesCount = Object.keys(realtime.notes).length; models.Revision.checkAllNotesRevision(function (err, notes) {
// check if all users and notes array are empty if (err) return new Error(err);
if (usersCount == 0 && notesCount == 0) { if (notes.length <= 0) {
// close db connection
models.sequelize.close();
clearInterval(checkCleanTimer); clearInterval(checkCleanTimer);
// wait for a while before exit return process.exit(0);
setTimeout(function () { }
process.exit(0); });
}, 100);
} }
}, 100); }, 100);
}); });

View File

@ -78,7 +78,7 @@ function getserverurl() {
} }
var version = '0.4.2'; var version = '0.4.2';
var maintenance = config.maintenance || false; var maintenance = true;
var cwd = path.join(__dirname, '..'); var cwd = path.join(__dirname, '..');
module.exports = { module.exports = {

View File

@ -0,0 +1,24 @@
'use strict';
module.exports = {
up: function (queryInterface, Sequelize) {
queryInterface.addColumn('Notes', 'savedAt', Sequelize.DATE);
queryInterface.createTable('Revisions', {
id: Sequelize.UUID,
noteId: Sequelize.UUID,
patch: Sequelize.TEXT,
lastContent: Sequelize.TEXT,
content: Sequelize.TEXT,
length: Sequelize.INTEGER,
createdAt: Sequelize.DATE,
updatedAt: Sequelize.DATE
});
return;
},
down: function (queryInterface, Sequelize) {
queryInterface.dropTable('Revisions');
queryInterface.removeColumn('Notes', 'savedAt');
return;
}
};

View File

@ -52,6 +52,9 @@ module.exports = function (sequelize, DataTypes) {
}, },
lastchangeAt: { lastchangeAt: {
type: DataTypes.DATE type: DataTypes.DATE
},
savedAt: {
type: DataTypes.DATE
} }
}, { }, {
classMethods: { classMethods: {
@ -66,6 +69,10 @@ module.exports = function (sequelize, DataTypes) {
as: "lastchangeuser", as: "lastchangeuser",
constraints: false constraints: false
}); });
Note.hasMany(models.Revision, {
foreignKey: "noteId",
constraints: false
});
}, },
checkFileExist: function (filePath) { checkFileExist: function (filePath) {
try { try {
@ -100,11 +107,15 @@ module.exports = function (sequelize, DataTypes) {
var dbModifiedTime = moment(note.lastchangeAt || note.createdAt); var dbModifiedTime = moment(note.lastchangeAt || note.createdAt);
if (fsModifiedTime.isAfter(dbModifiedTime)) { if (fsModifiedTime.isAfter(dbModifiedTime)) {
var body = fs.readFileSync(filePath, 'utf8'); var body = fs.readFileSync(filePath, 'utf8');
note.title = LZString.compressToBase64(Note.parseNoteTitle(body)); note.update({
note.content = LZString.compressToBase64(body); title: LZString.compressToBase64(Note.parseNoteTitle(body)),
note.lastchangeAt = fsModifiedTime; content: LZString.compressToBase64(body),
note.save().then(function (note) { lastchangeAt: fsModifiedTime
}).then(function (note) {
sequelize.models.Revision.saveNoteRevision(note, function (err, revision) {
if (err) return _callback(err, null);
return callback(null, note.id); return callback(null, note.id);
});
}).catch(function (err) { }).catch(function (err) {
return _callback(err, null); return _callback(err, null);
}); });
@ -224,6 +235,11 @@ module.exports = function (sequelize, DataTypes) {
} }
} }
return callback(null, note); return callback(null, note);
},
afterCreate: function (note, options, callback) {
sequelize.models.Revision.saveNoteRevision(note, function (err, revision) {
callback(err, note);
});
} }
} }
}); });

276
lib/models/revision.js Normal file
View File

@ -0,0 +1,276 @@
"use strict";
// external modules
var Sequelize = require("sequelize");
var LZString = require('lz-string');
var async = require('async');
var moment = require('moment');
var DiffMatchPatch = require('diff-match-patch');
var dmp = new DiffMatchPatch();
// core
var config = require("../config.js");
var logger = require("../logger.js");
module.exports = function (sequelize, DataTypes) {
var Revision = sequelize.define("Revision", {
id: {
type: DataTypes.UUID,
primaryKey: true,
defaultValue: Sequelize.UUIDV4
},
patch: {
type: DataTypes.TEXT
},
lastContent: {
type: DataTypes.TEXT
},
content: {
type: DataTypes.TEXT
},
length: {
type: DataTypes.INTEGER
}
}, {
classMethods: {
associate: function (models) {
Revision.belongsTo(models.User, {
foreignKey: "noteId",
as: "note",
constraints: false
});
},
createPatch: function (lastDoc, CurrDoc) {
var ms_start = (new Date()).getTime();
var diff = dmp.diff_main(lastDoc, CurrDoc);
dmp.diff_cleanupSemantic(diff);
var patch = dmp.patch_make(lastDoc, diff);
patch = dmp.patch_toText(patch);
var ms_end = (new Date()).getTime();
if (config.debug) {
logger.info(patch);
logger.info((ms_end - ms_start) + 'ms');
}
return patch;
},
getNoteRevisions: function (note, callback) {
Revision.findAll({
where: {
noteId: note.id
},
order: '"createdAt" DESC'
}).then(function (revisions) {
var data = [];
for (var i = 0, l = revisions.length; i < l; i++) {
var revision = revisions[i];
data.push({
time: moment(revision.createdAt).valueOf(),
length: revision.length
});
}
callback(null, data);
}).catch(function (err) {
callback(err, null);
});
},
getPatchedNoteRevisionByTime: function (note, time, callback) {
// find all revisions to prepare for all possible calculation
Revision.findAll({
where: {
noteId: note.id
},
order: '"createdAt" DESC'
}).then(function (revisions) {
if (revisions.length <= 0) return callback(null, null);
// measure target revision position
Revision.count({
where: {
noteId: note.id,
createdAt: {
$gte: time
}
},
order: '"createdAt" DESC'
}).then(function (count) {
if (count <= 0) return callback(null, null);
var ms_start = (new Date()).getTime();
var startContent = null;
var lastPatch = [];
var applyPatches = [];
if (count <= Math.round(revisions.length / 2)) {
// start from top to target
for (var i = 0; i < count; i++) {
var revision = revisions[i];
if (i == 0) {
startContent = LZString.decompressFromBase64(revision.content || revision.lastContent);
}
if (i != count - 1) {
var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch));
applyPatches = applyPatches.concat(patch);
}
lastPatch = revision.patch;
}
// swap DIFF_INSERT and DIFF_DELETE to achieve unpatching
for (var i = 0, l = applyPatches.length; i < l; i++) {
for (var j = 0, m = applyPatches[i].diffs.length; j < m; j++) {
var diff = applyPatches[i].diffs[j];
if (diff[0] == DiffMatchPatch.DIFF_INSERT)
diff[0] = DiffMatchPatch.DIFF_DELETE;
else if (diff[0] == DiffMatchPatch.DIFF_DELETE)
diff[0] = DiffMatchPatch.DIFF_INSERT;
}
}
} else {
// start from bottom to target
var l = revisions.length - 1;
for (var i = l; i >= count - 1; i--) {
var revision = revisions[i];
if (i == l) {
startContent = LZString.decompressFromBase64(revision.lastContent);
}
if (revision.patch) {
var patch = dmp.patch_fromText(LZString.decompressFromBase64(revision.patch));
applyPatches = applyPatches.concat(patch);
}
lastPatch = revision.patch;
}
}
try {
var finalContent = dmp.patch_apply(applyPatches, startContent)[0];
} catch (err) {
return callback(err, null);
}
var data = {
content: finalContent,
patch: dmp.patch_fromText(LZString.decompressFromBase64(lastPatch))
};
var ms_end = (new Date()).getTime();
if (config.debug) {
logger.info((ms_end - ms_start) + 'ms');
}
return callback(null, data);
}).catch(function (err) {
return callback(err, null);
});
}).catch(function (err) {
return callback(err, null);
});
},
checkAllNotesRevision: function (callback) {
Revision.saveAllNotesRevision(function (err, notes) {
if (err) return callback(err, null);
if (notes.length <= 0) {
return callback(null, notes);
} else {
Revision.checkAllNotesRevision(callback);
}
});
},
saveAllNotesRevision: function (callback) {
sequelize.models.Note.findAll({
where: {
$and: [
{
lastchangeAt: {
$or: {
$eq: null,
$and: {
$ne: null,
$gt: sequelize.col('createdAt')
}
}
}
},
{
savedAt: {
$or: {
$eq: null,
$lt: sequelize.col('lastchangeAt')
}
}
}
]
}
}).then(function (notes) {
if (notes.length <= 0) return callback(null, notes);
async.each(notes, function (note, _callback) {
Revision.saveNoteRevision(note, _callback);
}, function (err) {
if (err) return callback(err, null);
return callback(null, notes);
});
}).catch(function (err) {
return callback(err, null);
});
},
saveNoteRevision: function (note, callback) {
Revision.findAll({
where: {
noteId: note.id
},
order: '"createdAt" DESC'
}).then(function (revisions) {
if (revisions.length <= 0) {
// if no revision available
Revision.create({
noteId: note.id,
lastContent: note.content,
length: LZString.decompressFromBase64(note.content).length
}).then(function (revision) {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
} else {
var latestRevision = revisions[0];
var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent);
var content = LZString.decompressFromBase64(note.content);
var patch = Revision.createPatch(lastContent, content);
if (!patch) {
// if patch is empty (means no difference) then just update the latest revision updated time
latestRevision.changed('updatedAt', true);
latestRevision.update({
updatedAt: Date.now()
}).then(function (revision) {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
} else {
Revision.create({
noteId: note.id,
patch: LZString.compressToBase64(patch),
content: note.content,
length: LZString.decompressFromBase64(note.content).length
}).then(function (revision) {
// clear last revision content to reduce db size
latestRevision.update({
content: null
}).then(function () {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
}).catch(function (err) {
return callback(err, null);
});
}
}
}).catch(function (err) {
return callback(err, null);
});
},
finishSaveNoteRevision: function (note, revision, callback) {
note.update({
savedAt: revision.updatedAt
}).then(function () {
return callback(null, revision);
}).catch(function (err) {
return callback(err, null);
});
}
}
});
return Revision;
};

View File

@ -26,8 +26,7 @@ var realtime = {
secure: secure, secure: secure,
connection: connection, connection: connection,
getStatus: getStatus, getStatus: getStatus,
users: users, isReady: isReady
notes: notes
}; };
function onAuthorizeSuccess(data, accept) { function onAuthorizeSuccess(data, accept) {
@ -72,9 +71,8 @@ function emitCheck(note) {
} }
//actions //actions
var users, notes; var users = {};
realtime.users = users = {}; var notes = {};
realtime.notes = notes = {};
//update when the note is dirty //update when the note is dirty
var updater = setInterval(function () { var updater = setInterval(function () {
async.each(Object.keys(notes), function (key, callback) { async.each(Object.keys(notes), function (key, callback) {
@ -152,6 +150,7 @@ function finishUpdateNote(note, _note, callback) {
lastchangeAt: Date.now() lastchangeAt: Date.now()
}; };
_note.update(values).then(function (_note) { _note.update(values).then(function (_note) {
saverSleep = false;
return callback(null, _note); return callback(null, _note);
}).catch(function (err) { }).catch(function (err) {
logger.error(err); logger.error(err);
@ -179,6 +178,18 @@ var cleaner = setInterval(function () {
if (err) return logger.error('cleaner error', err); if (err) return logger.error('cleaner error', err);
}); });
}, 60000); }, 60000);
var saverSleep = true;
// save note revision in interval
var saver = setInterval(function () {
if (saverSleep) return;
models.Revision.saveAllNotesRevision(function (err, notes) {
if (err) return logger.error('revision saver failed: ' + err);
if (notes.length <= 0) {
saverSleep = true;
return;
}
});
}, 60000 * 5);
function getStatus(callback) { function getStatus(callback) {
models.Note.count().then(function (notecount) { models.Note.count().then(function (notecount) {
@ -233,6 +244,13 @@ function getStatus(callback) {
}); });
} }
function isReady() {
return realtime.io
&& Object.keys(notes).length == 0 && Object.keys(users).length == 0
&& connectionSocketQueue.length == 0 && !isConnectionBusy
&& disconnectSocketQueue.length == 0 && !isDisconnectBusy;
}
function extractNoteIdFromSocket(socket) { function extractNoteIdFromSocket(socket) {
if (!socket || !socket.handshake || !socket.handshake.headers) { if (!socket || !socket.handshake || !socket.handshake.headers) {
return false; return false;

View File

@ -15,6 +15,7 @@
"cookie": "0.2.3", "cookie": "0.2.3",
"cookie-parser": "1.4.1", "cookie-parser": "1.4.1",
"ejs": "^2.4.1", "ejs": "^2.4.1",
"diff-match-patch": "^1.0.0",
"emojify.js": "^1.1.0", "emojify.js": "^1.1.0",
"express": ">=4.13", "express": ">=4.13",
"express-session": "^1.13.0", "express-session": "^1.13.0",