packager: BatchProcessor: use Promise for queue()

Reviewed By: cpojer

Differential Revision: D4572181

fbshipit-source-id: 34c3824f05efd93847df9ba5931b0735c6711c28
This commit is contained in:
Jean Lauliac 2017-02-20 08:35:59 -08:00 committed by Facebook Github Bot
parent 306bf14d4a
commit 46d5c2f517
4 changed files with 37 additions and 66 deletions

View File

@ -24,6 +24,12 @@ type BatchProcessorOptions = {
concurrency: number,
};
type QueueItem<TItem, TResult> = {
item: TItem,
reject: (error: mixed) => mixed,
resolve: (result: TResult) => mixed,
};
/**
* We batch items together trying to minimize their processing, for example as
* network queries. For that we wait a small moment before processing a batch.
@ -33,14 +39,11 @@ type BatchProcessorOptions = {
*/
class BatchProcessor<TItem, TResult> {
_currentProcessCount: number;
_options: BatchProcessorOptions;
_processBatch: ProcessBatch<TItem, TResult>;
_queue: Array<{
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
}>;
_queue: Array<QueueItem<TItem, TResult>>;
_timeoutHandle: ?number;
_currentProcessCount: number;
constructor(
options: BatchProcessorOptions,
@ -64,12 +67,16 @@ class BatchProcessor<TItem, TResult> {
const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item);
this._processBatch(items, (error, results) => {
invariant(
results == null || results.length === items.length,
'Not enough results returned.',
);
for (let i = 0; i < items.length; ++i) {
jobs[i].callback(error, results && results[i]);
if (error != null) {
for (let i = 0; i < jobs.length; ++i) {
jobs[i].reject(error);
}
} else {
invariant(results != null, 'Neither results or error were returned.');
invariant(results.length === items.length, 'Not enough results returned.');
for (let i = 0; i < jobs.length; ++i) {
jobs[i].resolve(results[i]);
}
}
this._currentProcessCount--;
this._processQueueOnceReady();
@ -91,12 +98,11 @@ class BatchProcessor<TItem, TResult> {
}
}
queue(
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
) {
this._queue.push({item, callback});
this._processQueueOnceReady();
queue(item: TItem): Promise<TResult> {
return new Promise((resolve, reject) => {
this._queue.push({item, resolve, reject});
this._processQueueOnceReady();
});
}
}

View File

@ -74,7 +74,10 @@ class KeyURIFetcher {
}
fetch(key: string, callback: FetchURICallback) {
this._batchProcessor.queue(key, callback);
this._batchProcessor.queue(key).then(
res => process.nextTick(callback.bind(undefined, undefined, res)),
err => process.nextTick(callback.bind(undefined, err)),
);
}
constructor(fetchResultURIs: FetchResultURIs, processError: (error: Error) => mixed) {
@ -107,7 +110,7 @@ class KeyResultStore {
}
store(key: string, result: CachedResult) {
this._batchProcessor.queue({key, result}, () => {});
this._batchProcessor.queue({key, result});
}
constructor(storeResults: StoreResults) {

View File

@ -1,40 +0,0 @@
/**
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
'use strict';
const {EventEmitter} = require('events');
class BatchProcessorMock {
constructor(_, processBatch) {
this._processBatch = processBatch;
this._queue = [];
BatchProcessorMock.mocks.emit('new', this);
}
queue(item, callback) {
this._queue.push([item, callback]);
}
flushMock() {
const {_queue} = this;
this._queue = [];
process.nextTick(() => {
this._processBatch(_queue.map(pair => pair[0]), (error, res) => {
_queue.forEach((pair, i) => pair[1](error, res && res[i]));
});
});
}
}
BatchProcessorMock.mocks = new EventEmitter();
module.exports = BatchProcessorMock;

View File

@ -38,12 +38,12 @@ describe('BatchProcessor', () => {
}, 0);
});
const results = [];
const callback = (error, res) => {
expect(error).toBe(null);
results.push(res);
};
input.forEach(e => bp.queue(e, callback));
input.forEach(e => bp.queue(e).then(
res => results.push(res),
error => process.nextTick(() => { throw error; }),
));
jest.runAllTimers();
jest.runAllTicks();
expect(batches).toEqual([
[1, 2, 3],
[4, 5, 6],
@ -56,10 +56,12 @@ describe('BatchProcessor', () => {
it('report errors', () => {
const error = new Error('oh noes');
const bp = new BatchProcessor(options, (items, callback) => {
process.nextTick(callback.bind(null, error));
setTimeout(callback.bind(null, error), 0);
});
let receivedError;
bp.queue('foo', err => { receivedError = err; });
bp.queue('foo').catch(
err => { receivedError = err; },
);
jest.runAllTimers();
jest.runAllTicks();
expect(receivedError).toBe(error);