const child_process = require('child_process'); const constants = require('../../constants'); const path = require('path'); const utils = require('../../utils/utils'); let processCount = 1; class ProcessLauncher { /** * Constructor of ProcessLauncher. Forks the module and sets up the message handling * @param {Object} options Options tp start the process * * modulePath {String} Absolute path to the module to fork * * logger {Object} Logger * * events {Function} Events Emitter instance * @return {ProcessLauncher} The ProcessLauncher instance */ constructor(options) { this.name = options.name || path.basename(options.modulePath); if (this._isDebug()) { const childOptions = {stdio: 'pipe', execArgv: ['--inspect-brk=' + (60000 + processCount)]}; processCount++; this.process = child_process.fork(options.modulePath, [], childOptions); } else { this.process = child_process.fork(options.modulePath); } this.logger = options.logger; this.events = options.events; this.silent = options.silent; this.exitCallback = options.exitCallback; this.embark = options.embark; this.logs = []; this.subscriptions = {}; this._subscribeToMessages(); if (this.embark) { this._registerAsPlugin(); } } _isDebug() { const argvString= process.execArgv.join(); return argvString.includes('--debug') || argvString.includes('--inspect'); } // Subscribes to messages from the child process and delegates to the right methods _subscribeToMessages() { const self = this; this.process.on('message', (msg) => { if (msg.error) { self.logger.error(msg.error); } if (msg.result === constants.process.log) { return self._handleLog(msg); } if (msg.event) { return self._handleEvent(msg); } self._checkSubscriptions(msg); }); this.process.on('exit', (code) => { if (self.exitCallback) { return self.exitCallback(code); } if (code) { this.logger.info(`Child Process ${this.name} exited with code ${code}`); } }); } _registerAsPlugin() { const self = this; const apiRoute = '/embark-api/process-logs/' + self.name; self.embark.registerAPICall( 'ws', apiRoute, (ws, _req) => { self.events.on('process-log-' + self.name, function(id, log) { ws.send(JSON.stringify(Object.assign(log, {id})), () => {}); }); } ); self.embark.registerAPICall( 'get', apiRoute, (req, res) => { const result = self.logs.map((log, id) => Object.assign(log, {id})).slice(-50); res.send(JSON.stringify(result)); } ); } // Translates logs from the child process to the logger _handleLog(msg) { // Sometimes messages come in with line breaks, so we need to break them up accordingly. let processedMessages = []; // Ensure that `msg.message` is an array, so we process this consistently. Sometimes it // is an Array, sometimes it is a string. if(typeof msg.message === 'string') { processedMessages = [msg.message]; } else { msg.message.forEach(message => { let lines = message.split("\n"); lines.forEach(line => processedMessages.push(line)); }); } const timestamp = new Date().getTime(); processedMessages.forEach((message) => { const log = { msg: message, msg_clear: message.stripColors, logLevel: msg.logLevel, name: this.name, timestamp }; const id = this.logs.push(log) - 1; this.events.emit(`process-log-${this.name}`, id, log); if (this.silent && msg.type !== 'error') { return; } if (this.logger[msg.type]) { return this.logger[msg.type](utils.normalizeInput(message)); } this.logger.debug(utils.normalizeInput(message)); }); } // Handle event calls from the child process _handleEvent(msg) { const self = this; if (!self.events[msg.event]) { self.logger.warn('Unknown event method called: ' + msg.event); return; } if (!msg.args || !Array.isArray(msg.args)) { msg.args = []; } // Add callback in the args msg.args.push((result) => { self.process.send({ event: constants.process.events.response, result, eventId: msg.eventId }); }); self.events[msg.event](msg.requestName, ...msg.args); } // Looks at the subscriptions to see if there is a callback to call _checkSubscriptions(msg) { const messageKeys = Object.keys(msg); const subscriptionsKeys = Object.keys(this.subscriptions); let subscriptionsForKey; let messageKey; // Find if the message contains a key that we are subscribed to messageKeys.some(_messageKey => { return subscriptionsKeys.some(subscriptionKey => { if (_messageKey === subscriptionKey) { subscriptionsForKey = this.subscriptions[subscriptionKey]; messageKey = _messageKey; return true; } return false; }); }); if (subscriptionsForKey) { // Find if we are subscribed to one of the values let subsIndex = []; const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => { if (msg[messageKey] === sub.value) { subsIndex.push(index); return true; } return false; }); if (subscriptionsForValue.length) { // We are subscribed to that message, call the callback subscriptionsForValue.forEach((subscription, index) => { subscription.callback(msg); if (subscription.once) { // Called only once, we can remove it subscription = null; this.subscriptions[messageKey].splice(subsIndex[index], 1); } }); } } } /** * Subscribe to a message using a key-value pair * @param {String} key Message key to subscribe to * @param {String} value Value that the above key must have for the callback to be called * @param {Function} callback callback(response) * @return {void} */ on(key, value, callback) { if (this.subscriptions[key]) { this.subscriptions[key].push({value, callback}); return; } this.subscriptions[key] = [{value, callback}]; } /** * Same as .on, but only triggers once * @param {String} key Message key to subscribe to * @param {String} value Value that the above key must have for the callback to be called * @param {Function} callback callback(response) * @return {void} */ once(key, value, callback) { const obj = {value, callback, once: true}; if (this.subscriptions[key]) { this.subscriptions[key].push(obj); return; } this.subscriptions[key] = [obj]; } /** * Unsubscribes from a previously subscribed key-value pair (or key if no value) * @param {String} key Message key to unsubscribe * @param {String} value [Optional] Value of the key to unsubscribe * If there is no value, unsubscribes from all the values of that key * @return {void} */ unsubscribeTo(key, value) { if (!value) { this.subscriptions[key] = []; } if (this.subscriptions[key]) { this.subscriptions[key].filter((val, index) => { if (val.value === value) { this.subscriptions[key].splice(index, 1); } }); } } /** * Unsubscribes from all subscriptions * @return {void} */ unsubscribeToAll() { this.subscriptions = {}; } /** * Sends a message to the child process. Same as ChildProcess.send() * @params {Object} message Message to send * For other parameters, see: * https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback * @return {void} */ send() { if (!this.process.connected) { return false; } return this.process.send(...arguments); } /** * Disconnects the child process. It will exit on its own * @return {void} */ disconnect() { this.process.disconnect(); } /** * Kills the child process * https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal * @return {void} */ kill() { this.process.kill(...arguments); } } module.exports = ProcessLauncher;