From 32a8a2ed72d7bcdbe2f9fdccf7bad78e51c05bf1 Mon Sep 17 00:00:00 2001 From: Scott Kyle Date: Fri, 29 Apr 2016 13:01:01 -0700 Subject: [PATCH] Make migration tests work in Chrome debug mode All realm operations for the RPC are now done in their own thread so it can be allowed to block on waiting for callback results. The recursive and multi-threaded nature of this makes it pretty tricky, but it works! --- lib/browser/index.js | 39 ++++-- lib/browser/rpc.js | 26 +++- .../java/io/realm/react/RealmReactModule.java | 28 +--- react-native/ios/RealmReact/RealmReact.mm | 27 ++-- src/concurrent_deque.hpp | 90 +++++++++++++ src/ios/RealmJS.xcodeproj/project.pbxproj | 2 + src/js_types.hpp | 2 +- src/rpc.cpp | 125 ++++++++++++++++-- src/rpc.hpp | 29 +++- tests/js/asserts.js | 2 +- tests/js/migration-tests.js | 2 +- 11 files changed, 306 insertions(+), 66 deletions(-) create mode 100644 src/concurrent_deque.hpp diff --git a/lib/browser/index.js b/lib/browser/index.js index 583d897c..8248b6ca 100644 --- a/lib/browser/index.js +++ b/lib/browser/index.js @@ -33,6 +33,30 @@ const listenersKey = Symbol(); rpc.registerTypeConverter(objectTypes.LIST, createList); rpc.registerTypeConverter(objectTypes.RESULTS, createResults); rpc.registerTypeConverter(objectTypes.OBJECT, createObject); +rpc.registerTypeConverter(objectTypes.REALM, createRealm); + +function createRealm(_, info) { + let realm = Object.create(Realm.prototype); + + setupRealm(realm, info.id); + return realm; +} + +function setupRealm(realm, realmId) { + realm[keys.id] = realmId; + realm[keys.realm] = realmId; + realm[keys.type] = objectTypes.REALM; + realm[listenersKey] = new Set(); + + [ + 'path', + 'readOnly', + 'schema', + 'schemaVersion', + ].forEach((name) => { + Object.defineProperty(realm, name, {get: util.getterForProperty(name)}); + }); +} export default class Realm { constructor(config) { @@ -63,20 +87,7 @@ export default class Realm { let realmId = rpc.createRealm(Array.from(arguments)); registerConstructors(realmId, constructors); - - this[keys.id] = realmId; - this[keys.realm] = realmId; - this[keys.type] = objectTypes.REALM; - this[listenersKey] = new Set(); - - [ - 'path', - 'readOnly', - 'schema', - 'schemaVersion', - ].forEach((name) => { - Object.defineProperty(this, name, {get: util.getterForProperty(name)}); - }); + setupRealm(this, realmId); } create(type, ...args) { diff --git a/lib/browser/rpc.js b/lib/browser/rpc.js index 8e729076..7e96784b 100644 --- a/lib/browser/rpc.js +++ b/lib/browser/rpc.js @@ -22,6 +22,7 @@ import * as base64 from './base64'; import { keys, objectTypes } from './constants'; const {id: idKey, realm: realmKey} = keys; +const registeredCallbacks = []; const typeConverters = {}; let XMLHttpRequest = global.originalXMLHttpRequest || global.XMLHttpRequest; @@ -92,6 +93,14 @@ export function commitTransaction(realmId) { export function clearTestState() { sendRequest('clear_test_state'); + + // Clear all registered callbacks. + registeredCallbacks.length = 0; +} + +function registerCallback(callback) { + let key = registeredCallbacks.indexOf(callback); + return key >= 0 ? key : (registeredCallbacks.push(callback) - 1); } function serialize(realmId, value) { @@ -99,7 +108,7 @@ function serialize(realmId, value) { return {type: objectTypes.UNDEFINED}; } if (typeof value == 'function') { - return {type: objectTypes.FUNCTION}; + return {type: objectTypes.FUNCTION, value: registerCallback(value)}; } if (!value || typeof value != 'object') { return {value: value}; @@ -189,5 +198,20 @@ function sendRequest(command, data, host = sessionHost) { throw new Error(error || `Invalid response for "${command}"`); } + let callback = response.callback; + if (callback != null) { + let result; + let error; + try { + let realmId = data.realmId; + let args = deserialize(realmId, response.arguments); + result = registeredCallbacks[callback].apply(null, args); + result = serialize(realmId, result); + } catch (e) { + error = e.message || ('' + e); + } + return sendRequest('callback_result', {callback, result, error}); + } + return response.result; } diff --git a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java index 4d337213..f09f3dad 100644 --- a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java +++ b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java @@ -1,7 +1,5 @@ package io.realm.react; -import android.os.Handler; -import android.os.Looper; import android.util.Log; import com.facebook.react.bridge.ReactApplicationContext; @@ -19,7 +17,6 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import fi.iki.elonen.NanoHTTPD; @@ -28,7 +25,6 @@ public class RealmReactModule extends ReactContextBaseJavaModule { private static boolean sentAnalytics = false; private AndroidWebServer webServer; - private Handler handler = new Handler(Looper.getMainLooper()); static { SoLoader.loadLibrary("realmreact"); @@ -156,25 +152,11 @@ public class RealmReactModule extends ReactContextBaseJavaModule { e.printStackTrace(); } final String json = map.get("postData"); - final String[] jsonResponse = new String[1]; - final CountDownLatch latch = new CountDownLatch(1); - // Process the command on the UI thread - handler.post(new Runnable() { - @Override - public void run() { - jsonResponse[0] = processChromeDebugCommand(cmdUri, json); - latch.countDown(); - } - }); - try { - latch.await(); - Response response = newFixedLengthResponse(jsonResponse[0]); - response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); - return response; - } catch (InterruptedException e) { - e.printStackTrace(); - return null; - } + final String jsonResponse = processChromeDebugCommand(cmdUri, json); + + Response response = newFixedLengthResponse(jsonResponse); + response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); + return response; } } diff --git a/react-native/ios/RealmReact/RealmReact.mm b/react-native/ios/RealmReact/RealmReact.mm index b5b850b6..7381bddc 100644 --- a/react-native/ios/RealmReact/RealmReact.mm +++ b/react-native/ios/RealmReact/RealmReact.mm @@ -213,23 +213,24 @@ RCT_REMAP_METHOD(emit, emitEvent:(NSString *)eventName withObject:(id)object) { [_webServer addDefaultHandlerForMethod:@"POST" requestClass:[GCDWebServerDataRequest class] processBlock:^GCDWebServerResponse *(GCDWebServerRequest* request) { + __typeof__(self) self = weakSelf; + RPCServer *rpcServer = self ? self->_rpcServer.get() : nullptr; GCDWebServerResponse *response; + try { - // perform all realm ops on the main thread - __block NSData *responseData; - dispatch_sync(dispatch_get_main_queue(), ^{ - RealmReact *self = weakSelf; - if (self) { - if (_rpcServer) { - json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); - std::string responseText = _rpcServer->perform_request(request.path.UTF8String, args).dump(); - responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; - return; - } - } + NSData *responseData; + + if (rpcServer) { + json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); + std::string responseText = rpcServer->perform_request(request.path.UTF8String, args).dump(); + + responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; + } + else { // we have been deallocated responseData = [NSData data]; - }); + } + response = [[GCDWebServerDataResponse alloc] initWithData:responseData contentType:@"application/json"]; } catch(std::exception &ex) { diff --git a/src/concurrent_deque.hpp b/src/concurrent_deque.hpp new file mode 100644 index 00000000..4c05b372 --- /dev/null +++ b/src/concurrent_deque.hpp @@ -0,0 +1,90 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include +#include + +namespace realm { + +class ConcurrentDequeTimeout : public std::exception { + public: + ConcurrentDequeTimeout() : std::exception() {} +}; + +template +class ConcurrentDeque { + public: + T pop_front(size_t timeout = 0) { + std::unique_lock lock(m_mutex); + while (m_deque.empty()) { + wait(lock, timeout); + } + T item = std::move(m_deque.front()); + m_deque.pop_front(); + return item; + } + + T pop_back(size_t timeout = 0) { + std::unique_lock lock(m_mutex); + while (m_deque.empty()) { + wait(lock, timeout); + } + T item = std::move(m_deque.back()); + m_deque.pop_back(); + return item; + } + + void push_front(T&& item) { + std::unique_lock lock(m_mutex); + m_deque.push_front(std::move(item)); + lock.unlock(); + m_condition.notify_one(); + } + + void push_back(T&& item) { + std::unique_lock lock(m_mutex); + m_deque.push_back(std::move(item)); + lock.unlock(); + m_condition.notify_one(); + } + + bool empty() { + std::lock_guard lock(m_mutex); + return m_deque.empty(); + } + + void wait(std::unique_lock &lock, size_t timeout = 0) { + if (!timeout) { + m_condition.wait(lock); + } + else if (m_condition.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::timeout) { + throw ConcurrentDequeTimeout(); + } + } + + private: + std::condition_variable m_condition; + std::mutex m_mutex; + std::deque m_deque; +}; + +} // realm diff --git a/src/ios/RealmJS.xcodeproj/project.pbxproj b/src/ios/RealmJS.xcodeproj/project.pbxproj index 0da03000..338dbdeb 100644 --- a/src/ios/RealmJS.xcodeproj/project.pbxproj +++ b/src/ios/RealmJS.xcodeproj/project.pbxproj @@ -157,6 +157,7 @@ F60103141CC4CC8C00EC01BA /* jsc_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_return_value.hpp; sourceTree = ""; }; F60103151CC4CCFD00EC01BA /* node_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_return_value.hpp; sourceTree = ""; }; F60103161CC4CD2F00EC01BA /* node_string.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_string.hpp; sourceTree = ""; }; + F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = concurrent_deque.hpp; sourceTree = ""; }; F61378781C18EAAC008BFC51 /* js */ = {isa = PBXFileReference; lastKnownFileType = folder; path = js; sourceTree = ""; }; F620F0521CAF0B600082977B /* js_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = js_class.hpp; sourceTree = ""; }; F620F0531CAF2EF70082977B /* jsc_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_class.hpp; sourceTree = ""; }; @@ -259,6 +260,7 @@ 029048351C042A3C00ABDED4 /* platform.hpp */, 0290480F1C0428DF00ABDED4 /* rpc.cpp */, 029048101C0428DF00ABDED4 /* rpc.hpp */, + F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */, ); name = RealmJS; path = ..; diff --git a/src/js_types.hpp b/src/js_types.hpp index 3a3f967d..79b6486d 100644 --- a/src/js_types.hpp +++ b/src/js_types.hpp @@ -253,7 +253,7 @@ struct Exception : public std::runtime_error { const Protected m_value; Exception(ContextType ctx, const std::string &message) - : std::runtime_error(message), m_value(value(ctx, message)) {} + : std::runtime_error(message), m_value(ctx, value(ctx, message)) {} Exception(ContextType ctx, const ValueType &val) : std::runtime_error(std::string(Value::to_string(ctx, val))), m_value(ctx, val) {} diff --git a/src/rpc.cpp b/src/rpc.cpp index 2599df1d..28246d98 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -22,9 +22,7 @@ #include #include "rpc.hpp" - #include "jsc_init.hpp" -#include "jsc_types.hpp" #include "base64.hpp" #include "object_accessor.hpp" @@ -43,10 +41,54 @@ static const char * const RealmObjectTypesFunction = "function"; static const char * const RealmObjectTypesList = "list"; static const char * const RealmObjectTypesObject = "object"; static const char * const RealmObjectTypesResults = "results"; +static const char * const RealmObjectTypesRealm = "realm"; static const char * const RealmObjectTypesUndefined = "undefined"; +static RPCServer*& get_rpc_server(JSGlobalContextRef ctx) { + static std::map s_map; + return s_map[ctx]; +} + +RPCWorker::RPCWorker() { + m_thread = std::thread([this]() { + // TODO: Create ALooper/CFRunLoop to support async calls. + while (!m_stop) { + try_run_task(); + } + }); +} + +RPCWorker::~RPCWorker() { + m_stop = true; + m_thread.join(); +} + +void RPCWorker::add_task(std::function task) { + m_tasks.push_back(std::packaged_task(task)); +} + +json RPCWorker::pop_task_result() { + auto future = m_futures.pop_back(); + return future.get(); +} + +void RPCWorker::try_run_task() { + try { + // Use a 10 millisecond timeout to keep this thread unblocked. + auto task = m_tasks.pop_back(10); + task(); + + // Since this can be called recursively, it must be pushed to the front of the queue *after* running the task. + m_futures.push_front(task.get_future()); + } + catch (ConcurrentDequeTimeout &) { + // We tried. + } +} + RPCServer::RPCServer() { m_context = JSGlobalContextCreate(NULL); + get_rpc_server(m_context) = this; // JavaScriptCore crashes when trying to walk up the native stack to print the stacktrace. // FIXME: Avoid having to do this! @@ -173,20 +215,71 @@ RPCServer::~RPCServer() { JSValueUnprotect(m_context, item.second); } + get_rpc_server(m_context) = nullptr; JSGlobalContextRelease(m_context); } -json RPCServer::perform_request(std::string name, json &args) { - try { - RPCRequest action = m_requests[name]; - assert(action); +void RPCServer::run_callback(JSContextRef ctx, JSObjectRef this_object, size_t argc, const JSValueRef arguments[], jsc::ReturnValue &return_value) { + RPCServer* server = get_rpc_server(JSContextGetGlobalContext(ctx)); + if (!server) { + return; + } - if (name == "/create_session" || m_session_id == args["sessionId"].get()) { - return action(args); + // The first argument was curried to be the callback id. + RPCObjectID callback_id = jsc::Value::to_number(ctx, arguments[0]); + JSObjectRef arguments_array = jsc::Object::create_array(ctx, argc - 1, argc == 1 ? nullptr : arguments + 1); + json arguments_json = server->serialize_json_value(arguments_array); + + // The next task on the stack will instruct the JS to run this callback. + server->m_worker.add_task([&]() -> json { + return { + {"callback", callback_id}, + {"arguments", arguments_json}, + }; + }); + + // Wait for the next callback result to come off the result stack. + while (server->m_callback_results.empty()) { + // This may recursively bring us into another callback, hence the callback results being a stack. + server->m_worker.try_run_task(); + } + + json results = server->m_callback_results.pop_back(); + json error = results["error"]; + + // The callback id should be identical! + assert(callback_id == results["callback"].get()); + + if (!error.is_null()) { + throw jsc::Exception(ctx, error.get()); + } + + return_value.set(server->deserialize_json_value(results["result"])); +} + +json RPCServer::perform_request(std::string name, const json &args) { + std::lock_guard lock(m_request_mutex); + + if (name == "/create_session" || m_session_id == args["sessionId"].get()) { + if (name == "/callback_result") { + json results(args); + m_callback_results.push_back(std::move(results)); } else { - return {{"error", "Invalid session ID"}}; + RPCRequest action = m_requests[name]; + assert(action); + + m_worker.add_task([=] { + return action(args); + }); } + } + else { + return {{"error", "Invalid session ID"}}; + } + + try { + return m_worker.pop_task_result(); } catch (std::exception &exception) { return {{"error", exception.what()}}; } @@ -244,6 +337,12 @@ json RPCServer::serialize_json_value(JSValueRef js_value) { {"schema", serialize_object_schema(results->get_object_schema())} }; } + else if (jsc::Object::is_instance>(m_context, js_object)) { + return { + {"type", RealmObjectTypesRealm}, + {"id", store_object(js_object)}, + }; + } else if (jsc::Value::is_array(m_context, js_object)) { uint32_t length = jsc::Object::validated_get_length(m_context, js_object); std::vector array; @@ -313,8 +412,12 @@ JSValueRef RPCServer::deserialize_json_value(const json dict) { std::string type_string = type.get(); if (type_string == RealmObjectTypesFunction) { - // FIXME: Make this actually call the function by its id once we need it to. - return JSObjectMakeFunction(m_context, NULL, 0, NULL, jsc::String(""), NULL, 1, NULL); + RPCObjectID callback_id = value.get(); + JSObjectRef callback = JSObjectMakeFunctionWithCallback(m_context, nullptr, js::wrap); + + // Curry the first argument to be the callback id. + JSValueRef bind_args[2] = {jsc::Value::from_null(m_context), jsc::Value::from_number(m_context, callback_id)}; + return jsc::Object::call_method(m_context, callback, "bind", 2, bind_args); } else if (type_string == RealmObjectTypesDictionary) { JSObjectRef js_object = jsc::Object::create_empty(m_context); diff --git a/src/rpc.hpp b/src/rpc.hpp index ae015c5c..260b8e4f 100644 --- a/src/rpc.hpp +++ b/src/rpc.hpp @@ -18,6 +18,11 @@ #pragma once +#include +#include +#include + +#include "concurrent_deque.hpp" #include "json.hpp" #include "jsc_types.hpp" @@ -28,20 +33,42 @@ class ObjectSchema; namespace rpc { using json = nlohmann::json; + using RPCObjectID = u_int64_t; using RPCRequest = std::function; +class RPCWorker { + public: + RPCWorker(); + ~RPCWorker(); + + void add_task(std::function); + json pop_task_result(); + void try_run_task(); + + private: + bool m_stop = false; + std::thread m_thread; + ConcurrentDeque> m_tasks; + ConcurrentDeque> m_futures; +}; + class RPCServer { public: RPCServer(); ~RPCServer(); - json perform_request(std::string name, json &args); + json perform_request(std::string name, const json &args); private: JSGlobalContextRef m_context; + std::mutex m_request_mutex; std::map m_requests; std::map m_objects; + ConcurrentDeque m_callback_results; RPCObjectID m_session_id; + RPCWorker m_worker; + + static void run_callback(JSContextRef, JSObjectRef, size_t, const JSValueRef[], jsc::ReturnValue &); RPCObjectID store_object(JSObjectRef object); diff --git a/tests/js/asserts.js b/tests/js/asserts.js index a2aece10..75ec6c31 100644 --- a/tests/js/asserts.js +++ b/tests/js/asserts.js @@ -94,7 +94,7 @@ module.exports = { } catch (e) { caught = true; - if (e != expectedException) { + if (e.message != expectedException.message) { throw new TestFailureError('Expected exception "' + expectedException + '" not thrown - instead caught: "' + e + '"'); } } diff --git a/tests/js/migration-tests.js b/tests/js/migration-tests.js index 366ac7f0..58c65535 100644 --- a/tests/js/migration-tests.js +++ b/tests/js/migration-tests.js @@ -48,7 +48,7 @@ module.exports = BaseTest.extend({ }); // migration function exceptions should propogate - var exception = 'expected exception'; + var exception = new Error('expected exception'); realm = undefined; TestCase.assertThrowsException(function() { realm = new Realm({schema: [], schemaVersion: 3, migration: function() {