worker-farm: add @format

Reviewed By: davidaurelio

Differential Revision: D5658177

fbshipit-source-id: 59565a57243c6f8ce0d9aceaad5dc542992cd45b
This commit is contained in:
Jean Lauliac 2017-08-21 04:39:23 -07:00 committed by Facebook Github Bot
parent bf25e49665
commit 87aa1cc166
4 changed files with 290 additions and 262 deletions

View File

@ -5,10 +5,12 @@
* This source code is licensed under the BSD-style license found in the * This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant * LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory. * of patent rights can be found in the PATENTS file in the same directory.
*
* @format
*/ */
/* eslint-disable */ /* eslint-disable */
var $module var $module;
/* /*
var contextProto = this.context; var contextProto = this.context;
@ -17,42 +19,39 @@ var $module
} }
*/ */
function handle (data) { function handle(data) {
var idx = data.idx var idx = data.idx,
, child = data.child child = data.child,
, method = data.method method = data.method,
, args = data.args args = data.args,
, callback = function () { callback = function() {
var _args = Array.prototype.slice.call(arguments) var _args = Array.prototype.slice.call(arguments);
if (_args[0] instanceof Error) { if (_args[0] instanceof Error) {
var e = _args[0] var e = _args[0];
_args[0] = { _args[0] = {
'$error' : '$error' $error: '$error',
, 'type' : e.constructor.name type: e.constructor.name,
, 'message' : e.message message: e.message,
, 'stack' : e.stack stack: e.stack,
} };
Object.keys(e).forEach(function(key) { Object.keys(e).forEach(function(key) {
_args[0][key] = e[key] _args[0][key] = e[key];
}) });
}
process.send({ idx: idx, child: child, args: _args })
} }
, exec process.send({idx: idx, child: child, args: _args});
},
exec;
if (method == null && typeof $module == 'function') if (method == null && typeof $module == 'function') exec = $module;
exec = $module else if (typeof $module[method] == 'function') exec = $module[method];
else if (typeof $module[method] == 'function')
exec = $module[method]
if (!exec) if (!exec) return console.error('NO SUCH METHOD:', method);
return console.error('NO SUCH METHOD:', method)
exec.apply(null, args.concat([ callback ])) exec.apply(null, args.concat([callback]));
} }
process.on('message', function (data) { process.on('message', function(data) {
if (!$module) return $module = require(data.module) if (!$module) return ($module = require(data.module));
if (data == 'die') return process.exit(0) if (data == 'die') return process.exit(0);
handle(data) handle(data);
}) });

View File

