From 87aa1cc166cfd1d8dcd35182896daa8074571614 Mon Sep 17 00:00:00 2001 From: Jean Lauliac Date: Mon, 21 Aug 2017 04:39:23 -0700 Subject: [PATCH] worker-farm: add @format Reviewed By: davidaurelio Differential Revision: D5658177 fbshipit-source-id: 59565a57243c6f8ce0d9aceaad5dc542992cd45b --- .../src/worker-farm/lib/child/index.js | 67 ++- .../metro-bundler/src/worker-farm/lib/farm.js | 462 ++++++++++-------- .../metro-bundler/src/worker-farm/lib/fork.js | 1 + .../src/worker-farm/lib/index.js | 22 +- 4 files changed, 290 insertions(+), 262 deletions(-) diff --git a/packages/metro-bundler/src/worker-farm/lib/child/index.js b/packages/metro-bundler/src/worker-farm/lib/child/index.js index a20dcc2b..4ae1da8c 100644 --- a/packages/metro-bundler/src/worker-farm/lib/child/index.js +++ b/packages/metro-bundler/src/worker-farm/lib/child/index.js @@ -5,10 +5,12 @@ * This source code is licensed under the BSD-style license found in the * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. + * + * @format */ /* eslint-disable */ -var $module +var $module; /* var contextProto = this.context; @@ -17,42 +19,39 @@ var $module } */ -function handle (data) { - var idx = data.idx - , child = data.child - , method = data.method - , args = data.args - , callback = function () { - var _args = Array.prototype.slice.call(arguments) - if (_args[0] instanceof Error) { - var e = _args[0] - _args[0] = { - '$error' : '$error' - , 'type' : e.constructor.name - , 'message' : e.message - , 'stack' : e.stack - } - Object.keys(e).forEach(function(key) { - _args[0][key] = e[key] - }) - } - process.send({ idx: idx, child: child, args: _args }) +function handle(data) { + var idx = data.idx, + child = data.child, + method = data.method, + args = data.args, + callback = function() { + var _args = Array.prototype.slice.call(arguments); + if (_args[0] instanceof Error) { + var e = _args[0]; + _args[0] = { + $error: '$error', + type: e.constructor.name, + message: e.message, + stack: e.stack, + }; + Object.keys(e).forEach(function(key) { + _args[0][key] = e[key]; + }); } - , exec + process.send({idx: idx, child: child, args: _args}); + }, + exec; - if (method == null && typeof $module == 'function') - exec = $module - else if (typeof $module[method] == 'function') - exec = $module[method] + if (method == null && typeof $module == 'function') exec = $module; + else if (typeof $module[method] == 'function') exec = $module[method]; - if (!exec) - return console.error('NO SUCH METHOD:', method) + if (!exec) return console.error('NO SUCH METHOD:', method); - exec.apply(null, args.concat([ callback ])) + exec.apply(null, args.concat([callback])); } -process.on('message', function (data) { - if (!$module) return $module = require(data.module) - if (data == 'die') return process.exit(0) - handle(data) -}) +process.on('message', function(data) { + if (!$module) return ($module = require(data.module)); + if (data == 'die') return process.exit(0); + handle(data); +}); diff --git a/packages/metro-bundler/src/worker-farm/lib/farm.js b/packages/metro-bundler/src/worker-farm/lib/farm.js index 14c3d12c..0c53448d 100644 --- a/packages/metro-bundler/src/worker-farm/lib/farm.js +++ b/packages/metro-bundler/src/worker-farm/lib/farm.js @@ -6,190 +6,220 @@ * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. * + * @format * @flow */ /* eslint-disable */ const DEFAULT_OPTIONS = { - maxCallsPerWorker : Infinity - , maxConcurrentWorkers : require('os').cpus().length - , maxConcurrentCallsPerWorker : 10 - , maxConcurrentCalls : Infinity - , maxCallTime : Infinity // exceed this and the whole worker is terminated - , maxRetries : Infinity - , forcedKillTime : 100 - , autoStart : false - } + maxCallsPerWorker: Infinity, + maxConcurrentWorkers: require('os').cpus().length, + maxConcurrentCallsPerWorker: 10, + maxConcurrentCalls: Infinity, + maxCallTime: Infinity, // exceed this and the whole worker is terminated + maxRetries: Infinity, + forcedKillTime: 100, + autoStart: false, +}; -const extend = require('xtend') - , fork = require('./fork') - , TimeoutError = require('errno').create('TimeoutError') - , ProcessTerminatedError = require('errno').create('ProcessTerminatedError') - , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') +const extend = require('xtend'), + fork = require('./fork'), + TimeoutError = require('errno').create('TimeoutError'), + ProcessTerminatedError = require('errno').create('ProcessTerminatedError'), + MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError'); const mergeStream = require('merge-stream'); -function Farm (options: {+execArgv: Array}, path: string) { - this.options = extend(DEFAULT_OPTIONS, options) - this.path = path - this.activeCalls = 0 +function Farm(options: {+execArgv: Array}, path: string) { + this.options = extend(DEFAULT_OPTIONS, options); + this.path = path; + this.activeCalls = 0; this.stdout = mergeStream(); this.stderr = mergeStream(); } // make a handle to pass back in the form of an external API -Farm.prototype.mkhandle = function (method) { - return function () { - var args = Array.prototype.slice.call(arguments) +Farm.prototype.mkhandle = function(method) { + return function() { + var args = Array.prototype.slice.call(arguments); if (this.activeCalls >= this.options.maxConcurrentCalls) { - var err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')') + var err = new MaxConcurrentCallsError( + 'Too many concurrent calls (' + this.activeCalls + ')', + ); if (typeof args[args.length - 1] == 'function') - return process.nextTick(args[args.length - 1].bind(null, err)) - throw err + return process.nextTick(args[args.length - 1].bind(null, err)); + throw err; } this.addCall({ - method : method - , callback : args.pop() - , args : args - , retries : 0 - }) - }.bind(this) -} + method: method, + callback: args.pop(), + args: args, + retries: 0, + }); + }.bind(this); +}; // a constructor of sorts -Farm.prototype.setup = function (methods) { - var iface - if (!methods) { // single-function export - iface = this.mkhandle() - } else { // multiple functions on the export - iface = {} - methods.forEach(function (m) { - iface[m] = this.mkhandle(m) - }.bind(this)) +Farm.prototype.setup = function(methods) { + var iface; + if (!methods) { + // single-function export + iface = this.mkhandle(); + } else { + // multiple functions on the export + iface = {}; + methods.forEach( + function(m) { + iface[m] = this.mkhandle(m); + }.bind(this), + ); } - this.searchStart = -1 - this.childId = -1 - this.children = {} - this.activeChildren = 0 - this.callQueue = [] + this.searchStart = -1; + this.childId = -1; + this.children = {}; + this.activeChildren = 0; + this.callQueue = []; if (this.options.autoStart) { while (this.activeChildren < this.options.maxConcurrentWorkers) - this.startChild() + this.startChild(); } - return iface -} + return iface; +}; // when a child exits, check if there are any outstanding jobs and requeue them -Farm.prototype.onExit = function (childId) { +Farm.prototype.onExit = function(childId) { // delay this to give any sends a chance to finish - setTimeout(function () { - var doQueue = false - if (this.children[childId] && this.children[childId].activeCalls) { - this.children[childId].calls.forEach(function (call, i) { - if (!call) return - else if (call.retries >= this.options.maxRetries) { - this.receive({ - idx : i - , child : childId - , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ] - }) - } else { - call.retries++ - this.callQueue.unshift(call) - doQueue = true - } - }.bind(this)) - } - this.stopChild(childId) - doQueue && this.processQueue() - }.bind(this), 10) -} + setTimeout( + function() { + var doQueue = false; + if (this.children[childId] && this.children[childId].activeCalls) { + this.children[childId].calls.forEach( + function(call, i) { + if (!call) return; + else if (call.retries >= this.options.maxRetries) { + this.receive({ + idx: i, + child: childId, + args: [ + new ProcessTerminatedError( + 'cancel after ' + call.retries + ' retries!', + ), + ], + }); + } else { + call.retries++; + this.callQueue.unshift(call); + doQueue = true; + } + }.bind(this), + ); + } + this.stopChild(childId); + doQueue && this.processQueue(); + }.bind(this), + 10, + ); +}; // start a new worker -Farm.prototype.startChild = function () { - this.childId++ +Farm.prototype.startChild = function() { + this.childId++; - var forked = fork(this.path, {execArgv: this.options.execArgv}) - , id = this.childId - , c = { - send : forked.send - , child : forked.child - , calls : [] - , activeCalls : 0 - , exitCode : null - } + var forked = fork(this.path, {execArgv: this.options.execArgv}), + id = this.childId, + c = { + send: forked.send, + child: forked.child, + calls: [], + activeCalls: 0, + exitCode: null, + }; this.stdout.add(forked.child.stdout); this.stderr.add(forked.child.stderr); - forked.child.on('message', this.receive.bind(this)) - forked.child.once('exit', function (code) { - c.exitCode = code - this.onExit(id) - }.bind(this)) + forked.child.on('message', this.receive.bind(this)); + forked.child.once( + 'exit', + function(code) { + c.exitCode = code; + this.onExit(id); + }.bind(this), + ); - this.activeChildren++ - this.children[id] = c -} + this.activeChildren++; + this.children[id] = c; +}; // stop a worker, identified by id -Farm.prototype.stopChild = function (childId) { - var child = this.children[childId] +Farm.prototype.stopChild = function(childId) { + var child = this.children[childId]; if (child) { - child.send('die') - setTimeout(function () { - if (child.exitCode === null) - child.child.kill('SIGKILL') - }, this.options.forcedKillTime) - ;delete this.children[childId] - this.activeChildren-- + child.send('die'); + setTimeout(function() { + if (child.exitCode === null) child.child.kill('SIGKILL'); + }, this.options.forcedKillTime); + delete this.children[childId]; + this.activeChildren--; } -} +}; // called from a child process, the data contains information needed to // look up the child and the original call so we can invoke the callback -Farm.prototype.receive = function (data) { - var idx = data.idx - , childId = data.child - , args = data.args - , child = this.children[childId] - , call +Farm.prototype.receive = function(data) { + var idx = data.idx, + childId = data.child, + args = data.args, + child = this.children[childId], + call; if (!child) { return console.error( - 'Worker Farm: Received message for unknown child. ' - + 'This is likely as a result of premature child death, ' - + 'the operation will have been re-queued.' - ) + 'Worker Farm: Received message for unknown child. ' + + 'This is likely as a result of premature child death, ' + + 'the operation will have been re-queued.', + ); } - call = child.calls[idx] + call = child.calls[idx]; if (!call) { return console.error( - 'Worker Farm: Received message for unknown index for existing child. ' - + 'This should not happen!' - ) + 'Worker Farm: Received message for unknown index for existing child. ' + + 'This should not happen!', + ); } - if (this.options.maxCallTime !== Infinity) - clearTimeout(call.timer) + if (this.options.maxCallTime !== Infinity) clearTimeout(call.timer); if (args[0] && args[0].$error == '$error') { - var e = args[0] + var e = args[0]; switch (e.type) { - case 'TypeError': args[0] = new TypeError(e.message); break - case 'RangeError': args[0] = new RangeError(e.message); break - case 'EvalError': args[0] = new EvalError(e.message); break - case 'ReferenceError': args[0] = new ReferenceError(e.message); break - case 'SyntaxError': args[0] = new SyntaxError(e.message); break - case 'URIError': args[0] = new URIError(e.message); break - default: args[0] = new Error(e.message) + case 'TypeError': + args[0] = new TypeError(e.message); + break; + case 'RangeError': + args[0] = new RangeError(e.message); + break; + case 'EvalError': + args[0] = new EvalError(e.message); + break; + case 'ReferenceError': + args[0] = new ReferenceError(e.message); + break; + case 'SyntaxError': + args[0] = new SyntaxError(e.message); + break; + case 'URIError': + args[0] = new URIError(e.message); + break; + default: + args[0] = new Error(e.message); } - args[0].type = e.type - args[0].stack = e.stack + args[0].type = e.type; + args[0].stack = e.stack; // Copy any custom properties to pass it on. Object.keys(e).forEach(function(key) { @@ -197,101 +227,103 @@ Farm.prototype.receive = function (data) { }); } - process.nextTick(function () { - call.callback.apply(null, args) - }) + process.nextTick(function() { + call.callback.apply(null, args); + }); + delete child.calls[idx]; + child.activeCalls--; + this.activeCalls--; - ;delete child.calls[idx] - child.activeCalls-- - this.activeCalls-- - - if (child.calls.length >= this.options.maxCallsPerWorker - && !Object.keys(child.calls).length) { + if ( + child.calls.length >= this.options.maxCallsPerWorker && + !Object.keys(child.calls).length + ) { // this child has finished its run, kill it - this.stopChild(childId) + this.stopChild(childId); } // allow any outstanding calls to be processed - this.processQueue() -} + this.processQueue(); +}; -Farm.prototype.childTimeout = function (childId) { - var child = this.children[childId] - , i +Farm.prototype.childTimeout = function(childId) { + var child = this.children[childId], + i; - if (!child) - return + if (!child) return; for (i in child.calls) { this.receive({ - idx : i - , child : childId - , args : [ new TimeoutError('worker call timed out!') ] - }) + idx: i, + child: childId, + args: [new TimeoutError('worker call timed out!')], + }); } - this.stopChild(childId) -} + this.stopChild(childId); +}; // send a call to a worker, identified by id -Farm.prototype.send = function (childId, call) { - var child = this.children[childId] - , idx = child.calls.length +Farm.prototype.send = function(childId, call) { + var child = this.children[childId], + idx = child.calls.length; - child.calls.push(call) - child.activeCalls++ - this.activeCalls++ + child.calls.push(call); + child.activeCalls++; + this.activeCalls++; child.send({ - idx : idx - , child : childId - , method : call.method - , args : call.args - }) + idx: idx, + child: childId, + method: call.method, + args: call.args, + }); if (this.options.maxCallTime !== Infinity) { - call.timer = - setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) + call.timer = setTimeout( + this.childTimeout.bind(this, childId), + this.options.maxCallTime, + ); } -} +}; // a list of active worker ids, in order, but the starting offset is // shifted each time this method is called, so we work our way through // all workers when handing out jobs -Farm.prototype.childKeys = function () { - var cka = Object.keys(this.children) - , cks +Farm.prototype.childKeys = function() { + var cka = Object.keys(this.children), + cks; - if (this.searchStart >= cka.length - 1) - this.searchStart = 0 - else - this.searchStart++ + if (this.searchStart >= cka.length - 1) this.searchStart = 0; + else this.searchStart++; - cks = cka.splice(0, this.searchStart) + cks = cka.splice(0, this.searchStart); - return cka.concat(cks) -} + return cka.concat(cks); +}; // Calls are added to a queue, this processes the queue and is called // whenever there might be a chance to send more calls to the workers. // The various options all impact on when we're able to send calls, // they may need to be kept in a queue until a worker is ready. -Farm.prototype.processQueue = function () { - var cka, i = 0, childId +Farm.prototype.processQueue = function() { + var cka, + i = 0, + childId; - if (!this.callQueue.length) - return this.ending && this.end() + if (!this.callQueue.length) return this.ending && this.end(); if (this.activeChildren < this.options.maxConcurrentWorkers) - this.startChild() + this.startChild(); for (cka = this.childKeys(); i < cka.length; i++) { - childId = +cka[i] - if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker - && this.children[childId].calls.length < this.options.maxCallsPerWorker) { - - this.send(childId, this.callQueue.shift()) - if (!this.callQueue.length) - return this.ending && this.end() + childId = +cka[i]; + if ( + this.children[childId].activeCalls < + this.options.maxConcurrentCallsPerWorker && + this.children[childId].calls.length < this.options.maxCallsPerWorker + ) { + this.send(childId, this.callQueue.shift()); + if (!this.callQueue.length) return this.ending && this.end(); } /*else { console.log( , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker @@ -300,43 +332,39 @@ Farm.prototype.processQueue = function () { }*/ } - if (this.ending) - this.end() -} + if (this.ending) this.end(); +}; // add a new call to the call queue, then trigger a process of the queue -Farm.prototype.addCall = function (call) { - if (this.ending) - return this.end() // don't add anything new to the queue - this.callQueue.push(call) - this.processQueue() -} +Farm.prototype.addCall = function(call) { + if (this.ending) return this.end(); // don't add anything new to the queue + this.callQueue.push(call); + this.processQueue(); +}; // kills child workers when they're all done -Farm.prototype.end = function (callback) { - var complete = true - if (this.ending === false) - return - if (callback) - this.ending = callback - else if (this.ending == null) - this.ending = true - Object.keys(this.children).forEach(function (child) { - if (!this.children[child]) - return - if (!this.children[child].activeCalls) - this.stopChild(child) - else - complete = false - }.bind(this)) +Farm.prototype.end = function(callback) { + var complete = true; + if (this.ending === false) return; + if (callback) this.ending = callback; + else if (this.ending == null) this.ending = true; + Object.keys(this.children).forEach( + function(child) { + if (!this.children[child]) return; + if (!this.children[child].activeCalls) this.stopChild(child); + else complete = false; + }.bind(this), + ); if (complete && typeof this.ending == 'function') { - process.nextTick(function () { - this.ending() - this.ending = false - }.bind(this)) + process.nextTick( + function() { + this.ending(); + this.ending = false; + }.bind(this), + ); } -} +}; -module.exports = Farm -module.exports.TimeoutError = TimeoutError +module.exports = Farm; +module.exports.TimeoutError = TimeoutError; diff --git a/packages/metro-bundler/src/worker-farm/lib/fork.js b/packages/metro-bundler/src/worker-farm/lib/fork.js index f3ea06c3..5a218a69 100644 --- a/packages/metro-bundler/src/worker-farm/lib/fork.js +++ b/packages/metro-bundler/src/worker-farm/lib/fork.js @@ -6,6 +6,7 @@ * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. * + * @format * @flow */ diff --git a/packages/metro-bundler/src/worker-farm/lib/index.js b/packages/metro-bundler/src/worker-farm/lib/index.js index 8857b6ab..82eaf918 100644 --- a/packages/metro-bundler/src/worker-farm/lib/index.js +++ b/packages/metro-bundler/src/worker-farm/lib/index.js @@ -6,15 +6,16 @@ * LICENSE file in the root directory of this source tree. An additional grant * of patent rights can be found in the PATENTS file in the same directory. * + * @format * @flow */ /* eslint-disable */ -const Farm = require('./farm') +const Farm = require('./farm'); import type {Readable} from 'stream'; -var farms = [] // keep record of farms so we can end() them if required +var farms = []; // keep record of farms so we can end() them if required export type FarmAPI = {| methods: {[name: string]: Function}, @@ -27,10 +28,10 @@ function farm( path: string, methods: Array, ): FarmAPI { - var f = new Farm(options, path) - , api = f.setup(methods) + var f = new Farm(options, path), + api = f.setup(methods); - farms.push({ farm: f, api: api }) + farms.push({farm: f, api: api}); // $FlowFixMe: gotta type the Farm class. const {stdout, stderr} = f; @@ -39,12 +40,11 @@ function farm( return {methods: (api: any), stdout, stderr}; } -function end (api, callback) { +function end(api, callback) { for (var i = 0; i < farms.length; i++) - if (farms[i] && farms[i].api === api) - return farms[i].farm.end(callback) - process.nextTick(callback.bind(null, 'Worker farm not found!')) + if (farms[i] && farms[i].api === api) return farms[i].farm.end(callback); + process.nextTick(callback.bind(null, 'Worker farm not found!')); } -module.exports = farm -module.exports.end = end +module.exports = farm; +module.exports.end = end;