embark-area-51/lib/core/processes/processLauncher.js

287 lines
8.3 KiB
JavaScript
Raw Normal View History

2018-05-16 20:41:15 +00:00
const child_process = require('child_process');
2018-07-27 21:33:50 +00:00
const constants = require('../../constants');
const path = require('path');
2018-07-27 21:33:50 +00:00
const utils = require('../../utils/utils');
2018-05-16 15:49:38 +00:00
2018-08-23 14:29:39 +00:00
let processCount = 1;
2018-05-16 20:41:15 +00:00
class ProcessLauncher {
2018-05-16 15:49:38 +00:00
/**
* Constructor of ProcessLauncher. Forks the module and sets up the message handling
* @param {Object} options Options tp start the process
* * modulePath {String} Absolute path to the module to fork
* * logger {Object} Logger
* * events {Function} Events Emitter instance
* @return {ProcessLauncher} The ProcessLauncher instance
*/
2018-05-16 20:41:15 +00:00
constructor(options) {
this.name = options.name || path.basename(options.modulePath);
2018-08-23 14:29:39 +00:00
if (this._isDebug()) {
const childOptions = {stdio: 'pipe', execArgv: ['--inspect-brk=' + (60000 + processCount)]};
processCount++;
this.process = child_process.fork(options.modulePath, [], childOptions);
} else {
this.process = child_process.fork(options.modulePath);
}
2018-05-16 20:41:15 +00:00
this.logger = options.logger;
this.events = options.events;
this.silent = options.silent;
this.exitCallback = options.exitCallback;
2018-08-01 15:14:02 +00:00
this.embark = options.embark;
2018-08-02 16:48:41 +00:00
this.logs = [];
2018-05-16 20:41:15 +00:00
this.subscriptions = {};
this._subscribeToMessages();
2018-08-01 15:14:02 +00:00
if (this.embark) {
this._registerAsPlugin();
}
2018-05-16 20:41:15 +00:00
}
2018-08-23 14:29:39 +00:00
_isDebug() {
const argvString= process.execArgv.join();
return argvString.includes('--debug') || argvString.includes('--inspect');
}
2018-05-16 15:49:38 +00:00
// Subscribes to messages from the child process and delegates to the right methods
2018-05-16 20:41:15 +00:00
_subscribeToMessages() {
const self = this;
this.process.on('message', (msg) => {
2018-08-27 20:22:53 +00:00
if (msg.error) {
self.logger.error(msg.error);
2018-08-23 15:36:08 +00:00
}
2018-05-16 20:41:15 +00:00
if (msg.result === constants.process.log) {
2018-05-16 14:19:46 +00:00
return self._handleLog(msg);
}
if (msg.event) {
return self._handleEvent(msg);
2018-05-16 20:41:15 +00:00
}
self._checkSubscriptions(msg);
2018-05-16 14:19:46 +00:00
});
this.process.on('exit', (code) => {
if (self.exitCallback) {
return self.exitCallback(code);
}
if (code) {
this.logger.info(`Child Process ${this.name} exited with code ${code}`);
}
});
2018-05-16 14:19:46 +00:00
}
2018-05-16 20:41:15 +00:00
2018-08-01 15:14:02 +00:00
_registerAsPlugin() {
const self = this;
2018-08-02 16:48:41 +00:00
const apiRoute = '/embark-api/process-logs/' + self.name;
2018-08-01 15:14:02 +00:00
self.embark.registerAPICall(
'ws',
2018-08-02 16:48:41 +00:00
apiRoute,
2018-08-01 15:14:02 +00:00
(ws, _req) => {
2018-08-07 14:09:55 +00:00
self.events.on('process-log-' + self.name, function(logLevel, msg, name, timestamp) {
ws.send(JSON.stringify({msg, msg_clear: msg.stripColors, logLevel, name, timestamp}), () => {});
2018-08-01 15:14:02 +00:00
});
}
);
2018-08-02 16:48:41 +00:00
self.embark.registerAPICall(
'get',
apiRoute,
(req, res) => {
res.send(JSON.stringify(self.logs));
}
);
2018-08-01 15:14:02 +00:00
}
2018-05-16 15:49:38 +00:00
// Translates logs from the child process to the logger
2018-05-16 14:19:46 +00:00
_handleLog(msg) {
// Sometimes messages come in with line breaks, so we need to break them up accordingly.
let processedMessages = [];
// Ensure that `msg.message` is an array, so we process this consistently. Sometimes it
// is an Array, sometimes it is a string.
if(typeof msg.message === 'string') {
processedMessages = [msg.message];
} else {
msg.message.forEach((message) => {
let lines = message.split("\n");
lines.forEach((line) => { processedMessages.push(line); });
});
}
2018-08-07 14:09:55 +00:00
const timestamp = new Date().getTime();
processedMessages.forEach((message) => {
this.events.emit('process-log-' + this.name, msg.type, message, this.name, timestamp);
this.logs.push({
msg: message,
msg_clear: message.stripColors,
logLevel: msg.logLevel,
name: this.name,
timestamp
});
if (this.silent && msg.type !== 'error') {
return;
}
if (this.logger[msg.type]) {
return this.logger[msg.type](utils.normalizeInput(message));
}
this.logger.debug(utils.normalizeInput(message));
2018-08-27 20:22:53 +00:00
});
2018-05-16 14:19:46 +00:00
}
2018-05-16 15:49:38 +00:00
// Handle event calls from the child process
2018-05-16 14:19:46 +00:00
_handleEvent(msg) {
const self = this;
if (!self.events[msg.event]) {
self.logger.warn('Unknown event method called: ' + msg.event);
return;
}
if (!msg.args || !Array.isArray(msg.args)) {
msg.args = [];
}
// Add callback in the args
msg.args.push((result) => {
self.process.send({
event: constants.process.events.response,
result,
eventId: msg.eventId
});
});
self.events[msg.event](msg.requestName, ...msg.args);
2018-05-16 14:19:46 +00:00
}
2018-05-16 20:41:15 +00:00
2018-05-16 15:49:38 +00:00
// Looks at the subscriptions to see if there is a callback to call
2018-05-16 14:19:46 +00:00
_checkSubscriptions(msg) {
const messageKeys = Object.keys(msg);
const subscriptionsKeys = Object.keys(this.subscriptions);
2018-05-18 18:11:29 +00:00
let subscriptionsForKey;
2018-05-16 14:19:46 +00:00
let messageKey;
// Find if the message contains a key that we are subscribed to
messageKeys.some(_messageKey => {
return subscriptionsKeys.some(subscriptionKey => {
if (_messageKey === subscriptionKey) {
2018-05-18 18:11:29 +00:00
subscriptionsForKey = this.subscriptions[subscriptionKey];
2018-05-16 14:19:46 +00:00
messageKey = _messageKey;
return true;
}
return false;
});
});
2018-05-16 20:41:15 +00:00
2018-05-18 18:11:29 +00:00
if (subscriptionsForKey) {
2018-05-16 14:19:46 +00:00
// Find if we are subscribed to one of the values
2018-05-18 18:11:29 +00:00
let subsIndex = [];
const subscriptionsForValue = subscriptionsForKey.filter((sub, index) => {
2018-05-16 14:19:46 +00:00
if (msg[messageKey] === sub.value) {
2018-05-18 18:11:29 +00:00
subsIndex.push(index);
2018-05-16 14:19:46 +00:00
return true;
2018-05-16 20:41:15 +00:00
}
2018-05-16 14:19:46 +00:00
return false;
});
2018-05-18 18:11:29 +00:00
if (subscriptionsForValue.length) {
2018-05-16 14:19:46 +00:00
// We are subscribed to that message, call the callback
2018-05-18 18:11:29 +00:00
subscriptionsForValue.forEach((subscription, index) => {
subscription.callback(msg);
if (subscription.once) {
// Called only once, we can remove it
subscription = null;
this.subscriptions[messageKey].splice(subsIndex[index], 1);
}
});
2018-05-16 20:41:15 +00:00
}
2018-05-16 14:19:46 +00:00
}
2018-05-16 20:41:15 +00:00
}
2018-05-16 15:49:38 +00:00
/**
* Subscribe to a message using a key-value pair
* @param {String} key Message key to subscribe to
* @param {String} value Value that the above key must have for the callback to be called
* @param {Function} callback callback(response)
* @return {void}
*/
2018-05-18 18:11:29 +00:00
on(key, value, callback) {
2018-05-16 20:41:15 +00:00
if (this.subscriptions[key]) {
2018-05-16 18:06:34 +00:00
this.subscriptions[key].push({value, callback});
2018-05-16 20:41:15 +00:00
return;
}
this.subscriptions[key] = [{value, callback}];
}
2018-05-18 18:11:29 +00:00
/**
* Same as .on, but only triggers once
* @param {String} key Message key to subscribe to
* @param {String} value Value that the above key must have for the callback to be called
* @param {Function} callback callback(response)
* @return {void}
*/
once(key, value, callback) {
const obj = {value, callback, once: true};
if (this.subscriptions[key]) {
this.subscriptions[key].push(obj);
return;
}
this.subscriptions[key] = [obj];
}
2018-05-16 15:49:38 +00:00
/**
* Unsubscribes from a previously subscribed key-value pair (or key if no value)
* @param {String} key Message key to unsubscribe
* @param {String} value [Optional] Value of the key to unsubscribe
2018-08-27 20:22:53 +00:00
* If there is no value, unsubscribes from all the values of that key
2018-05-16 15:49:38 +00:00
* @return {void}
*/
2018-05-16 20:41:15 +00:00
unsubscribeTo(key, value) {
if (!value) {
this.subscriptions[key] = [];
}
if (this.subscriptions[key]) {
this.subscriptions[key].filter((val, index) => {
if (val.value === value) {
2018-05-16 18:06:34 +00:00
this.subscriptions[key].splice(index, 1);
2018-05-16 20:41:15 +00:00
}
});
}
}
2018-05-16 15:49:38 +00:00
/**
* Unsubscribes from all subscriptions
* @return {void}
*/
2018-05-16 18:06:34 +00:00
unsubscribeToAll() {
2018-05-16 20:41:15 +00:00
this.subscriptions = {};
}
2018-05-16 15:49:38 +00:00
/**
* Sends a message to the child process. Same as ChildProcess.send()
* @params {Object} message Message to send
* For other parameters, see:
* https://nodejs.org/api/child_process.html#child_process_subprocess_send_message_sendhandle_options_callback
* @return {void}
*/
2018-05-16 20:41:15 +00:00
send() {
if (!this.process.connected) {
return false;
}
return this.process.send(...arguments);
2018-05-16 20:41:15 +00:00
}
2018-05-16 15:49:38 +00:00
/**
* Disconnects the child process. It will exit on its own
* @return {void}
*/
2018-05-16 20:41:15 +00:00
disconnect() {
this.process.disconnect();
}
2018-05-16 15:49:38 +00:00
/**
* Kills the child process
* https://nodejs.org/api/child_process.html#child_process_subprocess_kill_signal
* @return {void}
*/
2018-05-16 20:41:15 +00:00
kill() {
this.process.kill(...arguments);
}
}
module.exports = ProcessLauncher;