packager: BatchProcessor: use Promise for processBatch()

Reviewed By: cpojer

Differential Revision: D4572495

fbshipit-source-id: 4a18b6ae16ea588104c337f2085707be07609005
This commit is contained in:
Jean Lauliac 2017-02-20 08:36:01 -08:00 committed by Facebook Github Bot
parent bac576c433
commit 564126f2bf
4 changed files with 61 additions and 75 deletions

View File

@ -151,6 +151,7 @@
"event-target-shim": "^1.0.5", "event-target-shim": "^1.0.5",
"fbjs": "^0.8.9", "fbjs": "^0.8.9",
"fbjs-scripts": "^0.7.0", "fbjs-scripts": "^0.7.0",
"form-data": "^2.1.1",
"fs-extra": "^0.26.2", "fs-extra": "^0.26.2",
"glob": "^5.0.15", "glob": "^5.0.15",
"graceful-fs": "^4.1.3", "graceful-fs": "^4.1.3",

View File

@ -13,10 +13,7 @@
const invariant = require('fbjs/lib/invariant'); const invariant = require('fbjs/lib/invariant');
type ProcessBatch<TItem, TResult> = ( type ProcessBatch<TItem, TResult> = (batch: Array<TItem>) => Promise<Array<TResult>>;
batch: Array<TItem>,
callback: (error?: Error, orderedResults?: Array<TResult>) => mixed,
) => mixed;
type BatchProcessorOptions = { type BatchProcessorOptions = {
maximumDelayMs: number, maximumDelayMs: number,
@ -45,10 +42,7 @@ class BatchProcessor<TItem, TResult> {
_queue: Array<QueueItem<TItem, TResult>>; _queue: Array<QueueItem<TItem, TResult>>;
_timeoutHandle: ?number; _timeoutHandle: ?number;
constructor( constructor(options: BatchProcessorOptions, processBatch: ProcessBatch<TItem, TResult>) {
options: BatchProcessorOptions,
processBatch: ProcessBatch<TItem, TResult>,
) {
this._options = options; this._options = options;
this._processBatch = processBatch; this._processBatch = processBatch;
this._queue = []; this._queue = [];
@ -57,30 +51,36 @@ class BatchProcessor<TItem, TResult> {
(this: any)._processQueue = this._processQueue.bind(this); (this: any)._processQueue = this._processQueue.bind(this);
} }
_onBatchFinished() {
this._currentProcessCount--;
this._processQueueOnceReady();
}
_onBatchResults(jobs: Array<QueueItem<TItem, TResult>>, results: Array<TResult>) {
invariant(results.length === jobs.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
this._onBatchFinished();
}
_onBatchError(jobs: Array<QueueItem<TItem, TResult>>, error: mixed) {
for (let i = 0; i < jobs.length; ++i) {
jobs[i].reject(error);
}
this._onBatchFinished();
}
_processQueue() { _processQueue() {
this._timeoutHandle = null; this._timeoutHandle = null;
while ( const {concurrency} = this._options;
this._queue.length > 0 && while (this._queue.length > 0 && this._currentProcessCount < concurrency) {
this._currentProcessCount < this._options.concurrency
) {
this._currentProcessCount++; this._currentProcessCount++;
const jobs = this._queue.splice(0, this._options.maximumItems); const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item); this._processBatch(jobs.map(job => job.item)).then(
this._processBatch(items, (error, results) => { this._onBatchResults.bind(this, jobs),
if (error != null) { this._onBatchError.bind(this, jobs),
for (let i = 0; i < jobs.length; ++i) { );
jobs[i].reject(error);
}
} else {
invariant(results != null, 'Neither results or error were returned.');
invariant(results.length === items.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
}
this._currentProcessCount--;
this._processQueueOnceReady();
});
} }
} }

View File

@ -23,15 +23,8 @@ import type {Options as TransformOptions} from '../JSTransformer/worker/worker';
import type {CachedResult, GetTransformCacheKey} from './TransformCache'; import type {CachedResult, GetTransformCacheKey} from './TransformCache';
import type {Reporter} from './reporting'; import type {Reporter} from './reporting';
type FetchResultURIs = ( type FetchResultURIs = (keys: Array<string>) => Promise<Map<string, string>>;
keys: Array<string>, type StoreResults = (resultsByKey: Map<string, CachedResult>) => Promise<void>;
callback: (error?: Error, results?: Map<string, string>) => void,
) => mixed;
type StoreResults = (
resultsByKey: Map<string, CachedResult>,
callback: (error?: Error) => void,
) => mixed;
type FetchProps = { type FetchProps = {
filePath: string, filePath: string,
@ -60,17 +53,15 @@ class KeyURIFetcher {
* and we proceed as if there were no result for these keys instead. That way * and we proceed as if there were no result for these keys instead. That way
* a build will not fail just because of the cache. * a build will not fail just because of the cache.
*/ */
_processKeys( async _processKeys(keys: Array<string>): Promise<Array<?URI>> {
keys: Array<string>, let URIsByKey;
callback: (error?: Error, keyURIs: Array<?URI>) => mixed, try {
) { URIsByKey = await this._fetchResultURIs(keys);
this._fetchResultURIs(keys, (error, URIsByKey) => { } catch (error) {
if (error != null) { this._processError(error);
this._processError(error); return new Array(keys.length);
} }
const URIs = keys.map(key => URIsByKey && URIsByKey.get(key)); return keys.map(key => URIsByKey.get(key));
callback(undefined, URIs);
});
} }
fetch(key: string, callback: FetchURICallback) { fetch(key: string, callback: FetchURICallback) {
@ -92,21 +83,17 @@ class KeyURIFetcher {
} }
type KeyedResult = {key: string, result: CachedResult};
class KeyResultStore { class KeyResultStore {
_storeResults: StoreResults; _storeResults: StoreResults;
_batchProcessor: BatchProcessor<{key: string, result: CachedResult}, void>; _batchProcessor: BatchProcessor<KeyedResult, void>;
_processResults( async _processResults(keyResults: Array<KeyedResult>): Promise<Array<void>> {
keyResults: Array<{key: string, result: CachedResult}>, const resultsByKey = new Map(keyResults.map(pair => [pair.key, pair.result]));
callback: (error?: Error) => mixed, await this._storeResults(resultsByKey);
) { return new Array(keyResults.length);
const resultsByKey = new Map(
keyResults.map(pair => [pair.key, pair.result]),
);
this._storeResults(resultsByKey, error => {
callback(error);
});
} }
store(key: string, result: CachedResult) { store(key: string, result: CachedResult) {

View File

@ -9,7 +9,9 @@
'use strict'; 'use strict';
jest.dontMock('../BatchProcessor'); jest
.useRealTimers()
.dontMock('../BatchProcessor');
const BatchProcessor = require('../BatchProcessor'); const BatchProcessor = require('../BatchProcessor');
@ -21,29 +23,27 @@ describe('BatchProcessor', () => {
concurrency: 2, concurrency: 2,
}; };
it('aggregate items concurrently', () => { it('aggregate items concurrently', async () => {
const input = [...Array(9).keys()].slice(1); const input = [...Array(9).keys()].slice(1);
const transform = e => e * 10; const transform = e => e * 10;
const batches = []; const batches = [];
let concurrency = 0; let concurrency = 0;
let maxConcurrency = 0; let maxConcurrency = 0;
const bp = new BatchProcessor(options, (items, callback) => { const bp = new BatchProcessor(options, (items) => new Promise(resolve => {
++concurrency; ++concurrency;
expect(concurrency).toBeLessThanOrEqual(options.concurrency); expect(concurrency).toBeLessThanOrEqual(options.concurrency);
maxConcurrency = Math.max(maxConcurrency, concurrency); maxConcurrency = Math.max(maxConcurrency, concurrency);
batches.push(items); batches.push(items);
setTimeout(() => { setTimeout(() => {
callback(null, items.map(transform)); resolve(items.map(transform));
--concurrency; --concurrency;
}, 0); }, 0);
}); }));
const results = []; const results = [];
input.forEach(e => bp.queue(e).then( await Promise.all(input.map(e => bp.queue(e).then(
res => results.push(res), res => results.push(res),
error => process.nextTick(() => { throw error; }), error => process.nextTick(() => { throw error; }),
)); )));
jest.runAllTimers();
jest.runAllTicks();
expect(batches).toEqual([ expect(batches).toEqual([
[1, 2, 3], [1, 2, 3],
[4, 5, 6], [4, 5, 6],
@ -53,17 +53,15 @@ describe('BatchProcessor', () => {
expect(results).toEqual(input.map(transform)); expect(results).toEqual(input.map(transform));
}); });
it('report errors', () => { it('report errors', async () => {
const error = new Error('oh noes'); const error = new Error('oh noes');
const bp = new BatchProcessor(options, (items, callback) => { const bp = new BatchProcessor(options, (items) => new Promise((_, reject) => {
setTimeout(callback.bind(null, error), 0); setTimeout(reject.bind(null, error), 0);
}); }));
let receivedError; let receivedError;
bp.queue('foo').catch( await bp.queue('foo').catch(
err => { receivedError = err; }, err => { receivedError = err; },
); );
jest.runAllTimers();
jest.runAllTicks();
expect(receivedError).toBe(error); expect(receivedError).toBe(error);
}); });