packager: worker-farm: isolate stdout/stderr

Reviewed By: davidaurelio

Differential Revision: D5011034

fbshipit-source-id: 81d3d49156775f9781427d3c103d768ccc30fe20
This commit is contained in:
Jean Lauliac 2017-05-08 08:16:40 -07:00 committed by Facebook Github Bot
parent a2e4e54245
commit a1d21be8b8
8 changed files with 71 additions and 10 deletions

View File

@ -194,8 +194,15 @@ class Bundler {
const maxWorkerCount = Bundler.getMaxWorkerCount();
this._transformer = new Transformer(
/* $FlowFixMe: in practice it's always here. */
this._transformer = new Transformer(opts.transformModulePath, maxWorkerCount);
opts.transformModulePath,
maxWorkerCount,
{
stdoutChunk: chunk => opts.reporter.update({type: 'worker_stdout_chunk', chunk}),
stderrChunk: chunk => opts.reporter.update({type: 'worker_stderr_chunk', chunk}),
}
);
const getTransformCacheKey = (options) => {
return transformCacheKey + getCacheKey(options);

View File

@ -9,6 +9,7 @@
'use strict';
jest
.unmock('stream')
.unmock('imurmurhash')
.unmock('../../lib/ModuleTransport')
.unmock('../');
@ -24,6 +25,7 @@ jest.setMock('../../worker-farm', workerFarm);
var Transformer = require('../');
const {any} = jasmine;
const {Readable} = require('stream');
describe('Transformer', function() {
let workers, Cache;
@ -39,7 +41,7 @@ describe('Transformer', function() {
workerFarm.mockImplementation((opts, path, methods) => {
const api = workers = {};
methods.forEach(method => {api[method] = jest.fn();});
return api;
return {methods: api, stdout: new Readable({read() {}}), stderr: new Readable({read() {}})};
});
});

View File

@ -50,6 +50,11 @@ function makeFarm(worker, methods, timeout, maxConcurrentWorkers) {
);
}
type Reporters = {
+stdoutChunk: (chunk: string) => mixed,
+stderrChunk: (chunk: string) => mixed,
};
class Transformer {
_workers: {[name: string]: Function};
@ -66,16 +71,24 @@ class Transformer {
sourceMap: MappingsMap,
) => Promise<{code: string, map: MappingsMap}>;
constructor(transformModulePath: string, maxWorkerCount: number) {
constructor(transformModulePath: string, maxWorkerCount: number, reporters: Reporters) {
invariant(path.isAbsolute(transformModulePath), 'transform module path should be absolute');
this._transformModulePath = transformModulePath;
this._workers = makeFarm(
const farm = makeFarm(
require.resolve('./worker'),
['minify', 'transformAndExtractDependencies'],
TRANSFORM_TIMEOUT_INTERVAL,
maxWorkerCount,
);
farm.stdout.on('data', chunk => {
reporters.stdoutChunk(chunk.toString('utf8'));
});
farm.stderr.on('data', chunk => {
reporters.stderrChunk(chunk.toString('utf8'));
});
this._workers = farm.methods;
this._transform = denodeify(this._workers.transformAndExtractDependencies);
this.minify = denodeify(this._workers.minify);
}

View File

@ -135,7 +135,6 @@ class TerminalReporter {
}
}
_logPackagerInitializing(port: number, projectRoots: Array<string>) {
terminal.log(
formatBanner(
@ -183,7 +182,6 @@ class TerminalReporter {
}
}
/**
* This function is only concerned with logging and should not do state
* or terminal status updates.
@ -214,9 +212,25 @@ class TerminalReporter {
case 'transform_cache_reset':
reporting.logWarning(terminal, 'the transform cache was reset.');
break;
case 'worker_stdout_chunk':
this._logWorkerChunk('stdout', event.chunk);
break;
case 'worker_stderr_chunk':
this._logWorkerChunk('stderr', event.chunk);
break;
}
}
_logWorkerChunk(origin: 'stdout' | 'stderr', chunk: string) {
const lines = chunk.split('\n');
if (lines.length >= 1 && lines[lines.length - 1] === '') {
lines.splice(lines.length - 1, 1);
}
lines.forEach(line => {
terminal.log(`transform[${origin}]: ${line}`);
});
}
/**
* We use Math.pow(ratio, 2) to as a conservative measure of progress because
* we know the `totalCount` is going to progressively increase as well. We

View File

@ -59,6 +59,12 @@ export type ReportableEvent = {
reason: GlobalCacheDisabledReason,
} | {
type: 'transform_cache_reset',
} | {
type: 'worker_stdout_chunk',
chunk: string,
} | {
type: 'worker_stderr_chunk',
chunk: string,
};
/**

View File

@ -27,10 +27,14 @@ const extend = require('xtend')
, ProcessTerminatedError = require('errno').create('ProcessTerminatedError')
, MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
const mergeStream = require('merge-stream');
function Farm (options: {+execArgv: Array<string>}, path: string) {
this.options = extend(DEFAULT_OPTIONS, options)
this.path = path
this.activeCalls = 0
this.stdout = mergeStream();
this.stderr = mergeStream();
}
// make a handle to pass back in the form of an external API
@ -118,6 +122,9 @@ Farm.prototype.startChild = function () {
, exitCode : null
}
this.stdout.add(forked.child.stdout);
this.stderr.add(forked.child.stderr);
forked.child.on('message', this.receive.bind(this))
forked.child.once('exit', function (code) {
c.exitCode = code

View File

@ -16,9 +16,10 @@ const childModule = require.resolve('./child/index');
function fork(forkModule: string, options: {|+execArgv: Array<string>|}) {
const child = childProcess.fork(childModule, {
env: process.env,
cwd: process.cwd(),
env: process.env,
execArgv: options.execArgv,
silent: true,
});
child.send({module: forkModule});

View File

@ -12,20 +12,31 @@
/* eslint-disable */
const Farm = require('./farm')
import type {Readable} from 'stream';
var farms = [] // keep record of farms so we can end() them if required
export type FarmAPI = {|
methods: {[name: string]: Function},
stdout: Readable,
stderr: Readable,
|};
function farm(
options: {+execArgv: Array<string>},
path: string,
methods: Array<string>,
): {[name: string]: Function} {
): FarmAPI {
var f = new Farm(options, path)
, api = f.setup(methods)
farms.push({ farm: f, api: api })
// $FlowFixMe: gotta type the Farm class.
const {stdout, stderr} = f;
// return the public API
return (api: any)
return {methods: (api: any), stdout, stderr};
}
function end (api, callback) {