Always invoke sync callbacks on the JavaScript thread (#809)

* Always invoke sync callbacks on the JavaScript thread

* Address review comments

* Make Callback::m_state be a shared, not a weak pointer

* Use the protected GlobalContext in the session bind handler

* Only dispatch on the event loop if we’re not on its thread
This commit is contained in:
Yavor Georgiev 2017-01-23 12:37:13 +01:00 committed by GitHub
parent 41869a3f94
commit 5183ff9a8c
3 changed files with 113 additions and 23 deletions

View File

@ -43,6 +43,7 @@
50C671001E1D2D31003CB63C /* thread_safe_reference.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 5D02C7791E1C83650048C13E /* thread_safe_reference.cpp */; };
5D25F5A11D6284FD00EBBB30 /* libz.tbd in Frameworks */ = {isa = PBXBuildFile; fileRef = F63FF3301C16434400B3B8E0 /* libz.tbd */; };
8507156E1E2CFCD000E548DB /* object_notifier.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 8507156B1E2CFC0100E548DB /* object_notifier.cpp */; };
850715AF1E32366B00E548DB /* event_loop_dispatcher.hpp in Sources */ = {isa = PBXBuildFile; fileRef = 850715AE1E32366B00E548DB /* event_loop_dispatcher.hpp */; };
F61378791C18EAC5008BFC51 /* js in Resources */ = {isa = PBXBuildFile; fileRef = F61378781C18EAAC008BFC51 /* js */; };
F63FF2C61C12469E00B3B8E0 /* jsc_init.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 029048011C0428DF00ABDED4 /* jsc_init.cpp */; };
F63FF2C91C12469E00B3B8E0 /* js_realm.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 029048071C0428DF00ABDED4 /* js_realm.cpp */; };
@ -176,6 +177,7 @@
5D02C77A1E1C83650048C13E /* thread_safe_reference.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = thread_safe_reference.hpp; path = src/thread_safe_reference.hpp; sourceTree = "<group>"; };
8507156B1E2CFC0100E548DB /* object_notifier.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = object_notifier.cpp; sourceTree = "<group>"; };
8507156C1E2CFC0100E548DB /* object_notifier.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = object_notifier.hpp; sourceTree = "<group>"; };
850715AE1E32366B00E548DB /* event_loop_dispatcher.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = event_loop_dispatcher.hpp; sourceTree = "<group>"; };
F60102CF1CBB814A00EC01BA /* node_init.hpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.h; path = node_init.hpp; sourceTree = "<group>"; };
F60102D11CBB865A00EC01BA /* jsc_init.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = jsc_init.hpp; sourceTree = "<group>"; };
F60102E31CBBB19700EC01BA /* node_object_accessor.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = node_object_accessor.hpp; sourceTree = "<group>"; };
@ -336,6 +338,7 @@
F6874A441CAD2ACD00EEEE36 /* JSC */,
F62BF9001CAC72C40022BCDC /* Node */,
F62A35141C18E783004A917D /* Object Store */,
850715AE1E32366B00E548DB /* event_loop_dispatcher.hpp */,
0290934A1CEFA9170009769E /* js_observable.hpp */,
F60102F71CBDA6D400EC01BA /* js_collection.hpp */,
029048041C0428DF00ABDED4 /* js_list.hpp */,

View File

