Process logs API refactor

There are three separate instances of process log APIs: embark logs, blockchain logs (when in standalone mode), and child process logs (storage, communication, blockchain, etc). Each one was repeating the implementation of creating a process log API endpoint. This commit centralises the API declaration by using the class `ProcessLogsApi`.

`ProcessLogsApi` is started for all three components mentioned above: blockchain (in standalone) in the `BlockchainListener` module, embark in the `EmbarkListener` module, and for all child processes in the `ProcessLauncher`.

These listeners have two functions:
1. Create the process logs API endpoints for `get` and `ws`, and
2. Ensure that all logs are logged through the `LogHandler`, which normalises the output of the log and ensures each log has a timestamp and id (used in the cockpit for log ordering).

Also, this commit moved the pipeline in to a module, so that the `embark` object could be passed to the `ProcessLogsApi` (to be used for registering API endpoints).
This commit is contained in:
emizzle 2018-10-24 08:54:09 +03:00 committed by Pascal Precht
parent 3246b62151
commit f5c77b1416
No known key found for this signature in database
GPG Key ID: 0EE28D8D6FD85D7D
11 changed files with 164 additions and 122 deletions

View File

@ -131,7 +131,7 @@ class EmbarkController {
engine.startService("processManager");
engine.startService("coreProcess");
engine.startService("loggerApi");
engine.startService("embarkListener");
engine.startService("blockchainListener");
engine.startService("serviceMonitor");
engine.startService("libraryManager");

View File

@ -78,8 +78,9 @@ class Engine {
"codeCoverage": this.codeCoverageService,
"scaffolding": this.scaffoldingService,
"coreProcess": this.coreProcessService,
"loggerApi": this.loggerApiService,
"blockchainListener": this.blockchainListenerService
"processApi": this.processApiService,
"blockchainListener": this.blockchainListenerService,
"embarkListener": this.embarkListenerService
};
let service = services[serviceName];
@ -93,6 +94,10 @@ class Engine {
return service.apply(this, [options]);
}
embarkListenerService(_options){
this.registerModule('embark_listener');
}
blockchainListenerService(_options){
this.registerModule('blockchain_listener', {
ipc: this.ipc
@ -105,8 +110,8 @@ class Engine {
});
}
loggerApiService(_options){
this.registerModule('logger_api', {
processApiService(_options){
this.registerModule('process_api', {
logger: this.logger
});
}
@ -129,17 +134,10 @@ class Engine {
}
pipelineService(_options) {
const self = this;
this.registerModule('pipeline', {
webpackConfigName: this.webpackConfigName
});
this.events.on('code-generator-ready', function (modifiedAssets) {
self.events.request('code', function (abi, contractsJSON) {
self.events.request('pipeline:build', {abi, contractsJSON, modifiedAssets}, () => {
self.events.emit('outputDone');
});
});
});
this.registerModule('pipeline', {plugins: this.plugins, env: this.env, webpackConfigName: this.webpackConfigName});
const pipeline = new Pipeline({
this.events.on('code-generator-ready', function () {
pipeline.build(abi, contractsJSON, null, () => {
}
serviceMonitor() {

View File

@ -1,8 +1,7 @@
const child_process = require('child_process');
const constants = require('../../constants');
const path = require('path');
const utils = require('../../utils/utils');
const LogHandler = require('../../utils/logHandler');
const ProcessLogsApi = require('../../modules/process_logs_api');
let processCount = 1;
class ProcessLauncher {
@ -31,13 +30,10 @@ class ProcessLauncher {
this.exitCallback = options.exitCallback;
this.embark = options.embark;
this.logs = [];
this.logHandler = new LogHandler({events: this.events, logger: this.logger, processName: this.name, silent: this.silent});
this.processLogsApi = new ProcessLogsApi({embark: this.embark, processName: this.name, silent: this.silent});
this.subscriptions = {};
this._subscribeToMessages();
if (this.embark) {
this._registerAsPlugin();
}
}
_isDebug() {
@ -53,7 +49,7 @@ class ProcessLauncher {
self.logger.error(msg.error);
}
if (msg.result === constants.process.log) {
return self.logHandler.handleLog(msg);
return self.processLogsApi.logHandler.handleLog(msg);
}
if (msg.event) {
return self._handleEvent(msg);
@ -71,30 +67,6 @@ class ProcessLauncher {
});
}
_registerAsPlugin() {
const self = this;
const apiRoute = '/embark-api/process-logs/' + self.name;
self.embark.registerAPICall(
'ws',
apiRoute,
(ws, _req) => {
self.events.on('process-log-' + self.name, function(id, log) {
ws.send(JSON.stringify(Object.assign(log, {id})), () => {});
});
}
);
self.embark.registerAPICall(
'get',
apiRoute,
(req, res) => {
let limit = parseInt(req.query.limit, 10);
if(!Number.isInteger(limit)) limit = 0;
const result = self.logs.map((log, id) => Object.assign(log, {id})).slice(limit);
res.send(JSON.stringify(result));
}
);
}
// Handle event calls from the child process
_handleEvent(msg) {
const self = this;

View File

@ -1,54 +1,45 @@
const LogHandler = require('../../utils/logHandler');
const ProcessLogsApi = require('../../modules/process_logs_api');
const PROCESS_NAME = 'blockchain';
/**
* BlockchainListener has two functions:
* 1. Register API endpoints (HTTP GET and WS) to retrieve blockchain logs
* when in standalone mode (ie `embark blockchain`).
* 2. Listen to log events from the IPC connection (to `embark blockchain`)
* and ensure they are processed through the LogHandler.
*/
class BlockchainListener {
constructor(embark, options) {
/**
* @param {Plugin} embark Embark module plugin object
* @param {Object} options Options object containing:
* - {Ipc} ipc IPC started by embark (in server role) used for communication
* with the standalone blockchain process.
*/
constructor(embark, {ipc}) {
this.embark = embark;
this.events = embark.events;
this.logger = embark.logger;
this.ipc = options.ipc;
this.logHandler = new LogHandler({events: this.events, logger: this.logger, processName: PROCESS_NAME, silent: true});
this.ipc = ipc;
this.processLogsApi = new ProcessLogsApi({embark: this.embark, processName: PROCESS_NAME, silent: true});
if (this.ipc.isServer()) {
this._listenToLogs();
this._listenToBlockchainLogs();
}
this._registerApi();
}
_listenToLogs() {
/**
* Listens to log events emitted by the standalone blockchain and ensures
* they are processed through the LogHandler.
*
* @return {void}
*/
_listenToBlockchainLogs() {
this.ipc.on('blockchain:log', ({logLevel, message}) => {
this.logHandler.handleLog({logLevel, message});
this.processLogsApi.logHandler.handleLog({logLevel, message});
});
}
_registerApi() {
// This route is overriden by `processLauncher` when the blockchain
// process is launched (ie when not in blockchain standalone mode).
// The order is determined by the order in which the engine starts
// it's services, with `blockchain_process` coming before `web3`.
const apiRoute = '/embark-api/process-logs/' + PROCESS_NAME;
this.embark.registerAPICall(
'ws',
apiRoute,
(ws, _req) => {
this.events.on('process-log-' + PROCESS_NAME, function (id, log) {
ws.send(JSON.stringify(Object.assign(log, {id})), () => {});
});
}
);
this.embark.registerAPICall(
'get',
apiRoute,
(req, res) => {
let limit = parseInt(req.query.limit, 10);
if (!Number.isInteger(limit)) limit = 0;
const result = this.logHandler.logs.map((log, id) => Object.assign(log, {id})).slice(limit);
res.send(JSON.stringify(result));
}
);
}
}
module.exports = BlockchainListener;

View File

@ -0,0 +1,37 @@
const ProcessLogsApi = require('../process_logs_api');
const EMBARK_PROCESS_NAME = 'embark';
/**
* EmbarkListener has two functions:
* 1. Register API endpoints (HTTP GET and WS) to retrieve embark logs.
* 2. Listen to log events in Embark and ensure they are processed
* through the LogHandler.
*/
class EmbarkListener {
/**
* @param {Plugin} embark EmbarkListener module plugin object
*/
constructor(embark) {
this.embark = embark;
this.events = embark.events;
this.logger = embark.logger;
this.processLogsApi = new ProcessLogsApi({embark: this.embark, processName: EMBARK_PROCESS_NAME, silent: false});
this._listenToEmbarkLogs();
}
/**
* Listens to log events emitted by the Embark application and ensures
* they are processed through the LogHandler.
*
* @return {void}
*/
_listenToEmbarkLogs() {
this.events.on("log", (logLevel, message) => {
this.processLogsApi.logHandler.handleLog({logLevel, message}, true);
});
}
}
module.exports = EmbarkListener;

View File

@ -1,22 +0,0 @@
class LoggerApi {
constructor(embark) {
this.embark = embark;
this.logger = embark.logger;
this.registerAPICalls();
}
registerAPICalls(){
this.embark.registerAPICall(
'get',
'/embark-api/process-logs/embark',
(req, res) => {
let limit = parseInt(req.query.limit, 10);
if(!Number.isInteger(limit)) limit = 0;
res.send(this.logger.parseLogFile(limit));
}
);
}
}
module.exports = LoggerApi;

View File

@ -8,6 +8,7 @@ const WebpackConfigReader = require('../pipeline/webpackConfigReader');
class Pipeline {
constructor(embark, options) {
this.embark = embark;
this.env = embark.config.env;
this.buildDir = embark.config.buildDir;
this.contractsFiles = embark.config.contractsFiles;
@ -194,6 +195,8 @@ class Pipeline {
});
let built = false;
const webpackProcess = new ProcessLauncher({
embark: self.embark,
plugins: self.plugins,
modulePath: utils.joinPath(__dirname, 'webpackProcess.js'),
logger: self.logger,
events: self.events,

View File

@ -0,0 +1,38 @@
const LogHandler = require('../../utils/logHandler');
class ProcessLogsApi {
constructor({embark, processName, silent}) {
this.embark = embark;
this.processName = processName;
this.logger = this.embark.logger;
this.events = this.embark.events;
this.logHandler = new LogHandler({events: this.events, logger: this.logger, processName: this.processName, silent});
this.registerAPICalls();
}
registerAPICalls() {
const apiRoute = '/embark-api/process-logs/' + this.processName;
this.embark.registerAPICall(
'ws',
apiRoute,
(ws, _req) => {
this.events.on('process-log-' + this.processName, function (log) {
ws.send(JSON.stringify(log), () => {});
});
}
);
this.embark.registerAPICall(
'get',
'/embark-api/process-logs/' + this.processName,
(req, res) => {
let limit = parseInt(req.query.limit, 10);
if (!Number.isInteger(limit)) limit = 0;
const result = this.logHandler.logs.map((log, id) => Object.assign(log, {id})).slice(limit);
res.send(JSON.stringify(result));
}
);
}
}
module.exports = ProcessLogsApi;

View File

@ -4,6 +4,7 @@ let SolcW = require('./solcW.js');
class Solidity {
constructor(embark, options) {
this.embark = embark;
this.logger = embark.logger;
this.events = embark.events;
this.ipc = options.ipc;
@ -68,7 +69,7 @@ class Solidity {
if (self.solcAlreadyLoaded) {
return callback();
}
self.solcW = new SolcW({logger: self.logger, events: self.events, ipc: self.ipc, useDashboard: self.useDashboard});
self.solcW = new SolcW(self.embark, {logger: self.logger, events: self.events, ipc: self.ipc, useDashboard: self.useDashboard});
self.logger.info(__("loading solc compiler") + "..");
self.solcW.load_compiler(function (err) {

View File

@ -6,7 +6,8 @@ const uuid = require('uuid/v1');
class SolcW {
constructor(options) {
constructor(embark, options) {
this.embark = embark;
this.logger = options.logger;
this.events = options.events;
this.ipc = options.ipc;
@ -40,6 +41,7 @@ class SolcW {
return done();
}
this.solcProcess = new ProcessLauncher({
embark: self.embark,
modulePath: utils.joinPath(__dirname, 'solcP.js'),
logger: self.logger,
events: self.events,

View File

@ -4,7 +4,20 @@ const utils = require('./utils');
// to prevent runaway memory leak
const MAX_LOGS = require('../constants').logs.maxLogLength;
/**
* Serves as a central point of log handling.
*/
class LogHandler {
/**
* @param {Object} options Options object containing:
* - {EventEmitter} events Embark events
* - {Logger} logger Embark logger
* - {String} processName Name of the process for which it's logs
* are being handled.
* - {Boolean} silent If true, does not log the message, unless
* it has a logLevel of 'error'.
*/
constructor({events, logger, processName, silent}) {
this.events = events;
this.logger = logger;
@ -12,33 +25,42 @@ class LogHandler {
this.silent = silent;
this.logs = [];
this.removedCount = 0;
this.id = 0;
}
/**
* Servers as an interception of logs, normalises the message output and
* adds metadata (timestamp, id) the data,
* stores the log in memory, then sends it to the logger for output. Max
* number of logs stored in memory is capped by MAX_LOGS
* Servers as an interception of logs, normalises the message output, adds
* metadata (timestamp, id), stores the log in memory, then sends it to the
* logger for output. Max number of logs stored in memory is capped by MAX_LOGS.
*
* @param {Object} msg Object containing the log message (msg.message)
* @param {Boolean} alreadyLogged (optional, default = false) If true, prevents
* the logger from logging the event. Generally used when the log has already
* been logged using the Logger (which emits a "log" event), and is then sent
* to `handleLog` for normalization. If allowed to log again, another event
* would be emitted, and an infinite loop would occur. Setting to true will
* prevent infinite looping.
*
* @returns {void}
*/
handleLog(msg) {
if(!msg) return;
handleLog(msg, alreadyLogged = false) {
if (!msg) return;
// Sometimes messages come in with line breaks, so we need to break them up accordingly.
let processedMessages = [];
// Ensure that `msg.message` is an array, so we process this consistently. Sometimes it
// is an Array, sometimes it is a string.
if(typeof msg.message === 'string') {
if (typeof msg.message === 'string') {
processedMessages = [msg.message];
} else {
} else if (Array.isArray(msg.message)) {
msg.message.forEach(message => {
if (Array.isArray(message)) message = message.join('\n');
let lines = message.split("\n");
lines.forEach(line => processedMessages.push(line));
});
} else if (typeof msg.message === 'object') {
processedMessages.push(JSON.stringify(msg.message));
}
const timestamp = new Date().getTime();
@ -49,15 +71,15 @@ class LogHandler {
msg_clear: message.stripColors,
logLevel: msg.logLevel,
name: this.processName,
timestamp
timestamp,
id: ++this.id
};
if(this.logs.length >= MAX_LOGS){
if (this.logs.length >= MAX_LOGS) {
this.logs.shift();
this.removedCount++;
}
const id = this.logs.push(log) - 1 + this.removedCount;
this.events.emit(`process-log-${this.processName}`, id, log);
if (this.silent && msg.type !== 'error') {
this.logs.push(log);
this.events.emit(`process-log-${this.processName}`, log);
if ((this.silent && msg.type !== 'error') || alreadyLogged) {
return;
}
if (this.logger[msg.type]) {