diff --git a/lib/browser/index.js b/lib/browser/index.js index 583d897c..acf7c26d 100644 --- a/lib/browser/index.js +++ b/lib/browser/index.js @@ -28,11 +28,33 @@ import * as rpc from './rpc'; import * as util from './util'; const {debugHosts, debugPort} = NativeModules.Realm; -const listenersKey = Symbol(); rpc.registerTypeConverter(objectTypes.LIST, createList); rpc.registerTypeConverter(objectTypes.RESULTS, createResults); rpc.registerTypeConverter(objectTypes.OBJECT, createObject); +rpc.registerTypeConverter(objectTypes.REALM, createRealm); + +function createRealm(_, info) { + let realm = Object.create(Realm.prototype); + + setupRealm(realm, info.id); + return realm; +} + +function setupRealm(realm, realmId) { + realm[keys.id] = realmId; + realm[keys.realm] = realmId; + realm[keys.type] = objectTypes.REALM; + + [ + 'path', + 'readOnly', + 'schema', + 'schemaVersion', + ].forEach((name) => { + Object.defineProperty(realm, name, {get: util.getterForProperty(name)}); + }); +} export default class Realm { constructor(config) { @@ -63,20 +85,7 @@ export default class Realm { let realmId = rpc.createRealm(Array.from(arguments)); registerConstructors(realmId, constructors); - - this[keys.id] = realmId; - this[keys.realm] = realmId; - this[keys.type] = objectTypes.REALM; - this[listenersKey] = new Set(); - - [ - 'path', - 'readOnly', - 'schema', - 'schemaVersion', - ].forEach((name) => { - Object.defineProperty(this, name, {get: util.getterForProperty(name)}); - }); + setupRealm(this, realmId); } create(type, ...args) { @@ -96,62 +105,13 @@ export default class Realm { let method = util.createMethod(objectTypes.REALM, 'objects'); return method.apply(this, [type, ...args]); } - - addListener(name, callback) { - if (typeof callback != 'function') { - throw new Error('Realm.addListener must be passed a function!'); - } - if (name != 'change') { - throw new Error("Only 'change' notification is supported."); - } - this[listenersKey].add(callback); - } - - removeListener(name, callback) { - if (typeof callback != 'function') { - throw new Error('Realm.removeListener must be passed a function!'); - } - if (name != 'change') { - throw new Error("Only 'change' notification is supported."); - } - this[listenersKey].delete(callback); - } - - removeAllListeners(name) { - if (name != undefined && name != 'change') { - throw new Error("Only 'change' notification is supported."); - } - this[listenersKey].clear(); - } - - write(callback) { - let realmId = this[keys.realm]; - - if (!realmId) { - throw new TypeError('write method was not called on a Realm object!'); - } - if (typeof callback != 'function') { - throw new TypeError('Realm.write() must be passed a function!'); - } - - rpc.beginTransaction(realmId); - - try { - callback(); - } catch (e) { - rpc.cancelTransaction(realmId); - collections.fireMutationListeners(realmId); - throw e; - } - - rpc.commitTransaction(realmId); - - this[listenersKey].forEach((cb) => cb(this, 'change')); - } } // Non-mutating methods: util.createMethods(Realm.prototype, objectTypes.REALM, [ + 'addListener', + 'removeListener', + 'removeAllListeners', 'close', ]); @@ -159,6 +119,7 @@ util.createMethods(Realm.prototype, objectTypes.REALM, [ util.createMethods(Realm.prototype, objectTypes.REALM, [ 'delete', 'deleteAll', + 'write', ], true); Object.defineProperties(Realm, { diff --git a/lib/browser/rpc.js b/lib/browser/rpc.js index 8e729076..4e1571e0 100644 --- a/lib/browser/rpc.js +++ b/lib/browser/rpc.js @@ -22,6 +22,7 @@ import * as base64 from './base64'; import { keys, objectTypes } from './constants'; const {id: idKey, realm: realmKey} = keys; +const registeredCallbacks = []; const typeConverters = {}; let XMLHttpRequest = global.originalXMLHttpRequest || global.XMLHttpRequest; @@ -78,20 +79,16 @@ export function setProperty(realmId, id, name, value) { sendRequest('set_property', {realmId, id, name, value}); } -export function beginTransaction(realmId) { - sendRequest('begin_transaction', {realmId}); -} - -export function cancelTransaction(realmId) { - sendRequest('cancel_transaction', {realmId}); -} - -export function commitTransaction(realmId) { - sendRequest('commit_transaction', {realmId}); -} - export function clearTestState() { sendRequest('clear_test_state'); + + // Clear all registered callbacks. + registeredCallbacks.length = 0; +} + +function registerCallback(callback) { + let key = registeredCallbacks.indexOf(callback); + return key >= 0 ? key : (registeredCallbacks.push(callback) - 1); } function serialize(realmId, value) { @@ -99,7 +96,7 @@ function serialize(realmId, value) { return {type: objectTypes.UNDEFINED}; } if (typeof value == 'function') { - return {type: objectTypes.FUNCTION}; + return {type: objectTypes.FUNCTION, value: registerCallback(value)}; } if (!value || typeof value != 'object') { return {value: value}; @@ -189,5 +186,20 @@ function sendRequest(command, data, host = sessionHost) { throw new Error(error || `Invalid response for "${command}"`); } + let callback = response.callback; + if (callback != null) { + let result; + let error; + try { + let realmId = data.realmId; + let args = deserialize(realmId, response.arguments); + result = registeredCallbacks[callback].apply(null, args); + result = serialize(realmId, result); + } catch (e) { + error = e.message || ('' + e); + } + return sendRequest('callback_result', {callback, result, error}); + } + return response.result; } diff --git a/lib/browser/util.js b/lib/browser/util.js index 2e89cdf2..6f41bfc2 100644 --- a/lib/browser/util.js +++ b/lib/browser/util.js @@ -46,13 +46,13 @@ export function createMethod(type, name, mutates) { throw new TypeError(name + ' method was called on an object of the wrong type!'); } - let result = rpc.callMethod(realmId, id, name, Array.from(arguments)); - - if (mutates) { - fireMutationListeners(realmId); + try { + return rpc.callMethod(realmId, id, name, Array.from(arguments)); + } finally { + if (mutates) { + fireMutationListeners(realmId); + } } - - return result; }; } diff --git a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java index 4d337213..f09f3dad 100644 --- a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java +++ b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java @@ -1,7 +1,5 @@ package io.realm.react; -import android.os.Handler; -import android.os.Looper; import android.util.Log; import com.facebook.react.bridge.ReactApplicationContext; @@ -19,7 +17,6 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import fi.iki.elonen.NanoHTTPD; @@ -28,7 +25,6 @@ public class RealmReactModule extends ReactContextBaseJavaModule { private static boolean sentAnalytics = false; private AndroidWebServer webServer; - private Handler handler = new Handler(Looper.getMainLooper()); static { SoLoader.loadLibrary("realmreact"); @@ -156,25 +152,11 @@ public class RealmReactModule extends ReactContextBaseJavaModule { e.printStackTrace(); } final String json = map.get("postData"); - final String[] jsonResponse = new String[1]; - final CountDownLatch latch = new CountDownLatch(1); - // Process the command on the UI thread - handler.post(new Runnable() { - @Override - public void run() { - jsonResponse[0] = processChromeDebugCommand(cmdUri, json); - latch.countDown(); - } - }); - try { - latch.await(); - Response response = newFixedLengthResponse(jsonResponse[0]); - response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); - return response; - } catch (InterruptedException e) { - e.printStackTrace(); - return null; - } + final String jsonResponse = processChromeDebugCommand(cmdUri, json); + + Response response = newFixedLengthResponse(jsonResponse); + response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); + return response; } } diff --git a/react-native/ios/RealmReact/RealmReact.mm b/react-native/ios/RealmReact/RealmReact.mm index b5b850b6..7381bddc 100644 --- a/react-native/ios/RealmReact/RealmReact.mm +++ b/react-native/ios/RealmReact/RealmReact.mm @@ -213,23 +213,24 @@ RCT_REMAP_METHOD(emit, emitEvent:(NSString *)eventName withObject:(id)object) { [_webServer addDefaultHandlerForMethod:@"POST" requestClass:[GCDWebServerDataRequest class] processBlock:^GCDWebServerResponse *(GCDWebServerRequest* request) { + __typeof__(self) self = weakSelf; + RPCServer *rpcServer = self ? self->_rpcServer.get() : nullptr; GCDWebServerResponse *response; + try { - // perform all realm ops on the main thread - __block NSData *responseData; - dispatch_sync(dispatch_get_main_queue(), ^{ - RealmReact *self = weakSelf; - if (self) { - if (_rpcServer) { - json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); - std::string responseText = _rpcServer->perform_request(request.path.UTF8String, args).dump(); - responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; - return; - } - } + NSData *responseData; + + if (rpcServer) { + json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); + std::string responseText = rpcServer->perform_request(request.path.UTF8String, args).dump(); + + responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; + } + else { // we have been deallocated responseData = [NSData data]; - }); + } + response = [[GCDWebServerDataResponse alloc] initWithData:responseData contentType:@"application/json"]; } catch(std::exception &ex) { diff --git a/src/concurrent_deque.hpp b/src/concurrent_deque.hpp new file mode 100644 index 00000000..4c05b372 --- /dev/null +++ b/src/concurrent_deque.hpp @@ -0,0 +1,90 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#pragma once + +#include +#include +#include +#include + +namespace realm { + +class ConcurrentDequeTimeout : public std::exception { + public: + ConcurrentDequeTimeout() : std::exception() {} +}; + +template +class ConcurrentDeque { + public: + T pop_front(size_t timeout = 0) { + std::unique_lock lock(m_mutex); + while (m_deque.empty()) { + wait(lock, timeout); + } + T item = std::move(m_deque.front()); + m_deque.pop_front(); + return item; + } + + T pop_back(size_t timeout = 0) { + std::unique_lock lock(m_mutex); + while (m_deque.empty()) { + wait(lock, timeout); + } + T item = std::move(m_deque.back()); + m_deque.pop_back(); + return item; + } + + void push_front(T&& item) { + std::unique_lock lock(m_mutex); + m_deque.push_front(std::move(item)); + lock.unlock(); + m_condition.notify_one(); + } + + void push_back(T&& item) { + std::unique_lock lock(m_mutex); + m_deque.push_back(std::move(item)); + lock.unlock(); + m_condition.notify_one(); + } + + bool empty() { + std::lock_guard lock(m_mutex); + return m_deque.empty(); + } + + void wait(std::unique_lock &lock, size_t timeout = 0) { + if (!timeout) { + m_condition.wait(lock); + } + else if (m_condition.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::timeout) { + throw ConcurrentDequeTimeout(); + } + } + + private: + std::condition_variable m_condition; + std::mutex m_mutex; + std::deque m_deque; +}; + +} // realm diff --git a/src/ios/RealmJS.xcodeproj/project.pbxproj b/src/ios/RealmJS.xcodeproj/project.pbxproj index 0da03000..338dbdeb 100644 --- a/src/ios/RealmJS.xcodeproj/project.pbxproj +++ b/src/ios/RealmJS.xcodeproj/project.pbxproj @@ -157,6 +157,7 @@ F60103141CC4CC8C00EC01BA /* jsc_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_return_value.hpp; sourceTree = ""; }; F60103151CC4CCFD00EC01BA /* node_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_return_value.hpp; sourceTree = ""; }; F60103161CC4CD2F00EC01BA /* node_string.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_string.hpp; sourceTree = ""; }; + F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = concurrent_deque.hpp; sourceTree = ""; }; F61378781C18EAAC008BFC51 /* js */ = {isa = PBXFileReference; lastKnownFileType = folder; path = js; sourceTree = ""; }; F620F0521CAF0B600082977B /* js_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = js_class.hpp; sourceTree = ""; }; F620F0531CAF2EF70082977B /* jsc_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_class.hpp; sourceTree = ""; }; @@ -259,6 +260,7 @@ 029048351C042A3C00ABDED4 /* platform.hpp */, 0290480F1C0428DF00ABDED4 /* rpc.cpp */, 029048101C0428DF00ABDED4 /* rpc.hpp */, + F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */, ); name = RealmJS; path = ..; diff --git a/src/js_types.hpp b/src/js_types.hpp index 3a3f967d..79b6486d 100644 --- a/src/js_types.hpp +++ b/src/js_types.hpp @@ -253,7 +253,7 @@ struct Exception : public std::runtime_error { const Protected m_value; Exception(ContextType ctx, const std::string &message) - : std::runtime_error(message), m_value(value(ctx, message)) {} + : std::runtime_error(message), m_value(ctx, value(ctx, message)) {} Exception(ContextType ctx, const ValueType &val) : std::runtime_error(std::string(Value::to_string(ctx, val))), m_value(ctx, val) {} diff --git a/src/jsc/jsc_protected.hpp b/src/jsc/jsc_protected.hpp index 39a8e198..6c20d039 100644 --- a/src/jsc/jsc_protected.hpp +++ b/src/jsc/jsc_protected.hpp @@ -28,6 +28,7 @@ class Protected { JSGlobalContextRef m_context; public: + Protected() : m_context(nullptr) {} Protected(const Protected &other) : Protected(other.m_context) {} Protected(Protected &&other) : m_context(other.m_context) { other.m_context = nullptr; @@ -43,6 +44,9 @@ class Protected { operator JSGlobalContextRef() const { return m_context; } + operator bool() const { + return m_context != nullptr; + } }; template<> @@ -51,6 +55,7 @@ class Protected { JSValueRef m_value; public: + Protected() {} Protected(const Protected &other) : Protected(other.m_context, other.m_value) {} Protected(Protected &&other) : m_context(other.m_context), m_value(other.m_value) { other.m_context = nullptr; @@ -67,11 +72,15 @@ class Protected { operator JSValueRef() const { return m_value; } + operator bool() const { + return m_value != nullptr; + } }; template<> class Protected : public Protected { public: + Protected() : Protected() {} Protected(const Protected &other) : Protected(other) {} Protected(Protected &&other) : Protected(std::move(other)) {} Protected(JSContextRef ctx, JSObjectRef value) : Protected(ctx, value) {} diff --git a/src/node/node_protected.hpp b/src/node/node_protected.hpp index 7ae354f1..042a10a1 100644 --- a/src/node/node_protected.hpp +++ b/src/node/node_protected.hpp @@ -29,11 +29,15 @@ class Protected { Nan::Persistent> m_value; public: + Protected() {} Protected(v8::Local value) : m_value(value) {} operator v8::Local() const { return Nan::New(m_value); } + operator bool() const { + return m_value.isEmpty(); + } bool operator==(const v8::Local &other) const { return m_value == other; } @@ -55,6 +59,7 @@ namespace js { template<> class Protected : public node::Protected { public: + Protected() : node::Protected() {} Protected(v8::Local ctx) : node::Protected(ctx) {} operator v8::Isolate*() const { @@ -65,18 +70,21 @@ class Protected : public node::Protected class Protected : public node::Protected { public: + Protected() : node::Protected() {} Protected(v8::Isolate* isolate, v8::Local value) : node::Protected(value) {} }; template<> class Protected : public node::Protected { public: + Protected() : node::Protected() {} Protected(v8::Isolate* isolate, v8::Local object) : node::Protected(object) {} }; template<> class Protected : public node::Protected { public: + Protected() : node::Protected() {} Protected(v8::Isolate* isolate, v8::Local object) : node::Protected(object) {} }; diff --git a/src/rpc.cpp b/src/rpc.cpp index 2599df1d..86a32c85 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -22,9 +22,7 @@ #include #include "rpc.hpp" - #include "jsc_init.hpp" -#include "jsc_types.hpp" #include "base64.hpp" #include "object_accessor.hpp" @@ -43,10 +41,57 @@ static const char * const RealmObjectTypesFunction = "function"; static const char * const RealmObjectTypesList = "list"; static const char * const RealmObjectTypesObject = "object"; static const char * const RealmObjectTypesResults = "results"; +static const char * const RealmObjectTypesRealm = "realm"; static const char * const RealmObjectTypesUndefined = "undefined"; +static RPCServer*& get_rpc_server(JSGlobalContextRef ctx) { + static std::map s_map; + return s_map[ctx]; +} + +RPCWorker::RPCWorker() { + m_thread = std::thread([this]() { + // TODO: Create ALooper/CFRunLoop to support async calls. + while (!m_stop) { + try_run_task(); + } + }); +} + +RPCWorker::~RPCWorker() { + m_stop = true; + m_thread.join(); +} + +void RPCWorker::add_task(std::function task) { + m_tasks.push_back(std::packaged_task(task)); +} + +json RPCWorker::pop_task_result() { + // This might block until a future has been added. + auto future = m_futures.pop_back(); + + // This will block until a return value (or exception) is available. + return future.get(); +} + +void RPCWorker::try_run_task() { + try { + // Use a 10 millisecond timeout to keep this thread unblocked. + auto task = m_tasks.pop_back(10); + task(); + + // Since this can be called recursively, it must be pushed to the front of the queue *after* running the task. + m_futures.push_front(task.get_future()); + } + catch (ConcurrentDequeTimeout &) { + // We tried. + } +} + RPCServer::RPCServer() { m_context = JSGlobalContextCreate(NULL); + get_rpc_server(m_context) = this; // JavaScriptCore crashes when trying to walk up the native stack to print the stacktrace. // FIXME: Avoid having to do this! @@ -65,7 +110,7 @@ RPCServer::RPCServer() { return (json){{"result", m_session_id}}; }; m_requests["/create_realm"] = [this](const json dict) { - JSObjectRef realm_constructor = m_session_id ? m_objects[m_session_id] : NULL; + JSObjectRef realm_constructor = m_session_id ? JSObjectRef(m_objects[m_session_id]) : NULL; if (!realm_constructor) { throw std::runtime_error("Realm constructor not found!"); } @@ -82,27 +127,6 @@ RPCServer::RPCServer() { RPCObjectID realm_id = store_object(realm_object); return (json){{"result", realm_id}}; }; - m_requests["/begin_transaction"] = [this](const json dict) { - RPCObjectID realm_id = dict["realmId"].get(); - SharedRealm realm = *jsc::Object::get_internal>(m_objects[realm_id]); - - realm->begin_transaction(); - return json::object(); - }; - m_requests["/cancel_transaction"] = [this](const json dict) { - RPCObjectID realm_id = dict["realmId"].get(); - SharedRealm realm = *jsc::Object::get_internal>(m_objects[realm_id]); - - realm->cancel_transaction(); - return json::object(); - }; - m_requests["/commit_transaction"] = [this](const json dict) { - RPCObjectID realm_id = dict["realmId"].get(); - SharedRealm realm = *jsc::Object::get_internal>(m_objects[realm_id]); - - realm->commit_transaction(); - return json::object(); - }; m_requests["/call_method"] = [this](const json dict) { JSObjectRef object = m_objects[dict["id"].get()]; std::string method_string = dict["name"].get(); @@ -148,20 +172,17 @@ RPCServer::RPCServer() { }; m_requests["/dispose_object"] = [this](const json dict) { RPCObjectID oid = dict["id"].get(); - JSValueUnprotect(m_context, m_objects[oid]); m_objects.erase(oid); return json::object(); }; m_requests["/clear_test_state"] = [this](const json dict) { for (auto object : m_objects) { // The session ID points to the Realm constructor object, which should remain. - if (object.first == m_session_id) { - continue; + if (object.first != m_session_id) { + m_objects.erase(object.first); } - - JSValueUnprotect(m_context, object.second); - m_objects.erase(object.first); } + m_callbacks.clear(); JSGarbageCollect(m_context); js::delete_all_realms(); return json::object(); @@ -169,24 +190,74 @@ RPCServer::RPCServer() { } RPCServer::~RPCServer() { - for (auto item : m_objects) { - JSValueUnprotect(m_context, item.second); - } - + get_rpc_server(m_context) = nullptr; JSGlobalContextRelease(m_context); } -json RPCServer::perform_request(std::string name, json &args) { - try { +void RPCServer::run_callback(JSContextRef ctx, JSObjectRef this_object, size_t argc, const JSValueRef arguments[], jsc::ReturnValue &return_value) { + RPCServer* server = get_rpc_server(JSContextGetGlobalContext(ctx)); + if (!server) { + return; + } + + // The first argument was curried to be the callback id. + RPCObjectID callback_id = jsc::Value::to_number(ctx, arguments[0]); + JSObjectRef arguments_array = jsc::Object::create_array(ctx, uint32_t(argc - 1), argc == 1 ? nullptr : arguments + 1); + json arguments_json = server->serialize_json_value(arguments_array); + + // The next task on the stack will instruct the JS to run this callback. + // This captures references since it will be executed before exiting this function. + server->m_worker.add_task([&]() -> json { + return { + {"callback", callback_id}, + {"arguments", arguments_json}, + }; + }); + + // Wait for the next callback result to come off the result stack. + while (server->m_callback_results.empty()) { + // This may recursively bring us into another callback, hence the callback results being a stack. + server->m_worker.try_run_task(); + } + + json results = server->m_callback_results.pop_back(); + json error = results["error"]; + + // The callback id should be identical! + assert(callback_id == results["callback"].get()); + + if (!error.is_null()) { + throw jsc::Exception(ctx, error.get()); + } + + return_value.set(server->deserialize_json_value(results["result"])); +} + +json RPCServer::perform_request(std::string name, const json &args) { + std::lock_guard lock(m_request_mutex); + + // Only create_session is allowed without the correct session id (since it creates the session id). + if (name != "/create_session" && m_session_id != args["sessionId"].get()) { + return {{"error", "Invalid session ID"}}; + } + + // The callback_result message contains the return value (or exception) of a callback ran by run_callback(). + if (name == "/callback_result") { + json results(args); + m_callback_results.push_back(std::move(results)); + } + else { RPCRequest action = m_requests[name]; assert(action); - if (name == "/create_session" || m_session_id == args["sessionId"].get()) { + m_worker.add_task([=] { return action(args); - } - else { - return {{"error", "Invalid session ID"}}; - } + }); + } + + try { + // This will either be the return value (or exception) of the action perform, OR an instruction to run a callback. + return m_worker.pop_task_result(); } catch (std::exception &exception) { return {{"error", exception.what()}}; } @@ -194,9 +265,9 @@ json RPCServer::perform_request(std::string name, json &args) { RPCObjectID RPCServer::store_object(JSObjectRef object) { static RPCObjectID s_next_id = 1; + RPCObjectID next_id = s_next_id++; - JSValueProtect(m_context, object); - m_objects[next_id] = object; + m_objects.emplace(next_id, js::Protected(m_context, object)); return next_id; } @@ -244,6 +315,12 @@ json RPCServer::serialize_json_value(JSValueRef js_value) { {"schema", serialize_object_schema(results->get_object_schema())} }; } + else if (jsc::Object::is_instance>(m_context, js_object)) { + return { + {"type", RealmObjectTypesRealm}, + {"id", store_object(js_object)}, + }; + } else if (jsc::Value::is_array(m_context, js_object)) { uint32_t length = jsc::Object::validated_get_length(m_context, js_object); std::vector array; @@ -313,8 +390,20 @@ JSValueRef RPCServer::deserialize_json_value(const json dict) { std::string type_string = type.get(); if (type_string == RealmObjectTypesFunction) { - // FIXME: Make this actually call the function by its id once we need it to. - return JSObjectMakeFunction(m_context, NULL, 0, NULL, jsc::String(""), NULL, 1, NULL); + RPCObjectID callback_id = value.get(); + + if (!m_callbacks.count(callback_id)) { + JSObjectRef callback = JSObjectMakeFunctionWithCallback(m_context, nullptr, js::wrap); + + // Curry the first argument to be the callback id. + JSValueRef bind_args[2] = {jsc::Value::from_null(m_context), jsc::Value::from_number(m_context, callback_id)}; + JSValueRef bound_callback = jsc::Object::call_method(m_context, callback, "bind", 2, bind_args); + + callback = jsc::Value::to_function(m_context, bound_callback); + m_callbacks.emplace(callback_id, js::Protected(m_context, callback)); + } + + return m_callbacks.at(callback_id); } else if (type_string == RealmObjectTypesDictionary) { JSObjectRef js_object = jsc::Object::create_empty(m_context); diff --git a/src/rpc.hpp b/src/rpc.hpp index ae015c5c..a74db8d0 100644 --- a/src/rpc.hpp +++ b/src/rpc.hpp @@ -18,8 +18,14 @@ #pragma once +#include +#include +#include + +#include "concurrent_deque.hpp" #include "json.hpp" #include "jsc_types.hpp" +#include "jsc_protected.hpp" namespace realm { @@ -28,20 +34,43 @@ class ObjectSchema; namespace rpc { using json = nlohmann::json; + using RPCObjectID = u_int64_t; using RPCRequest = std::function; +class RPCWorker { + public: + RPCWorker(); + ~RPCWorker(); + + void add_task(std::function); + json pop_task_result(); + void try_run_task(); + + private: + bool m_stop = false; + std::thread m_thread; + ConcurrentDeque> m_tasks; + ConcurrentDeque> m_futures; +}; + class RPCServer { public: RPCServer(); ~RPCServer(); - json perform_request(std::string name, json &args); + json perform_request(std::string name, const json &args); private: JSGlobalContextRef m_context; + std::mutex m_request_mutex; std::map m_requests; - std::map m_objects; + std::map> m_objects; + std::map> m_callbacks; + ConcurrentDeque m_callback_results; RPCObjectID m_session_id; + RPCWorker m_worker; + + static void run_callback(JSContextRef, JSObjectRef, size_t, const JSValueRef[], jsc::ReturnValue &); RPCObjectID store_object(JSObjectRef object); diff --git a/tests/js/asserts.js b/tests/js/asserts.js index a2aece10..75ec6c31 100644 --- a/tests/js/asserts.js +++ b/tests/js/asserts.js @@ -94,7 +94,7 @@ module.exports = { } catch (e) { caught = true; - if (e != expectedException) { + if (e.message != expectedException.message) { throw new TestFailureError('Expected exception "' + expectedException + '" not thrown - instead caught: "' + e + '"'); } } diff --git a/tests/js/migration-tests.js b/tests/js/migration-tests.js index 366ac7f0..58c65535 100644 --- a/tests/js/migration-tests.js +++ b/tests/js/migration-tests.js @@ -48,7 +48,7 @@ module.exports = BaseTest.extend({ }); // migration function exceptions should propogate - var exception = 'expected exception'; + var exception = new Error('expected exception'); realm = undefined; TestCase.assertThrowsException(function() { realm = new Realm({schema: [], schemaVersion: 3, migration: function() {