From bac576c433ac3b85dc50df567fe008589af99091 Mon Sep 17 00:00:00 2001 From: Jean Lauliac Date: Mon, 20 Feb 2017 08:35:59 -0800 Subject: [PATCH] packager: BatchProcessor: use Promise for queue() Reviewed By: cpojer Differential Revision: D4572181 fbshipit-source-id: 34c3824f05efd93847df9ba5931b0735c6711c28 --- packager/src/lib/BatchProcessor.js | 40 +++++++++++-------- packager/src/lib/GlobalTransformCache.js | 7 +++- packager/src/lib/__mocks__/BatchProcessor.js | 40 ------------------- .../src/lib/__tests__/BatchProcessor-test.js | 16 ++++---- 4 files changed, 37 insertions(+), 66 deletions(-) delete mode 100644 packager/src/lib/__mocks__/BatchProcessor.js diff --git a/packager/src/lib/BatchProcessor.js b/packager/src/lib/BatchProcessor.js index 0464534d7..586fd3970 100644 --- a/packager/src/lib/BatchProcessor.js +++ b/packager/src/lib/BatchProcessor.js @@ -24,6 +24,12 @@ type BatchProcessorOptions = { concurrency: number, }; +type QueueItem = { + item: TItem, + reject: (error: mixed) => mixed, + resolve: (result: TResult) => mixed, +}; + /** * We batch items together trying to minimize their processing, for example as * network queries. For that we wait a small moment before processing a batch. @@ -33,14 +39,11 @@ type BatchProcessorOptions = { */ class BatchProcessor { + _currentProcessCount: number; _options: BatchProcessorOptions; _processBatch: ProcessBatch; - _queue: Array<{ - item: TItem, - callback: (error?: Error, result?: TResult) => mixed, - }>; + _queue: Array>; _timeoutHandle: ?number; - _currentProcessCount: number; constructor( options: BatchProcessorOptions, @@ -64,12 +67,16 @@ class BatchProcessor { const jobs = this._queue.splice(0, this._options.maximumItems); const items = jobs.map(job => job.item); this._processBatch(items, (error, results) => { - invariant( - results == null || results.length === items.length, - 'Not enough results returned.', - ); - for (let i = 0; i < items.length; ++i) { - jobs[i].callback(error, results && results[i]); + if (error != null) { + for (let i = 0; i < jobs.length; ++i) { + jobs[i].reject(error); + } + } else { + invariant(results != null, 'Neither results or error were returned.'); + invariant(results.length === items.length, 'Not enough results returned.'); + for (let i = 0; i < jobs.length; ++i) { + jobs[i].resolve(results[i]); + } } this._currentProcessCount--; this._processQueueOnceReady(); @@ -91,12 +98,11 @@ class BatchProcessor { } } - queue( - item: TItem, - callback: (error?: Error, result?: TResult) => mixed, - ) { - this._queue.push({item, callback}); - this._processQueueOnceReady(); + queue(item: TItem): Promise { + return new Promise((resolve, reject) => { + this._queue.push({item, resolve, reject}); + this._processQueueOnceReady(); + }); } } diff --git a/packager/src/lib/GlobalTransformCache.js b/packager/src/lib/GlobalTransformCache.js index 6ad265a6b..2aa831bcb 100644 --- a/packager/src/lib/GlobalTransformCache.js +++ b/packager/src/lib/GlobalTransformCache.js @@ -74,7 +74,10 @@ class KeyURIFetcher { } fetch(key: string, callback: FetchURICallback) { - this._batchProcessor.queue(key, callback); + this._batchProcessor.queue(key).then( + res => process.nextTick(callback.bind(undefined, undefined, res)), + err => process.nextTick(callback.bind(undefined, err)), + ); } constructor(fetchResultURIs: FetchResultURIs, processError: (error: Error) => mixed) { @@ -107,7 +110,7 @@ class KeyResultStore { } store(key: string, result: CachedResult) { - this._batchProcessor.queue({key, result}, () => {}); + this._batchProcessor.queue({key, result}); } constructor(storeResults: StoreResults) { diff --git a/packager/src/lib/__mocks__/BatchProcessor.js b/packager/src/lib/__mocks__/BatchProcessor.js deleted file mode 100644 index 5f914808d..000000000 --- a/packager/src/lib/__mocks__/BatchProcessor.js +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright (c) 2015-present, Facebook, Inc. - * All rights reserved. - * - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -'use strict'; - -const {EventEmitter} = require('events'); - -class BatchProcessorMock { - - constructor(_, processBatch) { - this._processBatch = processBatch; - this._queue = []; - BatchProcessorMock.mocks.emit('new', this); - } - - queue(item, callback) { - this._queue.push([item, callback]); - } - - flushMock() { - const {_queue} = this; - this._queue = []; - process.nextTick(() => { - this._processBatch(_queue.map(pair => pair[0]), (error, res) => { - _queue.forEach((pair, i) => pair[1](error, res && res[i])); - }); - }); - } - -} - -BatchProcessorMock.mocks = new EventEmitter(); - -module.exports = BatchProcessorMock; diff --git a/packager/src/lib/__tests__/BatchProcessor-test.js b/packager/src/lib/__tests__/BatchProcessor-test.js index dabdfc0eb..facebd143 100644 --- a/packager/src/lib/__tests__/BatchProcessor-test.js +++ b/packager/src/lib/__tests__/BatchProcessor-test.js @@ -38,12 +38,12 @@ describe('BatchProcessor', () => { }, 0); }); const results = []; - const callback = (error, res) => { - expect(error).toBe(null); - results.push(res); - }; - input.forEach(e => bp.queue(e, callback)); + input.forEach(e => bp.queue(e).then( + res => results.push(res), + error => process.nextTick(() => { throw error; }), + )); jest.runAllTimers(); + jest.runAllTicks(); expect(batches).toEqual([ [1, 2, 3], [4, 5, 6], @@ -56,10 +56,12 @@ describe('BatchProcessor', () => { it('report errors', () => { const error = new Error('oh noes'); const bp = new BatchProcessor(options, (items, callback) => { - process.nextTick(callback.bind(null, error)); + setTimeout(callback.bind(null, error), 0); }); let receivedError; - bp.queue('foo', err => { receivedError = err; }); + bp.queue('foo').catch( + err => { receivedError = err; }, + ); jest.runAllTimers(); jest.runAllTicks(); expect(receivedError).toBe(error);