diff --git a/lib/core/engine.js b/lib/core/engine.js index 35044e82..38d7a72c 100644 --- a/lib/core/engine.js +++ b/lib/core/engine.js @@ -91,7 +91,7 @@ class Engine { } processManagerService(_options) { - const ProcessManager = require('./processes/process_manager.js'); + const ProcessManager = require('./processes/processManager.js'); this.processManager = new ProcessManager({ events: this.events, logger: this.logger, @@ -99,6 +99,10 @@ class Engine { }); } + graphService(_options) { + this.registerModule('graph'); + } + pipelineService(_options) { const self = this; this.registerModule('pipeline', { diff --git a/lib/core/processes/processLauncher.js b/lib/core/processes/processLauncher.js index 88ad8f4c..7ee80fc5 100644 --- a/lib/core/processes/processLauncher.js +++ b/lib/core/processes/processLauncher.js @@ -42,6 +42,9 @@ class ProcessLauncher { _subscribeToMessages() { const self = this; this.process.on('message', (msg) => { + if (msg.error) { + self.logger.error(msg.error); + } if (msg.result === constants.process.log) { return self._handleLog(msg); } diff --git a/lib/core/processes/processLauncher.js~HEAD b/lib/core/processes/processLauncher.js~HEAD deleted file mode 100644 index 1bbf9153..00000000 --- a/lib/core/processes/processLauncher.js~HEAD +++ /dev/null @@ -1,221 +0,0 @@ -const child_process = require('child_process'); -const constants = require('../../constants'); -const path = require('path'); -const utils = require('../../utils/utils'); - -class ProcessLauncher { - - /** - * Constructor of ProcessLauncher. Forks the module and sets up the message handling - * @param {Object} options Options tp start the process - * * modulePath {String} Absolute path to the module to fork - * * logger {Object} Logger - * * events {Function} Events Emitter instance - * @return {ProcessLauncher} The ProcessLauncher instance - */ - constructor(options) { - this.name = path.basename(options.modulePath); - this.process = child_process.fork(options.modulePath); - this.logger = options.logger; - this.events = options.events; - this.silent = options.silent; - this.exitCallback = options.exitCallback; - - this.subscriptions = {}; - this._subscribeToMessages(); - } - - // Subscribes to messages from the child process and delegates to the right methods - _subscribeToMessages() { - const self = this; - this.process.on('message', (msg) => { - if (msg.error) { - self.logger.error(msg.error); - } - if (msg.result === constants.process.log) { - return self._handleLog(msg); - } - if (msg.event) { - return self._handleEvent(msg); - } - self._checkSubscriptions(msg); - }); - - this.process.on('exit', (code) => { - if (self.exitCallback) { - return self.exitCallback(code); - } - if (code) { - this.logger.info(`Child Process ${this.name} exited with code ${code}`); - } - }); - } - - // Translates logs from the child process to the logger - _handleLog(msg) { - if (this.silent && msg.type !== 'error') { - return; - } - if (this.logger[msg.type]) { - return this.logger[msg.type](utils.normalizeInput(msg.message)); - } - this.logger.debug(utils.normalizeInput(msg.message)); - } - - // Handle event calls from the child process - _handleEvent(msg) { - const self = this; - if (!self.events[msg.event]) { - self.logger.warn('Unknown event method called: ' + msg.event); - return; - } - if (!msg.args || !Array.isArray(msg.args)) { - msg.args = []; - } - // Add callback in the args - msg.args.push((result) => { - self.process.send({ - event: constants.process.events.response, - result, - eventId: msg.eventId - }); - }); - self.events[msg.event](msg.requestName, ...msg.args); - } - - // Looks at the subscriptions to see if there is a callback to call - _checkSubscriptions(msg) { - const messageKeys = Object.keys(msg); - const subscriptionsKeys = Object.keys(this.subscriptions); - let subscriptionsForKey; - let messageKey; - // Find if the message contains a key that we are subscribed to - messageKeys.some(_messageKey => { - return subscriptionsKeys.some(subscriptionKey => { - if (_messageKey === subscriptionKey) { - subscriptionsForKey = this.subscriptions[subscriptionKey]; - messageKey = _messageKey; - return true; - } - return false; - }); - }); - - if (subscriptionsForKey) { - // Find if we are subscribed to one of the values - let subsIndex = []; - const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => { - if (msg[messageKey] === sub.value) { - subsIndex.push(index); - return true; - } - return false; - }); - - if (subscriptionsForValue.length) { - // We are subscribed to that message, call the callback - subscriptionsForValue.forEach((subscription, index) => { - subscription.callback(msg); - - if (subscription.once) { - // Called only once, we can remove it - subscription = null; - this.subscriptions[messageKey].splice(subsIndex[index], 1); - } - }); - } - } - } - - /** - * Subscribe to a message using a key-value pair - * @param {String} key Message key to subscribe to - * @param {String} value Value that the above key must have for the callback to be called - * @param {Function} callback callback(response) - * @return {void} - */ - on(key, value, callback) { - if (this.subscriptions[key]) { - this.subscriptions[key].push({value, callback}); - return; - } - this.subscriptions[key] = [{value, callback}]; - } - - /** - * Same as .on, but only triggers once - * @param {String} key Message key to subscribe to - * @param {String} value Value that the above key must have for the callback to be called - * @param {Function} callback callback(response) - * @return {void} - */ - once(key, value, callback) { - const obj = {value, callback, once: true}; - if (this.subscriptions[key]) { - this.subscriptions[key].push(obj); - return; - } - this.subscriptions[key] = [obj]; - } - - /** - * Unsubscribes from a previously subscribed key-value pair (or key if no value) - * @param {String} key Message key to unsubscribe - * @param {String} value [Optional] Value of the key to unsubscribe - * If there is no value, unsubscribes from all the values of that key - * @return {void} - */ - unsubscribeTo(key, value) { - if (!value) { - this.subscriptions[key] = []; - } - if (this.subscriptions[key]) { - this.subscriptions[key].filter((val, index) => { - if (val.value === value) { - this.subscriptions[key].splice(index, 1); - } - }); - } - } - - /** - * Unsubscribes from all subscriptions - * @return {void} - */ - unsubscribeToAll() { - this.subscriptions = {}; - } - - /** - * Sends a message to the child process. Same as ChildProcess.send() - * @params {Object} message Message to send - * For other parameters, see: - * https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback - * @return {void} - */ - send() { - if (!this.process.connected) { - return false; - } - return this.process.send(...arguments); - } - - /** - * Disconnects the child process. It will exit on its own - * @return {void} - */ - disconnect() { - this.process.disconnect(); - } - - /** - * Kills the child process - * https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal - * @return {void} - */ - kill() { - this.process.kill(...arguments); - } -} - -module.exports = ProcessLauncher; diff --git a/lib/core/processes/processWrapper.js~HEAD b/lib/core/processes/processWrapper.js~HEAD deleted file mode 100644 index 3b934a10..00000000 --- a/lib/core/processes/processWrapper.js~HEAD +++ /dev/null @@ -1,93 +0,0 @@ -process.on('uncaughtException', function(e){ - process.send({error: e.stack}); -}); - -const constants = require('../../constants'); -const Events = require('./eventsWrapper'); - -// Set PWD to CWD since Windows doesn't have a value for PWD -if (!process.env.PWD) { - process.env.PWD = process.cwd(); -} - -class ProcessWrapper { - - /** - * Class from which process extend. Should not be instantiated alone. - * Manages the log interception so that all console.* get sent back to the parent process - * Also creates an Events instance. To use it, just do `this.events.[on|request]` - * - * @param {Options} _options Nothing for now - */ - constructor(_options) { - this.interceptLogs(); - this.events = new Events(); - - this.pingParent(); - } - - // Ping parent to see if it is still alive. Otherwise, let's die - pingParent() { - const self = this; - self.retries = 0; - function error() { - if (self.retries > 2) { - self.kill(); - process.exit(); - } - self.retries++; - } - setInterval(() => { - try { - let result = self.send({action: 'ping'}); - if (!result) { - return error(); - } - self.retries = 0; - } catch (e) { - error(); - } - }, 500); - } - - interceptLogs() { - const context = {}; - context.console = console; - - context.console.log = this._log.bind(this, 'log'); - context.console.warn = this._log.bind(this, 'warn'); - context.console.error = this._log.bind(this, 'error'); - context.console.info = this._log.bind(this, 'info'); - context.console.debug = this._log.bind(this, 'debug'); - context.console.trace = this._log.bind(this, 'trace'); - context.console.dir = this._log.bind(this, 'dir'); - } - - _log(type, ...messages) { - const isHardSource = messages.some(message => { - return (typeof message === 'string' && message.indexOf('hardsource') > -1); - }); - if (isHardSource) { - return; - } - this.send({result: constants.process.log, message: messages, type}); - } - - send() { - if (!process.connected) { - return false; - } - return process.send(...arguments); - } - - kill() { - // Should be implemented by derived class - console.log('Process killed'); - } -} - -process.on('exit', () => { - process.exit(0); -}); - -module.exports = ProcessWrapper;