packager: extract BatchProcessor module

Reviewed By: davidaurelio

Differential Revision: D4449820

fbshipit-source-id: f54e6dd53ba4d211694e0bdc955af846d25778b8
This commit is contained in:
Jean Lauliac 2017-01-24 03:34:29 -08:00 committed by Facebook Github Bot
parent 83ed3acc58
commit 7eef41bc00
2 changed files with 106 additions and 87 deletions

104
react-packager/src/lib/BatchProcessor.js vendored Normal file
View File

@ -0,0 +1,104 @@
/**
* Copyright (c) 2016-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
* @flow
*/
'use strict';
const invariant = require('fbjs/lib/invariant');
type ProcessBatch<TItem, TResult> = (
batch: Array<TItem>,
callback: (error?: Error, orderedResults?: Array<TResult>) => mixed,
) => mixed;
type BatchProcessorOptions = {
maximumDelayMs: number,
maximumItems: number,
concurrency: number,
};
/**
* We batch items together trying to minimize their processing, for example as
* network queries. For that we wait a small moment before processing a batch.
* We limit also the number of items we try to process in a single batch so that
* if we have many items pending in a short amount of time, we can start
* processing right away.
*/
class BatchProcessor<TItem, TResult> {
_options: BatchProcessorOptions;
_processBatch: ProcessBatch<TItem, TResult>;
_queue: Array<{
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
}>;
_timeoutHandle: ?number;
_currentProcessCount: number;
constructor(
options: BatchProcessorOptions,
processBatch: ProcessBatch<TItem, TResult>,
) {
this._options = options;
this._processBatch = processBatch;
this._queue = [];
this._timeoutHandle = null;
this._currentProcessCount = 0;
(this: any)._processQueue = this._processQueue.bind(this);
}
_processQueue() {
this._timeoutHandle = null;
while (
this._queue.length > 0 &&
this._currentProcessCount < this._options.concurrency
) {
this._currentProcessCount++;
const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item);
this._processBatch(items, (error, results) => {
invariant(
results == null || results.length === items.length,
'Not enough results returned.',
);
for (let i = 0; i < items.length; ++i) {
jobs[i].callback(error, results && results[i]);
}
this._currentProcessCount--;
this._processQueueOnceReady();
});
}
}
_processQueueOnceReady() {
if (this._queue.length >= this._options.maximumItems) {
clearTimeout(this._timeoutHandle);
process.nextTick(this._processQueue);
return;
}
if (this._timeoutHandle == null) {
this._timeoutHandle = setTimeout(
this._processQueue,
this._options.maximumDelayMs,
);
}
}
queue(
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
) {
this._queue.push({item, callback});
this._processQueueOnceReady();
}
}
module.exports = BatchProcessor;

View File

@ -11,9 +11,10 @@
'use strict';
const BatchProcessor = require('./BatchProcessor');
const crypto = require('crypto');
const imurmurhash = require('imurmurhash');
const invariant = require('fbjs/lib/invariant');
const jsonStableStringify = require('json-stable-stringify');
const path = require('path');
const request = require('request');
@ -42,92 +43,6 @@ type FetchProps = {
type FetchCallback = (error?: Error, result?: ?CachedResult) => mixed;
type FetchURICallback = (error?: Error, resultURI?: ?string) => mixed;
type ProcessBatch<TItem, TResult> = (
batch: Array<TItem>,
callback: (error?: Error, orderedResults?: Array<TResult>) => mixed,
) => mixed;
type BatchProcessorOptions = {
maximumDelayMs: number,
maximumItems: number,
concurrency: number,
};
/**
* We batch keys together trying to make a smaller amount of queries. For that
* we wait a small moment before starting to fetch. We limit also the number of
* keys we try to fetch at once, so if we already have that many keys pending,
* we can start fetching right away.
*/
class BatchProcessor<TItem, TResult> {
_options: BatchProcessorOptions;
_processBatch: ProcessBatch<TItem, TResult>;
_queue: Array<{
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
}>;
_timeoutHandle: ?number;
_currentProcessCount: number;
constructor(
options: BatchProcessorOptions,
processBatch: ProcessBatch<TItem, TResult>,
) {
this._options = options;
this._processBatch = processBatch;
this._queue = [];
this._timeoutHandle = null;
this._currentProcessCount = 0;
(this: any)._processQueue = this._processQueue.bind(this);
}
_processQueue() {
this._timeoutHandle = null;
while (
this._queue.length > 0 &&
this._currentProcessCount < this._options.concurrency
) {
this._currentProcessCount++;
const jobs = this._queue.splice(0, this._options.maximumItems);
const items = jobs.map(job => job.item);
this._processBatch(items, (error, results) => {
invariant(
results == null || results.length === items.length,
'Not enough results returned.',
);
for (let i = 0; i < items.length; ++i) {
jobs[i].callback(error, results && results[i]);
}
this._currentProcessCount--;
this._processQueueOnceReady();
});
}
}
_processQueueOnceReady() {
if (this._queue.length >= this._options.maximumItems) {
clearTimeout(this._timeoutHandle);
process.nextTick(this._processQueue);
return;
}
if (this._timeoutHandle == null) {
this._timeoutHandle = setTimeout(
this._processQueue,
this._options.maximumDelayMs,
);
}
}
queue(
item: TItem,
callback: (error?: Error, result?: TResult) => mixed,
) {
this._queue.push({item, callback});
this._processQueueOnceReady();
}
}
type URI = string;
/**