Make migration tests work in Chrome debug mode

All realm operations for the RPC are now done in their own thread so it can be allowed to block on waiting for callback results. The recursive and multi-threaded nature of this makes it pretty tricky, but it works!
This commit is contained in:
Scott Kyle 2016-04-29 13:01:01 -07:00
parent 2b6b0b97f4
commit 32a8a2ed72
11 changed files with 306 additions and 66 deletions

View File

@ -33,6 +33,30 @@ const listenersKey = Symbol();
rpc.registerTypeConverter(objectTypes.LIST, createList); rpc.registerTypeConverter(objectTypes.LIST, createList);
rpc.registerTypeConverter(objectTypes.RESULTS, createResults); rpc.registerTypeConverter(objectTypes.RESULTS, createResults);
rpc.registerTypeConverter(objectTypes.OBJECT, createObject); rpc.registerTypeConverter(objectTypes.OBJECT, createObject);
rpc.registerTypeConverter(objectTypes.REALM, createRealm);
function createRealm(_, info) {
let realm = Object.create(Realm.prototype);
setupRealm(realm, info.id);
return realm;
}
function setupRealm(realm, realmId) {
realm[keys.id] = realmId;
realm[keys.realm] = realmId;
realm[keys.type] = objectTypes.REALM;
realm[listenersKey] = new Set();
[
'path',
'readOnly',
'schema',
'schemaVersion',
].forEach((name) => {
Object.defineProperty(realm, name, {get: util.getterForProperty(name)});
});
}
export default class Realm { export default class Realm {
constructor(config) { constructor(config) {
@ -63,20 +87,7 @@ export default class Realm {
let realmId = rpc.createRealm(Array.from(arguments)); let realmId = rpc.createRealm(Array.from(arguments));
registerConstructors(realmId, constructors); registerConstructors(realmId, constructors);
setupRealm(this, realmId);
this[keys.id] = realmId;
this[keys.realm] = realmId;
this[keys.type] = objectTypes.REALM;
this[listenersKey] = new Set();
[
'path',
'readOnly',
'schema',
'schemaVersion',
].forEach((name) => {
Object.defineProperty(this, name, {get: util.getterForProperty(name)});
});
} }
create(type, ...args) { create(type, ...args) {

View File

@ -22,6 +22,7 @@ import * as base64 from './base64';
import { keys, objectTypes } from './constants'; import { keys, objectTypes } from './constants';
const {id: idKey, realm: realmKey} = keys; const {id: idKey, realm: realmKey} = keys;
const registeredCallbacks = [];
const typeConverters = {}; const typeConverters = {};
let XMLHttpRequest = global.originalXMLHttpRequest || global.XMLHttpRequest; let XMLHttpRequest = global.originalXMLHttpRequest || global.XMLHttpRequest;
@ -92,6 +93,14 @@ export function commitTransaction(realmId) {
export function clearTestState() { export function clearTestState() {
sendRequest('clear_test_state'); sendRequest('clear_test_state');
// Clear all registered callbacks.
registeredCallbacks.length = 0;
}
function registerCallback(callback) {
let key = registeredCallbacks.indexOf(callback);
return key >= 0 ? key : (registeredCallbacks.push(callback) - 1);
} }
function serialize(realmId, value) { function serialize(realmId, value) {
@ -99,7 +108,7 @@ function serialize(realmId, value) {
return {type: objectTypes.UNDEFINED}; return {type: objectTypes.UNDEFINED};
} }
if (typeof value == 'function') { if (typeof value == 'function') {
return {type: objectTypes.FUNCTION}; return {type: objectTypes.FUNCTION, value: registerCallback(value)};
} }
if (!value || typeof value != 'object') { if (!value || typeof value != 'object') {
return {value: value}; return {value: value};
@ -189,5 +198,20 @@ function sendRequest(command, data, host = sessionHost) {
throw new Error(error || `Invalid response for "${command}"`); throw new Error(error || `Invalid response for "${command}"`);
} }
let callback = response.callback;
if (callback != null) {
let result;
let error;
try {
let realmId = data.realmId;
let args = deserialize(realmId, response.arguments);
result = registeredCallbacks[callback].apply(null, args);
result = serialize(realmId, result);
} catch (e) {
error = e.message || ('' + e);
}
return sendRequest('callback_result', {callback, result, error});
}
return response.result; return response.result;
} }

View File

@ -1,7 +1,5 @@
package io.realm.react; package io.realm.react;
import android.os.Handler;
import android.os.Looper;
import android.util.Log; import android.util.Log;
import com.facebook.react.bridge.ReactApplicationContext; import com.facebook.react.bridge.ReactApplicationContext;
@ -19,7 +17,6 @@ import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import fi.iki.elonen.NanoHTTPD; import fi.iki.elonen.NanoHTTPD;
@ -28,7 +25,6 @@ public class RealmReactModule extends ReactContextBaseJavaModule {
private static boolean sentAnalytics = false; private static boolean sentAnalytics = false;
private AndroidWebServer webServer; private AndroidWebServer webServer;
private Handler handler = new Handler(Looper.getMainLooper());
static { static {
SoLoader.loadLibrary("realmreact"); SoLoader.loadLibrary("realmreact");
@ -156,25 +152,11 @@ public class RealmReactModule extends ReactContextBaseJavaModule {
e.printStackTrace(); e.printStackTrace();
} }
final String json = map.get("postData"); final String json = map.get("postData");
final String[] jsonResponse = new String[1]; final String jsonResponse = processChromeDebugCommand(cmdUri, json);
final CountDownLatch latch = new CountDownLatch(1);
// Process the command on the UI thread Response response = newFixedLengthResponse(jsonResponse);
handler.post(new Runnable() { response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081");
@Override return response;
public void run() {
jsonResponse[0] = processChromeDebugCommand(cmdUri, json);
latch.countDown();
}
});
try {
latch.await();
Response response = newFixedLengthResponse(jsonResponse[0]);
response.addHeader("Access-Control-Allow-Origin", "http://localhost:8081");
return response;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
} }
} }

View File

@ -213,23 +213,24 @@ RCT_REMAP_METHOD(emit, emitEvent:(NSString *)eventName withObject:(id)object) {
[_webServer addDefaultHandlerForMethod:@"POST" [_webServer addDefaultHandlerForMethod:@"POST"
requestClass:[GCDWebServerDataRequest class] requestClass:[GCDWebServerDataRequest class]
processBlock:^GCDWebServerResponse *(GCDWebServerRequest* request) { processBlock:^GCDWebServerResponse *(GCDWebServerRequest* request) {
__typeof__(self) self = weakSelf;
RPCServer *rpcServer = self ? self->_rpcServer.get() : nullptr;
GCDWebServerResponse *response; GCDWebServerResponse *response;
try { try {
// perform all realm ops on the main thread NSData *responseData;
__block NSData *responseData;
dispatch_sync(dispatch_get_main_queue(), ^{ if (rpcServer) {
RealmReact *self = weakSelf; json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]);
if (self) { std::string responseText = rpcServer->perform_request(request.path.UTF8String, args).dump();
if (_rpcServer) {
json args = json::parse([[(GCDWebServerDataRequest *)request text] UTF8String]); responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()];
std::string responseText = _rpcServer->perform_request(request.path.UTF8String, args).dump(); }
responseData = [NSData dataWithBytes:responseText.c_str() length:responseText.length()]; else {
return;
}
}
// we have been deallocated // we have been deallocated
responseData = [NSData data]; responseData = [NSData data];
}); }
response = [[GCDWebServerDataResponse alloc] initWithData:responseData contentType:@"application/json"]; response = [[GCDWebServerDataResponse alloc] initWithData:responseData contentType:@"application/json"];
} }
catch(std::exception &ex) { catch(std::exception &ex) {

90
src/concurrent_deque.hpp Normal file
View File

@ -0,0 +1,90 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#pragma once
#include <condition_variable>
#include <deque>
#include <exception>
#include <mutex>
namespace realm {
class ConcurrentDequeTimeout : public std::exception {
public:
ConcurrentDequeTimeout() : std::exception() {}
};
template <typename T>
class ConcurrentDeque {
public:
T pop_front(size_t timeout = 0) {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_deque.empty()) {
wait(lock, timeout);
}
T item = std::move(m_deque.front());
m_deque.pop_front();
return item;
}
T pop_back(size_t timeout = 0) {
std::unique_lock<std::mutex> lock(m_mutex);
while (m_deque.empty()) {
wait(lock, timeout);
}
T item = std::move(m_deque.back());
m_deque.pop_back();
return item;
}
void push_front(T&& item) {
std::unique_lock<std::mutex> lock(m_mutex);
m_deque.push_front(std::move(item));
lock.unlock();
m_condition.notify_one();
}
void push_back(T&& item) {
std::unique_lock<std::mutex> lock(m_mutex);
m_deque.push_back(std::move(item));
lock.unlock();
m_condition.notify_one();
}
bool empty() {
std::lock_guard<std::mutex> lock(m_mutex);
return m_deque.empty();
}
void wait(std::unique_lock<std::mutex> &lock, size_t timeout = 0) {
if (!timeout) {
m_condition.wait(lock);
}
else if (m_condition.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::timeout) {
throw ConcurrentDequeTimeout();
}
}
private:
std::condition_variable m_condition;
std::mutex m_mutex;
std::deque<T> m_deque;
};
} // realm

View File

@ -157,6 +157,7 @@
F60103141CC4CC8C00EC01BA /* jsc_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_return_value.hpp; sourceTree = "<group>"; }; F60103141CC4CC8C00EC01BA /* jsc_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_return_value.hpp; sourceTree = "<group>"; };
F60103151CC4CCFD00EC01BA /* node_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_return_value.hpp; sourceTree = "<group>"; }; F60103151CC4CCFD00EC01BA /* node_return_value.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_return_value.hpp; sourceTree = "<group>"; };
F60103161CC4CD2F00EC01BA /* node_string.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_string.hpp; sourceTree = "<group>"; }; F60103161CC4CD2F00EC01BA /* node_string.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_string.hpp; sourceTree = "<group>"; };
F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = concurrent_deque.hpp; sourceTree = "<group>"; };
F61378781C18EAAC008BFC51 /* js */ = {isa = PBXFileReference; lastKnownFileType = folder; path = js; sourceTree = "<group>"; }; F61378781C18EAAC008BFC51 /* js */ = {isa = PBXFileReference; lastKnownFileType = folder; path = js; sourceTree = "<group>"; };
F620F0521CAF0B600082977B /* js_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = js_class.hpp; sourceTree = "<group>"; }; F620F0521CAF0B600082977B /* js_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = js_class.hpp; sourceTree = "<group>"; };
F620F0531CAF2EF70082977B /* jsc_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_class.hpp; sourceTree = "<group>"; }; F620F0531CAF2EF70082977B /* jsc_class.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = jsc_class.hpp; sourceTree = "<group>"; };
@ -259,6 +260,7 @@
029048351C042A3C00ABDED4 /* platform.hpp */, 029048351C042A3C00ABDED4 /* platform.hpp */,
0290480F1C0428DF00ABDED4 /* rpc.cpp */, 0290480F1C0428DF00ABDED4 /* rpc.cpp */,
029048101C0428DF00ABDED4 /* rpc.hpp */, 029048101C0428DF00ABDED4 /* rpc.hpp */,
F6079B181CD3EB9000BD2401 /* concurrent_deque.hpp */,
); );
name = RealmJS; name = RealmJS;
path = ..; path = ..;

View File

@ -253,7 +253,7 @@ struct Exception : public std::runtime_error {
const Protected<ValueType> m_value; const Protected<ValueType> m_value;
Exception(ContextType ctx, const std::string &message) Exception(ContextType ctx, const std::string &message)
: std::runtime_error(message), m_value(value(ctx, message)) {} : std::runtime_error(message), m_value(ctx, value(ctx, message)) {}
Exception(ContextType ctx, const ValueType &val) Exception(ContextType ctx, const ValueType &val)
: std::runtime_error(std::string(Value<T>::to_string(ctx, val))), m_value(ctx, val) {} : std::runtime_error(std::string(Value<T>::to_string(ctx, val))), m_value(ctx, val) {}

View File

@ -22,9 +22,7 @@
#include <string> #include <string>
#include "rpc.hpp" #include "rpc.hpp"
#include "jsc_init.hpp" #include "jsc_init.hpp"
#include "jsc_types.hpp"
#include "base64.hpp" #include "base64.hpp"
#include "object_accessor.hpp" #include "object_accessor.hpp"
@ -43,10 +41,54 @@ static const char * const RealmObjectTypesFunction = "function";
static const char * const RealmObjectTypesList = "list"; static const char * const RealmObjectTypesList = "list";
static const char * const RealmObjectTypesObject = "object"; static const char * const RealmObjectTypesObject = "object";
static const char * const RealmObjectTypesResults = "results"; static const char * const RealmObjectTypesResults = "results";
static const char * const RealmObjectTypesRealm = "realm";
static const char * const RealmObjectTypesUndefined = "undefined"; static const char * const RealmObjectTypesUndefined = "undefined";
static RPCServer*& get_rpc_server(JSGlobalContextRef ctx) {
static std::map<JSGlobalContextRef, RPCServer*> s_map;
return s_map[ctx];
}
RPCWorker::RPCWorker() {
m_thread = std::thread([this]() {
// TODO: Create ALooper/CFRunLoop to support async calls.
while (!m_stop) {
try_run_task();
}
});
}
RPCWorker::~RPCWorker() {
m_stop = true;
m_thread.join();
}
void RPCWorker::add_task(std::function<json()> task) {
m_tasks.push_back(std::packaged_task<json()>(task));
}
json RPCWorker::pop_task_result() {
auto future = m_futures.pop_back();
return future.get();
}
void RPCWorker::try_run_task() {
try {
// Use a 10 millisecond timeout to keep this thread unblocked.
auto task = m_tasks.pop_back(10);
task();
// Since this can be called recursively, it must be pushed to the front of the queue *after* running the task.
m_futures.push_front(task.get_future());
}
catch (ConcurrentDequeTimeout &) {
// We tried.
}
}
RPCServer::RPCServer() { RPCServer::RPCServer() {
m_context = JSGlobalContextCreate(NULL); m_context = JSGlobalContextCreate(NULL);
get_rpc_server(m_context) = this;
// JavaScriptCore crashes when trying to walk up the native stack to print the stacktrace. // JavaScriptCore crashes when trying to walk up the native stack to print the stacktrace.
// FIXME: Avoid having to do this! // FIXME: Avoid having to do this!
@ -173,20 +215,71 @@ RPCServer::~RPCServer() {
JSValueUnprotect(m_context, item.second); JSValueUnprotect(m_context, item.second);
} }
get_rpc_server(m_context) = nullptr;
JSGlobalContextRelease(m_context); JSGlobalContextRelease(m_context);
} }
json RPCServer::perform_request(std::string name, json &args) { void RPCServer::run_callback(JSContextRef ctx, JSObjectRef this_object, size_t argc, const JSValueRef arguments[], jsc::ReturnValue &return_value) {
try { RPCServer* server = get_rpc_server(JSContextGetGlobalContext(ctx));
RPCRequest action = m_requests[name]; if (!server) {
assert(action); return;
}
if (name == "/create_session" || m_session_id == args["sessionId"].get<RPCObjectID>()) { // The first argument was curried to be the callback id.
return action(args); RPCObjectID callback_id = jsc::Value::to_number(ctx, arguments[0]);
JSObjectRef arguments_array = jsc::Object::create_array(ctx, argc - 1, argc == 1 ? nullptr : arguments + 1);
json arguments_json = server->serialize_json_value(arguments_array);
// The next task on the stack will instruct the JS to run this callback.
server->m_worker.add_task([&]() -> json {
return {
{"callback", callback_id},
{"arguments", arguments_json},
};
});
// Wait for the next callback result to come off the result stack.
while (server->m_callback_results.empty()) {
// This may recursively bring us into another callback, hence the callback results being a stack.
server->m_worker.try_run_task();
}
json results = server->m_callback_results.pop_back();
json error = results["error"];
// The callback id should be identical!
assert(callback_id == results["callback"].get<RPCObjectID>());
if (!error.is_null()) {
throw jsc::Exception(ctx, error.get<std::string>());
}
return_value.set(server->deserialize_json_value(results["result"]));
}
json RPCServer::perform_request(std::string name, const json &args) {
std::lock_guard<std::mutex> lock(m_request_mutex);
if (name == "/create_session" || m_session_id == args["sessionId"].get<RPCObjectID>()) {
if (name == "/callback_result") {
json results(args);
m_callback_results.push_back(std::move(results));
} }
else { else {
return {{"error", "Invalid session ID"}}; RPCRequest action = m_requests[name];
assert(action);
m_worker.add_task([=] {
return action(args);
});
} }
}
else {
return {{"error", "Invalid session ID"}};
}
try {
return m_worker.pop_task_result();
} catch (std::exception &exception) { } catch (std::exception &exception) {
return {{"error", exception.what()}}; return {{"error", exception.what()}};
} }
@ -244,6 +337,12 @@ json RPCServer::serialize_json_value(JSValueRef js_value) {
{"schema", serialize_object_schema(results->get_object_schema())} {"schema", serialize_object_schema(results->get_object_schema())}
}; };
} }
else if (jsc::Object::is_instance<js::RealmClass<jsc::Types>>(m_context, js_object)) {
return {
{"type", RealmObjectTypesRealm},
{"id", store_object(js_object)},
};
}
else if (jsc::Value::is_array(m_context, js_object)) { else if (jsc::Value::is_array(m_context, js_object)) {
uint32_t length = jsc::Object::validated_get_length(m_context, js_object); uint32_t length = jsc::Object::validated_get_length(m_context, js_object);
std::vector<json> array; std::vector<json> array;
@ -313,8 +412,12 @@ JSValueRef RPCServer::deserialize_json_value(const json dict) {
std::string type_string = type.get<std::string>(); std::string type_string = type.get<std::string>();
if (type_string == RealmObjectTypesFunction) { if (type_string == RealmObjectTypesFunction) {
// FIXME: Make this actually call the function by its id once we need it to. RPCObjectID callback_id = value.get<RPCObjectID>();
return JSObjectMakeFunction(m_context, NULL, 0, NULL, jsc::String(""), NULL, 1, NULL); JSObjectRef callback = JSObjectMakeFunctionWithCallback(m_context, nullptr, js::wrap<run_callback>);
// Curry the first argument to be the callback id.
JSValueRef bind_args[2] = {jsc::Value::from_null(m_context), jsc::Value::from_number(m_context, callback_id)};
return jsc::Object::call_method(m_context, callback, "bind", 2, bind_args);
} }
else if (type_string == RealmObjectTypesDictionary) { else if (type_string == RealmObjectTypesDictionary) {
JSObjectRef js_object = jsc::Object::create_empty(m_context); JSObjectRef js_object = jsc::Object::create_empty(m_context);

View File

@ -18,6 +18,11 @@
#pragma once #pragma once
#include <functional>
#include <future>
#include <thread>
#include "concurrent_deque.hpp"
#include "json.hpp" #include "json.hpp"
#include "jsc_types.hpp" #include "jsc_types.hpp"
@ -28,20 +33,42 @@ class ObjectSchema;
namespace rpc { namespace rpc {
using json = nlohmann::json; using json = nlohmann::json;
using RPCObjectID = u_int64_t; using RPCObjectID = u_int64_t;
using RPCRequest = std::function<json(const json)>; using RPCRequest = std::function<json(const json)>;
class RPCWorker {
public:
RPCWorker();
~RPCWorker();
void add_task(std::function<json()>);
json pop_task_result();
void try_run_task();
private:
bool m_stop = false;
std::thread m_thread;
ConcurrentDeque<std::packaged_task<json()>> m_tasks;
ConcurrentDeque<std::future<json>> m_futures;
};
class RPCServer { class RPCServer {
public: public:
RPCServer(); RPCServer();
~RPCServer(); ~RPCServer();
json perform_request(std::string name, json &args); json perform_request(std::string name, const json &args);
private: private:
JSGlobalContextRef m_context; JSGlobalContextRef m_context;
std::mutex m_request_mutex;
std::map<std::string, RPCRequest> m_requests; std::map<std::string, RPCRequest> m_requests;
std::map<RPCObjectID, JSObjectRef> m_objects; std::map<RPCObjectID, JSObjectRef> m_objects;
ConcurrentDeque<json> m_callback_results;
RPCObjectID m_session_id; RPCObjectID m_session_id;
RPCWorker m_worker;
static void run_callback(JSContextRef, JSObjectRef, size_t, const JSValueRef[], jsc::ReturnValue &);
RPCObjectID store_object(JSObjectRef object); RPCObjectID store_object(JSObjectRef object);

View File

@ -94,7 +94,7 @@ module.exports = {
} }
catch (e) { catch (e) {
caught = true; caught = true;
if (e != expectedException) { if (e.message != expectedException.message) {
throw new TestFailureError('Expected exception "' + expectedException + '" not thrown - instead caught: "' + e + '"'); throw new TestFailureError('Expected exception "' + expectedException + '" not thrown - instead caught: "' + e + '"');
} }
} }

View File

@ -48,7 +48,7 @@ module.exports = BaseTest.extend({
}); });
// migration function exceptions should propogate // migration function exceptions should propogate
var exception = 'expected exception'; var exception = new Error('expected exception');
realm = undefined; realm = undefined;
TestCase.assertThrowsException(function() { TestCase.assertThrowsException(function() {
realm = new Realm({schema: [], schemaVersion: 3, migration: function() { realm = new Realm({schema: [], schemaVersion: 3, migration: function() {