Get rid of `async/queue`

Summary:
This replaces `async/queue` with a hand-rolled queue that does not yield to the event loop if a unit of work can be run synchronously.

Anecdotally, this leads to a > 11x speedup for the graph traversal when all data is available synchronously, e.g. from an in-memory cache.

Reviewed By: jeanlauliac

Differential Revision: D5861763

fbshipit-source-id: f7cf5f916a13adf9ca418d7522cd2f19df596fba
This commit is contained in:
David Aurelio 2017-09-19 09:18:54 -07:00 committed by Facebook Github Bot
parent c546349da4
commit 025e1841d9
3 changed files with 205 additions and 139 deletions

View File

@ -10,54 +10,22 @@
*/
'use strict';
const asyncify = require('async/asyncify');
const emptyFunction = require('fbjs/lib/emptyFunction');
const invariant = require('fbjs/lib/invariant');
const memoize = require('async/memoize');
const emptyModule = require('./module').empty;
const nullthrows = require('fbjs/lib/nullthrows');
const queue = require('async/queue');
const seq = require('async/seq');
import type {
Callback,
File,
GraphFn,
LoadFn,
LoadResult,
Module,
ResolveFn,
} from './types.flow';
type Async$Queue<T, C> = {
buffer: number,
concurrency: number,
drain: () => mixed,
empty: () => mixed,
error: (Error, T) => mixed,
idle(): boolean,
kill(): void,
length(): number,
pause(): void,
paused: boolean,
push(T | Array<T>, void | C): void,
resume(): void,
running(): number,
saturated: () => mixed,
started: boolean,
unsaturated: () => mixed,
unshift(T, void | C): void,
workersList(): Array<T>,
};
type LoadQueue =
Async$Queue<{id: string, parent: ?string}, Callback<File, Array<string>>>;
const NO_OPTIONS = {};
exports.create = function create(resolve: ResolveFn, load: LoadFn): GraphFn {
const resolveCallback = asyncify(resolve);
const loadCallback = asyncify(load);
function Graph(entryPoints, platform, options) {
async function Graph(entryPoints, platform, options) {
const {
log = (console: any),
optimize = false,
@ -69,112 +37,169 @@ exports.create = function create(resolve: ResolveFn, load: LoadFn): GraphFn {
return Promise.reject(new Error('The target platform has to be passed'));
}
const loadQueue: LoadQueue = queue(seq(
({id, parent}, cb) => resolveCallback(id, parent, platform, options || NO_OPTIONS, cb),
memoize((file, cb) => loadCallback(file, {log, optimize}, cb)),
({file, dependencies}, cb) => cb(null, file, dependencies),
), Number.MAX_SAFE_INTEGER);
const loadOptions = {log, optimize};
const memoizingLoad = memoizeLoad(load);
const {collect, loadModule} = createGraphHelpers(loadQueue, skip);
const queue: Queue<{id: string, parent: ?string, parentDependencyIndex: number, skip: ?Set<string>}, LoadResult, Map<?string, Module>> = new Queue(
({id, parent}) => memoizingLoad(resolve(id, parent, platform, options || NO_OPTIONS), loadOptions),
onFileLoaded,
new Map([[null, emptyModule()]]),
);
const result = deferred();
const tasks = Array.from(entryPoints, (id, i) => ({
id,
parent: null,
parentDependencyIndex: i,
skip,
}));
loadQueue.drain = () => {
loadQueue.kill();
result.resolve(collect());
};
loadQueue.error = error => {
loadQueue.error = emptyFunction;
loadQueue.kill();
result.reject(error);
};
let i = 0;
for (const entryPoint of entryPoints) {
loadModule(entryPoint, null, i++);
}
if (i === 0) {
if (tasks.length === 0) {
log.error('`Graph` called without any entry points');
loadQueue.kill();
return Promise.reject(new Error('At least one entry point has to be passed.'));
}
return result.promise;
queue.enqueue(...tasks);
return collect(await queue.result);
}
return Graph;
};
function createGraphHelpers(loadQueue, skip) {
const modules = new Map([[null, emptyModule()]]);
class Queue<T, R, A> {
_accumulate: (Queue<T, R, A>, A, R, T) => A;
_pending: Set<T> = new Set();
_queue: Array<T> = [];
_reject: Error => void;
_resolve: A => void;
_result: A;
_runTask: T => R | Promise<R>;
_running: boolean;
result: Promise<A>;
function collect(
path = null,
serialized = {entryModules: [], modules: []},
seen = new Set(),
) {
const module = modules.get(path);
if (module == null || seen.has(path)) {
return serialized;
}
constructor(runTask: T => R | Promise<R>, accumulate: (Queue<T, R, A>, A, R, T) => A, initial: A) {
this._runTask = runTask;
this._accumulate = accumulate;
this._result = initial;
const {dependencies} = module;
if (path === null) {
serialized.entryModules =
dependencies.map(dep => nullthrows(modules.get(dep.path)));
} else {
serialized.modules.push(module);
seen.add(path);
}
for (const dependency of dependencies) {
collect(dependency.path, serialized, seen);
}
return serialized;
const {promise, reject, resolve} = deferred();
this.result = promise;
this._reject = reject;
this._resolve = resolve;
}
function loadModule(id, parent, parentDepIndex) {
loadQueue.push(
{id, parent},
(error, file, dependencyIDs) =>
onFileLoaded(error, file, dependencyIDs, id, parent, parentDepIndex),
);
enqueue(...tasks: Array<T>) {
this._queue.push(...tasks);
this._run();
}
function onFileLoaded(
error,
file,
dependencyIDs,
id,
parent,
parentDependencyIndex,
) {
if (error) {
_onAsyncTaskDone(result: R, task: T) {
this._pending.delete(task);
this._onTaskDone(result, task);
this._run();
}
_onTaskDone(result: R, task: T) {
this._result = this._accumulate(this, this._result, result, task);
}
_run() {
if (this._running) {
return;
}
const {path} = nullthrows(file);
dependencyIDs = nullthrows(dependencyIDs);
this._running = true;
const parentModule = modules.get(parent);
invariant(parentModule, 'Invalid parent module: ' + String(parent));
parentModule.dependencies[parentDependencyIndex] = {id, path};
if ((!skip || !skip.has(path)) && !modules.has(path)) {
const module = {
dependencies: Array(dependencyIDs.length),
file: nullthrows(file),
};
modules.set(path, module);
for (let i = 0; i < dependencyIDs.length; ++i) {
loadModule(dependencyIDs[i], path, i);
const queue = this._queue;
const runTask = this._runTask;
while (queue.length) {
const task = queue.shift();
const result = runTask(task);
if (isPromise(result)) {
this._pending.add(task);
result.then(result => this._onAsyncTaskDone(result, task), this._reject);
} else {
this._onTaskDone(result, task);
}
}
this._running = false;
if (this._pending.size === 0) {
this._resolve(this._result);
}
}
}
function onFileLoaded(queue, modules, {file, dependencies}, {id, parent, parentDependencyIndex, skip}) {
const {path} = file;
const parentModule = modules.get(parent);
invariant(parentModule, 'Invalid parent module: ' + String(parent));
parentModule.dependencies[parentDependencyIndex] = {id, path};
if ((!skip || !skip.has(path)) && !modules.has(path)) {
modules.set(path, {file, dependencies: Array(dependencies.length)});
queue.enqueue(...dependencies.map((id, i) => ({
id,
parent: path,
parentDependencyIndex: i,
skip,
})));
}
return {collect, loadModule};
return modules;
}
function collect(
modules,
path = null,
serialized = {entryModules: [], modules: []},
seen = new Set(),
) {
const module = modules.get(path);
if (module == null || seen.has(path)) {
return serialized;
}
const {dependencies} = module;
if (path === null) {
serialized.entryModules =
dependencies.map(dep => nullthrows(modules.get(dep.path)));
} else {
serialized.modules.push(module);
seen.add(path);
}
for (const dependency of dependencies) {
collect(modules, dependency.path, serialized, seen);
}
return serialized;
}
declare function isPromise(x: mixed): boolean %checks(x instanceof Promise);
function memoizeLoad(load: LoadFn): LoadFn {
const cache = new Map();
return (path, options) => {
const cached = cache.get(path);
if (cached !== undefined) {
return cached;
}
const result = load(path, options);
cache.set(path, result);
if (isPromise(result)) {
result.then(resolved => cache.set(path, resolved));
}
return result;
};
}
// eslint-disable-next-line no-unclear-flowtypes, no-redeclare
function isPromise(x: {then?: ?Function}) {
return x != null && typeof x.then === 'function';
}
function deferred<T>(): {

View File

@ -30,7 +30,10 @@ describe('Graph:', () => {
load = fn();
resolve = fn();
resolve.stub.returns('arbitrary file');
load.stub.returns({file: createFile('arbitrary file'), dependencies: []});
load.stub.returns({
file: createFileFromId('arbitrary file'),
dependencies: [],
});
graph = Graph.create(resolve, load);
});
@ -198,6 +201,40 @@ describe('Graph:', () => {
expect(load).toBeCalledWith(path2, any(Object));
});
it('calls `load` only once for each file', async () => {
load.stub.reset();
resolve.stub.callsFake(idToPath);
load.stub
.withArgs(idToPath('a'))
.returns({file: createFileFromId('a'), dependencies: ['b', 'c']})
.withArgs(idToPath('b'))
.returns({file: createFileFromId('b'), dependencies: ['c']})
.withArgs(idToPath('c'))
.returns({file: createFileFromId('c'), dependencies: []});
await graph(['a'], anyPlatform, noOpts);
expect(load).toHaveBeenCalledTimes(3);
});
it('works when `load` returns promises', async () => {
load.stub.callsFake(path =>
Promise.resolve({
file: createFileFromPath(path),
dependencies: [],
}),
);
resolve.stub.callsFake(idToPath);
const entryPoints = ['a', 'b', 'c', 'd', 'e'];
const expectedModules = entryPoints.map(x => createModule(x));
const result = await graph(entryPoints, anyPlatform, noOpts);
expect(result).toEqual({
entryModules: expectedModules,
modules: expectedModules,
});
});
it('resolves modules in depth-first traversal order, regardless of the order of loading', async () => {
load.stub.reset();
resolve.stub.reset();
@ -208,15 +245,15 @@ describe('Graph:', () => {
resolve.stub.withArgs(id).returns(path);
load.stub
.withArgs(path)
.returns({file: createFile(id), dependencies: []});
.returns({file: createFileFromId(id), dependencies: []});
});
load.stub
.withArgs(idToPath('a'))
.returns({file: createFile('a'), dependencies: ['b', 'e', 'h']});
.returns({file: createFileFromId('a'), dependencies: ['b', 'e', 'h']});
// load certain files later
const b = deferred({file: createFile('b'), dependencies: ['c', 'd']});
const e = deferred({file: createFile('e'), dependencies: ['f', 'g']});
const b = deferred({file: createFileFromId('b'), dependencies: ['c', 'd']});
const e = deferred({file: createFileFromId('e'), dependencies: ['f', 'g']});
load.stub
.withArgs(idToPath('b'))
.returns(b.promise)
@ -229,7 +266,7 @@ describe('Graph:', () => {
// `b` loads after `a`
process.nextTick(b.resolve);
});
return {file: createFile('h'), dependencies: []};
return {file: createFileFromId('h'), dependencies: []};
};
const result = await graph(['a'], anyPlatform, noOpts);
@ -251,16 +288,16 @@ describe('Graph:', () => {
load.stub
.withArgs(idToPath('a'))
.returns({file: createFile('a'), dependencies: ['b']});
.returns({file: createFileFromId('a'), dependencies: ['b']});
load.stub
.withArgs(idToPath('b'))
.returns({file: createFile('b'), dependencies: []});
.returns({file: createFileFromId('b'), dependencies: []});
load.stub
.withArgs(idToPath('c'))
.returns({file: createFile('c'), dependencies: ['d']});
.returns({file: createFileFromId('c'), dependencies: ['d']});
load.stub
.withArgs(idToPath('d'))
.returns({file: createFile('d'), dependencies: []});
.returns({file: createFileFromId('d'), dependencies: []});
'abcd'
.split('')
@ -279,10 +316,10 @@ describe('Graph:', () => {
load.stub
.withArgs(idToPath('a'))
.returns({file: createFile('a'), dependencies: ['b']});
.returns({file: createFileFromId('a'), dependencies: ['b']});
load.stub
.withArgs(idToPath('b'))
.returns({file: createFile('b'), dependencies: []});
.returns({file: createFileFromId('b'), dependencies: []});
'ab'
.split('')
@ -302,12 +339,12 @@ describe('Graph:', () => {
resolve.stub.withArgs(id).returns(path);
load.stub
.withArgs(path)
.returns({file: createFile(id), dependencies: []});
.returns({file: createFileFromId(id), dependencies: []});
});
['a', 'd'].forEach(id =>
load.stub
.withArgs(idToPath(id))
.returns({file: createFile(id), dependencies: ['b', 'c']}),
.returns({file: createFileFromId(id), dependencies: ['b', 'c']}),
);
const result = await graph(['a', 'd', 'b'], anyPlatform, noOpts);
@ -329,11 +366,11 @@ describe('Graph:', () => {
.returns(idToPath('c'));
load.stub
.withArgs(idToPath('a'))
.returns({file: createFile('a'), dependencies: ['b']})
.returns({file: createFileFromId('a'), dependencies: ['b']})
.withArgs(idToPath('b'))
.returns({file: createFile('b'), dependencies: ['c']})
.returns({file: createFileFromId('b'), dependencies: ['c']})
.withArgs(idToPath('c'))
.returns({file: createFile('c'), dependencies: ['a']});
.returns({file: createFileFromId('c'), dependencies: ['a']});
const result = await graph(['a'], anyPlatform, noOpts);
expect(result.modules).toEqual([
@ -349,13 +386,13 @@ describe('Graph:', () => {
);
load.stub
.withArgs(idToPath('a'))
.returns({file: createFile('a'), dependencies: ['b', 'c', 'd']})
.returns({file: createFileFromId('a'), dependencies: ['b', 'c', 'd']})
.withArgs(idToPath('b'))
.returns({file: createFile('b'), dependencies: ['e']});
.returns({file: createFileFromId('b'), dependencies: ['e']});
['c', 'd', 'e'].forEach(id =>
load.stub
.withArgs(idToPath(id))
.returns({file: createFile(id), dependencies: []}),
.returns({file: createFileFromId(id), dependencies: []}),
);
const skip = new Set([idToPath('b'), idToPath('c')]);
@ -371,13 +408,17 @@ function createDependency(id) {
return {id, path: idToPath(id)};
}
function createFile(id) {
return {ast: {}, path: idToPath(id)};
function createFileFromId(id) {
return createFileFromPath(idToPath(id));
}
function createFileFromPath(path) {
return {ast: {}, path};
}
function createModule(id, dependencies = []): Module {
return {
file: createFile(id),
file: createFileFromId(id),
dependencies: dependencies.map(createDependency),
};
}

View File

@ -57,7 +57,7 @@ export type GraphResult = {|
export type IdForPathFn = {path: string} => number;
type LoadResult = {
export type LoadResult = {
file: File,
dependencies: Array<string>,
};