diff --git a/lib/processes/eventsWrapper.js b/lib/processes/eventsWrapper.js new file mode 100644 index 000000000..de1021585 --- /dev/null +++ b/lib/processes/eventsWrapper.js @@ -0,0 +1,55 @@ +const uuid = require('uuid/v1'); +const constants = require('../constants'); + +class Events { + + /** + * Constructs an event wrapper for processes. + * Handles sending an event message to the parent process and waiting for its response + * No need to create an instance of eventsWrapper for your own process, just extend ProcessWrapper + * Then, you an use `this.events.[on|request]` with the usual parameters you would use + */ + constructor() { + this.subscribedEvents = {}; + this.listenToParentProcess(); + } + + listenToParentProcess() { + process.on('message', (msg) => { + if (!msg.event || msg.event !== constants.process.events.response) { + return; + } + if (!this.subscribedEvents[msg.eventId]) { + return; + } + this.subscribedEvents[msg.eventId](msg.result); + }); + } + + sendEvent() { + const eventType = arguments[0]; + const requestName = arguments[1]; + + let args = [].slice.call(arguments, 2); + const eventId = uuid(); + this.subscribedEvents[eventId] = args[args.length - 1]; + args = args.splice(0, args.length - 2); + + process.send({ + event: eventType, + requestName, + args, + eventId: eventId + }); + } + + on() { + this.sendEvent(constants.process.events.on, ...arguments); + } + + request() { + this.sendEvent(constants.process.events.request, ...arguments); + } +} + +module.exports = Events; diff --git a/lib/processes/processLauncher.js b/lib/processes/processLauncher.js new file mode 100644 index 000000000..9c2c1b1d1 --- /dev/null +++ b/lib/processes/processLauncher.js @@ -0,0 +1,218 @@ +const child_process = require('child_process'); +const constants = require('../constants'); +const path = require('path'); +const utils = require('../utils/utils'); + +class ProcessLauncher { + + /** + * Constructor of ProcessLauncher. Forks the module and sets up the message handling + * @param {Object} options Options tp start the process + * * modulePath {String} Absolute path to the module to fork + * * logger {Object} Logger + * * events {Function} Events Emitter instance + * @return {ProcessLauncher} The ProcessLauncher instance + */ + constructor(options) { + this.name = path.basename(options.modulePath); + this.process = child_process.fork(options.modulePath); + this.logger = options.logger; + this.events = options.events; + this.silent = options.silent; + this.exitCallback = options.exitCallback; + + this.subscriptions = {}; + this._subscribeToMessages(); + } + + // Subscribes to messages from the child process and delegates to the right methods + _subscribeToMessages() { + const self = this; + this.process.on('message', (msg) => { + if (msg.result === constants.process.log) { + return self._handleLog(msg); + } + if (msg.event) { + return self._handleEvent(msg); + } + self._checkSubscriptions(msg); + }); + + this.process.on('exit', (code) => { + if (self.exitCallback) { + return self.exitCallback(code); + } + if (code) { + this.logger.info(`Child Process ${this.name} exited with code ${code}`); + } + }); + } + + // Translates logs from the child process to the logger + _handleLog(msg) { + if (this.silent && msg.type !== 'error') { + return; + } + if (this.logger[msg.type]) { + return this.logger[msg.type](utils.normalizeInput(msg.message)); + } + this.logger.debug(utils.normalizeInput(msg.message)); + } + + // Handle event calls from the child process + _handleEvent(msg) { + const self = this; + if (!self.events[msg.event]) { + self.logger.warn('Unknown event method called: ' + msg.event); + return; + } + if (!msg.args || !Array.isArray(msg.args)) { + msg.args = []; + } + // Add callback in the args + msg.args.push((result) => { + self.process.send({ + event: constants.process.events.response, + result, + eventId: msg.eventId + }); + }); + self.events[msg.event](msg.requestName, ...msg.args); + } + + // Looks at the subscriptions to see if there is a callback to call + _checkSubscriptions(msg) { + const messageKeys = Object.keys(msg); + const subscriptionsKeys = Object.keys(this.subscriptions); + let subscriptionsForKey; + let messageKey; + // Find if the message contains a key that we are subscribed to + messageKeys.some(_messageKey => { + return subscriptionsKeys.some(subscriptionKey => { + if (_messageKey === subscriptionKey) { + subscriptionsForKey = this.subscriptions[subscriptionKey]; + messageKey = _messageKey; + return true; + } + return false; + }); + }); + + if (subscriptionsForKey) { + // Find if we are subscribed to one of the values + let subsIndex = []; + const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => { + if (msg[messageKey] === sub.value) { + subsIndex.push(index); + return true; + } + return false; + }); + + if (subscriptionsForValue.length) { + // We are subscribed to that message, call the callback + subscriptionsForValue.forEach((subscription, index) => { + subscription.callback(msg); + + if (subscription.once) { + // Called only once, we can remove it + subscription = null; + this.subscriptions[messageKey].splice(subsIndex[index], 1); + } + }); + } + } + } + + /** + * Subscribe to a message using a key-value pair + * @param {String} key Message key to subscribe to + * @param {String} value Value that the above key must have for the callback to be called + * @param {Function} callback callback(response) + * @return {void} + */ + on(key, value, callback) { + if (this.subscriptions[key]) { + this.subscriptions[key].push({value, callback}); + return; + } + this.subscriptions[key] = [{value, callback}]; + } + + /** + * Same as .on, but only triggers once + * @param {String} key Message key to subscribe to + * @param {String} value Value that the above key must have for the callback to be called + * @param {Function} callback callback(response) + * @return {void} + */ + once(key, value, callback) { + const obj = {value, callback, once: true}; + if (this.subscriptions[key]) { + this.subscriptions[key].push(obj); + return; + } + this.subscriptions[key] = [obj]; + } + + /** + * Unsubscribes from a previously subscribed key-value pair (or key if no value) + * @param {String} key Message key to unsubscribe + * @param {String} value [Optional] Value of the key to unsubscribe + * If there is no value, unsubscribes from all the values of that key + * @return {void} + */ + unsubscribeTo(key, value) { + if (!value) { + this.subscriptions[key] = []; + } + if (this.subscriptions[key]) { + this.subscriptions[key].filter((val, index) => { + if (val.value === value) { + this.subscriptions[key].splice(index, 1); + } + }); + } + } + + /** + * Unsubscribes from all subscriptions + * @return {void} + */ + unsubscribeToAll() { + this.subscriptions = {}; + } + + /** + * Sends a message to the child process. Same as ChildProcess.send() + * @params {Object} message Message to send + * For other parameters, see: + * https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback + * @return {void} + */ + send() { + if (!this.process.connected) { + return false; + } + return this.process.send(...arguments); + } + + /** + * Disconnects the child process. It will exit on its own + * @return {void} + */ + disconnect() { + this.process.disconnect(); + } + + /** + * Kills the child process + * https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal + * @return {void} + */ + kill() { + this.process.kill(...arguments); + } +} + +module.exports = ProcessLauncher; diff --git a/lib/processes/processWrapper.js b/lib/processes/processWrapper.js new file mode 100644 index 000000000..30108fe45 --- /dev/null +++ b/lib/processes/processWrapper.js @@ -0,0 +1,93 @@ +const constants = require('../constants'); +const Events = require('./eventsWrapper'); + +// Override process.chdir so that we have a partial-implementation PWD for Windows +const realChdir = process.chdir; +process.chdir = (...args) => { + if (!process.env.PWD) { + process.env.PWD = process.cwd(); + } + realChdir(...args); +}; + +class ProcessWrapper { + + /** + * Class from which process extend. Should not be instantiated alone. + * Manages the log interception so that all console.* get sent back to the parent process + * Also creates an Events instance. To use it, just do `this.events.[on|request]` + * + * @param {Options} _options Nothing for now + */ + constructor(_options) { + this.interceptLogs(); + this.events = new Events(); + + this.pingParent(); + } + + // Ping parent to see if it is still alive. Otherwise, let's die + pingParent() { + const self = this; + self.retries = 0; + function error() { + if (self.retries > 2) { + self.kill(); + process.exit(); + } + self.retries++; + } + setInterval(() => { + try { + let result = self.send({action: 'ping'}); + if (!result) { + return error(); + } + self.retries = 0; + } catch (e) { + error(); + } + }, 500); + } + + interceptLogs() { + const context = {}; + context.console = console; + + context.console.log = this._log.bind(this, 'log'); + context.console.warn = this._log.bind(this, 'warn'); + context.console.error = this._log.bind(this, 'error'); + context.console.info = this._log.bind(this, 'info'); + context.console.debug = this._log.bind(this, 'debug'); + context.console.trace = this._log.bind(this, 'trace'); + context.console.dir = this._log.bind(this, 'dir'); + } + + _log(type, ...messages) { + const isHardSource = messages.some(message => { + return (typeof message === 'string' && message.indexOf('hardsource') > -1); + }); + if (isHardSource) { + return; + } + this.send({result: constants.process.log, message: messages, type}); + } + + send() { + if (!process.connected) { + return false; + } + return process.send(...arguments); + } + + kill() { + // Should be implemented by derived class + console.log('Process killed'); + } +} + +process.on('exit', () => { + process.exit(0); +}); + +module.exports = ProcessWrapper;