@ -6,190 +6,220 @@
* LICENSE file in the root directory of this source tree. An additional grant * LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory. * of patent rights can be found in the PATENTS file in the same directory.
* *
* @format
* @flow * @flow
*/ */
/* eslint-disable */ /* eslint-disable */
const DEFAULT_OPTIONS = { const DEFAULT_OPTIONS = {
maxCallsPerWorker : Infinity maxCallsPerWorker: Infinity,
, maxConcurrentWorkers : require('os').cpus().length maxConcurrentWorkers: require('os').cpus().length,
, maxConcurrentCallsPerWorker : 10 maxConcurrentCallsPerWorker: 10,
, maxConcurrentCalls : Infinity maxConcurrentCalls: Infinity,
, maxCallTime : Infinity // exceed this and the whole worker is terminated maxCallTime: Infinity, // exceed this and the whole worker is terminated
, maxRetries : Infinity maxRetries: Infinity,
, forcedKillTime : 100 forcedKillTime: 100,
, autoStart : false autoStart: false,
} };
const extend = require('xtend') const extend = require('xtend'),
, fork = require('./fork') fork = require('./fork'),
, TimeoutError = require('errno').create('TimeoutError') TimeoutError = require('errno').create('TimeoutError'),
, ProcessTerminatedError = require('errno').create('ProcessTerminatedError') ProcessTerminatedError = require('errno').create('ProcessTerminatedError'),
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError');
const mergeStream = require('merge-stream'); const mergeStream = require('merge-stream');
function Farm (options: {+execArgv: Array<string>}, path: string) { function Farm(options: {+execArgv: Array<string>}, path: string) {
this.options = extend(DEFAULT_OPTIONS, options) this.options = extend(DEFAULT_OPTIONS, options);
this.path = path this.path = path;
this.activeCalls = 0 this.activeCalls = 0;
this.stdout = mergeStream(); this.stdout = mergeStream();
this.stderr = mergeStream(); this.stderr = mergeStream();
} }
// make a handle to pass back in the form of an external API // make a handle to pass back in the form of an external API
Farm.prototype.mkhandle = function (method) { Farm.prototype.mkhandle = function(method) {
return function () { return function() {
var args = Array.prototype.slice.call(arguments) var args = Array.prototype.slice.call(arguments);
if (this.activeCalls >= this.options.maxConcurrentCalls) { if (this.activeCalls >= this.options.maxConcurrentCalls) {
var err = new MaxConcurrentCallsError('Too many concurrent calls (' + this.activeCalls + ')') var err = new MaxConcurrentCallsError(
'Too many concurrent calls (' + this.activeCalls + ')',
);
if (typeof args[args.length - 1] == 'function') if (typeof args[args.length - 1] == 'function')
return process.nextTick(args[args.length - 1].bind(null, err)) return process.nextTick(args[args.length - 1].bind(null, err));
throw err throw err;
} }
this.addCall({ this.addCall({
method : method method: method,
, callback : args.pop() callback: args.pop(),
, args : args args: args,
, retries : 0 retries: 0,
}) });
}.bind(this) }.bind(this);
} };
// a constructor of sorts // a constructor of sorts
Farm.prototype.setup = function (methods) { Farm.prototype.setup = function(methods) {
var iface var iface;
if (!methods) { // single-function export if (!methods) {
iface = this.mkhandle() // single-function export
} else { // multiple functions on the export iface = this.mkhandle();
iface = {} } else {
methods.forEach(function (m) { // multiple functions on the export
iface[m] = this.mkhandle(m) iface = {};
}.bind(this)) methods.forEach(
function(m) {
iface[m] = this.mkhandle(m);
}.bind(this),
);
} }
this.searchStart = -1 this.searchStart = -1;
this.childId = -1 this.childId = -1;
this.children = {} this.children = {};
this.activeChildren = 0 this.activeChildren = 0;
this.callQueue = [] this.callQueue = [];
if (this.options.autoStart) { if (this.options.autoStart) {
while (this.activeChildren < this.options.maxConcurrentWorkers) while (this.activeChildren < this.options.maxConcurrentWorkers)
this.startChild() this.startChild();
} }
return iface return iface;
} };
// when a child exits, check if there are any outstanding jobs and requeue them // when a child exits, check if there are any outstanding jobs and requeue them
Farm.prototype.onExit = function (childId) { Farm.prototype.onExit = function(childId) {
// delay this to give any sends a chance to finish // delay this to give any sends a chance to finish
setTimeout(function () { setTimeout(
var doQueue = false function() {
if (this.children[childId] && this.children[childId].activeCalls) { var doQueue = false;
this.children[childId].calls.forEach(function (call, i) { if (this.children[childId] && this.children[childId].activeCalls) {
if (!call) return this.children[childId].calls.forEach(
else if (call.retries >= this.options.maxRetries) { function(call, i) {
this.receive({ if (!call) return;
idx : i else if (call.retries >= this.options.maxRetries) {
, child : childId this.receive({
, args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ] idx: i,
}) child: childId,
} else { args: [
call.retries++ new ProcessTerminatedError(
this.callQueue.unshift(call) 'cancel after ' + call.retries + ' retries!',
doQueue = true ),
} ],
}.bind(this)) });
} } else {
this.stopChild(childId) call.retries++;
doQueue && this.processQueue() this.callQueue.unshift(call);
}.bind(this), 10) doQueue = true;
} }
}.bind(this),
);
}
this.stopChild(childId);
doQueue && this.processQueue();
}.bind(this),
10,
);
};
// start a new worker // start a new worker
Farm.prototype.startChild = function () { Farm.prototype.startChild = function() {
this.childId++ this.childId++;
var forked = fork(this.path, {execArgv: this.options.execArgv}) var forked = fork(this.path, {execArgv: this.options.execArgv}),
, id = this.childId id = this.childId,
, c = { c = {
send : forked.send send: forked.send,
, child : forked.child child: forked.child,
, calls : [] calls: [],
, activeCalls : 0 activeCalls: 0,
, exitCode : null exitCode: null,
} };
this.stdout.add(forked.child.stdout); this.stdout.add(forked.child.stdout);
this.stderr.add(forked.child.stderr); this.stderr.add(forked.child.stderr);
forked.child.on('message', this.receive.bind(this)) forked.child.on('message', this.receive.bind(this));
forked.child.once('exit', function (code) { forked.child.once(
c.exitCode = code 'exit',
this.onExit(id) function(code) {
}.bind(this)) c.exitCode = code;
this.onExit(id);
}.bind(this),
);
this.activeChildren++ this.activeChildren++;
this.children[id] = c this.children[id] = c;
} };
// stop a worker, identified by id // stop a worker, identified by id
Farm.prototype.stopChild = function (childId) { Farm.prototype.stopChild = function(childId) {
var child = this.children[childId] var child = this.children[childId];
if (child) { if (child) {
child.send('die') child.send('die');
setTimeout(function () { setTimeout(function() {
if (child.exitCode === null) if (child.exitCode === null) child.child.kill('SIGKILL');
child.child.kill('SIGKILL') }, this.options.forcedKillTime);
}, this.options.forcedKillTime) delete this.children[childId];
;delete this.children[childId] this.activeChildren--;
this.activeChildren--
} }
} };
// called from a child process, the data contains information needed to // called from a child process, the data contains information needed to
// look up the child and the original call so we can invoke the callback // look up the child and the original call so we can invoke the callback
Farm.prototype.receive = function (data) { Farm.prototype.receive = function(data) {
var idx = data.idx var idx = data.idx,
, childId = data.child childId = data.child,
, args = data.args args = data.args,
, child = this.children[childId] child = this.children[childId],
, call call;
if (!child) { if (!child) {
return console.error( return console.error(
'Worker Farm: Received message for unknown child. ' 'Worker Farm: Received message for unknown child. ' +
+ 'This is likely as a result of premature child death, ' 'This is likely as a result of premature child death, ' +
+ 'the operation will have been re-queued.' 'the operation will have been re-queued.',
) );
} }
call = child.calls[idx] call = child.calls[idx];
if (!call) { if (!call) {
return console.error( return console.error(
'Worker Farm: Received message for unknown index for existing child. ' 'Worker Farm: Received message for unknown index for existing child. ' +
+ 'This should not happen!' 'This should not happen!',
) );
} }
if (this.options.maxCallTime !== Infinity) if (this.options.maxCallTime !== Infinity) clearTimeout(call.timer);
clearTimeout(call.timer)
if (args[0] && args[0].$error == '$error') { if (args[0] && args[0].$error == '$error') {
var e = args[0] var e = args[0];
switch (e.type) { switch (e.type) {
case 'TypeError': args[0] = new TypeError(e.message); break case 'TypeError':
case 'RangeError': args[0] = new RangeError(e.message); break args[0] = new TypeError(e.message);
case 'EvalError': args[0] = new EvalError(e.message); break break;
case 'ReferenceError': args[0] = new ReferenceError(e.message); break case 'RangeError':
case 'SyntaxError': args[0] = new SyntaxError(e.message); break args[0] = new RangeError(e.message);
case 'URIError': args[0] = new URIError(e.message); break break;
default: args[0] = new Error(e.message) case 'EvalError':
args[0] = new EvalError(e.message);
break;
case 'ReferenceError':
args[0] = new ReferenceError(e.message);
break;
case 'SyntaxError':
args[0] = new SyntaxError(e.message);
break;
case 'URIError':
args[0] = new URIError(e.message);
break;
default:
args[0] = new Error(e.message);
} }
args[0].type = e.type args[0].type = e.type;
args[0].stack = e.stack args[0].stack = e.stack;
// Copy any custom properties to pass it on. // Copy any custom properties to pass it on.
Object.keys(e).forEach(function(key) { Object.keys(e).forEach(function(key) {
@ -197,101 +227,103 @@ Farm.prototype.receive = function (data) {
}); });
} }
process.nextTick(function () { process.nextTick(function() {
call.callback.apply(null, args) call.callback.apply(null, args);
}) });
delete child.calls[idx];
child.activeCalls--;
this.activeCalls--;
;delete child.calls[idx] if (
child.activeCalls-- child.calls.length >= this.options.maxCallsPerWorker &&
this.activeCalls-- !Object.keys(child.calls).length
) {
if (child.calls.length >= this.options.maxCallsPerWorker
&& !Object.keys(child.calls).length) {
// this child has finished its run, kill it // this child has finished its run, kill it
this.stopChild(childId) this.stopChild(childId);
} }
// allow any outstanding calls to be processed // allow any outstanding calls to be processed
this.processQueue() this.processQueue();
} };
Farm.prototype.childTimeout = function (childId) { Farm.prototype.childTimeout = function(childId) {
var child = this.children[childId] var child = this.children[childId],
, i i;
if (!child) if (!child) return;
return
for (i in child.calls) { for (i in child.calls) {
this.receive({ this.receive({
idx : i idx: i,
, child : childId child: childId,
, args : [ new TimeoutError('worker call timed out!') ] args: [new TimeoutError('worker call timed out!')],
}) });
} }
this.stopChild(childId) this.stopChild(childId);
} };
// send a call to a worker, identified by id // send a call to a worker, identified by id
Farm.prototype.send = function (childId, call) { Farm.prototype.send = function(childId, call) {
var child = this.children[childId] var child = this.children[childId],
, idx = child.calls.length idx = child.calls.length;
child.calls.push(call) child.calls.push(call);
child.activeCalls++ child.activeCalls++;
this.activeCalls++ this.activeCalls++;
child.send({ child.send({
idx : idx idx: idx,
, child : childId child: childId,
, method : call.method method: call.method,
, args : call.args args: call.args,
}) });
if (this.options.maxCallTime !== Infinity) { if (this.options.maxCallTime !== Infinity) {
call.timer = call.timer = setTimeout(
setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) this.childTimeout.bind(this, childId),
this.options.maxCallTime,
);
} }
} };
// a list of active worker ids, in order, but the starting offset is // a list of active worker ids, in order, but the starting offset is
// shifted each time this method is called, so we work our way through // shifted each time this method is called, so we work our way through
// all workers when handing out jobs // all workers when handing out jobs
Farm.prototype.childKeys = function () { Farm.prototype.childKeys = function() {
var cka = Object.keys(this.children) var cka = Object.keys(this.children),
, cks cks;
if (this.searchStart >= cka.length - 1) if (this.searchStart >= cka.length - 1) this.searchStart = 0;
this.searchStart = 0 else this.searchStart++;
else
this.searchStart++
cks = cka.splice(0, this.searchStart) cks = cka.splice(0, this.searchStart);
return cka.concat(cks) return cka.concat(cks);
} };
// Calls are added to a queue, this processes the queue and is called // Calls are added to a queue, this processes the queue and is called
// whenever there might be a chance to send more calls to the workers. // whenever there might be a chance to send more calls to the workers.
// The various options all impact on when we're able to send calls, // The various options all impact on when we're able to send calls,
// they may need to be kept in a queue until a worker is ready. // they may need to be kept in a queue until a worker is ready.
Farm.prototype.processQueue = function () { Farm.prototype.processQueue = function() {
var cka, i = 0, childId var cka,
i = 0,
childId;
if (!this.callQueue.length) if (!this.callQueue.length) return this.ending && this.end();
return this.ending && this.end()
if (this.activeChildren < this.options.maxConcurrentWorkers) if (this.activeChildren < this.options.maxConcurrentWorkers)
this.startChild() this.startChild();
for (cka = this.childKeys(); i < cka.length; i++) { for (cka = this.childKeys(); i < cka.length; i++) {
childId = +cka[i] childId = +cka[i];
if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker if (
&& this.children[childId].calls.length < this.options.maxCallsPerWorker) { this.children[childId].activeCalls <
this.options.maxConcurrentCallsPerWorker &&
this.send(childId, this.callQueue.shift()) this.children[childId].calls.length < this.options.maxCallsPerWorker
if (!this.callQueue.length) ) {
return this.ending && this.end() this.send(childId, this.callQueue.shift());
if (!this.callQueue.length) return this.ending && this.end();
} /*else { } /*else {
console.log( console.log(
, this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
@ -300,43 +332,39 @@ Farm.prototype.processQueue = function () {
}*/ }*/
} }
if (this.ending) if (this.ending) this.end();
this.end() };
}
// add a new call to the call queue, then trigger a process of the queue // add a new call to the call queue, then trigger a process of the queue
Farm.prototype.addCall = function (call) { Farm.prototype.addCall = function(call) {
if (this.ending) if (this.ending) return this.end(); // don't add anything new to the queue
return this.end() // don't add anything new to the queue this.callQueue.push(call);
this.callQueue.push(call) this.processQueue();
this.processQueue() };
}
// kills child workers when they're all done // kills child workers when they're all done
Farm.prototype.end = function (callback) { Farm.prototype.end = function(callback) {
var complete = true var complete = true;
if (this.ending === false) if (this.ending === false) return;
return if (callback) this.ending = callback;
if (callback) else if (this.ending == null) this.ending = true;
this.ending = callback Object.keys(this.children).forEach(
else if (this.ending == null) function(child) {
this.ending = true if (!this.children[child]) return;
Object.keys(this.children).forEach(function (child) { if (!this.children[child].activeCalls) this.stopChild(child);
if (!this.children[child]) else complete = false;
return }.bind(this),
if (!this.children[child].activeCalls) );
this.stopChild(child)
else
complete = false
}.bind(this))
if (complete && typeof this.ending == 'function') { if (complete && typeof this.ending == 'function') {
process.nextTick(function () { process.nextTick(
this.ending() function() {
this.ending = false this.ending();
}.bind(this)) this.ending = false;
}.bind(this),
);
} }
} };
module.exports = Farm module.exports = Farm;
module.exports.TimeoutError = TimeoutError module.exports.TimeoutError = TimeoutError;

