packager: GlobalTransformCache: better aggregation
Reviewed By: davidaurelio Differential Revision: D4238279 fbshipit-source-id: 3b79a3e87d0eb975efce6adc11b13862c7138e1b
This commit is contained in:
parent
6740207b9f
commit
9b9fd2f2f0
|
@ -11,8 +11,8 @@
|
|||
|
||||
'use strict';
|
||||
|
||||
const debounce = require('lodash/debounce');
|
||||
const imurmurhash = require('imurmurhash');
|
||||
const invariant = require('invariant');
|
||||
const jsonStableStringify = require('json-stable-stringify');
|
||||
const path = require('path');
|
||||
const request = require('request');
|
||||
|
@ -21,9 +21,6 @@ const toFixedHex = require('./toFixedHex');
|
|||
import type {Options as TransformOptions} from '../JSTransformer/worker/worker';
|
||||
import type {CachedResult} from './TransformCache';
|
||||
|
||||
const SINGLE_REQUEST_MAX_KEYS = 100;
|
||||
const AGGREGATION_DELAY_MS = 100;
|
||||
|
||||
type FetchResultURIs = (
|
||||
keys: Array<string>,
|
||||
callback: (error?: Error, results?: Map<string, string>) => void,
|
||||
|
@ -39,6 +36,94 @@ type FetchProps = {
|
|||
type FetchCallback = (error?: Error, resultURI?: ?CachedResult) => mixed;
|
||||
type FetchURICallback = (error?: Error, resultURI?: ?string) => mixed;
|
||||
|
||||
type ProcessBatch<TItem, TResult> = (
|
||||
batch: Array<TItem>,
|
||||
callback: (error?: Error, orderedResults?: Array<TResult>) => mixed,
|
||||
) => mixed;
|
||||
type BatchProcessorOptions = {
|
||||
maximumDelayMs: number,
|
||||
maximumItems: number,
|
||||
concurrency: number,
|
||||
};
|
||||
|
||||
/**
|
||||
* We batch keys together trying to make a smaller amount of queries. For that
|
||||
* we wait a small moment before starting to fetch. We limit also the number of
|
||||
* keys we try to fetch at once, so if we already have that many keys pending,
|
||||
* we can start fetching right away.
|
||||
*/
|
||||
class BatchProcessor<TItem, TResult> {
|
||||
|
||||
_options: BatchProcessorOptions;
|
||||
_processBatch: ProcessBatch<TItem, TResult>;
|
||||
_queue: Array<{
|
||||
item: TItem,
|
||||
callback: (error?: Error, result?: TResult) => mixed,
|
||||
}>;
|
||||
_timeoutHandle: ?number;
|
||||
_currentProcessCount: number;
|
||||
|
||||
constructor(
|
||||
options: BatchProcessorOptions,
|
||||
processBatch: ProcessBatch<TItem, TResult>,
|
||||
) {
|
||||
this._options = options;
|
||||
this._processBatch = processBatch;
|
||||
this._queue = [];
|
||||
this._timeoutHandle = null;
|
||||
this._currentProcessCount = 0;
|
||||
(this: any)._processQueue = this._processQueue.bind(this);
|
||||
}
|
||||
|
||||
_processQueue() {
|
||||
this._timeoutHandle = null;
|
||||
while (
|
||||
this._queue.length > 0 &&
|
||||
this._currentProcessCount < this._options.concurrency
|
||||
) {
|
||||
this._currentProcessCount++;
|
||||
const jobs = this._queue.splice(0, this._options.maximumItems);
|
||||
const items = jobs.map(job => job.item);
|
||||
this._processBatch(items, (error, results) => {
|
||||
invariant(
|
||||
results == null || results.length === items.length,
|
||||
'Not enough results returned.',
|
||||
);
|
||||
for (let i = 0; i < items.length; ++i) {
|
||||
jobs[i].callback(error, results && results[i]);
|
||||
}
|
||||
this._currentProcessCount--;
|
||||
this._processQueueOnceReady();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_processQueueOnceReady() {
|
||||
if (this._queue.length >= this._options.maximumItems) {
|
||||
clearTimeout(this._timeoutHandle);
|
||||
process.nextTick(this._processQueue);
|
||||
return;
|
||||
}
|
||||
if (this._timeoutHandle == null) {
|
||||
this._timeoutHandle = setTimeout(
|
||||
this._processQueue,
|
||||
this._options.maximumDelayMs,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
queue(
|
||||
item: TItem,
|
||||
callback: (error?: Error, result?: TResult) => mixed,
|
||||
) {
|
||||
this._queue.push({item, callback});
|
||||
this._processQueueOnceReady();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type URI = string;
|
||||
|
||||
/**
|
||||
* We aggregate the requests to do a single request for many keys. It also
|
||||
* ensures we do a single request at a time to avoid pressuring the I/O.
|
||||
|
@ -46,48 +131,29 @@ type FetchURICallback = (error?: Error, resultURI?: ?string) => mixed;
|
|||
class KeyURIFetcher {
|
||||
|
||||
_fetchResultURIs: FetchResultURIs;
|
||||
_pendingQueries: Array<{key: string, callback: FetchURICallback}>;
|
||||
_isProcessing: boolean;
|
||||
_processQueriesDebounced: () => void;
|
||||
_processQueries: () => void;
|
||||
_batchProcessor: BatchProcessor<string, ?URI>;
|
||||
|
||||
/**
|
||||
* Fetch the pending keys right now, if any and if we're not already doing
|
||||
* so in parallel. At the end of the fetch, we trigger a new batch fetching
|
||||
* recursively.
|
||||
*/
|
||||
_processQueries() {
|
||||
const {_pendingQueries} = this;
|
||||
if (_pendingQueries.length === 0 || this._isProcessing) {
|
||||
return;
|
||||
}
|
||||
this._isProcessing = true;
|
||||
const queries = _pendingQueries.splice(0, SINGLE_REQUEST_MAX_KEYS);
|
||||
const keys = queries.map(query => query.key);
|
||||
this._fetchResultURIs(keys, (error, results) => {
|
||||
queries.forEach(query => {
|
||||
query.callback(error, results && results.get(query.key));
|
||||
});
|
||||
this._isProcessing = false;
|
||||
process.nextTick(this._processQueries);
|
||||
_processKeys(
|
||||
keys: Array<string>,
|
||||
callback: (error?: Error, keyURIs: Array<?URI>) => mixed,
|
||||
) {
|
||||
this._fetchResultURIs(keys, (error, URIsByKey) => {
|
||||
const URIs = keys.map(key => URIsByKey && URIsByKey.get(key));
|
||||
callback(error, URIs);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueue the fetching of a particular key.
|
||||
*/
|
||||
fetch(key: string, callback: FetchURICallback) {
|
||||
this._pendingQueries.push({key, callback});
|
||||
this._processQueriesDebounced();
|
||||
this._batchProcessor.queue(key, callback);
|
||||
}
|
||||
|
||||
constructor(fetchResultURIs: FetchResultURIs) {
|
||||
this._fetchResultURIs = fetchResultURIs;
|
||||
this._pendingQueries = [];
|
||||
this._isProcessing = false;
|
||||
this._processQueries = this._processQueries.bind(this);
|
||||
this._processQueriesDebounced =
|
||||
debounce(this._processQueries, AGGREGATION_DELAY_MS);
|
||||
this._batchProcessor = new BatchProcessor({
|
||||
maximumDelayMs: 10,
|
||||
maximumItems: 500,
|
||||
concurrency: 25,
|
||||
}, this._processKeys.bind(this));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue