mirror of https://github.com/status-im/metro.git
Migrate metro-bundler to jest-worker
Summary: This diff migrates Metro Bundler from `worker-farm` to `jest-worker`: * Fully removes the custom patched `worker-farm` fork. * Removes //a lot// of non-clear Flow types used to cast functions from callbacks to promises. * Reduces cyclomatic complexity of the `Transformer` class by using `async`/`await`. * Cleans all additional methods inside `JSTransformer/index.js`, by moving them to a single class and properly scoping them. **Note:** this diff does not still enable the ability to bind files to workers by using `computeWorkerKey`; this will come at a later stage. Reviewed By: davidaurelio Differential Revision: D6247532 fbshipit-source-id: 51259360a5c15117996777a3be74b73b583f595e
This commit is contained in:
parent
81bfef0389
commit
d4dcd4c6fa
|
@ -32,6 +32,7 @@
|
|||
"image-size": "^0.6.0",
|
||||
"jest-docblock": "^21",
|
||||
"jest-haste-map": "^21",
|
||||
"jest-worker": "^21.2.1",
|
||||
"json-stable-stringify": "^1.0.1",
|
||||
"json5": "^0.4.0",
|
||||
"left-pad": "^1.1.3",
|
||||
|
|
|
@ -13,8 +13,7 @@
|
|||
'use strict';
|
||||
|
||||
jest
|
||||
.setMock('worker-farm', () => () => undefined)
|
||||
.setMock('../../worker-farm', () => () => undefined)
|
||||
.setMock('jest-worker', () => ({}))
|
||||
.setMock('../../JSTransformer/worker/minify')
|
||||
.mock('image-size')
|
||||
.mock('fs')
|
||||
|
|
|
@ -203,7 +203,7 @@ class Bundler {
|
|||
stderrChunk: chunk =>
|
||||
opts.reporter.update({type: 'worker_stderr_chunk', chunk}),
|
||||
},
|
||||
opts.workerPath,
|
||||
opts.workerPath || undefined,
|
||||
);
|
||||
|
||||
const getTransformCacheKey = options => {
|
||||
|
|
|
@ -14,16 +14,15 @@
|
|||
jest
|
||||
.mock('fs', () => ({writeFileSync: jest.fn()}))
|
||||
.mock('temp', () => ({path: () => '/arbitrary/path'}))
|
||||
.mock('worker-farm', () => jest.fn())
|
||||
.mock('../../worker-farm', () => jest.fn());
|
||||
.mock('jest-worker', () => ({default: jest.fn()}));
|
||||
|
||||
var Transformer = require('../');
|
||||
const Transformer = require('../');
|
||||
|
||||
const {any} = jasmine;
|
||||
const {Readable} = require('stream');
|
||||
|
||||
describe('Transformer', function() {
|
||||
let workers, Cache;
|
||||
let api, Cache;
|
||||
const fileName = '/an/arbitrary/file.js';
|
||||
const localPath = 'arbitrary/file.js';
|
||||
const transformModulePath = __filename;
|
||||
|
@ -33,70 +32,66 @@ describe('Transformer', function() {
|
|||
Cache.prototype.get = jest.fn((a, b, c) => c());
|
||||
|
||||
const fs = require('fs');
|
||||
const workerFarm = require('../../worker-farm');
|
||||
const jestWorker = require('jest-worker');
|
||||
|
||||
fs.writeFileSync.mockClear();
|
||||
workerFarm.mockClear();
|
||||
workerFarm.mockImplementation((opts, path, methods) => {
|
||||
const api = (workers = {});
|
||||
methods.forEach(method => {
|
||||
|
||||
jestWorker.default.mockClear();
|
||||
jestWorker.default.mockImplementation((workerPath, opts) => {
|
||||
api = {
|
||||
end: jest.fn(),
|
||||
getStdout: () => new Readable({read() {}}),
|
||||
getStderr: () => new Readable({read() {}}),
|
||||
};
|
||||
|
||||
opts.exposedMethods.forEach(method => {
|
||||
api[method] = jest.fn();
|
||||
});
|
||||
return {
|
||||
methods: api,
|
||||
stdout: new Readable({read() {}}),
|
||||
stderr: new Readable({read() {}}),
|
||||
};
|
||||
|
||||
return api;
|
||||
});
|
||||
});
|
||||
|
||||
it(
|
||||
'passes transform module path, file path, source code' +
|
||||
' to the worker farm when transforming',
|
||||
() => {
|
||||
const transformOptions = {arbitrary: 'options'};
|
||||
const code = 'arbitrary(code)';
|
||||
new Transformer(transformModulePath, 4).transformFile(
|
||||
fileName,
|
||||
localPath,
|
||||
code,
|
||||
transformOptions,
|
||||
);
|
||||
expect(workers.transformAndExtractDependencies).toBeCalledWith(
|
||||
transformModulePath,
|
||||
fileName,
|
||||
localPath,
|
||||
code,
|
||||
transformOptions,
|
||||
any(Function),
|
||||
);
|
||||
},
|
||||
);
|
||||
it('passes transform data to the worker farm when transforming', () => {
|
||||
const transformOptions = {arbitrary: 'options'};
|
||||
const code = 'arbitrary(code)';
|
||||
|
||||
it('should add file info to parse errors', function() {
|
||||
const transformer = new Transformer(transformModulePath, 4);
|
||||
var message = 'message';
|
||||
var snippet = 'snippet';
|
||||
|
||||
workers.transformAndExtractDependencies.mockImplementation(function(
|
||||
transformPath,
|
||||
filename,
|
||||
localPth,
|
||||
new Transformer(transformModulePath, 4).transformFile(
|
||||
fileName,
|
||||
localPath,
|
||||
code,
|
||||
opts,
|
||||
callback,
|
||||
) {
|
||||
var babelError = new SyntaxError(message);
|
||||
babelError.type = 'SyntaxError';
|
||||
babelError.description = message;
|
||||
babelError.loc = {
|
||||
line: 2,
|
||||
column: 15,
|
||||
};
|
||||
babelError.codeFrame = snippet;
|
||||
callback(babelError);
|
||||
});
|
||||
transformOptions,
|
||||
);
|
||||
|
||||
expect(api.transformAndExtractDependencies).toBeCalledWith(
|
||||
transformModulePath,
|
||||
fileName,
|
||||
localPath,
|
||||
code,
|
||||
transformOptions,
|
||||
);
|
||||
});
|
||||
|
||||
it('should add file info to parse errors', () => {
|
||||
const transformer = new Transformer(transformModulePath, 4);
|
||||
const message = 'message';
|
||||
const snippet = 'snippet';
|
||||
|
||||
api.transformAndExtractDependencies.mockImplementation(
|
||||
(transformPath, filename, localPth, code, opts) => {
|
||||
const babelError = new SyntaxError(message);
|
||||
|
||||
babelError.type = 'SyntaxError';
|
||||
babelError.description = message;
|
||||
babelError.loc = {line: 2, column: 15};
|
||||
babelError.codeFrame = snippet;
|
||||
|
||||
return Promise.reject(babelError);
|
||||
},
|
||||
);
|
||||
|
||||
expect.assertions(6);
|
||||
|
||||
expect.assertions(7);
|
||||
return transformer
|
||||
.transformFile(fileName, localPath, '', {})
|
||||
.catch(function(error) {
|
||||
|
@ -107,7 +102,6 @@ describe('Transformer', function() {
|
|||
expect(error.lineNumber).toBe(2);
|
||||
expect(error.column).toBe(15);
|
||||
expect(error.filename).toBe(fileName);
|
||||
expect(error.description).toBe(message);
|
||||
expect(error.snippet).toBe(snippet);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -15,229 +15,161 @@
|
|||
const Logger = require('../Logger');
|
||||
|
||||
const debug = require('debug')('Metro:JStransformer');
|
||||
const denodeify: Denodeify = require('denodeify');
|
||||
const invariant = require('fbjs/lib/invariant');
|
||||
const path = require('path');
|
||||
const util = require('util');
|
||||
const workerFarm = require('../worker-farm');
|
||||
const Worker = require('jest-worker').default;
|
||||
|
||||
import type {
|
||||
Data as TransformData,
|
||||
Options as WorkerOptions,
|
||||
TransformedCode,
|
||||
} from './worker';
|
||||
import type {Options, TransformedCode} from './worker';
|
||||
import type {LocalPath} from '../node-haste/lib/toLocalPath';
|
||||
import type {MappingsMap} from '../lib/SourceMap';
|
||||
import type {RawMappings} from '../lib/SourceMap';
|
||||
import type {ResultWithMap} from './worker/minify';
|
||||
|
||||
import typeof {
|
||||
minify as Minify,
|
||||
transformAndExtractDependencies as TransformAndExtractDependencies,
|
||||
} from './worker';
|
||||
|
||||
type CB<T> = (?Error, ?T) => mixed;
|
||||
type Denodeify = (<A, B, C, T>(
|
||||
(A, B, C, CB<T>) => void,
|
||||
) => (A, B, C) => Promise<T>) &
|
||||
(<A, B, C, D, E, T>(
|
||||
(A, B, C, D, E, CB<T>) => void,
|
||||
) => (A, B, C, D, E) => Promise<T>);
|
||||
|
||||
// Avoid memory leaks caused in workers. This number seems to be a good enough number
|
||||
// to avoid any memory leak while not slowing down initial builds.
|
||||
// TODO(amasad): Once we get bundle splitting, we can drive this down a bit more.
|
||||
const MAX_CALLS_PER_WORKER = 600;
|
||||
|
||||
// Worker will timeout if one of the callers timeout.
|
||||
const TRANSFORM_TIMEOUT_INTERVAL = 601000;
|
||||
|
||||
// How may times can we tolerate failures from the worker.
|
||||
const MAX_RETRIES = 2;
|
||||
|
||||
function makeFarm(worker, methods, timeout, maxConcurrentWorkers) {
|
||||
return workerFarm(
|
||||
{
|
||||
autoStart: true,
|
||||
/**
|
||||
* We whitelist only what would work. For example `--inspect` doesn't
|
||||
* work in the workers because it tries to open the same debugging port.
|
||||
* Feel free to add more cases to the RegExp. A whitelist is preferred, to
|
||||
* guarantee robustness when upgrading node, etc.
|
||||
*/
|
||||
execArgv: process.execArgv.filter(
|
||||
arg =>
|
||||
/^--stack[_-]trace[_-]limit=[0-9]+$/.test(arg) ||
|
||||
/^--heap[_-]growing[_-]percent=[0-9]+$/.test(arg) ||
|
||||
/^--max[_-]old[_-]space[_-]size=[0-9]+$/.test(arg),
|
||||
),
|
||||
maxConcurrentCallsPerWorker: 1,
|
||||
maxConcurrentWorkers,
|
||||
maxCallsPerWorker: MAX_CALLS_PER_WORKER,
|
||||
maxCallTime: timeout,
|
||||
maxRetries: MAX_RETRIES,
|
||||
},
|
||||
worker,
|
||||
methods,
|
||||
);
|
||||
}
|
||||
type WorkerInterface = Worker & {
|
||||
minify: Minify,
|
||||
transformAndExtractDependencies: TransformAndExtractDependencies,
|
||||
};
|
||||
|
||||
type Reporters = {
|
||||
+stdoutChunk: (chunk: string) => mixed,
|
||||
+stderrChunk: (chunk: string) => mixed,
|
||||
};
|
||||
|
||||
class Transformer {
|
||||
_workers: {[name: string]: Function};
|
||||
module.exports = class Transformer {
|
||||
_worker: WorkerInterface;
|
||||
_transformModulePath: string;
|
||||
_transform: (
|
||||
transform: string,
|
||||
filename: string,
|
||||
localPath: LocalPath,
|
||||
sourceCode: string,
|
||||
options: WorkerOptions,
|
||||
) => Promise<TransformData>;
|
||||
_usesFarm: boolean;
|
||||
minify: (
|
||||
filename: string,
|
||||
code: string,
|
||||
sourceMap: ?MappingsMap,
|
||||
) => Promise<{code: string, map: ?MappingsMap}>;
|
||||
|
||||
constructor(
|
||||
transformModulePath: string,
|
||||
maxWorkers: number,
|
||||
reporters: Reporters,
|
||||
workerPath: ?string,
|
||||
workerPath: string = require.resolve('./worker'),
|
||||
) {
|
||||
invariant(
|
||||
path.isAbsolute(transformModulePath),
|
||||
'transform module path should be absolute',
|
||||
);
|
||||
if (!workerPath) {
|
||||
workerPath = require.resolve('./worker');
|
||||
}
|
||||
|
||||
this._transformModulePath = transformModulePath;
|
||||
this._usesFarm = false;
|
||||
|
||||
if (maxWorkers > 1) {
|
||||
this._usesFarm = true;
|
||||
const farm = makeFarm(
|
||||
this._worker = this._makeFarm(
|
||||
workerPath,
|
||||
['minify', 'transformAndExtractDependencies'],
|
||||
TRANSFORM_TIMEOUT_INTERVAL,
|
||||
maxWorkers,
|
||||
);
|
||||
farm.stdout.on('data', chunk => {
|
||||
|
||||
this._worker.getStdout().on('data', chunk => {
|
||||
reporters.stdoutChunk(chunk.toString('utf8'));
|
||||
});
|
||||
farm.stderr.on('data', chunk => {
|
||||
|
||||
this._worker.getStderr().on('data', chunk => {
|
||||
reporters.stderrChunk(chunk.toString('utf8'));
|
||||
});
|
||||
|
||||
this._workers = farm.methods;
|
||||
} else {
|
||||
// $FlowFixMe
|
||||
this._workers = require(workerPath);
|
||||
// $FlowFixMe: impossible to type a dynamic require.
|
||||
this._worker = require(workerPath);
|
||||
}
|
||||
this._transform = denodeify(
|
||||
(this._workers
|
||||
.transformAndExtractDependencies: TransformAndExtractDependencies),
|
||||
);
|
||||
this.minify = denodeify((this._workers.minify: Minify));
|
||||
}
|
||||
|
||||
kill() {
|
||||
if (this._usesFarm && this._workers) {
|
||||
workerFarm.end(this._workers, () => {});
|
||||
if (this._worker && typeof this._worker.end === 'function') {
|
||||
this._worker.end();
|
||||
}
|
||||
}
|
||||
|
||||
transformFile(
|
||||
fileName: string,
|
||||
async minify(
|
||||
filename: string,
|
||||
code: string,
|
||||
sourceMap: RawMappings,
|
||||
): Promise<ResultWithMap> {
|
||||
return await this._worker.minify(filename, code, sourceMap);
|
||||
}
|
||||
|
||||
async transformFile(
|
||||
filename: string,
|
||||
localPath: LocalPath,
|
||||
code: string,
|
||||
options: WorkerOptions,
|
||||
options: Options,
|
||||
): Promise<TransformedCode> {
|
||||
if (!this._transform) {
|
||||
return Promise.reject(new Error('No transform module'));
|
||||
try {
|
||||
debug('Started ransforming file', filename);
|
||||
|
||||
const data = await this._worker.transformAndExtractDependencies(
|
||||
this._transformModulePath,
|
||||
filename,
|
||||
localPath,
|
||||
code,
|
||||
options,
|
||||
);
|
||||
|
||||
debug('Done transforming file', filename);
|
||||
|
||||
Logger.log(data.transformFileStartLogEntry);
|
||||
Logger.log(data.transformFileEndLogEntry);
|
||||
|
||||
return data.result;
|
||||
} catch (err) {
|
||||
debug('Failed transformFile file', filename);
|
||||
|
||||
if (err.loc) {
|
||||
throw this._formatBabelError(err, filename);
|
||||
} else {
|
||||
throw this._formatGenericError(err, filename);
|
||||
}
|
||||
}
|
||||
debug('transforming file', fileName);
|
||||
|
||||
return this._transform(
|
||||
this._transformModulePath,
|
||||
fileName,
|
||||
localPath,
|
||||
code,
|
||||
options,
|
||||
)
|
||||
.then(data => {
|
||||
Logger.log(data.transformFileStartLogEntry);
|
||||
Logger.log(data.transformFileEndLogEntry);
|
||||
debug('done transforming file', fileName);
|
||||
return data.result;
|
||||
})
|
||||
.catch(error => {
|
||||
if (error.type === 'TimeoutError') {
|
||||
const timeoutErr = new Error(
|
||||
`TimeoutError: transforming ${fileName} took longer than ` +
|
||||
`${TRANSFORM_TIMEOUT_INTERVAL / 1000} seconds.\n`,
|
||||
);
|
||||
/* $FlowFixMe: monkey-patch Error */
|
||||
timeoutErr.type = 'TimeoutError';
|
||||
throw timeoutErr;
|
||||
} else if (error.type === 'ProcessTerminatedError') {
|
||||
const uncaughtError = new Error(
|
||||
'Uncaught error in the transformer worker: ' +
|
||||
this._transformModulePath,
|
||||
);
|
||||
/* $FlowFixMe: monkey-patch Error */
|
||||
uncaughtError.type = 'ProcessTerminatedError';
|
||||
throw uncaughtError;
|
||||
}
|
||||
|
||||
throw formatError(error, fileName);
|
||||
});
|
||||
}
|
||||
|
||||
static TransformError;
|
||||
}
|
||||
_makeFarm(workerPath, exposedMethods, maxWorkers) {
|
||||
// We whitelist only what would work. For example `--inspect` doesn't work
|
||||
// in the workers because it tries to open the same debugging port. Feel
|
||||
// free to add more cases to the RegExp. A whitelist is preferred, to
|
||||
// guarantee robustness when upgrading node, etc.
|
||||
const execArgv = process.execArgv.filter(
|
||||
arg =>
|
||||
/^--stack[_-]trace[_-]limit=[0-9]+$/.test(arg) ||
|
||||
/^--heap[_-]growing[_-]percent=[0-9]+$/.test(arg) ||
|
||||
/^--max[_-]old[_-]space[_-]size=[0-9]+$/.test(arg),
|
||||
);
|
||||
|
||||
Transformer.TransformError = TransformError;
|
||||
return new Worker(workerPath, {
|
||||
exposedMethods,
|
||||
forkOptions: {execArgv},
|
||||
maxWorkers,
|
||||
});
|
||||
}
|
||||
|
||||
function TransformError() {
|
||||
Error.captureStackTrace && Error.captureStackTrace(this, TransformError);
|
||||
}
|
||||
util.inherits(TransformError, SyntaxError);
|
||||
_formatGenericError(err, filename) {
|
||||
const error = new TransformError(`${filename}: ${err.message}`);
|
||||
|
||||
function formatError(err, filename) {
|
||||
if (err.loc) {
|
||||
return formatBabelError(err, filename);
|
||||
} else {
|
||||
return formatGenericError(err, filename);
|
||||
// $FlowFixMe: extending an error.
|
||||
return Object.assign(error, {
|
||||
stack: (err.stack || '')
|
||||
.split('\n')
|
||||
.slice(0, -1)
|
||||
.join('\n'),
|
||||
lineNumber: 0,
|
||||
});
|
||||
}
|
||||
|
||||
_formatBabelError(err, filename) {
|
||||
const error = new TransformError(
|
||||
`${err.type || 'Error'} in ${filename}: ${err.message}`,
|
||||
);
|
||||
|
||||
// $FlowFixMe: extending an error.
|
||||
return Object.assign(error, {
|
||||
stack: err.stack,
|
||||
snippet: err.codeFrame,
|
||||
lineNumber: err.loc.line,
|
||||
column: err.loc.column,
|
||||
filename,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
class TransformError extends SyntaxError {
|
||||
type: string = 'TransformError';
|
||||
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
Error.captureStackTrace && Error.captureStackTrace(this, TransformError);
|
||||
}
|
||||
}
|
||||
|
||||
function formatGenericError(err, filename) {
|
||||
var msg = 'TransformError: ' + filename + ': ' + err.message;
|
||||
var error = new TransformError();
|
||||
var stack = (err.stack || '').split('\n').slice(0, -1);
|
||||
error.stack = stack.join('\n');
|
||||
error.message = msg;
|
||||
error.type = 'TransformError';
|
||||
error.lineNumber = 0;
|
||||
error.description = '';
|
||||
return error;
|
||||
}
|
||||
|
||||
function formatBabelError(err, filename) {
|
||||
var error = new TransformError();
|
||||
error.type = 'TransformError';
|
||||
error.message = `${err.type || error.type} in ${filename}: ${err.message}`;
|
||||
error.stack = err.stack;
|
||||
error.snippet = err.codeFrame;
|
||||
error.lineNumber = err.loc.line;
|
||||
error.column = err.loc.column;
|
||||
error.filename = filename;
|
||||
error.description = err.message;
|
||||
return error;
|
||||
}
|
||||
|
||||
module.exports = Transformer;
|
||||
|
|
|
@ -42,14 +42,10 @@ describe('code transformation worker:', () => {
|
|||
const localPath = `local/${filename}`;
|
||||
const sourceCode = 'arbitrary(code)';
|
||||
const transformOptions = {arbitrary: 'options'};
|
||||
transformCode(
|
||||
transformer,
|
||||
filename,
|
||||
localPath,
|
||||
sourceCode,
|
||||
{dev: true, transform: transformOptions},
|
||||
() => {},
|
||||
);
|
||||
transformCode(transformer, filename, localPath, sourceCode, {
|
||||
dev: true,
|
||||
transform: transformOptions,
|
||||
});
|
||||
expect(transformer.transform).toBeCalledWith({
|
||||
filename,
|
||||
localPath,
|
||||
|
@ -65,14 +61,7 @@ describe('code transformation worker:', () => {
|
|||
const sourceCode = 'arbitrary(code)';
|
||||
const options = {dev: false, transform: {arbitrary: 'options'}};
|
||||
|
||||
transformCode(
|
||||
transformer,
|
||||
filename,
|
||||
localPath,
|
||||
sourceCode,
|
||||
options,
|
||||
() => {},
|
||||
);
|
||||
transformCode(transformer, filename, localPath, sourceCode, options);
|
||||
|
||||
const plugins = transformer.transform.mock.calls[0][0].plugins;
|
||||
|
||||
|
@ -84,14 +73,8 @@ describe('code transformation worker:', () => {
|
|||
const filename = 'arbitrary/file.json';
|
||||
const localPath = `local/${filename}`;
|
||||
const sourceCode = '{"arbitrary":"property"}';
|
||||
transformCode(
|
||||
transformer,
|
||||
filename,
|
||||
localPath,
|
||||
sourceCode,
|
||||
{dev: true},
|
||||
() => {},
|
||||
);
|
||||
transformCode(transformer, filename, localPath, sourceCode, {dev: true});
|
||||
|
||||
expect(transformer.transform).toBeCalledWith({
|
||||
filename,
|
||||
localPath,
|
||||
|
@ -101,166 +84,137 @@ describe('code transformation worker:', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('calls back with the result of the transform in the cache', done => {
|
||||
it('calls back with the result of the transform in the cache', async () => {
|
||||
const result = {
|
||||
code: 'some.other(code)',
|
||||
map: [],
|
||||
};
|
||||
|
||||
transformCode(
|
||||
const data = await transformCode(
|
||||
transformer,
|
||||
'filename',
|
||||
'local/filename',
|
||||
result.code,
|
||||
{},
|
||||
(error, data) => {
|
||||
expect(error).toBeNull();
|
||||
expect(data.result).toEqual(objectContaining(result));
|
||||
done();
|
||||
},
|
||||
);
|
||||
|
||||
expect(data.result).toEqual(objectContaining(result));
|
||||
});
|
||||
|
||||
it(
|
||||
'removes the leading assignment to `module.exports` before passing ' +
|
||||
'on the result if the file is a JSON file, even if minified',
|
||||
done => {
|
||||
const code = '{a:1,b:2}';
|
||||
const filePath = 'arbitrary/file.json';
|
||||
transformCode(
|
||||
transformer,
|
||||
filePath,
|
||||
filePath,
|
||||
code,
|
||||
{},
|
||||
(error, data) => {
|
||||
expect(error).toBeNull();
|
||||
expect(data.result.code).toEqual(code);
|
||||
done();
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
it('removes the leading `module.exports` before returning if the file is a JSON file, even if minified', async () => {
|
||||
const code = '{a:1,b:2}';
|
||||
const filePath = 'arbitrary/file.json';
|
||||
const data = await transformCode(transformer, filePath, filePath, code, {});
|
||||
|
||||
it('removes shebang when present', done => {
|
||||
expect(data.result.code).toEqual(code);
|
||||
});
|
||||
|
||||
it('removes shebang when present', async () => {
|
||||
const shebang = '#!/usr/bin/env node';
|
||||
const result = {
|
||||
code: `${shebang} \n arbitrary(code)`,
|
||||
};
|
||||
const filePath = 'arbitrary/file.js';
|
||||
transformCode(
|
||||
|
||||
const data = await transformCode(
|
||||
transformer,
|
||||
filePath,
|
||||
filePath,
|
||||
result.code,
|
||||
{},
|
||||
(error, data) => {
|
||||
expect(error).toBeNull();
|
||||
const {code} = data.result;
|
||||
expect(code).not.toContain(shebang);
|
||||
expect(code.split('\n').length).toEqual(result.code.split('\n').length);
|
||||
done();
|
||||
},
|
||||
);
|
||||
|
||||
const {code} = data.result;
|
||||
expect(code).not.toContain(shebang);
|
||||
expect(code.split('\n').length).toEqual(result.code.split('\n').length);
|
||||
});
|
||||
|
||||
it('calls back with any error yielded by the transform', done => {
|
||||
it('calls back with any error yielded by the transform', async () => {
|
||||
const message = 'SyntaxError: this code is broken.';
|
||||
|
||||
transformer.transform.mockImplementation(() => {
|
||||
throw new Error(message);
|
||||
});
|
||||
|
||||
transformCode(
|
||||
transformer,
|
||||
'filename',
|
||||
'local/filename',
|
||||
'code',
|
||||
{},
|
||||
error => {
|
||||
expect(error.message).toBe(message);
|
||||
done();
|
||||
},
|
||||
);
|
||||
});
|
||||
expect.assertions(1);
|
||||
|
||||
describe('dependency extraction', () => {
|
||||
it('passes the transformed code the `extractDependencies`', done => {
|
||||
const code = 'arbitrary(code)';
|
||||
|
||||
transformCode(
|
||||
try {
|
||||
await transformCode(
|
||||
transformer,
|
||||
'filename',
|
||||
'local/filename',
|
||||
code,
|
||||
'code',
|
||||
{},
|
||||
error => {
|
||||
expect(error).toBeNull();
|
||||
expect(extractDependencies).toBeCalledWith(code, 'filename');
|
||||
done();
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
expect(error.message).toBe(message);
|
||||
}
|
||||
});
|
||||
|
||||
describe('dependency extraction', () => {
|
||||
it('passes the transformed code the `extractDependencies`', async () => {
|
||||
const code = 'arbitrary(code)';
|
||||
|
||||
await transformCode(transformer, 'filename', 'local/filename', code, {});
|
||||
|
||||
expect(extractDependencies).toBeCalledWith(code, 'filename');
|
||||
});
|
||||
|
||||
it(
|
||||
'uses `dependencies` and `dependencyOffsets` ' +
|
||||
'provided by `extractDependencies` for the result',
|
||||
done => {
|
||||
const dependencyData = {
|
||||
dependencies: ['arbitrary', 'list', 'of', 'dependencies'],
|
||||
dependencyOffsets: [12, 119, 185, 328, 471],
|
||||
};
|
||||
extractDependencies.mockReturnValue(dependencyData);
|
||||
it('uses `dependencies` and `dependencyOffsets` provided by `extractDependencies` for the result', async () => {
|
||||
const dependencyData = {
|
||||
dependencies: ['arbitrary', 'list', 'of', 'dependencies'],
|
||||
dependencyOffsets: [12, 119, 185, 328, 471],
|
||||
};
|
||||
|
||||
transformCode(
|
||||
transformer,
|
||||
'filename',
|
||||
'local/filename',
|
||||
'code',
|
||||
{},
|
||||
(error, data) => {
|
||||
expect(error).toBeNull();
|
||||
expect(data.result).toEqual(objectContaining(dependencyData));
|
||||
done();
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
extractDependencies.mockReturnValue(dependencyData);
|
||||
|
||||
it('does not extract requires of JSON files', done => {
|
||||
const data = await transformCode(
|
||||
transformer,
|
||||
'filename',
|
||||
'local/filename',
|
||||
'code',
|
||||
{},
|
||||
);
|
||||
|
||||
expect(data.result).toEqual(objectContaining(dependencyData));
|
||||
});
|
||||
|
||||
it('does not extract requires of JSON files', async () => {
|
||||
const jsonStr = '{"arbitrary":"json"}';
|
||||
transformCode(
|
||||
|
||||
const data = await transformCode(
|
||||
transformer,
|
||||
'arbitrary.json',
|
||||
'local/arbitrary.json',
|
||||
jsonStr,
|
||||
{},
|
||||
(error, data) => {
|
||||
expect(error).toBeNull();
|
||||
const {dependencies, dependencyOffsets} = data.result;
|
||||
expect(extractDependencies).not.toBeCalled();
|
||||
expect(dependencies).toEqual([]);
|
||||
expect(dependencyOffsets).toEqual([]);
|
||||
done();
|
||||
},
|
||||
);
|
||||
|
||||
const {dependencies, dependencyOffsets} = data.result;
|
||||
|
||||
expect(extractDependencies).not.toBeCalled();
|
||||
expect(dependencies).toEqual([]);
|
||||
expect(dependencyOffsets).toEqual([]);
|
||||
});
|
||||
|
||||
it('calls back with every error thrown by `extractDependencies`', done => {
|
||||
it('calls back with every error thrown by `extractDependencies`', async () => {
|
||||
const error = new Error('arbitrary');
|
||||
|
||||
extractDependencies.mockImplementation(() => {
|
||||
throw error;
|
||||
});
|
||||
transformCode(
|
||||
transformer,
|
||||
'arbitrary.js',
|
||||
'local/arbitrary.js',
|
||||
'code',
|
||||
{},
|
||||
(e, data) => {
|
||||
expect(e).toBe(error);
|
||||
done();
|
||||
},
|
||||
);
|
||||
|
||||
try {
|
||||
await transformCode(
|
||||
transformer,
|
||||
'arbitrary.js',
|
||||
'local/arbitrary.js',
|
||||
'code',
|
||||
{},
|
||||
);
|
||||
} catch (err) {
|
||||
expect(err).toBe(error);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -27,6 +27,7 @@ import type {
|
|||
RawMappings,
|
||||
} from '../../lib/SourceMap';
|
||||
import type {LocalPath} from '../../node-haste/lib/toLocalPath';
|
||||
import type {ResultWithMap} from './minify';
|
||||
import type {Ast, Plugins as BabelPlugins} from 'babel-core';
|
||||
|
||||
export type TransformedCode = {
|
||||
|
@ -43,11 +44,13 @@ export type TransformArgs<ExtraOptions: {}> = {|
|
|||
plugins?: BabelPlugins,
|
||||
src: string,
|
||||
|};
|
||||
|
||||
export type TransformResults = {
|
||||
ast: ?Ast,
|
||||
code: string,
|
||||
map: ?MappingsMap | RawMappings,
|
||||
};
|
||||
|
||||
export type Transform<ExtraOptions: {}> = (
|
||||
TransformArgs<ExtraOptions>,
|
||||
) => TransformResults;
|
||||
|
@ -90,132 +93,113 @@ export type Data = {
|
|||
transformFileEndLogEntry: LogEntry,
|
||||
};
|
||||
|
||||
type Callback<T> = (error: ?Error, data: ?T) => mixed;
|
||||
type TransformCode = (
|
||||
Transformer<*>,
|
||||
string,
|
||||
LocalPath,
|
||||
string,
|
||||
Options,
|
||||
Callback<Data>,
|
||||
) => void;
|
||||
function transformCode(
|
||||
transformer: Transformer<*>,
|
||||
filename: string,
|
||||
localPath: LocalPath,
|
||||
sourceCode: string,
|
||||
options: Options,
|
||||
): Data {
|
||||
const isJson = filename.endsWith('.json');
|
||||
|
||||
const transformCode: TransformCode = asyncify(
|
||||
(
|
||||
transformer: Transformer<*>,
|
||||
filename: string,
|
||||
localPath: LocalPath,
|
||||
sourceCode: string,
|
||||
options: Options,
|
||||
): Data => {
|
||||
const isJson = filename.endsWith('.json');
|
||||
if (isJson) {
|
||||
sourceCode = 'module.exports=' + sourceCode;
|
||||
if (isJson) {
|
||||
sourceCode = 'module.exports=' + sourceCode;
|
||||
}
|
||||
|
||||
const transformFileStartLogEntry = {
|
||||
action_name: 'Transforming file',
|
||||
action_phase: 'start',
|
||||
file_name: filename,
|
||||
log_entry_label: 'Transforming file',
|
||||
start_timestamp: process.hrtime(),
|
||||
};
|
||||
|
||||
const plugins = options.dev
|
||||
? []
|
||||
: [[inline.plugin, options], [constantFolding.plugin, options]];
|
||||
|
||||
const transformed = transformer.transform({
|
||||
filename,
|
||||
localPath,
|
||||
options: options.transform,
|
||||
plugins,
|
||||
src: sourceCode,
|
||||
});
|
||||
|
||||
// If the transformer returns standard sourcemaps, we need to transform them
|
||||
// to rawMappings so we can process them correctly.
|
||||
const rawMappings =
|
||||
transformed.map && !Array.isArray(transformed.map)
|
||||
? toRawMappings(transformed.map)
|
||||
: transformed.map;
|
||||
|
||||
// Convert the sourcemaps to Compact Raw source maps.
|
||||
const map = rawMappings ? rawMappings.map(compactMapping) : null;
|
||||
|
||||
let code = transformed.code;
|
||||
if (isJson) {
|
||||
code = code.replace(/^\w+\.exports=/, '');
|
||||
} else {
|
||||
// Remove shebang
|
||||
code = code.replace(/^#!.*/, '');
|
||||
}
|
||||
|
||||
const depsResult = isJson
|
||||
? {dependencies: [], dependencyOffsets: []}
|
||||
: extractDependencies(code, filename);
|
||||
|
||||
const timeDelta = process.hrtime(transformFileStartLogEntry.start_timestamp);
|
||||
const duration_ms = Math.round((timeDelta[0] * 1e9 + timeDelta[1]) / 1e6);
|
||||
const transformFileEndLogEntry = {
|
||||
action_name: 'Transforming file',
|
||||
action_phase: 'end',
|
||||
file_name: filename,
|
||||
duration_ms,
|
||||
log_entry_label: 'Transforming file',
|
||||
};
|
||||
|
||||
return {
|
||||
result: {...depsResult, code, map},
|
||||
transformFileStartLogEntry,
|
||||
transformFileEndLogEntry,
|
||||
};
|
||||
}
|
||||
|
||||
exports.minify = async function(
|
||||
filename: string,
|
||||
code: string,
|
||||
sourceMap: RawMappings,
|
||||
): Promise<ResultWithMap> {
|
||||
try {
|
||||
return minify.withSourceMap(code, sourceMap, filename);
|
||||
} catch (error) {
|
||||
if (error.constructor.name === 'JS_Parse_Error') {
|
||||
throw new Error(
|
||||
`${error.message} in file ${filename} at ${error.line}:${error.col}`,
|
||||
);
|
||||
}
|
||||
|
||||
const transformFileStartLogEntry = {
|
||||
action_name: 'Transforming file',
|
||||
action_phase: 'start',
|
||||
file_name: filename,
|
||||
log_entry_label: 'Transforming file',
|
||||
start_timestamp: process.hrtime(),
|
||||
};
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
|
||||
const plugins = options.dev
|
||||
? []
|
||||
: [[inline.plugin, options], [constantFolding.plugin, options]];
|
||||
|
||||
const transformed = transformer.transform({
|
||||
filename,
|
||||
localPath,
|
||||
options: options.transform,
|
||||
plugins,
|
||||
src: sourceCode,
|
||||
});
|
||||
|
||||
// If the transformer returns standard sourcemaps, we need to transform them
|
||||
// to rawMappings so we can process them correctly.
|
||||
const rawMappings =
|
||||
transformed.map && !Array.isArray(transformed.map)
|
||||
? toRawMappings(transformed.map)
|
||||
: transformed.map;
|
||||
|
||||
// Convert the sourcemaps to Compact Raw source maps.
|
||||
const map = rawMappings ? rawMappings.map(compactMapping) : null;
|
||||
|
||||
let code = transformed.code;
|
||||
if (isJson) {
|
||||
code = code.replace(/^\w+\.exports=/, '');
|
||||
} else {
|
||||
// Remove shebang
|
||||
code = code.replace(/^#!.*/, '');
|
||||
}
|
||||
|
||||
const depsResult = isJson
|
||||
? {dependencies: [], dependencyOffsets: []}
|
||||
: extractDependencies(code, filename);
|
||||
|
||||
const timeDelta = process.hrtime(
|
||||
transformFileStartLogEntry.start_timestamp,
|
||||
);
|
||||
const duration_ms = Math.round((timeDelta[0] * 1e9 + timeDelta[1]) / 1e6);
|
||||
const transformFileEndLogEntry = {
|
||||
action_name: 'Transforming file',
|
||||
action_phase: 'end',
|
||||
file_name: filename,
|
||||
duration_ms,
|
||||
log_entry_label: 'Transforming file',
|
||||
};
|
||||
|
||||
return {
|
||||
result: {...depsResult, code, map},
|
||||
transformFileStartLogEntry,
|
||||
transformFileEndLogEntry,
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
exports.transformAndExtractDependencies = (
|
||||
exports.transformAndExtractDependencies = async function(
|
||||
transform: string,
|
||||
filename: string,
|
||||
localPath: LocalPath,
|
||||
sourceCode: string,
|
||||
options: Options,
|
||||
callback: Callback<Data>,
|
||||
) => {
|
||||
/* $FlowFixMe: impossible to type a dynamic require */
|
||||
): Promise<Data> {
|
||||
// $FlowFixMe: impossible to type a dynamic require.
|
||||
const transformModule: Transformer<*> = require(transform);
|
||||
transformCode(
|
||||
|
||||
return transformCode(
|
||||
transformModule,
|
||||
filename,
|
||||
localPath,
|
||||
sourceCode,
|
||||
options,
|
||||
callback,
|
||||
);
|
||||
};
|
||||
|
||||
exports.minify = asyncify(
|
||||
(filename: string, code: string, sourceMap: RawMappings) => {
|
||||
var result;
|
||||
try {
|
||||
result = minify.withSourceMap(code, sourceMap, filename);
|
||||
} catch (error) {
|
||||
if (error.constructor.name === 'JS_Parse_Error') {
|
||||
throw new Error(
|
||||
error.message +
|
||||
' in file "' +
|
||||
filename +
|
||||
'" at line ' +
|
||||
error.line +
|
||||
':' +
|
||||
error.col,
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
return result;
|
||||
},
|
||||
);
|
||||
|
||||
exports.transformCode = transformCode; // for easier testing
|
||||
|
|
|
@ -26,7 +26,7 @@ import type {
|
|||
RawMappings,
|
||||
} from '../../lib/SourceMap';
|
||||
|
||||
type ResultWithMap = {
|
||||
export type ResultWithMap = {
|
||||
code: string,
|
||||
map: MappingsMap,
|
||||
};
|
||||
|
|
|
@ -13,8 +13,7 @@
|
|||
'use strict';
|
||||
|
||||
jest
|
||||
.mock('../../worker-farm', () => () => () => {})
|
||||
.mock('worker-farm', () => () => () => {})
|
||||
.mock('jest-worker', () => ({}))
|
||||
.mock('../../JSTransformer/worker/minify')
|
||||
.mock('crypto')
|
||||
.mock('../symbolicate', () => ({
|
||||
|
|
|
@ -35,46 +35,36 @@ describe('basic_bundle', () => {
|
|||
const polyfill2 = path.join(INPUT_PATH, 'polyfill-2.js');
|
||||
|
||||
beforeEach(() => {
|
||||
// Don't waste time creating a worker-farm from jest-haste-map, use the
|
||||
// function directly instead.
|
||||
jest.mock('worker-farm', () => {
|
||||
function workerFarm(opts, workerPath, methodNames) {
|
||||
return require(workerPath);
|
||||
}
|
||||
workerFarm.end = () => {};
|
||||
return workerFarm;
|
||||
});
|
||||
// We replace the farm by a simple require, so that the worker sources are
|
||||
// transformed and managed by jest.
|
||||
jest.mock('../../worker-farm', () => {
|
||||
let ended = false;
|
||||
|
||||
function workerFarm(opts, workerPath, methodNames) {
|
||||
jest.mock('jest-worker', () => {
|
||||
function Worker(workerPath, opts) {
|
||||
const {Readable} = require('stream');
|
||||
const methods = {};
|
||||
const worker = require(workerPath);
|
||||
const api = {
|
||||
getStdout: () => new Readable({read() {}}),
|
||||
getStderr: () => new Readable({read() {}}),
|
||||
end: () => (ended = true),
|
||||
};
|
||||
|
||||
methodNames.forEach(name => {
|
||||
methods[name] = function() {
|
||||
let ended = false;
|
||||
|
||||
opts.exposedMethods.forEach(name => {
|
||||
api[name] = function() {
|
||||
if (ended) {
|
||||
throw new Error('worker farm was ended');
|
||||
}
|
||||
|
||||
return worker[name].apply(null, arguments);
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
stdout: new Readable({read() {}}),
|
||||
stderr: new Readable({read() {}}),
|
||||
methods,
|
||||
};
|
||||
return api;
|
||||
}
|
||||
|
||||
workerFarm.end = () => {
|
||||
ended = true;
|
||||
return {
|
||||
default: Worker,
|
||||
};
|
||||
|
||||
return workerFarm;
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -23,7 +23,6 @@ const {
|
|||
AmbiguousModuleResolutionError,
|
||||
} = require('../node-haste/DependencyGraph/ResolutionRequest');
|
||||
|
||||
import type Transformer from '../JSTransformer';
|
||||
import type {BundleOptions} from '../Server';
|
||||
import type Terminal from './Terminal';
|
||||
import type {ReportableEvent, GlobalCacheDisabledReason} from './reporting';
|
||||
|
@ -262,7 +261,7 @@ class TerminalReporter {
|
|||
* these are operational errors, not programming errors, and the stacktrace
|
||||
* is not actionable to end users.
|
||||
*/
|
||||
_logBundlingError(error: Error | Transformer.TransformError) {
|
||||
_logBundlingError(error: Error) {
|
||||
if (error instanceof AmbiguousModuleResolutionError) {
|
||||
const he = error.hasteError;
|
||||
const message =
|
||||
|
|
|
@ -1 +0,0 @@
|
|||
node_modules
|
|
@ -1,9 +0,0 @@
|
|||
language: node_js
|
||||
node_js:
|
||||
- "0.10"
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
notifications:
|
||||
email:
|
||||
- rod@vagg.org
|
|
@ -1,13 +0,0 @@
|
|||
The MIT License (MIT)
|
||||
=====================
|
||||
|
||||
Copyright (c) 2014 LevelUP contributors
|
||||
---------------------------------------
|
||||
|
||||
*LevelUP contributors listed at <https://github.com/rvagg/node-levelup#contributors>*
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
@ -1,143 +0,0 @@
|
|||
# Worker Farm
|
||||
|
||||
NOTE: this was forked from npm module `worker-farm`. Below is the original documentation, that may not be up-to-date.
|
||||
|
||||
|
||||
---
|
||||
|
||||
Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options. *Available in npm as <strong>worker-farm</strong>*.
|
||||
|
||||
## Example
|
||||
|
||||
Given a file, *child.js*:
|
||||
|
||||
```js
|
||||
module.exports = function (inp, callback) {
|
||||
callback(null, inp + ' BAR (' + process.pid + ')')
|
||||
}
|
||||
```
|
||||
|
||||
And a main file:
|
||||
|
||||
```js
|
||||
var workerFarm = require('worker-farm')
|
||||
, workers = workerFarm(require.resolve('./child'))
|
||||
, ret = 0
|
||||
|
||||
for (var i = 0; i < 10; i++) {
|
||||
workers('#' + i + ' FOO', function (err, outp) {
|
||||
console.log(outp)
|
||||
if (++ret == 10)
|
||||
workerFarm.end(workers)
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
We'll get an output something like the following:
|
||||
|
||||
```
|
||||
#1 FOO BAR (8546)
|
||||
#0 FOO BAR (8545)
|
||||
#8 FOO BAR (8545)
|
||||
#9 FOO BAR (8546)
|
||||
#2 FOO BAR (8548)
|
||||
#4 FOO BAR (8551)
|
||||
#3 FOO BAR (8549)
|
||||
#6 FOO BAR (8555)
|
||||
#5 FOO BAR (8553)
|
||||
#7 FOO BAR (8557)
|
||||
```
|
||||
|
||||
This example is contained in the *[examples/basic](https://github.com/rvagg/node-worker-farm/tree/master/examples/basic/)* directory.
|
||||
|
||||
### Example #1: Estimating π using child workers
|
||||
|
||||
You will also find a more complex example in *[examples/pi](https://github.com/rvagg/node-worker-farm/tree/master/examples/pi/)* that estimates the value of **π** by using a Monte Carlo *area-under-the-curve* method and compares the speed of doing it all in-process vs using child workers to complete separate portions.
|
||||
|
||||
Running `node examples/pi` will give you something like:
|
||||
|
||||
```
|
||||
Doing it the slow (single-process) way...
|
||||
π ≈ 3.1416269360000006 (0.0000342824102075312 away from actual!)
|
||||
took 8341 milliseconds
|
||||
Doing it the fast (multi-process) way...
|
||||
π ≈ 3.1416233600000036 (0.00003070641021052367 away from actual!)
|
||||
took 1985 milliseconds
|
||||
```
|
||||
|
||||
## Durability
|
||||
|
||||
An important feature of Worker Farm is **call durability**. If a child process dies for any reason during the execution of call(s), those calls will be re-queued and taken care of by other child processes. In this way, when you ask for something to be done, unless there is something *seriously* wrong with what you're doing, you should get a result on your callback function.
|
||||
|
||||
## My use-case
|
||||
|
||||
There are other libraries for managing worker processes available but my use-case was fairly specific: I need to make heavy use of the [node-java](https://github.com/nearinfinity/node-java) library to interact with JVM code. Unfortunately, because the JVM garbage collector is so difficult to interact with, it's prone to killing your Node process when the GC kicks under heavy load. For safety I needed a durable way to make calls so that (1) it wouldn't kill my main process and (2) any calls that weren't successful would be resubmitted for processing.
|
||||
|
||||
Worker Farm allows me to spin up multiple JVMs to be controlled by Node, and have a single, uncomplicated API that acts the same way as an in-process API and the calls will be taken care of by a child process even if an error kills a child process while it is working as the call will simply be passed to a new child process.
|
||||
|
||||
**But**, don't think that Worker Farm is specific to that use-case, it's designed to be very generic and simple to adapt to anything requiring the use of child Node processes.
|
||||
|
||||
## API
|
||||
|
||||
Worker Farm exports a main function an an `end()` method. The main function sets up a "farm" of coordinated child-process workers and it can be used to instantiate multiple farms, all operating independently.
|
||||
|
||||
### workerFarm([options, ]pathToModule[, exportedMethods])
|
||||
|
||||
In its most basic form, you call `workerFarm()` with the path to a module file to be invoked by the child process. You should use an **absolute path** to the module file, the best way to obtain the path is with `require.resolve('./path/to/module')`, this function can be used in exactly the same way as `require('./path/to/module')` but it returns an absolute path.
|
||||
|
||||
#### `exportedMethods`
|
||||
|
||||
If your module exports a single function on `module.exports` then you should omit the final parameter. However, if you are exporting multiple functions on `module.exports` then you should list them in an Array of Strings:
|
||||
|
||||
```js
|
||||
var workers = workerFarm(require.resolve('./mod'), [ 'doSomething', 'doSomethingElse' ])
|
||||
workers.doSomething(function () {})
|
||||
workers.doSomethingElse(function () {})
|
||||
```
|
||||
|
||||
Listing the available methods will instruct Worker Farm what API to provide you with on the returned object. If you don't list a `exportedMethods` Array then you'll get a single callable function to use; but if you list the available methods then you'll get an object with callable functions by those names.
|
||||
|
||||
**It is assumed that each function you call on your child module will take a `callback` function as the last argument.**
|
||||
|
||||
#### `options`
|
||||
|
||||
If you don't provide an `options` object then the following defaults will be used:
|
||||
|
||||
```js
|
||||
{
|
||||
maxCallsPerWorker : Infinity
|
||||
, maxConcurrentWorkers : require('os').cpus().length
|
||||
, maxConcurrentCallsPerWorker : 10
|
||||
, maxConcurrentCalls : Infinity
|
||||
, maxCallTime : Infinity
|
||||
, maxRetries : Infinity
|
||||
, autoStart : false
|
||||
}
|
||||
```
|
||||
|
||||
* **<code>maxCallsPerWorker</code>** allows you to control the lifespan of your child processes. A positive number will indicate that you only want each child to accept that many calls before it is terminated. This may be useful if you need to control memory leaks or similar in child processes.
|
||||
|
||||
* **<code>maxConcurrentWorkers</code>** will set the number of child processes to maintain concurrently. By default it is set to the number of CPUs available on the current system, but it can be any reasonable number, including `1`.
|
||||
|
||||
* **<code>maxConcurrentCallsPerWorker</code>** allows you to control the *concurrency* of individual child processes. Calls are placed into a queue and farmed out to child processes according to the number of calls they are allowed to handle concurrently. It is arbitrarily set to 10 by default so that calls are shared relatively evenly across workers, however if your calls predictably take a similar amount of time then you could set it to `Infinity` and Worker Farm won't queue any calls but spread them evenly across child processes and let them go at it. If your calls aren't I/O bound then it won't matter what value you use here as the individual workers won't be able to execute more than a single call at a time.
|
||||
|
||||
* **<code>maxConcurrentCalls</code>** allows you to control the maximum number of calls in the queue—either actively being processed or waiting for a worker to be processed. `Infinity` indicates no limit but if you have conditions that may endlessly queue jobs and you need to set a limit then provide a `>0` value and any calls that push the limit will return on their callback with a `MaxConcurrentCallsError` error (check `err.type == 'MaxConcurrentCallsError'`).
|
||||
|
||||
* **<code>maxCallTime</code>** *(use with caution, understand what this does before you use it!)* when `!== Infinity`, will cap a time, in milliseconds, that *any single call* can take to execute in a worker. If this time limit is exceeded by just a single call then the worker running that call will be killed and any calls running on that worker will have their callbacks returned with a `TimeoutError` (check `err.type == 'TimeoutError'`). If you are running with `maxConcurrentCallsPerWorker` value greater than `1` then **all calls currently executing** will fail and will be automatically resubmitted uless you've changed the `maxRetries` option. Use this if you have jobs that may potentially end in infinite loops that you can't programatically end with your child code. Preferably run this with a `maxConcurrentCallsPerWorker` so you don't interrupt other calls when you have a timeout. This timeout operates on a per-call basis but will interrupt a whole worker.
|
||||
|
||||
* **<code>maxRetries</code>** allows you to control the max number of call requeues after worker termination (unexpected or timeout). By default this option is set to `Infinity` which means that each call of each terminated worker will always be auto requeued. When the number of retries exceeds `maxRetries` value, the job callback will be executed with a `ProcessTerminatedError`. Note that if you are running with finite `maxCallTime` and `maxConcurrentCallsPerWorkers` greater than `1` then any `TimeoutError` will increase the retries counter *for each* concurrent call of the terminated worker.
|
||||
|
||||
* **<code>autoStart</code>** when set to `true` will start the workers as early as possible. Use this when your workers have to do expensive initialization. That way they'll be ready when the first request comes through.
|
||||
|
||||
### workerFarm.end(farm)
|
||||
|
||||
Child processes stay alive waiting for jobs indefinitely and your farm manager will stay alive managing its workers, so if you need it to stop then you have to do so explicitly. If you send your farm API to `workerFarm.end()` then it'll cleanly end your worker processes. Note though that it's a *soft* ending so it'll wait for child processes to finish what they are working on before asking them to die.
|
||||
|
||||
Any calls that are queued and not yet being handled by a child process will be discarded. `end()` only waits for those currently in progress.
|
||||
|
||||
Once you end a farm, it won't handle any more calls, so don't even try!
|
||||
|
||||
|
||||
## License
|
||||
|
||||
Worker Farm is Copyright (c) 2014 Rod Vagg [@rvagg](https://twitter.com/rvagg) and licensed under the MIT license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE.md file for more details.
|
|
@ -1,15 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
module.exports = function(inp, callback) {
|
||||
callback(null, inp + ' BAR (' + process.pid + ')');
|
||||
};
|
|
@ -1,22 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
var workerFarm = require('../../'),
|
||||
workers = workerFarm(require.resolve('./child')),
|
||||
ret = 0;
|
||||
|
||||
for (var i = 0; i < 10; i++) {
|
||||
workers('#' + i + ' FOO', function(err, outp) {
|
||||
console.log(outp);
|
||||
if (++ret == 10) workerFarm.end(workers);
|
||||
});
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
/* A simple PI estimation function using a Monte Carlo method
|
||||
* For 0 to `points`, take 2 random numbers < 1, square and add them to
|
||||
* find the area under that point in a 1x1 square. If that area is <= 1
|
||||
* then it's *within* a quarter-circle, otherwise it's outside.
|
||||
* Take the number of points <= 1 and multiply it by 4 and you have an
|
||||
* estimate!
|
||||
* Do this across multiple processes and average the results to
|
||||
* increase accuracy.
|
||||
*/
|
||||
|
||||
module.exports = function(points, callback) {
|
||||
var inside = 0,
|
||||
i = points;
|
||||
|
||||
while (i--)
|
||||
if (Math.pow(Math.random(), 2) + Math.pow(Math.random(), 2) <= 1) inside++;
|
||||
|
||||
callback(null, inside / points * 4);
|
||||
};
|
|
@ -1,55 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
const CHILDREN = 500,
|
||||
POINTS_PER_CHILD = 1000000,
|
||||
FARM_OPTIONS = {
|
||||
maxConcurrentWorkers: require('os').cpus().length,
|
||||
maxCallsPerWorker: Infinity,
|
||||
maxConcurrentCallsPerWorker: 1,
|
||||
};
|
||||
|
||||
var workerFarm = require('../../'),
|
||||
calcDirect = require('./calc'),
|
||||
calcWorker = workerFarm(FARM_OPTIONS, require.resolve('./calc')),
|
||||
ret,
|
||||
start,
|
||||
tally = function(finish, err, avg) {
|
||||
ret.push(avg);
|
||||
if (ret.length == CHILDREN) {
|
||||
var pi =
|
||||
ret.reduce(function(a, b) {
|
||||
return a + b;
|
||||
}) / ret.length,
|
||||
end = +new Date();
|
||||
console.log(
|
||||
'PI ~=',
|
||||
pi,
|
||||
'\t(' + Math.abs(pi - Math.PI),
|
||||
'away from actual!)',
|
||||
);
|
||||
console.log('took', end - start, 'milliseconds');
|
||||
if (finish) finish();
|
||||
}
|
||||
},
|
||||
calc = function(method, callback) {
|
||||
ret = [];
|
||||
start = +new Date();
|
||||
for (var i = 0; i < CHILDREN; i++)
|
||||
method(POINTS_PER_CHILD, tally.bind(null, callback));
|
||||
};
|
||||
|
||||
console.log('Doing it the slow (single-process) way...');
|
||||
calc(calcDirect, function() {
|
||||
console.log('Doing it the fast (multi-process) way...');
|
||||
calc(calcWorker, process.exit);
|
||||
});
|
|
@ -1,57 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
var $module;
|
||||
|
||||
/*
|
||||
var contextProto = this.context;
|
||||
while (contextProto = Object.getPrototypeOf(contextProto)) {
|
||||
completionGroups.push(Object.getOwnPropertyNames(contextProto));
|
||||
}
|
||||
*/
|
||||
|
||||
function handle(data) {
|
||||
var idx = data.idx,
|
||||
child = data.child,
|
||||
method = data.method,
|
||||
args = data.args,
|
||||
callback = function() {
|
||||
var _args = Array.prototype.slice.call(arguments);
|
||||
if (_args[0] instanceof Error) {
|
||||
var e = _args[0];
|
||||
_args[0] = {
|
||||
$error: '$error',
|
||||
type: e.constructor.name,
|
||||
message: e.message,
|
||||
stack: e.stack,
|
||||
};
|
||||
Object.keys(e).forEach(function(key) {
|
||||
_args[0][key] = e[key];
|
||||
});
|
||||
}
|
||||
process.send({idx: idx, child: child, args: _args});
|
||||
},
|
||||
exec;
|
||||
|
||||
if (method == null && typeof $module == 'function') exec = $module;
|
||||
else if (typeof $module[method] == 'function') exec = $module[method];
|
||||
|
||||
if (!exec) return console.error('NO SUCH METHOD:', method);
|
||||
|
||||
exec.apply(null, args.concat([callback]));
|
||||
}
|
||||
|
||||
process.on('message', function(data) {
|
||||
if (!$module) return ($module = require(data.module));
|
||||
if (data == 'die') return process.exit(0);
|
||||
handle(data);
|
||||
});
|
|
@ -1,370 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
* @flow
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
const DEFAULT_OPTIONS = {
|
||||
maxCallsPerWorker: Infinity,
|
||||
maxConcurrentWorkers: require('os').cpus().length,
|
||||
maxConcurrentCallsPerWorker: 10,
|
||||
maxConcurrentCalls: Infinity,
|
||||
maxCallTime: Infinity, // exceed this and the whole worker is terminated
|
||||
maxRetries: Infinity,
|
||||
forcedKillTime: 100,
|
||||
autoStart: false,
|
||||
};
|
||||
|
||||
const extend = require('xtend'),
|
||||
fork = require('./fork'),
|
||||
TimeoutError = require('errno').create('TimeoutError'),
|
||||
ProcessTerminatedError = require('errno').create('ProcessTerminatedError'),
|
||||
MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError');
|
||||
|
||||
const mergeStream = require('merge-stream');
|
||||
|
||||
function Farm(options: {+execArgv: Array<string>}, path: string) {
|
||||
this.options = extend(DEFAULT_OPTIONS, options);
|
||||
this.path = path;
|
||||
this.activeCalls = 0;
|
||||
this.stdout = mergeStream();
|
||||
this.stderr = mergeStream();
|
||||
}
|
||||
|
||||
// make a handle to pass back in the form of an external API
|
||||
Farm.prototype.mkhandle = function(method) {
|
||||
return function() {
|
||||
var args = Array.prototype.slice.call(arguments);
|
||||
if (this.activeCalls >= this.options.maxConcurrentCalls) {
|
||||
var err = new MaxConcurrentCallsError(
|
||||
'Too many concurrent calls (' + this.activeCalls + ')',
|
||||
);
|
||||
if (typeof args[args.length - 1] == 'function')
|
||||
return process.nextTick(args[args.length - 1].bind(null, err));
|
||||
throw err;
|
||||
}
|
||||
this.addCall({
|
||||
method: method,
|
||||
callback: args.pop(),
|
||||
args: args,
|
||||
retries: 0,
|
||||
});
|
||||
}.bind(this);
|
||||
};
|
||||
|
||||
// a constructor of sorts
|
||||
Farm.prototype.setup = function(methods) {
|
||||
var iface;
|
||||
if (!methods) {
|
||||
// single-function export
|
||||
iface = this.mkhandle();
|
||||
} else {
|
||||
// multiple functions on the export
|
||||
iface = {};
|
||||
methods.forEach(
|
||||
function(m) {
|
||||
iface[m] = this.mkhandle(m);
|
||||
}.bind(this),
|
||||
);
|
||||
}
|
||||
|
||||
this.searchStart = -1;
|
||||
this.childId = -1;
|
||||
this.children = {};
|
||||
this.activeChildren = 0;
|
||||
this.callQueue = [];
|
||||
|
||||
if (this.options.autoStart) {
|
||||
while (this.activeChildren < this.options.maxConcurrentWorkers)
|
||||
this.startChild();
|
||||
}
|
||||
|
||||
return iface;
|
||||
};
|
||||
|
||||
// when a child exits, check if there are any outstanding jobs and requeue them
|
||||
Farm.prototype.onExit = function(childId) {
|
||||
// delay this to give any sends a chance to finish
|
||||
setTimeout(
|
||||
function() {
|
||||
var doQueue = false;
|
||||
if (this.children[childId] && this.children[childId].activeCalls) {
|
||||
this.children[childId].calls.forEach(
|
||||
function(call, i) {
|
||||
if (!call) return;
|
||||
else if (call.retries >= this.options.maxRetries) {
|
||||
this.receive({
|
||||
idx: i,
|
||||
child: childId,
|
||||
args: [
|
||||
new ProcessTerminatedError(
|
||||
'cancel after ' + call.retries + ' retries!',
|
||||
),
|
||||
],
|
||||
});
|
||||
} else {
|
||||
call.retries++;
|
||||
this.callQueue.unshift(call);
|
||||
doQueue = true;
|
||||
}
|
||||
}.bind(this),
|
||||
);
|
||||
}
|
||||
this.stopChild(childId);
|
||||
doQueue && this.processQueue();
|
||||
}.bind(this),
|
||||
10,
|
||||
);
|
||||
};
|
||||
|
||||
// start a new worker
|
||||
Farm.prototype.startChild = function() {
|
||||
this.childId++;
|
||||
|
||||
var forked = fork(this.path, {execArgv: this.options.execArgv}),
|
||||
id = this.childId,
|
||||
c = {
|
||||
send: forked.send,
|
||||
child: forked.child,
|
||||
calls: [],
|
||||
activeCalls: 0,
|
||||
exitCode: null,
|
||||
};
|
||||
|
||||
this.stdout.add(forked.child.stdout);
|
||||
this.stderr.add(forked.child.stderr);
|
||||
|
||||
forked.child.on('message', this.receive.bind(this));
|
||||
forked.child.once(
|
||||
'exit',
|
||||
function(code) {
|
||||
c.exitCode = code;
|
||||
this.onExit(id);
|
||||
}.bind(this),
|
||||
);
|
||||
|
||||
this.activeChildren++;
|
||||
this.children[id] = c;
|
||||
};
|
||||
|
||||
// stop a worker, identified by id
|
||||
Farm.prototype.stopChild = function(childId) {
|
||||
var child = this.children[childId];
|
||||
if (child) {
|
||||
child.send('die');
|
||||
setTimeout(function() {
|
||||
if (child.exitCode === null) child.child.kill('SIGKILL');
|
||||
}, this.options.forcedKillTime);
|
||||
delete this.children[childId];
|
||||
this.activeChildren--;
|
||||
}
|
||||
};
|
||||
|
||||
// called from a child process, the data contains information needed to
|
||||
// look up the child and the original call so we can invoke the callback
|
||||
Farm.prototype.receive = function(data) {
|
||||
var idx = data.idx,
|
||||
childId = data.child,
|
||||
args = data.args,
|
||||
child = this.children[childId],
|
||||
call;
|
||||
|
||||
if (!child) {
|
||||
return console.error(
|
||||
'Worker Farm: Received message for unknown child. ' +
|
||||
'This is likely as a result of premature child death, ' +
|
||||
'the operation will have been re-queued.',
|
||||
);
|
||||
}
|
||||
|
||||
call = child.calls[idx];
|
||||
if (!call) {
|
||||
return console.error(
|
||||
'Worker Farm: Received message for unknown index for existing child. ' +
|
||||
'This should not happen!',
|
||||
);
|
||||
}
|
||||
|
||||
if (this.options.maxCallTime !== Infinity) clearTimeout(call.timer);
|
||||
|
||||
if (args[0] && args[0].$error == '$error') {
|
||||
var e = args[0];
|
||||
switch (e.type) {
|
||||
case 'TypeError':
|
||||
args[0] = new TypeError(e.message);
|
||||
break;
|
||||
case 'RangeError':
|
||||
args[0] = new RangeError(e.message);
|
||||
break;
|
||||
case 'EvalError':
|
||||
args[0] = new EvalError(e.message);
|
||||
break;
|
||||
case 'ReferenceError':
|
||||
args[0] = new ReferenceError(e.message);
|
||||
break;
|
||||
case 'SyntaxError':
|
||||
args[0] = new SyntaxError(e.message);
|
||||
break;
|
||||
case 'URIError':
|
||||
args[0] = new URIError(e.message);
|
||||
break;
|
||||
default:
|
||||
args[0] = new Error(e.message);
|
||||
}
|
||||
args[0].type = e.type;
|
||||
args[0].stack = e.stack;
|
||||
|
||||
// Copy any custom properties to pass it on.
|
||||
Object.keys(e).forEach(function(key) {
|
||||
args[0][key] = e[key];
|
||||
});
|
||||
}
|
||||
|
||||
process.nextTick(function() {
|
||||
call.callback.apply(null, args);
|
||||
});
|
||||
delete child.calls[idx];
|
||||
child.activeCalls--;
|
||||
this.activeCalls--;
|
||||
|
||||
if (
|
||||
child.calls.length >= this.options.maxCallsPerWorker &&
|
||||
!Object.keys(child.calls).length
|
||||
) {
|
||||
// this child has finished its run, kill it
|
||||
this.stopChild(childId);
|
||||
}
|
||||
|
||||
// allow any outstanding calls to be processed
|
||||
this.processQueue();
|
||||
};
|
||||
|
||||
Farm.prototype.childTimeout = function(childId) {
|
||||
var child = this.children[childId],
|
||||
i;
|
||||
|
||||
if (!child) return;
|
||||
|
||||
for (i in child.calls) {
|
||||
this.receive({
|
||||
idx: i,
|
||||
child: childId,
|
||||
args: [new TimeoutError('worker call timed out!')],
|
||||
});
|
||||
}
|
||||
this.stopChild(childId);
|
||||
};
|
||||
|
||||
// send a call to a worker, identified by id
|
||||
Farm.prototype.send = function(childId, call) {
|
||||
var child = this.children[childId],
|
||||
idx = child.calls.length;
|
||||
|
||||
child.calls.push(call);
|
||||
child.activeCalls++;
|
||||
this.activeCalls++;
|
||||
|
||||
child.send({
|
||||
idx: idx,
|
||||
child: childId,
|
||||
method: call.method,
|
||||
args: call.args,
|
||||
});
|
||||
|
||||
if (this.options.maxCallTime !== Infinity) {
|
||||
call.timer = setTimeout(
|
||||
this.childTimeout.bind(this, childId),
|
||||
this.options.maxCallTime,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// a list of active worker ids, in order, but the starting offset is
|
||||
// shifted each time this method is called, so we work our way through
|
||||
// all workers when handing out jobs
|
||||
Farm.prototype.childKeys = function() {
|
||||
var cka = Object.keys(this.children),
|
||||
cks;
|
||||
|
||||
if (this.searchStart >= cka.length - 1) this.searchStart = 0;
|
||||
else this.searchStart++;
|
||||
|
||||
cks = cka.splice(0, this.searchStart);
|
||||
|
||||
return cka.concat(cks);
|
||||
};
|
||||
|
||||
// Calls are added to a queue, this processes the queue and is called
|
||||
// whenever there might be a chance to send more calls to the workers.
|
||||
// The various options all impact on when we're able to send calls,
|
||||
// they may need to be kept in a queue until a worker is ready.
|
||||
Farm.prototype.processQueue = function() {
|
||||
var cka,
|
||||
i = 0,
|
||||
childId;
|
||||
|
||||
if (!this.callQueue.length) return this.ending && this.end();
|
||||
|
||||
if (this.activeChildren < this.options.maxConcurrentWorkers)
|
||||
this.startChild();
|
||||
|
||||
for (cka = this.childKeys(); i < cka.length; i++) {
|
||||
childId = +cka[i];
|
||||
if (
|
||||
this.children[childId].activeCalls <
|
||||
this.options.maxConcurrentCallsPerWorker &&
|
||||
this.children[childId].calls.length < this.options.maxCallsPerWorker
|
||||
) {
|
||||
this.send(childId, this.callQueue.shift());
|
||||
if (!this.callQueue.length) return this.ending && this.end();
|
||||
} /*else {
|
||||
console.log(
|
||||
, this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
|
||||
, this.children[childId].calls.length < this.options.maxCallsPerWorker
|
||||
, this.children[childId].calls.length , this.options.maxCallsPerWorker)
|
||||
}*/
|
||||
}
|
||||
|
||||
if (this.ending) this.end();
|
||||
};
|
||||
|
||||
// add a new call to the call queue, then trigger a process of the queue
|
||||
Farm.prototype.addCall = function(call) {
|
||||
if (this.ending) return this.end(); // don't add anything new to the queue
|
||||
this.callQueue.push(call);
|
||||
this.processQueue();
|
||||
};
|
||||
|
||||
// kills child workers when they're all done
|
||||
Farm.prototype.end = function(callback) {
|
||||
var complete = true;
|
||||
if (this.ending === false) return;
|
||||
if (callback) this.ending = callback;
|
||||
else if (this.ending == null) this.ending = true;
|
||||
Object.keys(this.children).forEach(
|
||||
function(child) {
|
||||
if (!this.children[child]) return;
|
||||
if (!this.children[child].activeCalls) this.stopChild(child);
|
||||
else complete = false;
|
||||
}.bind(this),
|
||||
);
|
||||
|
||||
if (complete && typeof this.ending == 'function') {
|
||||
process.nextTick(
|
||||
function() {
|
||||
this.ending();
|
||||
this.ending = false;
|
||||
}.bind(this),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = Farm;
|
||||
module.exports.TimeoutError = TimeoutError;
|
|
@ -1,41 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
* @flow
|
||||
*/
|
||||
|
||||
'use strict';
|
||||
|
||||
const childProcess = require('child_process');
|
||||
const childModule = require.resolve('./child/index');
|
||||
|
||||
function fork(forkModule: string, options: {|+execArgv: Array<string>|}) {
|
||||
const child = childProcess.fork(childModule, {
|
||||
cwd: process.cwd(),
|
||||
env: process.env,
|
||||
execArgv: options.execArgv,
|
||||
silent: true,
|
||||
});
|
||||
|
||||
child.send({module: forkModule});
|
||||
|
||||
// return a send() function for this child
|
||||
return {
|
||||
send(data: {}) {
|
||||
try {
|
||||
child.send(data);
|
||||
} catch (e) {
|
||||
// this *should* be picked up by onExit and the operation requeued
|
||||
}
|
||||
},
|
||||
child,
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = fork;
|
|
@ -1,50 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
* @flow
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
const Farm = require('./farm');
|
||||
|
||||
import type {Readable} from 'stream';
|
||||
|
||||
var farms = []; // keep record of farms so we can end() them if required
|
||||
|
||||
export type FarmAPI = {|
|
||||
methods: {[name: string]: Function},
|
||||
stdout: Readable,
|
||||
stderr: Readable,
|
||||
|};
|
||||
|
||||
function farm(
|
||||
options: {+execArgv: Array<string>},
|
||||
path: string,
|
||||
methods: Array<string>,
|
||||
): FarmAPI {
|
||||
var f = new Farm(options, path),
|
||||
api = f.setup(methods);
|
||||
|
||||
farms.push({farm: f, api: api});
|
||||
|
||||
// $FlowFixMe: gotta type the Farm class.
|
||||
const {stdout, stderr} = f;
|
||||
|
||||
// return the public API
|
||||
return {methods: (api: any), stdout, stderr};
|
||||
}
|
||||
|
||||
function end(api, callback) {
|
||||
for (var i = 0; i < farms.length; i++)
|
||||
if (farms[i] && farms[i].api === api) return farms[i].farm.end(callback);
|
||||
process.nextTick(callback.bind(null, 'Worker farm not found!'));
|
||||
}
|
||||
|
||||
module.exports = farm;
|
||||
module.exports.end = end;
|
|
@ -1,18 +0,0 @@
|
|||
{
|
||||
"private": true,
|
||||
"authors": [
|
||||
"Rod Vagg @rvagg <rod@vagg.org> (https://github.com/rvagg)"
|
||||
],
|
||||
"main": "./lib/index.js",
|
||||
"dependencies": {
|
||||
"errno": ">=0.1.1 <0.2.0-0",
|
||||
"xtend": ">=4.0.0 <4.1.0-0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"tape": ">=3.0.3 <3.1.0-0"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "node ./tests/"
|
||||
},
|
||||
"license": "MIT"
|
||||
}
|
|
@ -1,78 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
var fs = require('fs');
|
||||
|
||||
module.exports = function(timeout, callback) {
|
||||
callback = callback.bind(null, null, process.pid, Math.random(), timeout);
|
||||
if (timeout) return setTimeout(callback, timeout);
|
||||
callback();
|
||||
};
|
||||
|
||||
module.exports.run0 = function(callback) {
|
||||
module.exports(0, callback);
|
||||
};
|
||||
|
||||
module.exports.killable = function(id, callback) {
|
||||
if (Math.random() < 0.5) return process.exit(-1);
|
||||
callback(null, id, process.pid);
|
||||
};
|
||||
|
||||
module.exports.err = function(type, message, data, callback) {
|
||||
if (typeof data == 'function') {
|
||||
callback = data;
|
||||
data = null;
|
||||
} else {
|
||||
var err = new Error(message);
|
||||
Object.keys(data).forEach(function(key) {
|
||||
err[key] = data[key];
|
||||
});
|
||||
callback(err);
|
||||
return;
|
||||
}
|
||||
|
||||
if (type == 'TypeError') return callback(new TypeError(message));
|
||||
callback(new Error(message));
|
||||
};
|
||||
|
||||
module.exports.block = function() {
|
||||
while (true);
|
||||
};
|
||||
|
||||
// use provided file path to save retries count among terminated workers
|
||||
module.exports.stubborn = function(path, callback) {
|
||||
function isOutdated(path) {
|
||||
return new Date().getTime() - fs.statSync(path).mtime.getTime() > 2000;
|
||||
}
|
||||
|
||||
// file may not be properly deleted, check if modified no earler than two seconds ago
|
||||
if (!fs.existsSync(path) || isOutdated(path)) {
|
||||
fs.writeFileSync(path, '1');
|
||||
process.exit(-1);
|
||||
}
|
||||
|
||||
var retry = parseInt(fs.readFileSync(path, 'utf8'));
|
||||
if (Number.isNaN(retry))
|
||||
return callback(new Error('file contents is not a number'));
|
||||
|
||||
if (retry > 4) {
|
||||
callback(null, 12);
|
||||
} else {
|
||||
fs.writeFileSync(path, String(retry + 1));
|
||||
process.exit(-1);
|
||||
}
|
||||
};
|
||||
|
||||
var started = Date.now();
|
||||
module.exports.uptime = function(callback) {
|
||||
callback(null, Date.now() - started);
|
||||
};
|
|
@ -1,528 +0,0 @@
|
|||
/**
|
||||
* Copyright (c) 2013-present, Facebook, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This source code is licensed under the BSD-style license found in the
|
||||
* LICENSE file in the root directory of this source tree. An additional grant
|
||||
* of patent rights can be found in the PATENTS file in the same directory.
|
||||
*
|
||||
* @format
|
||||
*/
|
||||
|
||||
/* eslint-disable */
|
||||
var tape = require('tape'),
|
||||
workerFarm = require('../'),
|
||||
childPath = require.resolve('./child'),
|
||||
fs = require('fs'),
|
||||
uniq = function(ar) {
|
||||
var a = [],
|
||||
i,
|
||||
j;
|
||||
o: for (i = 0; i < ar.length; ++i) {
|
||||
for (j = 0; j < a.length; ++j) if (a[j] == ar[i]) continue o;
|
||||
a[a.length] = ar[i];
|
||||
}
|
||||
return a;
|
||||
};
|
||||
|
||||
// a child where module.exports = function ...
|
||||
tape('simple, exports=function test', function(t) {
|
||||
t.plan(4);
|
||||
|
||||
var child = workerFarm(childPath);
|
||||
child(0, function(err, pid, rnd) {
|
||||
t.ok(pid > process.pid, 'pid makes sense');
|
||||
t.ok(pid < process.pid + 500, 'pid makes sense');
|
||||
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense');
|
||||
});
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
// a child where we have module.exports.fn = function ...
|
||||
tape('simple, exports.fn test', function(t) {
|
||||
t.plan(4);
|
||||
|
||||
var child = workerFarm(childPath, ['run0']);
|
||||
child.run0(function(err, pid, rnd) {
|
||||
t.ok(pid > process.pid, 'pid makes sense');
|
||||
t.ok(pid < process.pid + 500, 'pid makes sense');
|
||||
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense');
|
||||
});
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
// use the returned pids to check that we're using a single child process
|
||||
// when maxConcurrentWorkers = 1
|
||||
tape('single worker', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm({maxConcurrentWorkers: 1}, childPath),
|
||||
pids = [],
|
||||
i = 10;
|
||||
|
||||
while (i--) {
|
||||
child(0, function(err, pid) {
|
||||
pids.push(pid);
|
||||
if (pids.length == 10) {
|
||||
t.equal(1, uniq(pids).length, 'only a single process (by pid)');
|
||||
} else if (pids.length > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
// use the returned pids to check that we're using two child processes
|
||||
// when maxConcurrentWorkers = 2
|
||||
tape('two workers', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm({maxConcurrentWorkers: 2}, childPath),
|
||||
pids = [],
|
||||
i = 10;
|
||||
|
||||
while (i--) {
|
||||
child(0, function(err, pid) {
|
||||
pids.push(pid);
|
||||
if (pids.length == 10) {
|
||||
t.equal(2, uniq(pids).length, 'only two child processes (by pid)');
|
||||
} else if (pids.length > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
// use the returned pids to check that we're using a child process per
|
||||
// call when maxConcurrentWorkers = 10
|
||||
tape('many workers', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm({maxConcurrentWorkers: 10}, childPath),
|
||||
pids = [],
|
||||
i = 10;
|
||||
|
||||
while (i--) {
|
||||
child(1, function(err, pid) {
|
||||
pids.push(pid);
|
||||
if (pids.length == 10) {
|
||||
t.equal(10, uniq(pids).length, 'pids are all the same (by pid)');
|
||||
} else if (pids.length > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
tape('auto start workers', function(t) {
|
||||
t.plan(4);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxConcurrentWorkers: 3, autoStart: true},
|
||||
childPath,
|
||||
['uptime'],
|
||||
),
|
||||
pids = [],
|
||||
i = 3,
|
||||
delay = 150;
|
||||
|
||||
setTimeout(function() {
|
||||
while (i--)
|
||||
child.uptime(function(err, uptime) {
|
||||
t.ok(uptime > 10, 'child has been up before the request');
|
||||
});
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
}, delay);
|
||||
});
|
||||
|
||||
// use the returned pids to check that we're using a child process per
|
||||
// call when we set maxCallsPerWorker = 1 even when we have maxConcurrentWorkers = 1
|
||||
tape('single call per worker', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxConcurrentWorkers: 1, maxCallsPerWorker: 1},
|
||||
childPath,
|
||||
),
|
||||
pids = [],
|
||||
i = 10;
|
||||
|
||||
while (i--) {
|
||||
child(0, function(err, pid) {
|
||||
pids.push(pid);
|
||||
if (pids.length == 10) {
|
||||
t.equal(10, uniq(pids).length, 'one process for each call (by pid)');
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
} else if (pids.length > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// use the returned pids to check that we're using a child process per
|
||||
// two-calls when we set maxCallsPerWorker = 2 even when we have maxConcurrentWorkers = 1
|
||||
tape('two calls per worker', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxConcurrentWorkers: 1, maxCallsPerWorker: 2},
|
||||
childPath,
|
||||
),
|
||||
pids = [],
|
||||
i = 10;
|
||||
|
||||
while (i--) {
|
||||
child(0, function(err, pid) {
|
||||
pids.push(pid);
|
||||
if (pids.length == 10) {
|
||||
t.equal(5, uniq(pids).length, 'one process for each call (by pid)');
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
} else if (pids.length > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// use timing to confirm that one worker will process calls sequentially
|
||||
tape('many concurrent calls', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm({maxConcurrentWorkers: 1}, childPath),
|
||||
i = 10,
|
||||
cbc = 0,
|
||||
start = Date.now();
|
||||
|
||||
while (i--) {
|
||||
child(100, function() {
|
||||
if (++cbc == 10) {
|
||||
var time = Date.now() - start;
|
||||
t.ok(
|
||||
time > 100 && time < 200,
|
||||
'processed tasks concurrently (' + time + 'ms)',
|
||||
);
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
} else if (cbc > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// use timing to confirm that one child processes calls sequentially with
|
||||
// maxConcurrentCallsPerWorker = 1
|
||||
tape('single concurrent call', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxConcurrentWorkers: 1, maxConcurrentCallsPerWorker: 1},
|
||||
childPath,
|
||||
),
|
||||
i = 10,
|
||||
cbc = 0,
|
||||
start = Date.now();
|
||||
|
||||
while (i--) {
|
||||
child(10, function() {
|
||||
if (++cbc == 10) {
|
||||
var time = Date.now() - start;
|
||||
t.ok(
|
||||
time > 100 && time < 190,
|
||||
'processed tasks sequentially (' + time + 'ms)',
|
||||
);
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
} else if (cbc > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// use timing to confirm that one child processes *only* 5 calls concurrently
|
||||
tape('multiple concurrent calls', function(t) {
|
||||
t.plan(2);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxConcurrentWorkers: 1, maxConcurrentCallsPerWorker: 5},
|
||||
childPath,
|
||||
),
|
||||
i = 10,
|
||||
cbc = 0,
|
||||
start = Date.now();
|
||||
|
||||
while (i--) {
|
||||
child(50, function() {
|
||||
if (++cbc == 10) {
|
||||
var time = Date.now() - start;
|
||||
t.ok(
|
||||
time > 100 && time < 200,
|
||||
'processed tasks concurrently (' + time + 'ms)',
|
||||
);
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
} else if (cbc > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// call a method that will die with a probability of 0.5 but expect that
|
||||
// we'll get results for each of our calls anyway
|
||||
tape('durability', function(t) {
|
||||
t.plan(3);
|
||||
|
||||
var child = workerFarm({maxConcurrentWorkers: 2}, childPath, ['killable']),
|
||||
ids = [],
|
||||
pids = [],
|
||||
i = 10;
|
||||
|
||||
while (i--) {
|
||||
child.killable(i, function(err, id, pid) {
|
||||
ids.push(id);
|
||||
pids.push(pid);
|
||||
if (ids.length == 10) {
|
||||
t.ok(
|
||||
uniq(pids).length > 2,
|
||||
'processed by many (' +
|
||||
uniq(pids).length +
|
||||
') workers, but got there in the end!',
|
||||
);
|
||||
t.ok(
|
||||
uniq(ids).length == 10,
|
||||
'received a single result for each unique call',
|
||||
);
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
} else if (ids.length > 10) t.fail('too many callbacks!');
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// a callback provided to .end() can and will be called (uses "simple, exports=function test" to create a child)
|
||||
tape('simple, end callback', function(t) {
|
||||
t.plan(4);
|
||||
|
||||
var child = workerFarm(childPath);
|
||||
child(0, function(err, pid, rnd) {
|
||||
t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid);
|
||||
t.ok(
|
||||
pid < process.pid + 500,
|
||||
'pid makes sense ' + pid + ' vs ' + process.pid,
|
||||
);
|
||||
t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense');
|
||||
});
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.pass('an .end() callback was successfully called');
|
||||
});
|
||||
});
|
||||
|
||||
tape('call timeout test', function(t) {
|
||||
t.plan(3 + 3 + 4 + 4 + 4 + 3 + 1);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxCallTime: 250, maxConcurrentWorkers: 1},
|
||||
childPath,
|
||||
);
|
||||
|
||||
// should come back ok
|
||||
child(50, function(err, pid, rnd) {
|
||||
t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid);
|
||||
t.ok(
|
||||
pid < process.pid + 500,
|
||||
'pid makes sense ' + pid + ' vs ' + process.pid,
|
||||
);
|
||||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd);
|
||||
});
|
||||
|
||||
// should come back ok
|
||||
child(50, function(err, pid, rnd) {
|
||||
t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid);
|
||||
t.ok(
|
||||
pid < process.pid + 500,
|
||||
'pid makes sense ' + pid + ' vs ' + process.pid,
|
||||
);
|
||||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd);
|
||||
});
|
||||
|
||||
// should die
|
||||
child(500, function(err, pid, rnd) {
|
||||
t.ok(err, 'got an error');
|
||||
t.equal(err.type, 'TimeoutError', 'correct error type');
|
||||
t.ok(pid === undefined, 'no pid');
|
||||
t.ok(rnd === undefined, 'no rnd');
|
||||
});
|
||||
|
||||
// should die
|
||||
child(1000, function(err, pid, rnd) {
|
||||
t.ok(err, 'got an error');
|
||||
t.equal(err.type, 'TimeoutError', 'correct error type');
|
||||
t.ok(pid === undefined, 'no pid');
|
||||
t.ok(rnd === undefined, 'no rnd');
|
||||
});
|
||||
|
||||
// should die even though it is only a 100ms task, it'll get caught up
|
||||
// in a dying worker
|
||||
setTimeout(function() {
|
||||
child(100, function(err, pid, rnd) {
|
||||
t.ok(err, 'got an error');
|
||||
t.equal(err.type, 'TimeoutError', 'correct error type');
|
||||
t.ok(pid === undefined, 'no pid');
|
||||
t.ok(rnd === undefined, 'no rnd');
|
||||
});
|
||||
}, 200);
|
||||
|
||||
// should be ok, new worker
|
||||
setTimeout(function() {
|
||||
child(50, function(err, pid, rnd) {
|
||||
t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid);
|
||||
t.ok(
|
||||
pid < process.pid + 500,
|
||||
'pid makes sense ' + pid + ' vs ' + process.pid,
|
||||
);
|
||||
t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd);
|
||||
});
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
}, 400);
|
||||
});
|
||||
|
||||
tape('test error passing', function(t) {
|
||||
t.plan(10);
|
||||
|
||||
var child = workerFarm(childPath, ['err']);
|
||||
child.err('Error', 'this is an Error', function(err) {
|
||||
t.ok(err instanceof Error, 'is an Error object');
|
||||
t.equal('Error', err.type, 'correct type');
|
||||
t.equal('this is an Error', err.message, 'correct message');
|
||||
});
|
||||
child.err('TypeError', 'this is a TypeError', function(err) {
|
||||
t.ok(err instanceof Error, 'is a TypeError object');
|
||||
t.equal('TypeError', err.type, 'correct type');
|
||||
t.equal('this is a TypeError', err.message, 'correct message');
|
||||
});
|
||||
child.err(
|
||||
'Error',
|
||||
'this is an Error with custom props',
|
||||
{foo: 'bar', baz: 1},
|
||||
function(err) {
|
||||
t.ok(err instanceof Error, 'is an Error object');
|
||||
t.equal(err.foo, 'bar', 'passes data');
|
||||
t.equal(err.baz, 1, 'passes data');
|
||||
},
|
||||
);
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
tape('test maxConcurrentCalls', function(t) {
|
||||
t.plan(10);
|
||||
|
||||
var child = workerFarm({maxConcurrentCalls: 5}, childPath);
|
||||
|
||||
child(50, function(err) {
|
||||
t.notOk(err, 'no error');
|
||||
});
|
||||
child(50, function(err) {
|
||||
t.notOk(err, 'no error');
|
||||
});
|
||||
child(50, function(err) {
|
||||
t.notOk(err, 'no error');
|
||||
});
|
||||
child(50, function(err) {
|
||||
t.notOk(err, 'no error');
|
||||
});
|
||||
child(50, function(err) {
|
||||
t.notOk(err, 'no error');
|
||||
});
|
||||
child(50, function(err) {
|
||||
t.ok(err);
|
||||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type');
|
||||
});
|
||||
child(50, function(err) {
|
||||
t.ok(err);
|
||||
t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type');
|
||||
});
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
// this test should not keep the process running! if the test process
|
||||
// doesn't die then the problem is here
|
||||
tape('test timeout kill', function(t) {
|
||||
t.plan(3);
|
||||
|
||||
var child = workerFarm(
|
||||
{maxCallTime: 250, maxConcurrentWorkers: 1},
|
||||
childPath,
|
||||
['block'],
|
||||
);
|
||||
child.block(function(err) {
|
||||
t.ok(err, 'got an error');
|
||||
t.equal(err.type, 'TimeoutError', 'correct error type');
|
||||
});
|
||||
|
||||
workerFarm.end(child, function() {
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
||||
|
||||
tape('test max retries after process terminate', function(t) {
|
||||
t.plan(7);
|
||||
|
||||
// temporary file is used to store the number of retries among terminating workers
|
||||
var filepath1 = '.retries1';
|
||||
var child1 = workerFarm({maxConcurrentWorkers: 1, maxRetries: 5}, childPath, [
|
||||
'stubborn',
|
||||
]);
|
||||
child1.stubborn(filepath1, function(err, result) {
|
||||
t.notOk(err, 'no error');
|
||||
t.equal(result, 12, 'correct result');
|
||||
});
|
||||
|
||||
workerFarm.end(child1, function() {
|
||||
fs.unlinkSync(filepath1);
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
|
||||
var filepath2 = '.retries2';
|
||||
var child2 = workerFarm({maxConcurrentWorkers: 1, maxRetries: 3}, childPath, [
|
||||
'stubborn',
|
||||
]);
|
||||
child2.stubborn(filepath2, function(err, result) {
|
||||
t.ok(err, 'got an error');
|
||||
t.equal(err.type, 'ProcessTerminatedError', 'correct error type');
|
||||
t.equal(
|
||||
err.message,
|
||||
'cancel after 3 retries!',
|
||||
'correct message and number of retries',
|
||||
);
|
||||
});
|
||||
|
||||
workerFarm.end(child2, function() {
|
||||
fs.unlinkSync(filepath2);
|
||||
t.ok(true, 'workerFarm ended');
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue