packager: BatchProcessor: use Promise for queue()

Reviewed By: cpojer

Differential Revision: D4572181

fbshipit-source-id: 34c3824f05efd93847df9ba5931b0735c6711c28
This commit is contained in:
Jean Lauliac 2017-02-20 08:35:59 -08:00 committed by Facebook Github Bot
parent 1a5b56d070
commit bac576c433
4 changed files with 37 additions and 66 deletions

View File

@ -24,6 +24,12 @@ type BatchProcessorOptions = {
concurrency: number, concurrency: number,
}; };
type QueueItem<TItem, TResult> = {
item: TItem,
reject: (error: mixed) => mixed,
resolve: (result: TResult) => mixed,
};
/** /**
* We batch items together trying to minimize their processing, for example as * We batch items together trying to minimize their processing, for example as
* network queries. For that we wait a small moment before processing a batch. * network queries. For that we wait a small moment before processing a batch.
@ -33,14 +39,11 @@ type BatchProcessorOptions = {
*/ */
class BatchProcessor<TItem, TResult> { class BatchProcessor<TItem, TResult> {
_currentProcessCount: number;
_options: BatchProcessorOptions; _options: BatchProcessorOptions;
_processBatch: ProcessBatch<TItem, TResult>; _processBatch: ProcessBatch<TItem, TResult>;
_queue: Array<{ _queue: Array<QueueItem<TItem, TResult>>;
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
}>;
_timeoutHandle: ?number; _timeoutHandle: ?number;
_currentProcessCount: number;
constructor( constructor(
options: BatchProcessorOptions, options: BatchProcessorOptions,
@ -64,12 +67,16 @@ class BatchProcessor<TItem, TResult> {
const jobs = this._queue.splice(0, this._options.maximumItems); const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item); const items = jobs.map(job => job.item);
this._processBatch(items, (error, results) => { this._processBatch(items, (error, results) => {
invariant( if (error != null) {
results == null || results.length === items.length, for (let i = 0; i < jobs.length; ++i) {
'Not enough results returned.', jobs[i].reject(error);
); }
for (let i = 0; i < items.length; ++i) { } else {
jobs[i].callback(error, results && results[i]); invariant(results != null, 'Neither results or error were returned.');
invariant(results.length === items.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
} }
this._currentProcessCount--; this._currentProcessCount--;
this._processQueueOnceReady(); this._processQueueOnceReady();
@ -91,12 +98,11 @@ class BatchProcessor<TItem, TResult> {
} }
} }
queue( queue(item: TItem): Promise<TResult> {
item: TItem, return new Promise((resolve, reject) => {
callback: (error?: Error, result?: TResult) => mixed, this._queue.push({item, resolve, reject});
) {
this._queue.push({item, callback});
this._processQueueOnceReady(); this._processQueueOnceReady();
});
} }
} }

View File

@ -74,7 +74,10 @@ class KeyURIFetcher {
} }
fetch(key: string, callback: FetchURICallback) { fetch(key: string, callback: FetchURICallback) {
this._batchProcessor.queue(key, callback); this._batchProcessor.queue(key).then(
res => process.nextTick(callback.bind(undefined, undefined, res)),
err => process.nextTick(callback.bind(undefined, err)),
);
} }
constructor(fetchResultURIs: FetchResultURIs, processError: (error: Error) => mixed) { constructor(fetchResultURIs: FetchResultURIs, processError: (error: Error) => mixed) {
@ -107,7 +110,7 @@ class KeyResultStore {
} }
store(key: string, result: CachedResult) { store(key: string, result: CachedResult) {
this._batchProcessor.queue({key, result}, () => {}); this._batchProcessor.queue({key, result});
} }
constructor(storeResults: StoreResults) { constructor(storeResults: StoreResults) {

View File

@ -1,40 +0,0 @@
/**
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
'use strict';
const {EventEmitter} = require('events');
class BatchProcessorMock {
constructor(_, processBatch) {
this._processBatch = processBatch;
this._queue = [];
BatchProcessorMock.mocks.emit('new', this);
}
queue(item, callback) {
this._queue.push([item, callback]);
}
flushMock() {
const {_queue} = this;
this._queue = [];
process.nextTick(() => {
this._processBatch(_queue.map(pair => pair[0]), (error, res) => {
_queue.forEach((pair, i) => pair[1](error, res && res[i]));
});
});
}
}
BatchProcessorMock.mocks = new EventEmitter();
module.exports = BatchProcessorMock;

View File

@ -38,12 +38,12 @@ describe('BatchProcessor', () => {
}, 0); }, 0);
}); });
const results = []; const results = [];
const callback = (error, res) => { input.forEach(e => bp.queue(e).then(
expect(error).toBe(null); res => results.push(res),
results.push(res); error => process.nextTick(() => { throw error; }),
}; ));
input.forEach(e => bp.queue(e, callback));
jest.runAllTimers(); jest.runAllTimers();
jest.runAllTicks();
expect(batches).toEqual([ expect(batches).toEqual([
[1, 2, 3], [1, 2, 3],
[4, 5, 6], [4, 5, 6],
@ -56,10 +56,12 @@ describe('BatchProcessor', () => {
it('report errors', () => { it('report errors', () => {
const error = new Error('oh noes'); const error = new Error('oh noes');
const bp = new BatchProcessor(options, (items, callback) => { const bp = new BatchProcessor(options, (items, callback) => {
process.nextTick(callback.bind(null, error)); setTimeout(callback.bind(null, error), 0);
}); });
let receivedError; let receivedError;
bp.queue('foo', err => { receivedError = err; }); bp.queue('foo').catch(
err => { receivedError = err; },
);
jest.runAllTimers(); jest.runAllTimers();
jest.runAllTicks(); jest.runAllTicks();
expect(receivedError).toBe(error); expect(receivedError).toBe(error);