react-native/Libraries/Utilities/MessageQueue.js

460 lines
16 KiB
JavaScript
Raw Normal View History

2015-01-30 01:10:49 +00:00
/**
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
2015-01-30 01:10:49 +00:00
*
* @providesModule MessageQueue
*/
'use strict';
var ErrorUtils = require('ErrorUtils');
var invariant = require('invariant');
var warning = require('warning');
var JSTimersExecution = require('JSTimersExecution');
var INTERNAL_ERROR = 'Error in MessageQueue implementation';
/**
* So as not to confuse static build system.
*/
var requireFunc = require;
/**
* @param {Object!} module Module instance, must be loaded.
* @param {string} methodName Name of method in `module`.
* @param {array<*>} params Arguments to method.
* @returns {*} Return value of method invocation.
*/
var jsCall = function(module, methodName, params) {
return module[methodName].apply(module, params);
};
/**
* A utility for aggregating "work" to be done, and potentially transferring
* that work to another thread. Each instance of `MessageQueue` has the notion
* of a "target" thread - the thread that the work will be sent to.
*
* TODO: Long running callback results, and streaming callback results (ability
* for a callback to be invoked multiple times).
*
* @param {object} moduleNameToID Used to translate module/method names into
* efficient numeric IDs.
* @class MessageQueue
*/
var MessageQueue = function(remoteModulesConfig, localModulesConfig, customRequire) {
this._requireFunc = customRequire || requireFunc;
this._initBookeeping();
this._initNamingMap(remoteModulesConfig, localModulesConfig);
};
// REQUEST: Parallell arrays:
var REQUEST_MODULE_IDS = 0;
var REQUEST_METHOD_IDS = 1;
var REQUEST_PARAMSS = 2;
// RESPONSE: Parallell arrays:
var RESPONSE_CBIDS = 3;
var RESPONSE_RETURN_VALUES = 4;
/**
* Utility to catch errors and prevent having to bind, or execute a bound
* function, while catching errors in a process and returning a resulting
* return value. This ensures that even if a process fails, we can still return
* *some* values (from `_flushedQueueUnguarded` for example). Glorified
* try/catch/finally that invokes the global `onerror`.
*
* @param {function} operation Function to execute, likely populates the
* message buffer.
* @param {Array<*>} operationArguments Arguments passed to `operation`.
* @param {function} getReturnValue Returns a return value - will be invoked
* even if the `operation` fails half way through completing its task.
* @return {object} Return value returned from `getReturnValue`.
*/
var guardReturn = function(operation, operationArguments, getReturnValue, context) {
if (operation) {
ErrorUtils.applyWithGuard(operation, context, operationArguments);
}
if (getReturnValue) {
return ErrorUtils.applyWithGuard(getReturnValue, context, null);
}
return null;
};
/**
* Bookkeeping logic for callbackIDs. We ensure that success and error
* callbacks are numerically adjacent.
*
* We could have also stored the association between success cbID and errorCBID
* in a map without relying on this adjacency, but the bookkeeping here avoids
* an additional two maps to associate in each direction, and avoids growing
* dictionaries (new fields). Instead, we compute pairs of callback IDs, by
* populating the `res` argument to `allocateCallbackIDs` (in conjunction with
* pooling). Behind this bookeeping API, we ensure that error and success
* callback IDs are always adjacent so that when one is invoked, we always know
* how to free the memory of the other. By using this API, it is impossible to
* create malformed callbackIDs that are not adjacent.
*/
var createBookkeeping = function() {
return {
/**
* Incrementing callback ID. Must start at 1 - otherwise converted null
* values which become zero are not distinguishable from a GUID of zero.
*/
GUID: 1,
errorCallbackIDForSuccessCallbackID: function(successID) {
return successID + 1;
},
successCallbackIDForErrorCallbackID: function(errorID) {
return errorID - 1;
},
allocateCallbackIDs: function(res) {
res.successCallbackID = this.GUID++;
res.errorCallbackID = this.GUID++;
},
isSuccessCallback: function(id) {
return id % 2 === 1;
}
};
};
var MessageQueueMixin = {
/**
* Creates an efficient wire protocol for communicating across a bridge.
* Avoids allocating strings.
*
* @param {object} remoteModulesConfig Configuration of modules and their
* methods.
*/
_initNamingMap: function(remoteModulesConfig, localModulesConfig) {
this._remoteModuleNameToModuleID = {};
this._remoteModuleIDToModuleName = {}; // Reverse
this._remoteModuleNameToMethodNameToID = {};
this._remoteModuleNameToMethodIDToName = {}; // Reverse
this._localModuleNameToModuleID = {};
this._localModuleIDToModuleName = {}; // Reverse
this._localModuleNameToMethodNameToID = {};
this._localModuleNameToMethodIDToName = {}; // Reverse
function fillMappings(
modulesConfig,
moduleNameToModuleID,
moduleIDToModuleName,
moduleNameToMethodNameToID,
moduleNameToMethodIDToName
) {
for (var moduleName in modulesConfig) {
var moduleConfig = modulesConfig[moduleName];
var moduleID = moduleConfig.moduleID;
moduleNameToModuleID[moduleName] = moduleID;
moduleIDToModuleName[moduleID] = moduleName; // Reverse
moduleNameToMethodNameToID[moduleName] = {};
moduleNameToMethodIDToName[moduleName] = {}; // Reverse
var methods = moduleConfig.methods;
for (var methodName in methods) {
var methodID = methods[methodName].methodID;
moduleNameToMethodNameToID[moduleName][methodName] =
methodID;
moduleNameToMethodIDToName[moduleName][methodID] =
methodName; // Reverse
}
}
}
fillMappings(
remoteModulesConfig,
this._remoteModuleNameToModuleID,
this._remoteModuleIDToModuleName,
this._remoteModuleNameToMethodNameToID,
this._remoteModuleNameToMethodIDToName
);
fillMappings(
localModulesConfig,
this._localModuleNameToModuleID,
this._localModuleIDToModuleName,
this._localModuleNameToMethodNameToID,
this._localModuleNameToMethodIDToName
);
},
_initBookeeping: function() {
this._POOLED_CBIDS = {errorCallbackID: null, successCallbackID: null};
this._bookkeeping = createBookkeeping();
/**
* Stores callbacks so that we may simulate asynchronous return values from
* other threads. Remote invocations in other threads can pass return values
* back asynchronously to the requesting thread.
*/
this._threadLocalCallbacksByID = [];
this._threadLocalScopesByID = [];
/**
* Memory efficient parallel arrays. Each index cuts through the three
* arrays and forms a remote invocation of methodName(params) whos return
* value will be reported back to the other thread by way of the
* corresponding id in cbIDs. Each entry (A-D in the graphic below),
* represents a work item of the following form:
* - moduleID: ID of module to invoke method from.
* - methodID: ID of method in module to invoke.
* - params: List of params to pass to method.
* - cbID: ID to respond back to originating thread with.
*
* TODO: We can make this even more efficient (memory) by creating a single
* array, that is always pushed `n` elements as a time.
*/
this._outgoingItems = [
/*REQUEST_MODULE_IDS: */ [/* +-+ +-+ +-+ +-+ */],
/*REQUEST_METHOD_IDS: */ [/* |A| |B| |C| |D| */],
/*REQUEST_PARAMSS: */ [/* |-| |-| |-| |-| */],
/*RESPONSE_CBIDS: */ [/* +-+ +-+ +-+ +-+ */],
/* |E| |F| |G| |H| */
/*RESPONSE_RETURN_VALUES: */ [/* +-+ +-+ +-+ +-+ */]
];
/**
* Used to allow returning the buffer, while at the same time clearing it in
* a memory efficient manner.
*/
this._outgoingItemsSwap = [[], [], [], [], []];
},
invokeCallback: function(cbID, args) {
return guardReturn(this._invokeCallback, [cbID, args], null, this);
},
_invokeCallback: function(cbID, args) {
try {
var cb = this._threadLocalCallbacksByID[cbID];
var scope = this._threadLocalScopesByID[cbID];
warning(
cb,
'Cannot find callback with CBID %s. Native module may have invoked ' +
'both the success callback and the error callback.',
cbID
);
cb.apply(scope, args);
} catch(ie_requires_catch) {
throw ie_requires_catch;
} finally {
// Clear out the memory regardless of success or failure.
this._freeResourcesForCallbackID(cbID);
}
},
invokeCallbackAndReturnFlushedQueue: function(cbID, args) {
if (this._enableLogging) {
this._loggedIncomingItems.push([new Date().getTime(), cbID, args]);
}
return guardReturn(
this._invokeCallback,
[cbID, args],
this._flushedQueueUnguarded,
this
);
},
callFunction: function(moduleID, methodID, params) {
return guardReturn(this._callFunction, [moduleID, methodID, params], null, this);
},
_callFunction: function(moduleID, methodID, params) {
var moduleName = this._localModuleIDToModuleName[moduleID];
var methodName = this._localModuleNameToMethodIDToName[moduleName][methodID];
var ret = jsCall(this._requireFunc(moduleName), methodName, params);
return ret;
},
callFunctionReturnFlushedQueue: function(moduleID, methodID, params) {
if (this._enableLogging) {
this._loggedIncomingItems.push([new Date().getTime(), moduleID, methodID, params]);
}
return guardReturn(
this._callFunction,
[moduleID, methodID, params],
this._flushedQueueUnguarded,
this
);
},
setLoggingEnabled: function(enabled) {
this._enableLogging = enabled;
this._loggedIncomingItems = [];
this._loggedOutgoingItems = [[], [], [], [], []];
},
getLoggedIncomingItems: function() {
return this._loggedIncomingItems;
},
getLoggedOutgoingItems: function() {
return this._loggedOutgoingItems;
},
replayPreviousLog: function(previousLog) {
this._outgoingItems = previousLog;
},
/**
* Simple helpers for clearing the queues. This doesn't handle the fact that
* memory in the current buffer is leaked until the next frame or update - but
* that will typically be on the order of < 500ms.
*/
_swapAndReinitializeBuffer: function() {
// Outgoing requests
var currentOutgoingItems = this._outgoingItems;
var nextOutgoingItems = this._outgoingItemsSwap;
nextOutgoingItems[REQUEST_MODULE_IDS].length = 0;
nextOutgoingItems[REQUEST_METHOD_IDS].length = 0;
nextOutgoingItems[REQUEST_PARAMSS].length = 0;
// Outgoing responses
nextOutgoingItems[RESPONSE_CBIDS].length = 0;
nextOutgoingItems[RESPONSE_RETURN_VALUES].length = 0;
this._outgoingItemsSwap = currentOutgoingItems;
this._outgoingItems = nextOutgoingItems;
},
/**
* @param {string} moduleID JS module name.
* @param {methodName} methodName Method in module to invoke.
* @param {array<*>?} params Array representing arguments to method.
* @param {string} cbID Unique ID to pass back in potential response.
*/
_pushRequestToOutgoingItems: function(moduleID, methodName, params) {
this._outgoingItems[REQUEST_MODULE_IDS].push(moduleID);
this._outgoingItems[REQUEST_METHOD_IDS].push(methodName);
this._outgoingItems[REQUEST_PARAMSS].push(params);
if (this._enableLogging) {
this._loggedOutgoingItems[REQUEST_MODULE_IDS].push(moduleID);
this._loggedOutgoingItems[REQUEST_METHOD_IDS].push(methodName);
this._loggedOutgoingItems[REQUEST_PARAMSS].push(params);
}
},
/**
* @param {string} cbID Unique ID that other side of bridge has remembered.
* @param {*} returnValue Return value to pass to callback on other side of
* bridge.
*/
_pushResponseToOutgoingItems: function(cbID, returnValue) {
this._outgoingItems[RESPONSE_CBIDS].push(cbID);
this._outgoingItems[RESPONSE_RETURN_VALUES].push(returnValue);
},
_freeResourcesForCallbackID: function(cbID) {
var correspondingCBID = this._bookkeeping.isSuccessCallback(cbID) ?
this._bookkeeping.errorCallbackIDForSuccessCallbackID(cbID) :
this._bookkeeping.successCallbackIDForErrorCallbackID(cbID);
this._threadLocalCallbacksByID[cbID] = null;
this._threadLocalScopesByID[cbID] = null;
if (this._threadLocalCallbacksByID[correspondingCBID]) {
this._threadLocalCallbacksByID[correspondingCBID] = null;
this._threadLocalScopesByID[correspondingCBID] = null;
}
},
/**
* @param {Function} onFail Function to store in current thread for later
* lookup, when request fails.
* @param {Function} onSucc Function to store in current thread for later
* lookup, when request succeeds.
* @param {Object?=} scope Scope to invoke `cb` with.
* @param {Object?=} res Resulting callback ids. Use `this._POOLED_CBIDS`.
*/
_storeCallbacksInCurrentThread: function(onFail, onSucc, scope) {
invariant(onFail || onSucc, INTERNAL_ERROR);
this._bookkeeping.allocateCallbackIDs(this._POOLED_CBIDS);
var succCBID = this._POOLED_CBIDS.successCallbackID;
var errorCBID = this._POOLED_CBIDS.errorCallbackID;
this._threadLocalCallbacksByID[errorCBID] = onFail;
this._threadLocalCallbacksByID[succCBID] = onSucc;
this._threadLocalScopesByID[errorCBID] = scope;
this._threadLocalScopesByID[succCBID] = scope;
},
/**
* IMPORTANT: There is possibly a timing issue with this form of flushing. We
* are currently not seeing any problems but the potential issue to look out
* for is:
* - While flushing this._outgoingItems contains the work for the other thread
* to perform.
* - To mitigate this, we never allow enqueueing messages if the queue is
* already reserved - as long as it is reserved, it could be in the midst of
* a flush.
*
* If this ever occurs we can easily eliminate the race condition. We can
* completely solve any ambiguity by sending messages such that we'll never
* try to reserve the queue when already reserved. Here's the pseudocode:
*
* var defensiveCopy = efficientDefensiveCopy(this._outgoingItems);
* this._swapAndReinitializeBuffer();
*/
flushedQueue: function() {
return guardReturn(null, null, this._flushedQueueUnguarded, this);
},
_flushedQueueUnguarded: function() {
// Call the functions registred via setImmediate
JSTimersExecution.callImmediates();
var currentOutgoingItems = this._outgoingItems;
this._swapAndReinitializeBuffer();
var ret = currentOutgoingItems[REQUEST_MODULE_IDS].length ||
currentOutgoingItems[RESPONSE_RETURN_VALUES].length ? currentOutgoingItems : null;
return ret;
},
call: function(moduleName, methodName, params, onFail, onSucc, scope) {
invariant(
(!onFail || typeof onFail === 'function') &&
(!onSucc || typeof onSucc === 'function'),
'Callbacks must be functions'
);
// Store callback _before_ sending the request, just in case the MailBox
// returns the response in a blocking manner.
if (onSucc) {
2015-01-30 01:10:49 +00:00
this._storeCallbacksInCurrentThread(onFail, onSucc, scope, this._POOLED_CBIDS);
onFail && params.push(this._POOLED_CBIDS.errorCallbackID);
2015-01-30 01:10:49 +00:00
params.push(this._POOLED_CBIDS.successCallbackID);
}
var moduleID = this._remoteModuleNameToModuleID[moduleName];
if (moduleID === undefined || moduleID === null) {
throw new Error('Unrecognized module name:' + moduleName);
}
var methodID = this._remoteModuleNameToMethodNameToID[moduleName][methodName];
if (methodID === undefined || moduleID === null) {
throw new Error('Unrecognized method name:' + methodName);
}
this._pushRequestToOutgoingItems(moduleID, methodID, params);
},
__numPendingCallbacksOnlyUseMeInTestCases: function() {
var callbacks = this._threadLocalCallbacksByID;
var total = 0;
for (var i = 0; i < callbacks.length; i++) {
if (callbacks[i]) {
total++;
}
}
return total;
}
};
Object.assign(MessageQueue.prototype, MessageQueueMixin);
module.exports = MessageQueue;