@ -0,0 +1,98 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#pragma once
#include <queue>
#include <tuple>
#include <mutex>
#include "util/event_loop_signal.hpp"
// FIXME: remove once we switch to C++ 17 where we can use std::apply
namespace _apply_polyfill {
template <class Tuple, class F, size_t... Is>
constexpr auto apply_impl(Tuple t, F f, std::index_sequence<Is...>) {
return f(std::get<Is>(t)...);
}
template <class Tuple, class F>
constexpr auto apply(Tuple t, F f) {
return apply_impl(t, f, std::make_index_sequence<std::tuple_size<Tuple>{}>{});
}
}
namespace realm {
template <class F>
class EventLoopDispatcher;
template <typename... Args>
class EventLoopDispatcher<void(Args...)> {
using Tuple = std::tuple<typename std::remove_reference<Args>::type...>;
private:
struct State {
public:
State(std::function<void(Args...)> func) : m_func(func) { }
const std::function<void(Args...)> m_func;
std::queue<Tuple> m_invocations;
std::mutex m_mutex;
};
const std::shared_ptr<State> m_state;
struct Callback {
std::shared_ptr<State> m_state;
public:
void operator()()
{
std::unique_lock<std::mutex> lock(m_state->m_mutex);
while (!m_state->m_invocations.empty()) {
auto& tuple = m_state->m_invocations.front();
::_apply_polyfill::apply(tuple, m_state->m_func);
m_state->m_invocations.pop();
}
}
};
const std::shared_ptr<EventLoopSignal<Callback>> m_signal;
const std::thread::id m_thread = std::this_thread::get_id();
public:
EventLoopDispatcher(std::function<void(Args...)> func)
: m_state(std::make_shared<State>(func))
, m_signal(std::make_shared<EventLoopSignal<Callback>>(Callback{m_state}))
{
}
void operator()(Args... args)
{
if (m_thread == std::this_thread::get_id()) {
m_state->m_func(args...);
return;
}
{
std::unique_lock<std::mutex> lock(m_state->m_mutex);
m_state->m_invocations.push(std::make_tuple(args...));
}
m_signal->notify();
}
};
}

View File

@ -22,6 +22,7 @@
#include <map>
#include <set>
#include "event_loop_dispatcher.hpp"
#include "platform.hpp"
#include "js_class.hpp"
#include "js_collection.hpp"
@ -264,30 +265,18 @@ void SyncClass<T>::populate_sync_config(ContextType ctx, ObjectType realm_constr
Protected<ObjectType> protected_sync(ctx, sync_constructor);
Protected<typename T::GlobalContext> protected_ctx(Context<T>::get_global_context(ctx));
auto handler = [=](const std::string& path, const realm::SyncConfig& config, std::shared_ptr<SyncSession>) {
EventLoopDispatcher<SyncBindSessionHandler> bind([=](const std::string& path, const realm::SyncConfig& config, std::shared_ptr<SyncSession>) {
HANDLESCOPE
if (config.user->is_admin()) {
// FIXME: This log-in callback is called while the object store still holds some sync-related locks.
// Notify the object store of the access token asynchronously to avoid the deadlock that would result
// from reentering the object store here.
auto thread = std::thread([path, config]{
auto session = SyncManager::shared().get_existing_active_session(path);
session->refresh_access_token(config.user->refresh_token(), config.realm_url);
});
thread.detach();
}
else {
ObjectType user_constructor = Object::validated_get_object(ctx, protected_sync, std::string("User"));
FunctionType authenticate = Object::validated_get_function(ctx, user_constructor, std::string("_authenticateRealm"));
ObjectType user_constructor = Object::validated_get_object(protected_ctx, protected_sync, std::string("User"));
FunctionType authenticate = Object::validated_get_function(protected_ctx, user_constructor, std::string("_authenticateRealm"));
ValueType arguments[3];
arguments[0] = Value::from_string(protected_ctx, path.c_str());
arguments[1] = Value::from_string(protected_ctx, config.realm_url.c_str());
arguments[2] = refresh;
ObjectType user = create_object<T, UserClass<T>>(ctx, new SharedUser(config.user));
Function::call(protected_ctx, authenticate, user, 3, arguments);
}
};
ValueType arguments[3];
arguments[0] = Value::from_string(protected_ctx, path.c_str());
arguments[1] = Value::from_string(protected_ctx, config.realm_url.c_str());
arguments[2] = refresh;
ObjectType user = create_object<T, UserClass<T>>(protected_ctx, new SharedUser(config.user));
Function::call(protected_ctx, authenticate, user, 3, arguments);
});
ObjectType user = Object::validated_get_object(ctx, sync_config_object, "user");
SharedUser shared_user = *get_internal<T, UserClass<T>>(user);
@ -300,7 +289,7 @@ void SyncClass<T>::populate_sync_config(ContextType ctx, ObjectType realm_constr
// FIXME - use make_shared
config.sync_config = std::shared_ptr<SyncConfig>(new SyncConfig{shared_user, raw_realm_url,
SyncSessionStopPolicy::AfterChangesUploaded,
handler, [=](auto, SyncError) {}});
std::move(bind), [=](auto, SyncError) {}});
config.schema_mode = SchemaMode::Additive;
config.path = realm::SyncManager::shared().path_for_realm(shared_user->identity(), raw_realm_url);
}