codimd/lib/models/revision.js

283 lines
12 KiB
JavaScript
Raw Normal View History

"use strict";
// external modules
var Sequelize = require("sequelize");
var LZString = require('lz-string');
var async = require('async');
var moment = require('moment');
var childProcess = require('child_process');
var shortId = require('shortid');
// core
var config = require("../config.js");
var logger = require("../logger.js");
var dmpWorker = createDmpWorker();
var dmpCallbackCache = {};
function createDmpWorker() {
var worker = childProcess.fork("./lib/workers/dmpWorker.js", {
stdio: 'ignore'
});
if (config.debug) logger.info('dmp worker process started');
worker.on('message', function (data) {
if (!data || !data.msg || !data.cacheKey) {
return logger.error('dmp worker error: not enough data on message');
}
var cacheKey = data.cacheKey;
switch(data.msg) {
case 'error':
dmpCallbackCache[cacheKey](data.error, null);
break;
case 'check':
dmpCallbackCache[cacheKey](null, data.result);
break;
}
delete dmpCallbackCache[cacheKey];
});
worker.on('close', function (code) {
dmpWorker = null;
if (config.debug) logger.info('dmp worker process exited with code ' + code);
});
return worker;
}
function sendDmpWorker(data, callback) {
if (!dmpWorker) dmpWorker = createDmpWorker();
var cacheKey = Date.now() + '_' + shortId.generate();
dmpCallbackCache[cacheKey] = callback;
data = Object.assign(data, {
cacheKey: cacheKey
});
dmpWorker.send(data);
}
module.exports = function (sequelize, DataTypes) {
var Revision = sequelize.define("Revision", {
id: {
type: DataTypes.UUID,
primaryKey: true,
defaultValue: Sequelize.UUIDV4
},
patch: {
type: DataTypes.TEXT
},
lastContent: {
type: DataTypes.TEXT
},
content: {
type: DataTypes.TEXT
},
length: {
type: DataTypes.INTEGER
},
authorship: {
type: DataTypes.TEXT
}
}, {
classMethods: {
associate: function (models) {
2016-07-30 11:15:44 +08:00
Revision.belongsTo(models.Note, {
foreignKey: "noteId",
as: "note",
constraints: false
});
},
getNoteRevisions: function (note, callback) {
Revision.findAll({
where: {
noteId: note.id
},
order: '"createdAt" DESC'
}).then(function (revisions) {
var data = [];
for (var i = 0, l = revisions.length; i < l; i++) {
var revision = revisions[i];
data.push({
time: moment(revision.createdAt).valueOf(),
length: revision.length
});
}
callback(null, data);
}).catch(function (err) {
callback(err, null);
});
},
getPatchedNoteRevisionByTime: function (note, time, callback) {
// find all revisions to prepare for all possible calculation
Revision.findAll({
where: {
noteId: note.id
},
order: '"createdAt" DESC'
}).then(function (revisions) {
if (revisions.length <= 0) return callback(null, null);
// measure target revision position
Revision.count({
where: {
noteId: note.id,
createdAt: {
$gte: time
}
},
order: '"createdAt" DESC'
}).then(function (count) {
if (count <= 0) return callback(null, null);
sendDmpWorker({
msg: 'get revision',
revisions: revisions,
count: count
}, callback);
}).catch(function (err) {
return callback(err, null);
});
}).catch(function (err) {
return callback(err, null);
});
},
checkAllNotesRevision: function (callback) {
Revision.saveAllNotesRevision(function (err, notes) {
if (err) return callback(err, null);
2016-09-18 16:50:20 +08:00
if (!notes || notes.length <= 0) {
return callback(null, notes);
} else {
Revision.checkAllNotesRevision(callback);
}
});
},
saveAllNotesRevision: function (callback) {
sequelize.models.Note.findAll({
2016-09-18 16:50:20 +08:00
// query all notes that need to save for revision
where: {
$and: [
{
lastchangeAt: {
$or: {
$eq: null,
$and: {
$ne: null,
$gt: sequelize.col('createdAt')
}
}
}
},
{
savedAt: {
$or: {
$eq: null,
$lt: sequelize.col('lastchangeAt')
}
}
}
]
}
}).then(function (notes) {
if (notes.length <= 0) return callback(null, notes);
2016-09-18 16:50:20 +08:00
var savedNotes = [];
async.each(notes, function (note, _callback) {
2016-09-18 16:50:20 +08:00
// revision saving policy: note not been modified for 5 mins or not save for 10 mins
if (note.lastchangeAt && note.savedAt) {
var lastchangeAt = moment(note.lastchangeAt);
var savedAt = moment(note.savedAt);
2016-10-15 13:54:16 +08:00
if (moment().isAfter(lastchangeAt.add(5, 'minutes'))) {
2016-09-18 16:50:20 +08:00
savedNotes.push(note);
Revision.saveNoteRevision(note, _callback);
2016-10-15 13:54:16 +08:00
} else if (lastchangeAt.isAfter(savedAt.add(10, 'minutes'))) {
2016-09-18 16:50:20 +08:00
savedNotes.push(note);
Revision.saveNoteRevision(note, _callback);
} else {
return _callback(null, null);
}
} else {
savedNotes.push(note);
Revision.saveNoteRevision(note, _callback);
}
}, function (err) {
if (err) return callback(err, null);
2016-09-18 16:50:20 +08:00
// return null when no notes need saving at this moment but have delayed tasks to be done
var result = ((savedNotes.length == 0) && (notes.length > savedNotes.length)) ? null : savedNotes;
return callback(null, result);
});
}).catch(function (err) {
return callback(err, null);
});
},
saveNoteRevision: function (note, callback) {
Revision.findAll({
where: {
noteId: note.id
},
order: '"createdAt" DESC'
}).then(function (revisions) {
if (revisions.length <= 0) {
// if no revision available
Revision.create({
noteId: note.id,
lastContent: note.content,
length: LZString.decompressFromBase64(note.content).length,
authorship: note.authorship
}).then(function (revision) {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
} else {
var latestRevision = revisions[0];
var lastContent = LZString.decompressFromBase64(latestRevision.content || latestRevision.lastContent);
var content = LZString.decompressFromBase64(note.content);
sendDmpWorker({
msg: 'create patch',
lastDoc: lastContent,
currDoc: content,
}, function (err, patch) {
if (err) logger.error('save note revision error', err);
if (!patch) {
// if patch is empty (means no difference) then just update the latest revision updated time
latestRevision.changed('updatedAt', true);
latestRevision.update({
updatedAt: Date.now()
}).then(function (revision) {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
} else {
Revision.create({
noteId: note.id,
patch: LZString.compressToBase64(patch),
content: note.content,
length: LZString.decompressFromBase64(note.content).length,
authorship: note.authorship
}).then(function (revision) {
// clear last revision content to reduce db size
latestRevision.update({
content: null
}).then(function () {
Revision.finishSaveNoteRevision(note, revision, callback);
}).catch(function (err) {
return callback(err, null);
});
}).catch(function (err) {
return callback(err, null);
});
}
});
}
}).catch(function (err) {
return callback(err, null);
});
},
finishSaveNoteRevision: function (note, revision, callback) {
note.update({
savedAt: revision.updatedAt
}).then(function () {
return callback(null, revision);
}).catch(function (err) {
return callback(err, null);
});
}
}
});
return Revision;
};