diff --git a/react-packager/src/lib/GlobalTransformCache.js b/react-packager/src/lib/GlobalTransformCache.js index d6de72cc..39bf86e1 100644 --- a/react-packager/src/lib/GlobalTransformCache.js +++ b/react-packager/src/lib/GlobalTransformCache.js @@ -11,8 +11,8 @@ 'use strict'; -const debounce = require('lodash/debounce'); const imurmurhash = require('imurmurhash'); +const invariant = require('invariant'); const jsonStableStringify = require('json-stable-stringify'); const path = require('path'); const request = require('request'); @@ -21,9 +21,6 @@ const toFixedHex = require('./toFixedHex'); import type {Options as TransformOptions} from '../JSTransformer/worker/worker'; import type {CachedResult} from './TransformCache'; -const SINGLE_REQUEST_MAX_KEYS = 100; -const AGGREGATION_DELAY_MS = 100; - type FetchResultURIs = ( keys: Array, callback: (error?: Error, results?: Map) => void, @@ -39,6 +36,94 @@ type FetchProps = { type FetchCallback = (error?: Error, resultURI?: ?CachedResult) => mixed; type FetchURICallback = (error?: Error, resultURI?: ?string) => mixed; +type ProcessBatch = ( + batch: Array, + callback: (error?: Error, orderedResults?: Array) => mixed, +) => mixed; +type BatchProcessorOptions = { + maximumDelayMs: number, + maximumItems: number, + concurrency: number, +}; + +/** + * We batch keys together trying to make a smaller amount of queries. For that + * we wait a small moment before starting to fetch. We limit also the number of + * keys we try to fetch at once, so if we already have that many keys pending, + * we can start fetching right away. + */ +class BatchProcessor { + + _options: BatchProcessorOptions; + _processBatch: ProcessBatch; + _queue: Array<{ + item: TItem, + callback: (error?: Error, result?: TResult) => mixed, + }>; + _timeoutHandle: ?number; + _currentProcessCount: number; + + constructor( + options: BatchProcessorOptions, + processBatch: ProcessBatch, + ) { + this._options = options; + this._processBatch = processBatch; + this._queue = []; + this._timeoutHandle = null; + this._currentProcessCount = 0; + (this: any)._processQueue = this._processQueue.bind(this); + } + + _processQueue() { + this._timeoutHandle = null; + while ( + this._queue.length > 0 && + this._currentProcessCount < this._options.concurrency + ) { + this._currentProcessCount++; + const jobs = this._queue.splice(0, this._options.maximumItems); + const items = jobs.map(job => job.item); + this._processBatch(items, (error, results) => { + invariant( + results == null || results.length === items.length, + 'Not enough results returned.', + ); + for (let i = 0; i < items.length; ++i) { + jobs[i].callback(error, results && results[i]); + } + this._currentProcessCount--; + this._processQueueOnceReady(); + }); + } + } + + _processQueueOnceReady() { + if (this._queue.length >= this._options.maximumItems) { + clearTimeout(this._timeoutHandle); + process.nextTick(this._processQueue); + return; + } + if (this._timeoutHandle == null) { + this._timeoutHandle = setTimeout( + this._processQueue, + this._options.maximumDelayMs, + ); + } + } + + queue( + item: TItem, + callback: (error?: Error, result?: TResult) => mixed, + ) { + this._queue.push({item, callback}); + this._processQueueOnceReady(); + } + +} + +type URI = string; + /** * We aggregate the requests to do a single request for many keys. It also * ensures we do a single request at a time to avoid pressuring the I/O. @@ -46,48 +131,29 @@ type FetchURICallback = (error?: Error, resultURI?: ?string) => mixed; class KeyURIFetcher { _fetchResultURIs: FetchResultURIs; - _pendingQueries: Array<{key: string, callback: FetchURICallback}>; - _isProcessing: boolean; - _processQueriesDebounced: () => void; - _processQueries: () => void; + _batchProcessor: BatchProcessor; - /** - * Fetch the pending keys right now, if any and if we're not already doing - * so in parallel. At the end of the fetch, we trigger a new batch fetching - * recursively. - */ - _processQueries() { - const {_pendingQueries} = this; - if (_pendingQueries.length === 0 || this._isProcessing) { - return; - } - this._isProcessing = true; - const queries = _pendingQueries.splice(0, SINGLE_REQUEST_MAX_KEYS); - const keys = queries.map(query => query.key); - this._fetchResultURIs(keys, (error, results) => { - queries.forEach(query => { - query.callback(error, results && results.get(query.key)); - }); - this._isProcessing = false; - process.nextTick(this._processQueries); + _processKeys( + keys: Array, + callback: (error?: Error, keyURIs: Array) => mixed, + ) { + this._fetchResultURIs(keys, (error, URIsByKey) => { + const URIs = keys.map(key => URIsByKey && URIsByKey.get(key)); + callback(error, URIs); }); } - /** - * Enqueue the fetching of a particular key. - */ fetch(key: string, callback: FetchURICallback) { - this._pendingQueries.push({key, callback}); - this._processQueriesDebounced(); + this._batchProcessor.queue(key, callback); } constructor(fetchResultURIs: FetchResultURIs) { this._fetchResultURIs = fetchResultURIs; - this._pendingQueries = []; - this._isProcessing = false; - this._processQueries = this._processQueries.bind(this); - this._processQueriesDebounced = - debounce(this._processQueries, AGGREGATION_DELAY_MS); + this._batchProcessor = new BatchProcessor({ + maximumDelayMs: 10, + maximumItems: 500, + concurrency: 25, + }, this._processKeys.bind(this)); } }