diff --git a/lib/browser/index.js b/lib/browser/index.js index 583d897c..8248b6ca 100644 --- a/lib/browser/index.js +++ b/lib/browser/index.js @@ -33,6 +33,30 @@ const listenersKey = Symbol(); rpc.registerTypeConverter(objectTypes.LIST, createList); rpc.registerTypeConverter(objectTypes.RESULTS, createResults); rpc.registerTypeConverter(objectTypes.OBJECT, createObject); +rpc.registerTypeConverter(objectTypes.REALM, createRealm); + +function createRealm(_, info) { + let realm = Object.create(Realm.prototype); + + setupRealm(realm, info.id); + return realm; +} + +function setupRealm(realm, realmId) { + realm[keys.id] = realmId; + realm[keys.realm] = realmId; + realm[keys.type] = objectTypes.REALM; + realm[listenersKey] = new Set(); + + [ + 'path', + 'readOnly', + 'schema', + 'schemaVersion', + ].forEach((name) => { + Object.defineProperty(realm, name, {get: util.getterForProperty(name)}); + }); +} export default class Realm { constructor(config) { @@ -63,20 +87,7 @@ export default class Realm { let realmId = rpc.createRealm(Array.from(arguments)); registerConstructors(realmId, constructors); - - this[keys.id] = realmId; - this[keys.realm] = realmId; - this[keys.type] = objectTypes.REALM; - this[listenersKey] = new Set(); - - [ - 'path', - 'readOnly', - 'schema', - 'schemaVersion', - ].forEach((name) => { - Object.defineProperty(this, name, {get: util.getterForProperty(name)}); - }); + setupRealm(this, realmId); } create(type, ...args) { diff --git a/lib/browser/rpc.js b/lib/browser/rpc.js index 8e729076..7e96784b 100644 --- a/lib/browser/rpc.js +++ b/lib/browser/rpc.js @@ -22,6 +22,7 @@ import * as base64 from './base64'; import { keys, objectTypes } from './constants'; const {id: idKey, realm: realmKey} = keys; +const registeredCallbacks = []; const typeConverters = {}; let XMLHttpRequest = global.originalXMLHttpRequest || global.XMLHttpRequest; @@ -92,6 +93,14 @@ export function commitTransaction(realmId) { export function clearTestState() { sendRequest('clear_test_state'); + + // Clear all registered callbacks. + registeredCallbacks.length = 0; +} + +function registerCallback(callback) { + let key = registeredCallbacks.indexOf(callback); + return key >= 0 ? key : (registeredCallbacks.push(callback) - 1); } function serialize(realmId, value) { @@ -99,7 +108,7 @@ function serialize(realmId, value) { return {type: objectTypes.UNDEFINED}; } if (typeof value == 'function') { - return {type: objectTypes.FUNCTION}; + return {type: objectTypes.FUNCTION, value: registerCallback(value)}; } if (!value || typeof value != 'object') { return {value: value}; @@ -189,5 +198,20 @@ function sendRequest(command, data, host = sessionHost) { throw new Error(error || `Invalid response for "${command}"`); } + let callback = response.callback; + if (callback != null) { + let result; + let error; + try { + let realmId = data.realmId; + let args = deserialize(realmId, response.arguments); + result = registeredCallbacks[callback].apply(null, args); + result = serialize(realmId, result); + } catch (e) { + error = e.message || ('' + e); + } + return sendRequest('callback_result', {callback, result, error}); + } + return response.result; } diff --git a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java index 4d337213..f09f3dad 100644 --- a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java +++ b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java @@ -1,7 +1,5 @@ package io.realm.react; -import android.os.Handler; -import android.os.Looper; import android.util.Log; import com.facebook.react.bridge.ReactApplicationContext; @@ -19,7 +17,6 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import fi.iki.elonen.NanoHTTPD; @@ -28,7 +25,6 @@ public class RealmReactModule extends ReactContextBaseJavaModule { private static boolean sentAnalytics = false; private AndroidWebServer webServer; - private Handler handler = new Handler(Looper.getMainLooper()); static { SoLoader.loadLibrary("realmreact"); @@ -156,25 +152,11 @@ public class RealmReactModule extends ReactContextBaseJavaModule { e.printStackTrace(); } final String json = map.get("postData"); - final String[] jsonResponse = new String[1]; - final CountDownLatch latch = new CountDownLatch(1); - // Process the command on the UI thread - handler.post(new Runnable() { - @Override - public void run() { - jsonResponse[0] = processChromeDebugCommand(cmdUri, json); - latch.countDown(); - } - }); - try { - latch.await(); - Response response = newFixedLengthResponse(jsonResponse[0]); - response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); - return response; - } catch (InterruptedException e) { - e.printStackTrace(); - return null; - } + final String jsonResponse = processChromeDebugCommand(cmdUri, json); + + Response response = newFixedLengthResponse(jsonResponse); + response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); + return response; } } diff --git a/react-native/ios/RealmReact/RealmReact.mm b/react-native/ios/RealmReact/RealmReact.mm index b5b850b6..7381bddc 100644 --- a/react-native/ios/RealmReact/RealmReact.mm +++ b/react-native/ios/RealmReact/RealmReact.mm @@ -213,23 +213,24 @@ RCT_REMAP_METHOD(emit, emitEvent:(NSString *)eventName withObject:(id)object) { [_webServer addDefaultHandlerForMethod:@"POST" requestClass:[GCDWebServerDataRequest class] processBlock:^GCDWebServerResponse *(GCDWebServerRequest* request) { + __typeof__(self) self = weakSelf; + RPCServer *rpcServer = self ? self->_rpcServer.get() : nullptr; GCDWebServerResponse *response; + try { - // perform all realm ops on the main thread - __block NSData *responseData; - dispatch_sync(dispatch_get_main_queue(), ^{ - RealmReact *self = weakSelf; - if (self) { - if (_rpcServer) { - json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); - std::string responseText = _rpcServer->perform_request(request.path.UTF8String, args).dump(); - responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; - return; - } - } + NSData *responseData; + + if (rpcServer) { + json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); + std::string responseText = rpcServer->perform_request(request.path.UTF8String, args).dump(); + + responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; + } + else { // we have been deallocated responseData = [NSData data]; - }); + } + response = [[GCDWebServerDataResponse alloc] initWithData:responseData contentType:@"application/json"]; } catch(std::exception &ex) { diff --git a/src/concurrent_deque.hpp b/src/concurrent_deque.hpp new file mode 100644 index 00000000..4c05b372 --- /dev/null +++ b/src/concurrent_deque.hpp @@ -0,0 +1,90 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include +#include + +namespace realm { + +class ConcurrentDequeTimeout : public std::exception { + public: + ConcurrentDequeTimeout() : std::exception() {} +}; + +template +class ConcurrentDeque { + public: + T pop_front(size_t timeout = 0) { + std::unique_lock lock(m_mutex); + while (m_deque.empty()) { + wait(lock, timeout); + } + T item = std::move(m_deque.front()); + m_deque.pop_front(); + return item; + } + + T pop_back(size_t timeout = 0) { + std::unique_lock lock(m_mutex); + while (m_deque.empty()) { + wait(lock, timeout); + } + T item = std::move(m_deque.back()); + m_deque.pop_back(); + return item; + } + + void push_front(T&& item) { + std::unique_lock lock(m_mutex); + m_deque.push_front(std::move(item)); + lock.unlock(); + m_condition.notify_one(); + } + + void push_back(T&& item) { + std::unique_lock lock(m_mutex); + m_deque.push_back(std::move(item)); + lock.unlock(); + m_condition.notify_one(); + } + + bool empty() { + std::lock_guard lock(m_mutex); + return m_deque.empty(); + } + + void wait(std::unique_lock &lock, size_t timeout = 0) { + if (!timeout) { + m_condition.wait(lock); + } + else if (m_condition.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::timeout) { + throw ConcurrentDequeTimeout(); + } + } + + private: + std::condition_variable m_condition; + std::mutex m_mutex; + std::deque m_deque; +}; + +} // realm diff --git a/src/ios/RealmJS.xcodeproj/project.pbxproj b/src/ios/RealmJS.xcodeproj/project.pbxproj index 0da03000..338dbdeb 100644 --- a/src/ios/RealmJS.xcodeproj/project.pbxproj +++ b/src/ios/RealmJS.xcodeproj/project.pbxproj @@ -157,6 +157,7 @@ F60103141CC4CC8C00EC01BA /* jsc_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_return_value.hpp; sourceTree = ""; }; F60103151CC4CCFD00EC01BA /* node_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_return_value.hpp; sourceTree = ""; }; F60103161CC4CD2F00EC01BA /* node_string.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_string.hpp; sourceTree = ""; }; + F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = concurrent_deque.hpp; sourceTree = ""; }; F61378781C18EAAC008BFC51 /* js */ = {isa = PBXFileReference; lastKnownFileType = folder; path = js; sourceTree = ""; }; F620F0521CAF0B600082977B /* js_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = js_class.hpp; sourceTree = ""; }; F620F0531CAF2EF70082977B /* jsc_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_class.hpp; sourceTree = ""; }; @@ -259,6 +260,7 @@ 029048351C042A3C00ABDED4 /* platform.hpp */, 0290480F1C0428DF00ABDED4 /* rpc.cpp */, 029048101C0428DF00ABDED4 /* rpc.hpp */, + F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */, ); name = RealmJS; path = ..; diff --git a/src/js_types.hpp b/src/js_types.hpp index 3a3f967d..79b6486d 100644 --- a/src/js_types.hpp +++ b/src/js_types.hpp @@ -253,7 +253,7 @@ struct Exception : public std::runtime_error { const Protected m_value; Exception(ContextType ctx, const std::string &message) - : std::runtime_error(message), m_value(value(ctx, message)) {} + : std::runtime_error(message), m_value(ctx, value(ctx, message)) {} Exception(ContextType ctx, const ValueType &val) : std::runtime_error(std::string(Value::to_string(ctx, val))), m_value(ctx, val) {} diff --git a/src/rpc.cpp b/src/rpc.cpp index 2599df1d..28246d98 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -22,9 +22,7 @@ #include #include "rpc.hpp" - #include "jsc_init.hpp" -#include "jsc_types.hpp" #include "base64.hpp" #include "object_accessor.hpp" @@ -43,10 +41,54 @@ static const char * const RealmObjectTypesFunction = "function"; static const char * const RealmObjectTypesList = "list"; static const char * const RealmObjectTypesObject = "object"; static const char * const RealmObjectTypesResults = "results"; +static const char * const RealmObjectTypesRealm = "realm"; static const char * const RealmObjectTypesUndefined = "undefined"; +static RPCServer*& get_rpc_server(JSGlobalContextRef ctx) { + static std::map s_map; + return s_map[ctx]; +} + +RPCWorker::RPCWorker() { + m_thread = std::thread([this]() { + // TODO: Create ALooper/CFRunLoop to support async calls. + while (!m_stop) { + try_run_task(); + } + }); +} + +RPCWorker::~RPCWorker() { + m_stop = true; + m_thread.join(); +} + +void RPCWorker::add_task(std::function task) { + m_tasks.push_back(std::packaged_task(task)); +} + +json RPCWorker::pop_task_result() { + auto future = m_futures.pop_back(); + return future.get(); +} + +void RPCWorker::try_run_task() { + try { + // Use a 10 millisecond timeout to keep this thread unblocked. + auto task = m_tasks.pop_back(10); + task(); + + // Since this can be called recursively, it must be pushed to the front of the queue *after* running the task. + m_futures.push_front(task.get_future()); + } + catch (ConcurrentDequeTimeout &) { + // We tried. + } +} + RPCServer::RPCServer() { m_context = JSGlobalContextCreate(NULL); + get_rpc_server(m_context) = this; // JavaScriptCore crashes when trying to walk up the native stack to print the stacktrace. // FIXME: Avoid having to do this! @@ -173,20 +215,71 @@ RPCServer::~RPCServer() { JSValueUnprotect(m_context, item.second); } + get_rpc_server(m_context) = nullptr; JSGlobalContextRelease(m_context); } -json RPCServer::perform_request(std::string name, json &args) { - try { - RPCRequest action = m_requests[name]; - assert(action); +void RPCServer::run_callback(JSContextRef ctx, JSObjectRef this_object, size_t argc, const JSValueRef arguments[], jsc::ReturnValue &return_value) { + RPCServer* server = get_rpc_server(JSContextGetGlobalContext(ctx)); + if (!server) { + return; + } - if (name == "/create_session" || m_session_id == args["sessionId"].get()) { - return action(args); + // The first argument was curried to be the callback id. + RPCObjectID callback_id = jsc::Value::to_number(ctx, arguments[0]); + JSObjectRef arguments_array = jsc::Object::create_array(ctx, argc - 1, argc == 1 ? nullptr : arguments + 1); + json arguments_json = server->serialize_json_value(arguments_array); + + // The next task on the stack will instruct the JS to run this callback. + server->m_worker.add_task([&]() -> json { + return { + {"callback", callback_id}, + {"arguments", arguments_json}, + }; + }); + + // Wait for the next callback result to come off the result stack. + while (server->m_callback_results.empty()) { + // This may recursively bring us into another callback, hence the callback results being a stack. + server->m_worker.try_run_task(); + } + + json results = server->m_callback_results.pop_back(); + json error = results["error"]; + + // The callback id should be identical! + assert(callback_id == results["callback"].get()); + + if (!error.is_null()) { + throw jsc::Exception(ctx, error.get()); + } + + return_value.set(server->deserialize_json_value(results["result"])); +} + +json RPCServer::perform_request(std::string name, const json &args) { + std::lock_guard lock(m_request_mutex); + + if (name == "/create_session" || m_session_id == args["sessionId"].get()) { + if (name == "/callback_result") { + json results(args); + m_callback_results.push_back(std::move(results)); } else { - return {{"error", "Invalid session ID"}}; + RPCRequest action = m_requests[name]; + assert(action); + + m_worker.add_task([=] { + return action(args); + }); } + } + else { + return {{"error", "Invalid session ID"}}; + } + + try { + return m_worker.pop_task_result(); } catch (std::exception &exception) { return {{"error", exception.what()}}; } @@ -244,6 +337,12 @@ json RPCServer::serialize_json_value(JSValueRef js_value) { {"schema", serialize_object_schema(results->get_object_schema())} }; } + else if (jsc::Object::is_instance>(m_context, js_object)) { + return { + {"type", RealmObjectTypesRealm}, + {"id", store_object(js_object)}, + }; + } else if (jsc::Value::is_array(m_context, js_object)) { uint32_t length = jsc::Object::validated_get_length(m_context, js_object); std::vector array; @@ -313,8 +412,12 @@ JSValueRef RPCServer::deserialize_json_value(const json dict) { std::string type_string = type.get(); if (type_string == RealmObjectTypesFunction) { - // FIXME: Make this actually call the function by its id once we need it to. - return JSObjectMakeFunction(m_context, NULL, 0, NULL, jsc::String(""), NULL, 1, NULL); + RPCObjectID callback_id = value.get(); + JSObjectRef callback = JSObjectMakeFunctionWithCallback(m_context, nullptr, js::wrap); + + // Curry the first argument to be the callback id. + JSValueRef bind_args[2] = {jsc::Value::from_null(m_context), jsc::Value::from_number(m_context, callback_id)}; + return jsc::Object::call_method(m_context, callback, "bind", 2, bind_args); } else if (type_string == RealmObjectTypesDictionary) { JSObjectRef js_object = jsc::Object::create_empty(m_context); diff --git a/src/rpc.hpp b/src/rpc.hpp index ae015c5c..260b8e4f 100644 --- a/src/rpc.hpp +++ b/src/rpc.hpp @@ -18,6 +18,11 @@ #pragma once +#include +#include +#include + +#include "concurrent_deque.hpp" #include "json.hpp" #include "jsc_types.hpp" @@ -28,20 +33,42 @@ class ObjectSchema; namespace rpc { using json = nlohmann::json; + using RPCObjectID = u_int64_t; using RPCRequest = std::function; +class RPCWorker { + public: + RPCWorker(); + ~RPCWorker(); + + void add_task(std::function); + json pop_task_result(); + void try_run_task(); + + private: + bool m_stop = false; + std::thread m_thread; + ConcurrentDeque> m_tasks; + ConcurrentDeque> m_futures; +}; + class RPCServer { public: RPCServer(); ~RPCServer(); - json perform_request(std::string name, json &args); + json perform_request(std::string name, const json &args); private: JSGlobalContextRef m_context; + std::mutex m_request_mutex; std::map m_requests; std::map m_objects; + ConcurrentDeque m_callback_results; RPCObjectID m_session_id; + RPCWorker m_worker; + + static void run_callback(JSContextRef, JSObjectRef, size_t, const JSValueRef[], jsc::ReturnValue &); RPCObjectID store_object(JSObjectRef object); diff --git a/tests/js/asserts.js b/tests/js/asserts.js index a2aece10..75ec6c31 100644 --- a/tests/js/asserts.js +++ b/tests/js/asserts.js @@ -94,7 +94,7 @@ module.exports = { } catch (e) { caught = true; - if (e != expectedException) { + if (e.message != expectedException.message) { throw new TestFailureError('Expected exception "' + expectedException + '" not thrown - instead caught: "' + e + '"'); } } diff --git a/tests/js/migration-tests.js b/tests/js/migration-tests.js index 366ac7f0..58c65535 100644 --- a/tests/js/migration-tests.js +++ b/tests/js/migration-tests.js @@ -48,7 +48,7 @@ module.exports = BaseTest.extend({ }); // migration function exceptions should propogate - var exception = 'expected exception'; + var exception = new Error('expected exception'); realm = undefined; TestCase.assertThrowsException(function() { realm = new Realm({schema: [], schemaVersion: 3, migration: function() {