View File

@ -6,6 +6,7 @@
* LICENSE file in the root directory of this source tree. An additional grant * LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory. * of patent rights can be found in the PATENTS file in the same directory.
* *
* @format
* @flow * @flow
*/ */

View File

@ -6,15 +6,16 @@
* LICENSE file in the root directory of this source tree. An additional grant * LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory. * of patent rights can be found in the PATENTS file in the same directory.
* *
* @format
* @flow * @flow
*/ */
/* eslint-disable */ /* eslint-disable */
const Farm = require('./farm') const Farm = require('./farm');
import type {Readable} from 'stream'; import type {Readable} from 'stream';
var farms = [] // keep record of farms so we can end() them if required var farms = []; // keep record of farms so we can end() them if required
export type FarmAPI = {| export type FarmAPI = {|
methods: {[name: string]: Function}, methods: {[name: string]: Function},
@ -27,10 +28,10 @@ function farm(
path: string, path: string,
methods: Array<string>, methods: Array<string>,
): FarmAPI { ): FarmAPI {
var f = new Farm(options, path) var f = new Farm(options, path),
, api = f.setup(methods) api = f.setup(methods);
farms.push({ farm: f, api: api }) farms.push({farm: f, api: api});
// $FlowFixMe: gotta type the Farm class. // $FlowFixMe: gotta type the Farm class.
const {stdout, stderr} = f; const {stdout, stderr} = f;
@ -39,12 +40,11 @@ function farm(
return {methods: (api: any), stdout, stderr}; return {methods: (api: any), stdout, stderr};
} }
function end (api, callback) { function end(api, callback) {
for (var i = 0; i < farms.length; i++) for (var i = 0; i < farms.length; i++)
if (farms[i] && farms[i].api === api) if (farms[i] && farms[i].api === api) return farms[i].farm.end(callback);
return farms[i].farm.end(callback) process.nextTick(callback.bind(null, 'Worker farm not found!'));
process.nextTick(callback.bind(null, 'Worker farm not found!'))
} }
module.exports = farm module.exports = farm;
module.exports.end = end module.exports.end = end;