diff --git a/lib/browser/rpc.js b/lib/browser/rpc.js index 37c219f1..5c04897a 100644 --- a/lib/browser/rpc.js +++ b/lib/browser/rpc.js @@ -212,6 +212,8 @@ function makeRequest(url, data) { return JSON.parse(responseText); } +let pollTimeoutId; + //returns an object from rpc serialized json value function deserialize_json_value(value) { let result = {}; @@ -230,17 +232,19 @@ function deserialize_json_value(value) { } function sendRequest(command, data, host = sessionHost) { - if (!host) { - throw new Error('Must first create RPC session with a valid host'); - } + clearTimeout(pollTimeoutId); + try { + if (!host) { + throw new Error('Must first create RPC session with a valid host'); + } - data = Object.assign({}, data, sessionId ? {sessionId} : null); + data = Object.assign({}, data, sessionId ? {sessionId} : null); - let url = 'http://' + host + '/' + command; - let response = makeRequest(url, data); + let url = 'http://' + host + '/' + command; + let response = makeRequest(url, data); - if (!response || response.error) { - let error = response && response.error; + if (!response || response.error) { + let error = response && response.error; // Remove the type prefix from the error message (e.g. "Error: "). if (error && error.replace) { @@ -261,22 +265,25 @@ function sendRequest(command, data, host = sessionHost) { throw new Error(error || `Invalid response for "${command}"`); } - - let callback = response.callback; - if (callback != null) { - let result; - let error; - try { - let realmId = data.realmId; - let thisObject = deserialize(realmId, response.this); - let args = deserialize(realmId, response.arguments); - result = registeredCallbacks[callback].apply(thisObject, args); - result = serialize(realmId, result); - } catch (e) { - error = e.message || ('' + e); + let callback = response.callback; + if (callback != null) { + let result; + let error; + try { + let realmId = data.realmId; + let thisObject = deserialize(realmId, response.this); + let args = deserialize(realmId, response.arguments); + result = registeredCallbacks[callback].apply(thisObject, args); + result = serialize(realmId, result); + } catch (e) { + error = e.message || ('' + e); + } + return sendRequest('callback_result', {callback, result, error}); } - return sendRequest('callback_result', {callback, result, error}); - } - return response.result; + return response.result; + } + finally { + pollTimeoutId = setTimeout(() => sendRequest('callbacks_poll'), 50); + } } diff --git a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java index 18dc5d1f..f09f65ec 100644 --- a/react-native/android/src/main/java/io/realm/react/RealmReactModule.java +++ b/react-native/android/src/main/java/io/realm/react/RealmReactModule.java @@ -2,6 +2,9 @@ package io.realm.react; import android.content.res.AssetManager; import android.os.Build; +import android.os.Handler; +import android.os.HandlerThread; +import android.os.Looper; import android.util.Log; import com.facebook.react.bridge.ReactApplicationContext; @@ -19,6 +22,9 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; import fi.iki.elonen.NanoHTTPD; @@ -37,6 +43,9 @@ class RealmReactModule extends ReactContextBaseJavaModule { SoLoader.loadLibrary("realmreact"); } + private Handler worker; + private HandlerThread workerThread; + public RealmReactModule(ReactApplicationContext reactContext) { super(reactContext); @@ -121,7 +130,9 @@ class RealmReactModule extends ReactContextBaseJavaModule { private void startWebServer() { setupChromeDebugModeRealmJsContext(); - webServer = new AndroidWebServer(DEFAULT_PORT); + startWorker(); + + webServer = new AndroidWebServer(DEFAULT_PORT, getReactApplicationContext()); try { webServer.start(); Log.i("Realm", "Starting the debugging WebServer, Host: " + webServer.getHostname() + " Port: " + webServer.getListeningPort()); @@ -130,20 +141,44 @@ class RealmReactModule extends ReactContextBaseJavaModule { } } + private void startWorker() { + workerThread = new HandlerThread("MyHandlerThread"); + workerThread.start(); + worker = new Handler(workerThread.getLooper()); + worker.postDelayed(new Runnable() { + @Override + public void run() { + boolean stop = tryRunTask(); + if (!stop) { + worker.postDelayed(this, 10); + } + } + }, 10); + } + private void stopWebServer() { if (webServer != null) { Log.i("Realm", "Stopping the webserver"); webServer.stop(); } + + if (workerThread != null) { + workerThread.quit(); + workerThread = null; + } } class AndroidWebServer extends NanoHTTPD { - public AndroidWebServer(int port) { + private ReactApplicationContext reactApplicationContext; + + public AndroidWebServer(int port, ReactApplicationContext reactApplicationContext) { super(port); + this.reactApplicationContext = reactApplicationContext; } - public AndroidWebServer(String hostname, int port) { + public AndroidWebServer(String hostname, int port, ReactApplicationContext reactApplicationContext) { super(hostname, port); + this.reactApplicationContext = reactApplicationContext; } @Override @@ -164,7 +199,7 @@ class RealmReactModule extends ReactContextBaseJavaModule { return response; } final String jsonResponse = processChromeDebugCommand(cmdUri, json); - + Response response = newFixedLengthResponse(jsonResponse); response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081"); return response; @@ -185,4 +220,7 @@ class RealmReactModule extends ReactContextBaseJavaModule { // this receives one command from Chrome debug then return the processing we should post back private native String processChromeDebugCommand(String cmd, String args); + + // this receives one command from Chrome debug then return the processing we should post back + private native boolean tryRunTask(); } diff --git a/src/android/io_realm_react_RealmReactModule.cpp b/src/android/io_realm_react_RealmReactModule.cpp index 8fcd4b21..e776bca0 100644 --- a/src/android/io_realm_react_RealmReactModule.cpp +++ b/src/android/io_realm_react_RealmReactModule.cpp @@ -77,6 +77,13 @@ JNIEXPORT jstring JNICALL Java_io_realm_react_RealmReactModule_processChromeDebu return env->NewStringUTF(response.dump().c_str()); } +JNIEXPORT jboolean JNICALL Java_io_realm_react_RealmReactModule_tryRunTask +(JNIEnv *env, jclass) +{ + jboolean result = s_rpc_server->try_run_task(); + return result; +} + JNIEXPORT jboolean JNICALL Java_io_realm_react_RealmReactModule_isContextInjected (JNIEnv *env, jclass) { diff --git a/src/android/io_realm_react_RealmReactModule.h b/src/android/io_realm_react_RealmReactModule.h index f285d404..763c8086 100644 --- a/src/android/io_realm_react_RealmReactModule.h +++ b/src/android/io_realm_react_RealmReactModule.h @@ -29,6 +29,13 @@ JNIEXPORT jlong JNICALL Java_io_realm_react_RealmReactModule_setupChromeDebugMod JNIEXPORT jstring JNICALL Java_io_realm_react_RealmReactModule_processChromeDebugCommand (JNIEnv *, jclass, jstring, jstring); +/* + * Class: io_realm_react_RealmReactModule + * Method: tryRunTask + */ +JNIEXPORT jboolean JNICALL Java_io_realm_react_RealmReactModule_tryRunTask +(JNIEnv *, jclass); + /* * Class: io_realm_react_RealmReactModule * Method: isContextInjected diff --git a/src/js_realm.hpp b/src/js_realm.hpp index db192cbf..d7584b21 100644 --- a/src/js_realm.hpp +++ b/src/js_realm.hpp @@ -738,6 +738,7 @@ void RealmClass::wait_for_download_completion(ContextType ctx, ObjectType thi ValueType callback_arguments[1]; callback_arguments[0] = object; + Function::callback(protected_ctx, protected_callback, protected_this, 1, callback_arguments); } diff --git a/src/rpc.cpp b/src/rpc.cpp index 5f6b9b2b..5917e7e8 100644 --- a/src/rpc.cpp +++ b/src/rpc.cpp @@ -28,6 +28,7 @@ #include "object_accessor.hpp" #include "shared_realm.hpp" #include "results.hpp" +#include using namespace realm; using namespace realm::rpc; @@ -83,12 +84,14 @@ RPCServer*& get_rpc_server(JSGlobalContextRef ctx) { } RPCWorker::RPCWorker() { - m_thread = std::thread([this]() { + //m_thread = std::thread([this]() { + //m_looper = ALooper_prepare(ALOOPER_PREPARE_ALLOW_NON_CALLBACKS); + // TODO: Create ALooper/CFRunLoop to support async calls. - while (!m_stop) { - try_run_task(); - } - }); + //while (!m_stop) { + //try_run_task(); + //} + //}); } RPCWorker::~RPCWorker() { @@ -107,23 +110,30 @@ json RPCWorker::pop_task_result() { return future.get(); } -void RPCWorker::try_run_task() { +bool RPCWorker::try_run_task() { + if (m_stop) { + return true; + } + // Use a 10 millisecond timeout to keep this thread unblocked. auto task = m_tasks.try_pop_back(10); if (!task) { - return; + return false; } (*task)(); // Since this can be called recursively, it must be pushed to the front of the queue *after* running the task. m_futures.push_front(task->get_future()); + + return m_stop; } void RPCWorker::stop() { if (!m_stop) { m_stop = true; m_thread.join(); + //m_looper = nullptr; } } @@ -288,6 +298,10 @@ RPCServer::RPCServer() { return json::object(); }; + + m_requests["/callbacks_poll"] = [this](const json dict) { + return json::object(); + }; } RPCServer::~RPCServer() { @@ -340,6 +354,7 @@ void RPCServer::run_callback(JSContextRef ctx, JSObjectRef function, JSObjectRef } return_value.set(server->deserialize_json_value(results["result"])); + } json RPCServer::perform_request(std::string name, const json &args) { @@ -388,6 +403,10 @@ json RPCServer::perform_request(std::string name, const json &args) { } } +bool RPCServer::try_run_task() { + return m_worker.try_run_task(); +} + RPCObjectID RPCServer::store_object(JSObjectRef object) { static RPCObjectID s_next_id = 1; diff --git a/src/rpc.hpp b/src/rpc.hpp index ed286d05..38c2a0d3 100644 --- a/src/rpc.hpp +++ b/src/rpc.hpp @@ -26,6 +26,7 @@ #include "json.hpp" #include "jsc_types.hpp" #include "jsc_protected.hpp" +#include namespace realm { @@ -45,11 +46,12 @@ class RPCWorker { void add_task(std::function); json pop_task_result(); - void try_run_task(); + bool try_run_task(); void stop(); private: bool m_stop = false; + //ALooper* m_looper; std::thread m_thread; ConcurrentDeque> m_tasks; ConcurrentDeque> m_futures; @@ -60,6 +62,7 @@ class RPCServer { RPCServer(); ~RPCServer(); json perform_request(std::string name, const json &args); + bool try_run_task(); private: JSGlobalContextRef m_context;