packager: worker-farm: isolate stdout/stderr
Reviewed By: davidaurelio Differential Revision: D5011034 fbshipit-source-id: 81d3d49156775f9781427d3c103d768ccc30fe20
This commit is contained in:
parent
5f04678ee8
commit
fbe6d9321e
|
@ -177,6 +177,7 @@
|
|||
"json5": "^0.4.0",
|
||||
"left-pad": "^1.1.3",
|
||||
"lodash": "^4.16.6",
|
||||
"merge-stream": "^1.0.1",
|
||||
"mime": "^1.3.4",
|
||||
"mime-types": "2.1.11",
|
||||
"minimist": "^1.2.0",
|
||||
|
|
|
@ -194,8 +194,15 @@ class Bundler {
|
|||
|
||||
const maxWorkerCount = Bundler.getMaxWorkerCount();
|
||||
|
||||
/* $FlowFixMe: in practice it's always here. */
|
||||
this._transformer = new Transformer(opts.transformModulePath, maxWorkerCount);
|
||||
this._transformer = new Transformer(
|
||||
/* $FlowFixMe: in practice it's always here. */
|
||||
opts.transformModulePath,
|
||||
maxWorkerCount,
|
||||
{
|
||||
stdoutChunk: chunk => opts.reporter.update({type: 'worker_stdout_chunk', chunk}),
|
||||
stderrChunk: chunk => opts.reporter.update({type: 'worker_stderr_chunk', chunk}),
|
||||
}
|
||||
);
|
||||
|
||||
const getTransformCacheKey = (options) => {
|
||||
return transformCacheKey + getCacheKey(options);
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
'use strict';
|
||||
|
||||
jest
|
||||
.unmock('stream')
|
||||
.unmock('imurmurhash')
|
||||
.unmock('../../lib/ModuleTransport')
|
||||
.unmock('../');
|
||||
|
@ -24,6 +25,7 @@ jest.setMock('../../worker-farm', workerFarm);
|
|||
var Transformer = require('../');
|
||||
|
||||
const {any} = jasmine;
|
||||
const {Readable} = require('stream');
|
||||
|
||||
describe('Transformer', function() {
|
||||
let workers, Cache;
|
||||
|
@ -39,7 +41,7 @@ describe('Transformer', function() {
|
|||
workerFarm.mockImplementation((opts, path, methods) => {
|
||||
const api = workers = {};
|
||||
methods.forEach(method => {api[method] = jest.fn();});
|
||||
return api;
|
||||
return {methods: api, stdout: new Readable({read() {}}), stderr: new Readable({read() {}})};
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -50,6 +50,11 @@ function makeFarm(worker, methods, timeout, maxConcurrentWorkers) {
|
|||
);
|
||||
}
|
||||
|
||||
type Reporters = {
|
||||
+stdoutChunk: (chunk: string) => mixed,
|
||||
+stderrChunk: (chunk: string) => mixed,
|
||||
};
|
||||
|
||||
class Transformer {
|
||||
|
||||
_workers: {[name: string]: Function};
|
||||
|
@ -66,16 +71,24 @@ class Transformer {
|
|||
sourceMap: MappingsMap,
|
||||
) => Promise<{code: string, map: MappingsMap}>;
|
||||
|
||||
constructor(transformModulePath: string, maxWorkerCount: number) {
|
||||
constructor(transformModulePath: string, maxWorkerCount: number, reporters: Reporters) {
|
||||
invariant(path.isAbsolute(transformModulePath), 'transform module path should be absolute');
|
||||
this._transformModulePath = transformModulePath;
|
||||
|
||||
this._workers = makeFarm(
|
||||
const farm = makeFarm(
|
||||
require.resolve('./worker'),
|
||||
['minify', 'transformAndExtractDependencies'],
|
||||
TRANSFORM_TIMEOUT_INTERVAL,
|
||||
maxWorkerCount,
|
||||
);
|
||||
farm.stdout.on('data', chunk => {
|
||||
reporters.stdoutChunk(chunk.toString('utf8'));
|
||||
});
|
||||
farm.stderr.on('data', chunk => {
|
||||
reporters.stderrChunk(chunk.toString('utf8'));
|
||||
});
|
||||
|
||||
this._workers = farm.methods;
|
||||
this._transform = denodeify(this._workers.transformAndExtractDependencies);
|
||||
this.minify = denodeify(this._workers.minify);
|
||||
}
|
||||
|
|
|
@ -135,7 +135,6 @@ class TerminalReporter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
_logPackagerInitializing(port: number, projectRoots: Array<string>) {
|
||||
terminal.log(
|
||||
formatBanner(
|
||||
|
@ -183,7 +182,6 @@ class TerminalReporter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This function is only concerned with logging and should not do state
|
||||
* or terminal status updates.
|
||||
|
@ -214,9 +212,25 @@ class TerminalReporter {
|
|||
case 'transform_cache_reset':
|
||||
reporting.logWarning(terminal, 'the transform cache was reset.');
|
||||
break;
|
||||
case 'worker_stdout_chunk':
|
||||
this._logWorkerChunk('stdout', event.chunk);
|
||||
break;
|
||||
case 'worker_stderr_chunk':
|
||||
this._logWorkerChunk('stderr', event.chunk);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_logWorkerChunk(origin: 'stdout' | 'stderr', chunk: string) {
|
||||
const lines = chunk.split('\n');
|
||||
if (lines.length >= 1 && lines[lines.length - 1] === '') {
|
||||
lines.splice(lines.length - 1, 1);
|
||||
}
|
||||
lines.forEach(line => {
|
||||
terminal.log(`transform[${origin}]: ${line}`);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* We use Math.pow(ratio, 2) to as a conservative measure of progress because
|
||||
* we know the `totalCount` is going to progressively increase as well. We
|
||||
|
|
|
@ -59,6 +59,12 @@ export type ReportableEvent = {
|
|||
reason: GlobalCacheDisabledReason,
|
||||
} | {
|
||||
type: 'transform_cache_reset',
|
||||
} | {
|
||||
type: 'worker_stdout_chunk',
|
||||
chunk: string,
|
||||
} | {
|
||||
type: 'worker_stderr_chunk',
|
||||
chunk: string,
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,10 +27,14 @@ const extend = require('xtend')
|
|||
, ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
|
||||
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
|
||||
|
||||
const mergeStream = require('merge-stream');
|
||||
|
||||
function Farm (options: {+execArgv: Array<string>}, path: string) {
|
||||
this.options = extend(DEFAULT_OPTIONS, options)
|
||||
this.path = path
|
||||
this.activeCalls = 0
|
||||
this.stdout = mergeStream();
|
||||
this.stderr = mergeStream();
|
||||
}
|
||||
|
||||
// make a handle to pass back in the form of an external API
|
||||
|
@ -118,6 +122,9 @@ Farm.prototype.startChild = function () {
|
|||
, exitCode : null
|
||||
}
|
||||
|
||||
this.stdout.add(forked.child.stdout);
|
||||
this.stderr.add(forked.child.stderr);
|
||||
|
||||
forked.child.on('message', this.receive.bind(this))
|
||||
forked.child.once('exit', function (code) {
|
||||
c.exitCode = code
|
||||
|
|
|
@ -16,9 +16,10 @@ const childModule = require.resolve('./child/index');
|
|||
|
||||
function fork(forkModule: string, options: {|+execArgv: Array<string>|}) {
|
||||
const child = childProcess.fork(childModule, {
|
||||
env: process.env,
|
||||
cwd: process.cwd(),
|
||||
env: process.env,
|
||||
execArgv: options.execArgv,
|
||||
silent: true,
|
||||
});
|
||||
|
||||
child.send({module: forkModule});
|
||||
|
|
|
@ -12,20 +12,31 @@
|
|||
/* eslint-disable */
|
||||
const Farm = require('./farm')
|
||||
|
||||
import type {Readable} from 'stream';
|
||||
|
||||
var farms = [] // keep record of farms so we can end() them if required
|
||||
|
||||
export type FarmAPI = {|
|
||||
methods: {[name: string]: Function},
|
||||
stdout: Readable,
|
||||
stderr: Readable,
|
||||
|};
|
||||
|
||||
function farm(
|
||||
options: {+execArgv: Array<string>},
|
||||
path: string,
|
||||
methods: Array<string>,
|
||||
): {[name: string]: Function} {
|
||||
): FarmAPI {
|
||||
var f = new Farm(options, path)
|
||||
, api = f.setup(methods)
|
||||
|
||||
farms.push({ farm: f, api: api })
|
||||
|
||||
// $FlowFixMe: gotta type the Farm class.
|
||||
const {stdout, stderr} = f;
|
||||
|
||||
// return the public API
|
||||
return (api: any)
|
||||
return {methods: (api: any), stdout, stderr};
|
||||
}
|
||||
|
||||
function end (api, callback) {
|
||||
|
|
Loading…
Reference in New Issue