Merge pull request #1488 from realm/fix-realm-open-android-debugger
Fix realm open android debugger
This commit is contained in:
commit
4fd144f982
15
CHANGELOG.md
15
CHANGELOG.md
|
@ -1,3 +1,18 @@
|
|||
X.Y.Z Release notes
|
||||
=============================================================
|
||||
### Breaking changes
|
||||
* None.
|
||||
|
||||
### Enchancements
|
||||
* None
|
||||
|
||||
### Bug fixes
|
||||
* Fixes Realm.open hangs in React Native debugger for iOS and Android
|
||||
|
||||
### Internal
|
||||
* None.
|
||||
|
||||
|
||||
2.0.6 Release notes (2017-11-10)
|
||||
=============================================================
|
||||
### Breaking changes
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
import * as base64 from './base64';
|
||||
import { keys, objectTypes } from './constants';
|
||||
|
||||
const {id: idKey, realm: _realmKey} = keys;
|
||||
const { id: idKey, realm: _realmKey } = keys;
|
||||
let registeredCallbacks = [];
|
||||
const typeConverters = {};
|
||||
|
||||
|
@ -41,8 +41,8 @@ if (XMLHttpRequest.__proto__ != global.XMLHttpRequestEventTarget) {
|
|||
global.XMLHttpRequest = fakeXMLHttpRequest;
|
||||
}
|
||||
|
||||
registerTypeConverter(objectTypes.DATA, (_, {value}) => base64.decode(value));
|
||||
registerTypeConverter(objectTypes.DATE, (_, {value}) => new Date(value));
|
||||
registerTypeConverter(objectTypes.DATA, (_, { value }) => base64.decode(value));
|
||||
registerTypeConverter(objectTypes.DATE, (_, { value }) => new Date(value));
|
||||
registerTypeConverter(objectTypes.DICT, deserializeDict);
|
||||
registerTypeConverter(objectTypes.FUNCTION, deserializeFunction);
|
||||
|
||||
|
@ -54,7 +54,6 @@ export function createSession(refreshAccessToken, host) {
|
|||
refreshAccessToken[persistentCallback] = true;
|
||||
sessionId = sendRequest('create_session', { refreshAccessToken: serialize(undefined, refreshAccessToken) }, host);
|
||||
sessionHost = host;
|
||||
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
|
@ -63,18 +62,18 @@ export function createRealm(args) {
|
|||
args = args.map((arg) => serialize(null, arg));
|
||||
}
|
||||
|
||||
return sendRequest('create_realm', {arguments: args});
|
||||
return sendRequest('create_realm', { arguments: args });
|
||||
}
|
||||
|
||||
export function createUser(args) {
|
||||
args = args.map((arg) => serialize(null, arg));
|
||||
const result = sendRequest('create_user', {arguments: args});
|
||||
const result = sendRequest('create_user', { arguments: args });
|
||||
return deserialize(undefined, result);
|
||||
}
|
||||
|
||||
export function _adminUser(args) {
|
||||
args = args.map((arg) => serialize(null, arg));
|
||||
const result = sendRequest('_adminUser', {arguments: args});
|
||||
const result = sendRequest('_adminUser', { arguments: args });
|
||||
return deserialize(undefined, result);
|
||||
}
|
||||
|
||||
|
@ -83,18 +82,18 @@ export function callMethod(realmId, id, name, args) {
|
|||
args = args.map((arg) => serialize(realmId, arg));
|
||||
}
|
||||
|
||||
let result = sendRequest('call_method', {realmId, id, name, arguments: args});
|
||||
let result = sendRequest('call_method', { realmId, id, name, arguments: args });
|
||||
return deserialize(realmId, result);
|
||||
}
|
||||
|
||||
export function getProperty(realmId, id, name) {
|
||||
let result = sendRequest('get_property', {realmId, id, name});
|
||||
let result = sendRequest('get_property', { realmId, id, name });
|
||||
return deserialize(realmId, result);
|
||||
}
|
||||
|
||||
export function setProperty(realmId, id, name, value) {
|
||||
value = serialize(realmId, value);
|
||||
sendRequest('set_property', {realmId, id, name, value});
|
||||
sendRequest('set_property', { realmId, id, name, value });
|
||||
}
|
||||
|
||||
export function getAllUsers() {
|
||||
|
@ -116,36 +115,36 @@ function registerCallback(callback) {
|
|||
|
||||
function serialize(realmId, value) {
|
||||
if (typeof value == 'undefined') {
|
||||
return {type: objectTypes.UNDEFINED};
|
||||
return { type: objectTypes.UNDEFINED };
|
||||
}
|
||||
if (typeof value == 'function') {
|
||||
return {type: objectTypes.FUNCTION, value: registerCallback(value)};
|
||||
return { type: objectTypes.FUNCTION, value: registerCallback(value) };
|
||||
}
|
||||
if (!value || typeof value != 'object') {
|
||||
return {value: value};
|
||||
return { value: value };
|
||||
}
|
||||
|
||||
let id = value[idKey];
|
||||
if (id) {
|
||||
return {id};
|
||||
return { id };
|
||||
}
|
||||
|
||||
if (value instanceof Date) {
|
||||
return {type: objectTypes.DATE, value: value.getTime()};
|
||||
return { type: objectTypes.DATE, value: value.getTime() };
|
||||
}
|
||||
|
||||
if (Array.isArray(value)) {
|
||||
let array = value.map((item) => serialize(realmId, item));
|
||||
return {value: array};
|
||||
return { value: array };
|
||||
}
|
||||
|
||||
if (value instanceof ArrayBuffer || ArrayBuffer.isView(value)) {
|
||||
return {type: objectTypes.DATA, value: base64.encode(value)};
|
||||
return { type: objectTypes.DATA, value: base64.encode(value) };
|
||||
}
|
||||
|
||||
let keys = Object.keys(value);
|
||||
let values = keys.map((key) => serialize(realmId, value[key]));
|
||||
return {type: objectTypes.DICT, keys, values};
|
||||
return { type: objectTypes.DICT, keys, values };
|
||||
}
|
||||
|
||||
export function deserialize(realmId, info) {
|
||||
|
@ -164,7 +163,7 @@ export function deserialize(realmId, info) {
|
|||
}
|
||||
|
||||
function deserializeDict(realmId, info) {
|
||||
let {keys, values} = info;
|
||||
let { keys, values } = info;
|
||||
let object = {};
|
||||
|
||||
for (let i = 0, len = keys.length; i < len; i++) {
|
||||
|
@ -186,10 +185,10 @@ function makeRequest(url, data) {
|
|||
if (global.__debug__) {
|
||||
let request = global.__debug__.require('sync-request');
|
||||
let response = request('POST', url, {
|
||||
body: JSON.stringify(data),
|
||||
headers: {
|
||||
"Content-Type": "text/plain;charset=UTF-8"
|
||||
}
|
||||
body: JSON.stringify(data),
|
||||
headers: {
|
||||
"Content-Type": "text/plain;charset=UTF-8"
|
||||
}
|
||||
});
|
||||
|
||||
statusCode = response.statusCode;
|
||||
|
@ -212,6 +211,8 @@ function makeRequest(url, data) {
|
|||
return JSON.parse(responseText);
|
||||
}
|
||||
|
||||
let pollTimeoutId;
|
||||
|
||||
//returns an object from rpc serialized json value
|
||||
function deserialize_json_value(value) {
|
||||
let result = {};
|
||||
|
@ -225,58 +226,69 @@ function deserialize_json_value(value) {
|
|||
result[propName] = propValue.value;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
function sendRequest(command, data, host = sessionHost) {
|
||||
if (!host) {
|
||||
throw new Error('Must first create RPC session with a valid host');
|
||||
}
|
||||
|
||||
data = Object.assign({}, data, sessionId ? {sessionId} : null);
|
||||
|
||||
let url = 'http://' + host + '/' + command;
|
||||
let response = makeRequest(url, data);
|
||||
|
||||
if (!response || response.error) {
|
||||
let error = response && response.error;
|
||||
|
||||
// Remove the type prefix from the error message (e.g. "Error: ").
|
||||
if (error && error.replace) {
|
||||
error = error.replace(/^[a-z]+: /i, '');
|
||||
clearTimeout(pollTimeoutId);
|
||||
try {
|
||||
if (!host) {
|
||||
throw new Error('Must first create RPC session with a valid host');
|
||||
}
|
||||
else if (error.type && error.type === 'dict') {
|
||||
const responseError = deserialize_json_value(error);
|
||||
let responeMessage;
|
||||
if (response.message && response.message !== '') {
|
||||
// Remove the type prefix from the error message (e.g. "Error: ").
|
||||
responeMessage = response.message.replace(/^[a-z]+: /i, '');
|
||||
|
||||
data = Object.assign({}, data, sessionId ? { sessionId } : null);
|
||||
|
||||
let url = 'http://' + host + '/' + command;
|
||||
let response = makeRequest(url, data);
|
||||
|
||||
if (!response || response.error) {
|
||||
let error = response && response.error;
|
||||
|
||||
// Remove the type prefix from the error message (e.g. "Error: ").
|
||||
if (error && error.replace) {
|
||||
error = error.replace(/^[a-z]+: /i, '');
|
||||
}
|
||||
else if (error.type && error.type === 'dict') {
|
||||
const responseError = deserialize_json_value(error);
|
||||
let responeMessage;
|
||||
if (response.message && response.message !== '') {
|
||||
// Remove the type prefix from the error message (e.g. "Error: ").
|
||||
responeMessage = response.message.replace(/^[a-z]+: /i, '');
|
||||
}
|
||||
|
||||
const exceptionToReport = new Error(responeMessage);
|
||||
Object.assign(exceptionToReport, responseError);
|
||||
throw exceptionToReport;
|
||||
}
|
||||
|
||||
const exceptionToReport = new Error(responeMessage);
|
||||
Object.assign(exceptionToReport, responseError);
|
||||
throw exceptionToReport;
|
||||
throw new Error(error || `Invalid response for "${command}"`);
|
||||
}
|
||||
let callback = response.callback;
|
||||
if (callback != null) {
|
||||
let result;
|
||||
let error;
|
||||
try {
|
||||
let realmId = data.realmId;
|
||||
let thisObject = deserialize(realmId, response.this);
|
||||
let args = deserialize(realmId, response.arguments);
|
||||
result = registeredCallbacks[callback].apply(thisObject, args);
|
||||
result = serialize(realmId, result);
|
||||
} catch (e) {
|
||||
error = e.message || ('' + e);
|
||||
}
|
||||
|
||||
let callbackCommand = "callback_result";
|
||||
if (command == 'callbacks_poll') {
|
||||
callbackCommand = "callback_poll_result";
|
||||
}
|
||||
|
||||
return sendRequest(callbackCommand, { callback, result, error, "callback_call_counter": response.callback_call_counter });
|
||||
}
|
||||
|
||||
throw new Error(error || `Invalid response for "${command}"`);
|
||||
return response.result;
|
||||
}
|
||||
|
||||
let callback = response.callback;
|
||||
if (callback != null) {
|
||||
let result;
|
||||
let error;
|
||||
try {
|
||||
let realmId = data.realmId;
|
||||
let thisObject = deserialize(realmId, response.this);
|
||||
let args = deserialize(realmId, response.arguments);
|
||||
result = registeredCallbacks[callback].apply(thisObject, args);
|
||||
result = serialize(realmId, result);
|
||||
} catch (e) {
|
||||
error = e.message || ('' + e);
|
||||
}
|
||||
return sendRequest('callback_result', {callback, result, error});
|
||||
finally {
|
||||
pollTimeoutId = setTimeout(() => sendRequest('callbacks_poll'), 100);
|
||||
}
|
||||
|
||||
return response.result;
|
||||
}
|
||||
|
|
|
@ -76,7 +76,6 @@ module.exports = function(realmConstructor) {
|
|||
else {
|
||||
try {
|
||||
let syncedRealm = new realmConstructor(config);
|
||||
//FIXME: RN hangs here. Remove when node's makeCallback alternative is implemented
|
||||
setTimeout(() => { resolve(syncedRealm); }, 1);
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
|
|
|
@ -2,6 +2,9 @@ package io.realm.react;
|
|||
|
||||
import android.content.res.AssetManager;
|
||||
import android.os.Build;
|
||||
import android.os.Handler;
|
||||
import android.os.HandlerThread;
|
||||
import android.os.Looper;
|
||||
import android.util.Log;
|
||||
|
||||
import com.facebook.react.bridge.ReactApplicationContext;
|
||||
|
@ -19,6 +22,9 @@ import java.util.Enumeration;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.RunnableFuture;
|
||||
|
||||
import fi.iki.elonen.NanoHTTPD;
|
||||
|
||||
|
@ -37,6 +43,9 @@ class RealmReactModule extends ReactContextBaseJavaModule {
|
|||
SoLoader.loadLibrary("realmreact");
|
||||
}
|
||||
|
||||
private Handler worker;
|
||||
private HandlerThread workerThread;
|
||||
|
||||
public RealmReactModule(ReactApplicationContext reactContext) {
|
||||
super(reactContext);
|
||||
|
||||
|
@ -121,7 +130,9 @@ class RealmReactModule extends ReactContextBaseJavaModule {
|
|||
|
||||
private void startWebServer() {
|
||||
setupChromeDebugModeRealmJsContext();
|
||||
webServer = new AndroidWebServer(DEFAULT_PORT);
|
||||
startWorker();
|
||||
|
||||
webServer = new AndroidWebServer(DEFAULT_PORT, getReactApplicationContext());
|
||||
try {
|
||||
webServer.start();
|
||||
Log.i("Realm", "Starting the debugging WebServer, Host: " + webServer.getHostname() + " Port: " + webServer.getListeningPort());
|
||||
|
@ -130,20 +141,44 @@ class RealmReactModule extends ReactContextBaseJavaModule {
|
|||
}
|
||||
}
|
||||
|
||||
private void startWorker() {
|
||||
workerThread = new HandlerThread("MyHandlerThread");
|
||||
workerThread.start();
|
||||
worker = new Handler(workerThread.getLooper());
|
||||
worker.postDelayed(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean stop = tryRunTask();
|
||||
if (!stop) {
|
||||
worker.postDelayed(this, 10);
|
||||
}
|
||||
}
|
||||
}, 10);
|
||||
}
|
||||
|
||||
private void stopWebServer() {
|
||||
if (webServer != null) {
|
||||
Log.i("Realm", "Stopping the webserver");
|
||||
webServer.stop();
|
||||
}
|
||||
|
||||
if (workerThread != null) {
|
||||
workerThread.quit();
|
||||
workerThread = null;
|
||||
}
|
||||
}
|
||||
|
||||
class AndroidWebServer extends NanoHTTPD {
|
||||
public AndroidWebServer(int port) {
|
||||
private ReactApplicationContext reactApplicationContext;
|
||||
|
||||
public AndroidWebServer(int port, ReactApplicationContext reactApplicationContext) {
|
||||
super(port);
|
||||
this.reactApplicationContext = reactApplicationContext;
|
||||
}
|
||||
|
||||
public AndroidWebServer(String hostname, int port) {
|
||||
public AndroidWebServer(String hostname, int port, ReactApplicationContext reactApplicationContext) {
|
||||
super(hostname, port);
|
||||
this.reactApplicationContext = reactApplicationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -164,7 +199,7 @@ class RealmReactModule extends ReactContextBaseJavaModule {
|
|||
return response;
|
||||
}
|
||||
final String jsonResponse = processChromeDebugCommand(cmdUri, json);
|
||||
|
||||
|
||||
Response response = newFixedLengthResponse(jsonResponse);
|
||||
response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081");
|
||||
return response;
|
||||
|
@ -185,4 +220,7 @@ class RealmReactModule extends ReactContextBaseJavaModule {
|
|||
|
||||
// this receives one command from Chrome debug then return the processing we should post back
|
||||
private native String processChromeDebugCommand(String cmd, String args);
|
||||
|
||||
// this receives one command from Chrome debug then return the processing we should post back
|
||||
private native boolean tryRunTask();
|
||||
}
|
||||
|
|
|
@ -77,6 +77,13 @@ JNIEXPORT jstring JNICALL Java_io_realm_react_RealmReactModule_processChromeDebu
|
|||
return env->NewStringUTF(response.dump().c_str());
|
||||
}
|
||||
|
||||
JNIEXPORT jboolean JNICALL Java_io_realm_react_RealmReactModule_tryRunTask
|
||||
(JNIEnv *env, jclass)
|
||||
{
|
||||
jboolean result = s_rpc_server->try_run_task();
|
||||
return result;
|
||||
}
|
||||
|
||||
JNIEXPORT jboolean JNICALL Java_io_realm_react_RealmReactModule_isContextInjected
|
||||
(JNIEnv *env, jclass)
|
||||
{
|
||||
|
|
|
@ -29,6 +29,13 @@ JNIEXPORT jlong JNICALL Java_io_realm_react_RealmReactModule_setupChromeDebugMod
|
|||
JNIEXPORT jstring JNICALL Java_io_realm_react_RealmReactModule_processChromeDebugCommand
|
||||
(JNIEnv *, jclass, jstring, jstring);
|
||||
|
||||
/*
|
||||
* Class: io_realm_react_RealmReactModule
|
||||
* Method: tryRunTask
|
||||
*/
|
||||
JNIEXPORT jboolean JNICALL Java_io_realm_react_RealmReactModule_tryRunTask
|
||||
(JNIEnv *, jclass);
|
||||
|
||||
/*
|
||||
* Class: io_realm_react_RealmReactModule
|
||||
* Method: isContextInjected
|
||||
|
|
|
@ -35,6 +35,23 @@ public:
|
|||
return do_pop_back();
|
||||
}
|
||||
|
||||
T pop_if(std::function<bool(const T&)> predicate) {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
|
||||
for (auto it = m_deque.begin(); it != m_deque.end();) {
|
||||
if (predicate(*it)) {
|
||||
T item = std::move(*it);
|
||||
m_deque.erase(it);
|
||||
return item;
|
||||
}
|
||||
else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
util::Optional<T> try_pop_back(size_t timeout) {
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
m_condition.wait_for(lock, std::chrono::milliseconds(timeout),
|
||||
|
@ -57,7 +74,7 @@ public:
|
|||
}
|
||||
|
||||
bool empty() {
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
std::lock_guard <std::mutex> lock(m_mutex);
|
||||
return m_deque.empty();
|
||||
}
|
||||
|
||||
|
|
|
@ -728,7 +728,7 @@ void RealmClass<T>::wait_for_download_completion(ContextType ctx, ObjectType thi
|
|||
HANDLESCOPE
|
||||
if (!error_code) {
|
||||
//success
|
||||
Function<T>::callback(protected_ctx, protected_callback, protected_this, 0, nullptr);
|
||||
Function<T>::callback(protected_ctx, protected_callback, typename T::Object(), 0, nullptr);
|
||||
}
|
||||
else {
|
||||
//fail
|
||||
|
@ -738,7 +738,8 @@ void RealmClass<T>::wait_for_download_completion(ContextType ctx, ObjectType thi
|
|||
|
||||
ValueType callback_arguments[1];
|
||||
callback_arguments[0] = object;
|
||||
Function<T>::callback(protected_ctx, protected_callback, protected_this, 1, callback_arguments);
|
||||
|
||||
Function<T>::callback(protected_ctx, protected_callback, typename T::Object(), 1, callback_arguments);
|
||||
}
|
||||
|
||||
// We keep our Realm instance alive until the callback has had a chance to open its own instance.
|
||||
|
@ -755,7 +756,7 @@ void RealmClass<T>::wait_for_download_completion(ContextType ctx, ObjectType thi
|
|||
auto syncSession = create_object<T, SessionClass<T>>(ctx, new WeakSession(session));
|
||||
ValueType callback_arguments[1];
|
||||
callback_arguments[0] = syncSession;
|
||||
Function<T>::callback(protected_ctx, session_callback_func, protected_this, 1, callback_arguments);
|
||||
Function<T>::callback(protected_ctx, session_callback_func, typename T::Object(), 1, callback_arguments);
|
||||
}
|
||||
|
||||
session->wait_for_download_completion(std::move(wait_handler));
|
||||
|
|
106
src/rpc.cpp
106
src/rpc.cpp
|
@ -82,13 +82,30 @@ RPCServer*& get_rpc_server(JSGlobalContextRef ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
void runLoopFunc(CFRunLoopRef loop, RPCWorker* rpcWorker) {
|
||||
auto m_stop = false;
|
||||
CFRunLoopPerformBlock(loop, kCFRunLoopDefaultMode,
|
||||
^{
|
||||
rpcWorker->try_run_task();
|
||||
if (rpcWorker->should_stop()) {
|
||||
CFRunLoopStop(CFRunLoopGetCurrent());
|
||||
} else {
|
||||
runLoopFunc(loop, rpcWorker);
|
||||
}
|
||||
});
|
||||
CFRunLoopWakeUp(loop);
|
||||
}
|
||||
#endif
|
||||
|
||||
RPCWorker::RPCWorker() {
|
||||
m_thread = std::thread([this]() {
|
||||
// TODO: Create ALooper/CFRunLoop to support async calls.
|
||||
while (!m_stop) {
|
||||
try_run_task();
|
||||
}
|
||||
});
|
||||
#ifdef __APPLE__
|
||||
m_thread = std::thread([this]() {
|
||||
m_loop = CFRunLoopGetCurrent();
|
||||
runLoopFunc(m_loop, this);
|
||||
CFRunLoopRun();
|
||||
});
|
||||
#endif
|
||||
}
|
||||
|
||||
RPCWorker::~RPCWorker() {
|
||||
|
@ -107,29 +124,53 @@ json RPCWorker::pop_task_result() {
|
|||
return future.get();
|
||||
}
|
||||
|
||||
void RPCWorker::try_run_task() {
|
||||
json RPCWorker::try_pop_task_result() {
|
||||
// This might block until a future has been added.
|
||||
auto future = m_futures.try_pop_back(0);
|
||||
if (!future) {
|
||||
return json::object();
|
||||
}
|
||||
// This will block until a return value (or exception) is available.
|
||||
return (*future).get();
|
||||
}
|
||||
|
||||
bool RPCWorker::try_run_task() {
|
||||
if (m_stop) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Use a 10 millisecond timeout to keep this thread unblocked.
|
||||
auto task = m_tasks.try_pop_back(10);
|
||||
if (!task) {
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
(*task)();
|
||||
|
||||
// Since this can be called recursively, it must be pushed to the front of the queue *after* running the task.
|
||||
m_futures.push_front(task->get_future());
|
||||
|
||||
return m_stop;
|
||||
}
|
||||
|
||||
bool RPCWorker::should_stop() {
|
||||
return m_stop;
|
||||
}
|
||||
|
||||
void RPCWorker::stop() {
|
||||
if (!m_stop) {
|
||||
m_stop = true;
|
||||
#if __APPLE__
|
||||
m_thread.join();
|
||||
m_loop = nullptr;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
RPCServer::RPCServer() {
|
||||
m_context = JSGlobalContextCreate(NULL);
|
||||
get_rpc_server(m_context) = this;
|
||||
m_callback_call_counter = 1;
|
||||
|
||||
// JavaScriptCore crashes when trying to walk up the native stack to print the stacktrace.
|
||||
// FIXME: Avoid having to do this!
|
||||
|
@ -307,6 +348,7 @@ void RPCServer::run_callback(JSContextRef ctx, JSObjectRef function, JSObjectRef
|
|||
return;
|
||||
}
|
||||
|
||||
u_int64_t counter = server->m_callback_call_counter++;
|
||||
// The first argument was curried to be the callback id.
|
||||
RPCObjectID callback_id = server->m_callback_ids[function];
|
||||
JSObjectRef arguments_array = jsc::Object::create_array(ctx, uint32_t(argc), arguments);
|
||||
|
@ -320,26 +362,46 @@ void RPCServer::run_callback(JSContextRef ctx, JSObjectRef function, JSObjectRef
|
|||
{"callback", callback_id},
|
||||
{"this", this_json},
|
||||
{"arguments", arguments_json},
|
||||
{"callback_call_counter", counter}
|
||||
};
|
||||
});
|
||||
|
||||
// Wait for the next callback result to come off the result stack.
|
||||
while (server->m_callback_results.empty()) {
|
||||
// This may recursively bring us into another callback, hence the callback results being a stack.
|
||||
server->m_worker.try_run_task();
|
||||
// Wait for this callback call result to come off the result stack.
|
||||
json callbackResult = nullptr;
|
||||
while (callbackResult == nullptr) {
|
||||
callbackResult = server->m_callback_results.pop_if([&](json result) {
|
||||
auto resultCallbackId = result["callback"].get<u_int64_t>();
|
||||
auto resultCallbackCounter = result["callback_call_counter"].get<u_int64_t>();
|
||||
if (resultCallbackId == callback_id && resultCallbackCounter == counter) {
|
||||
return true;
|
||||
}
|
||||
else {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
|
||||
if (callbackResult == nullptr) {
|
||||
server->m_worker.try_run_task();
|
||||
}
|
||||
}
|
||||
|
||||
json results = server->m_callback_results.pop_back();
|
||||
json results = callbackResult;
|
||||
json error = results["error"];
|
||||
|
||||
auto resultCallbackId = results["callback"];
|
||||
if (resultCallbackId.is_null()) {
|
||||
|
||||
}
|
||||
// The callback id should be identical!
|
||||
assert(callback_id == results["callback"].get<RPCObjectID>());
|
||||
assert(callback_id == resultCallbackId.get<RPCObjectID>());
|
||||
|
||||
if (!error.is_null()) {
|
||||
throw jsc::Exception(ctx, error.get<std::string>());
|
||||
}
|
||||
|
||||
|
||||
return_value.set(server->deserialize_json_value(results["result"]));
|
||||
|
||||
}
|
||||
|
||||
json RPCServer::perform_request(std::string name, const json &args) {
|
||||
|
@ -355,6 +417,15 @@ json RPCServer::perform_request(std::string name, const json &args) {
|
|||
json results(args);
|
||||
m_callback_results.push_back(std::move(results));
|
||||
}
|
||||
else if (name == "/callback_poll_result") {
|
||||
json results(args);
|
||||
m_callback_results.push_back(std::move(results));
|
||||
return json::object();
|
||||
}
|
||||
else if (name == "/callbacks_poll") {
|
||||
auto result = m_worker.try_pop_task_result();
|
||||
return result;
|
||||
}
|
||||
else {
|
||||
RPCRequest action = m_requests[name];
|
||||
assert(action);
|
||||
|
@ -382,12 +453,17 @@ json RPCServer::perform_request(std::string name, const json &args) {
|
|||
|
||||
try {
|
||||
// This will either be the return value (or exception) of the action perform, OR an instruction to run a callback.
|
||||
return m_worker.pop_task_result();
|
||||
auto result = m_worker.pop_task_result();
|
||||
return result;
|
||||
} catch (std::exception &exception) {
|
||||
return {{"error", exception.what()}};
|
||||
}
|
||||
}
|
||||
|
||||
bool RPCServer::try_run_task() {
|
||||
return m_worker.try_run_task();
|
||||
}
|
||||
|
||||
RPCObjectID RPCServer::store_object(JSObjectRef object) {
|
||||
static RPCObjectID s_next_id = 1;
|
||||
|
||||
|
|
10
src/rpc.hpp
10
src/rpc.hpp
|
@ -45,12 +45,17 @@ class RPCWorker {
|
|||
|
||||
void add_task(std::function<json()>);
|
||||
json pop_task_result();
|
||||
void try_run_task();
|
||||
bool try_run_task();
|
||||
void stop();
|
||||
json try_pop_task_result();
|
||||
bool should_stop();
|
||||
|
||||
private:
|
||||
bool m_stop = false;
|
||||
#if __APPLE__
|
||||
std::thread m_thread;
|
||||
CFRunLoopRef m_loop;
|
||||
#endif
|
||||
ConcurrentDeque<std::packaged_task<json()>> m_tasks;
|
||||
ConcurrentDeque<std::future<json>> m_futures;
|
||||
};
|
||||
|
@ -60,6 +65,8 @@ class RPCServer {
|
|||
RPCServer();
|
||||
~RPCServer();
|
||||
json perform_request(std::string name, const json &args);
|
||||
bool try_run_task();
|
||||
|
||||
|
||||
private:
|
||||
JSGlobalContextRef m_context;
|
||||
|
@ -74,6 +81,7 @@ class RPCServer {
|
|||
ConcurrentDeque<json> m_callback_results;
|
||||
RPCObjectID m_session_id;
|
||||
RPCWorker m_worker;
|
||||
u_int64_t m_callback_call_counter;
|
||||
|
||||
static void run_callback(JSContextRef, JSObjectRef, JSObjectRef, size_t, const JSValueRef[], jsc::ReturnValue &);
|
||||
|
||||
|
|
Loading…
Reference in New Issue