fixes due to bad rebase
This commit is contained in:
parent
2e3102444d
commit
c4ea6abc4b
|
@ -91,7 +91,7 @@ class Engine {
|
|||
}
|
||||
|
||||
processManagerService(_options) {
|
||||
const ProcessManager = require('./processes/process_manager.js');
|
||||
const ProcessManager = require('./processes/processManager.js');
|
||||
this.processManager = new ProcessManager({
|
||||
events: this.events,
|
||||
logger: this.logger,
|
||||
|
@ -99,6 +99,10 @@ class Engine {
|
|||
});
|
||||
}
|
||||
|
||||
graphService(_options) {
|
||||
this.registerModule('graph');
|
||||
}
|
||||
|
||||
pipelineService(_options) {
|
||||
const self = this;
|
||||
this.registerModule('pipeline', {
|
||||
|
|
|
@ -42,6 +42,9 @@ class ProcessLauncher {
|
|||
_subscribeToMessages() {
|
||||
const self = this;
|
||||
this.process.on('message', (msg) => {
|
||||
if (msg.error) {
|
||||
self.logger.error(msg.error);
|
||||
}
|
||||
if (msg.result === constants.process.log) {
|
||||
return self._handleLog(msg);
|
||||
}
|
||||
|
|
|
@ -1,221 +0,0 @@
|
|||
const child_process = require('child_process');
|
||||
const constants = require('../../constants');
|
||||
const path = require('path');
|
||||
const utils = require('../../utils/utils');
|
||||
|
||||
class ProcessLauncher {
|
||||
|
||||
/**
|
||||
* Constructor of ProcessLauncher. Forks the module and sets up the message handling
|
||||
* @param {Object} options Options tp start the process
|
||||
* * modulePath {String} Absolute path to the module to fork
|
||||
* * logger {Object} Logger
|
||||
* * events {Function} Events Emitter instance
|
||||
* @return {ProcessLauncher} The ProcessLauncher instance
|
||||
*/
|
||||
constructor(options) {
|
||||
this.name = path.basename(options.modulePath);
|
||||
this.process = child_process.fork(options.modulePath);
|
||||
this.logger = options.logger;
|
||||
this.events = options.events;
|
||||
this.silent = options.silent;
|
||||
this.exitCallback = options.exitCallback;
|
||||
|
||||
this.subscriptions = {};
|
||||
this._subscribeToMessages();
|
||||
}
|
||||
|
||||
// Subscribes to messages from the child process and delegates to the right methods
|
||||
_subscribeToMessages() {
|
||||
const self = this;
|
||||
this.process.on('message', (msg) => {
|
||||
if (msg.error) {
|
||||
self.logger.error(msg.error);
|
||||
}
|
||||
if (msg.result === constants.process.log) {
|
||||
return self._handleLog(msg);
|
||||
}
|
||||
if (msg.event) {
|
||||
return self._handleEvent(msg);
|
||||
}
|
||||
self._checkSubscriptions(msg);
|
||||
});
|
||||
|
||||
this.process.on('exit', (code) => {
|
||||
if (self.exitCallback) {
|
||||
return self.exitCallback(code);
|
||||
}
|
||||
if (code) {
|
||||
this.logger.info(`Child Process ${this.name} exited with code ${code}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Translates logs from the child process to the logger
|
||||
_handleLog(msg) {
|
||||
if (this.silent && msg.type !== 'error') {
|
||||
return;
|
||||
}
|
||||
if (this.logger[msg.type]) {
|
||||
return this.logger[msg.type](utils.normalizeInput(msg.message));
|
||||
}
|
||||
this.logger.debug(utils.normalizeInput(msg.message));
|
||||
}
|
||||
|
||||
// Handle event calls from the child process
|
||||
_handleEvent(msg) {
|
||||
const self = this;
|
||||
if (!self.events[msg.event]) {
|
||||
self.logger.warn('Unknown event method called: ' + msg.event);
|
||||
return;
|
||||
}
|
||||
if (!msg.args || !Array.isArray(msg.args)) {
|
||||
msg.args = [];
|
||||
}
|
||||
// Add callback in the args
|
||||
msg.args.push((result) => {
|
||||
self.process.send({
|
||||
event: constants.process.events.response,
|
||||
result,
|
||||
eventId: msg.eventId
|
||||
});
|
||||
});
|
||||
self.events[msg.event](msg.requestName, ...msg.args);
|
||||
}
|
||||
|
||||
// Looks at the subscriptions to see if there is a callback to call
|
||||
_checkSubscriptions(msg) {
|
||||
const messageKeys = Object.keys(msg);
|
||||
const subscriptionsKeys = Object.keys(this.subscriptions);
|
||||
let subscriptionsForKey;
|
||||
let messageKey;
|
||||
// Find if the message contains a key that we are subscribed to
|
||||
messageKeys.some(_messageKey => {
|
||||
return subscriptionsKeys.some(subscriptionKey => {
|
||||
if (_messageKey === subscriptionKey) {
|
||||
subscriptionsForKey = this.subscriptions[subscriptionKey];
|
||||
messageKey = _messageKey;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
});
|
||||
|
||||
if (subscriptionsForKey) {
|
||||
// Find if we are subscribed to one of the values
|
||||
let subsIndex = [];
|
||||
const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => {
|
||||
if (msg[messageKey] === sub.value) {
|
||||
subsIndex.push(index);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
|
||||
if (subscriptionsForValue.length) {
|
||||
// We are subscribed to that message, call the callback
|
||||
subscriptionsForValue.forEach((subscription, index) => {
|
||||
subscription.callback(msg);
|
||||
|
||||
if (subscription.once) {
|
||||
// Called only once, we can remove it
|
||||
subscription = null;
|
||||
this.subscriptions[messageKey].splice(subsIndex[index], 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a message using a key-value pair
|
||||
* @param {String} key Message key to subscribe to
|
||||
* @param {String} value Value that the above key must have for the callback to be called
|
||||
* @param {Function} callback callback(response)
|
||||
* @return {void}
|
||||
*/
|
||||
on(key, value, callback) {
|
||||
if (this.subscriptions[key]) {
|
||||
this.subscriptions[key].push({value, callback});
|
||||
return;
|
||||
}
|
||||
this.subscriptions[key] = [{value, callback}];
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as .on, but only triggers once
|
||||
* @param {String} key Message key to subscribe to
|
||||
* @param {String} value Value that the above key must have for the callback to be called
|
||||
* @param {Function} callback callback(response)
|
||||
* @return {void}
|
||||
*/
|
||||
once(key, value, callback) {
|
||||
const obj = {value, callback, once: true};
|
||||
if (this.subscriptions[key]) {
|
||||
this.subscriptions[key].push(obj);
|
||||
return;
|
||||
}
|
||||
this.subscriptions[key] = [obj];
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from a previously subscribed key-value pair (or key if no value)
|
||||
* @param {String} key Message key to unsubscribe
|
||||
* @param {String} value [Optional] Value of the key to unsubscribe
|
||||
* If there is no value, unsubscribes from all the values of that key
|
||||
* @return {void}
|
||||
*/
|
||||
unsubscribeTo(key, value) {
|
||||
if (!value) {
|
||||
this.subscriptions[key] = [];
|
||||
}
|
||||
if (this.subscriptions[key]) {
|
||||
this.subscriptions[key].filter((val, index) => {
|
||||
if (val.value === value) {
|
||||
this.subscriptions[key].splice(index, 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribes from all subscriptions
|
||||
* @return {void}
|
||||
*/
|
||||
unsubscribeToAll() {
|
||||
this.subscriptions = {};
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message to the child process. Same as ChildProcess.send()
|
||||
* @params {Object} message Message to send
|
||||
* For other parameters, see:
|
||||
* https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
|
||||
* @return {void}
|
||||
*/
|
||||
send() {
|
||||
if (!this.process.connected) {
|
||||
return false;
|
||||
}
|
||||
return this.process.send(...arguments);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects the child process. It will exit on its own
|
||||
* @return {void}
|
||||
*/
|
||||
disconnect() {
|
||||
this.process.disconnect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Kills the child process
|
||||
* https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal
|
||||
* @return {void}
|
||||
*/
|
||||
kill() {
|
||||
this.process.kill(...arguments);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = ProcessLauncher;
|
|
@ -1,93 +0,0 @@
|
|||
process.on('uncaughtException', function(e){
|
||||
process.send({error: e.stack});
|
||||
});
|
||||
|
||||
const constants = require('../../constants');
|
||||
const Events = require('./eventsWrapper');
|
||||
|
||||
// Set PWD to CWD since Windows doesn't have a value for PWD
|
||||
if (!process.env.PWD) {
|
||||
process.env.PWD = process.cwd();
|
||||
}
|
||||
|
||||
class ProcessWrapper {
|
||||
|
||||
/**
|
||||
* Class from which process extend. Should not be instantiated alone.
|
||||
* Manages the log interception so that all console.* get sent back to the parent process
|
||||
* Also creates an Events instance. To use it, just do `this.events.[on|request]`
|
||||
*
|
||||
* @param {Options} _options Nothing for now
|
||||
*/
|
||||
constructor(_options) {
|
||||
this.interceptLogs();
|
||||
this.events = new Events();
|
||||
|
||||
this.pingParent();
|
||||
}
|
||||
|
||||
// Ping parent to see if it is still alive. Otherwise, let's die
|
||||
pingParent() {
|
||||
const self = this;
|
||||
self.retries = 0;
|
||||
function error() {
|
||||
if (self.retries > 2) {
|
||||
self.kill();
|
||||
process.exit();
|
||||
}
|
||||
self.retries++;
|
||||
}
|
||||
setInterval(() => {
|
||||
try {
|
||||
let result = self.send({action: 'ping'});
|
||||
if (!result) {
|
||||
return error();
|
||||
}
|
||||
self.retries = 0;
|
||||
} catch (e) {
|
||||
error();
|
||||
}
|
||||
}, 500);
|
||||
}
|
||||
|
||||
interceptLogs() {
|
||||
const context = {};
|
||||
context.console = console;
|
||||
|
||||
context.console.log = this._log.bind(this, 'log');
|
||||
context.console.warn = this._log.bind(this, 'warn');
|
||||
context.console.error = this._log.bind(this, 'error');
|
||||
context.console.info = this._log.bind(this, 'info');
|
||||
context.console.debug = this._log.bind(this, 'debug');
|
||||
context.console.trace = this._log.bind(this, 'trace');
|
||||
context.console.dir = this._log.bind(this, 'dir');
|
||||
}
|
||||
|
||||
_log(type, ...messages) {
|
||||
const isHardSource = messages.some(message => {
|
||||
return (typeof message === 'string' && message.indexOf('hardsource') > -1);
|
||||
});
|
||||
if (isHardSource) {
|
||||
return;
|
||||
}
|
||||
this.send({result: constants.process.log, message: messages, type});
|
||||
}
|
||||
|
||||
send() {
|
||||
if (!process.connected) {
|
||||
return false;
|
||||
}
|
||||
return process.send(...arguments);
|
||||
}
|
||||
|
||||
kill() {
|
||||
// Should be implemented by derived class
|
||||
console.log('Process killed');
|
||||
}
|
||||
}
|
||||
|
||||
process.on('exit', () => {
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
module.exports = ProcessWrapper;
|
Loading…
Reference in New Issue