Decouple Realm instance tracking from interprocess notifications

This commit is contained in:
Thomas Goyne 2015-08-21 12:27:27 -07:00
parent c3a9489b02
commit d6daa052e8
7 changed files with 182 additions and 79 deletions

View File

@ -28,8 +28,10 @@ set(HEADERS
if(APPLE)
include_directories(impl/apple)
list(APPEND SOURCES
impl/apple/cached_realm.cpp
impl/apple/external_commit_helper.cpp)
list(APPEND HEADERS
impl/apple/cached_realm.hpp
impl/apple/external_commit_helper.hpp)
find_library(CF_LIBRARY CoreFoundation)
endif()

View File

@ -0,0 +1,93 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "cached_realm.hpp"
#include "shared_realm.hpp"
using namespace realm;
using namespace realm::_impl;
CachedRealm::CachedRealm(const std::shared_ptr<Realm>& realm, bool cache)
: m_realm(realm)
, m_cache(cache)
{
struct RefCountedWeakPointer {
std::weak_ptr<Realm> realm;
std::atomic<size_t> ref_count = {1};
};
CFRunLoopSourceContext ctx{};
ctx.info = new RefCountedWeakPointer{realm};
ctx.perform = [](void* info) {
if (auto realm = static_cast<RefCountedWeakPointer*>(info)->realm.lock()) {
realm->notify();
}
};
ctx.retain = [](const void* info) {
static_cast<RefCountedWeakPointer*>(const_cast<void*>(info))->ref_count.fetch_add(1, std::memory_order_relaxed);
return info;
};
ctx.release = [](const void* info) {
auto ptr = static_cast<RefCountedWeakPointer*>(const_cast<void*>(info));
if (ptr->ref_count.fetch_add(-1, std::memory_order_acq_rel) == 1) {
delete ptr;
}
};
m_runloop = CFRunLoopGetCurrent();
CFRetain(m_runloop);
m_signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(m_runloop, m_signal, kCFRunLoopDefaultMode);
}
CachedRealm::CachedRealm(CachedRealm&& rgt)
{
*this = std::move(rgt);
}
CachedRealm& CachedRealm::operator=(CachedRealm&& rgt)
{
m_realm = std::move(rgt.m_realm);
m_thread_id = rgt.m_thread_id;
m_cache = rgt.m_cache;
m_runloop = rgt.m_runloop;
m_signal = rgt.m_signal;
rgt.m_runloop = nullptr;
rgt.m_signal = nullptr;
return *this;
}
CachedRealm::~CachedRealm()
{
if (m_signal) {
CFRunLoopSourceInvalidate(m_signal);
CFRelease(m_signal);
CFRelease(m_runloop);
}
}
void CachedRealm::notify()
{
CFRunLoopSourceSignal(m_signal);
// Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle
// waiting for events
CFRunLoopWakeUp(m_runloop);
}

View File

@ -0,0 +1,65 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_CACHED_REALM_HPP
#define REALM_CACHED_REALM_HPP
#include <CoreFoundation/CFRunLoop.h>
#include <memory>
#include <thread>
namespace realm {
class Realm;
namespace _impl {
class CachedRealm {
public:
CachedRealm(const std::shared_ptr<Realm>& realm, bool cache);
~CachedRealm();
CachedRealm(CachedRealm&&);
CachedRealm& operator=(CachedRealm&&);
CachedRealm(const CachedRealm&) = delete;
CachedRealm& operator=(const CachedRealm&) = delete;
// Get a strong reference to the cached realm
std::shared_ptr<Realm> realm() const { return m_realm.lock(); }
// Does this CachedRealm store a Realm instance that should be used on the current thread?
bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); }
bool expired() const { return m_realm.expired(); }
// Asyncronously call notify() on the Realm on the appropriate thread
void notify();
private:
std::weak_ptr<Realm> m_realm;
std::thread::id m_thread_id = std::this_thread::get_id();
bool m_cache = false;
CFRunLoopRef m_runloop;
CFRunLoopSourceRef m_signal;
};
} // namespace _impl
} // namespace realm
#endif // REALM_CACHED_REALM_HPP

View File

@ -143,45 +143,10 @@ ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
ExternalCommitHelper::~ExternalCommitHelper()
{
REALM_ASSERT_DEBUG(m_realms.empty());
notify_fd(m_shutdown_write_fd);
m_thread.wait(); // Wait for the thread to exit
}
void ExternalCommitHelper::add_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
// Create the runloop source
CFRunLoopSourceContext ctx{};
ctx.info = realm;
ctx.perform = [](void* info) {
static_cast<Realm*>(info)->notify();
};
CFRunLoopRef runloop = CFRunLoopGetCurrent();
CFRetain(runloop);
CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(runloop, signal, kCFRunLoopDefaultMode);
m_realms.push_back({realm, runloop, signal});
}
void ExternalCommitHelper::remove_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto it = m_realms.begin(); it != m_realms.end(); ++it) {
if (it->realm == realm) {
CFRunLoopSourceInvalidate(it->signal);
CFRelease(it->signal);
CFRelease(it->runloop);
m_realms.erase(it);
return;
}
}
REALM_TERMINATE("Realm not registered");
}
void ExternalCommitHelper::listen()
{
pthread_setname_np("RLMRealm notification listener");
@ -216,14 +181,7 @@ void ExternalCommitHelper::listen()
}
assert(event.ident == (uint32_t)m_notify_fd);
std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto const& realm : m_realms) {
CFRunLoopSourceSignal(realm.signal);
// Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle
// waiting for events
CFRunLoopWakeUp(realm.runloop);
}
m_parent.on_change();
}
}

View File

@ -20,9 +20,7 @@
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <CoreFoundation/CFRunLoop.h>
#include <functional>
#include <future>
#include <mutex>
#include <vector>
namespace realm {
@ -31,16 +29,12 @@ class Realm;
namespace _impl {
class RealmCoordinator;
// FIXME: split IPC from the local cross-thread stuff
// both are platform-specific but need to be useable separately
class ExternalCommitHelper {
public:
ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper();
void notify_others();
void add_realm(Realm* realm);
void remove_realm(Realm* realm);
private:
// A RAII holder for a file descriptor which automatically closes the wrapped
@ -75,13 +69,6 @@ private:
RealmCoordinator& m_parent;
// Currently registered realms and the signal for delivering notifications
// to them
std::vector<PerRealmInfo> m_realms;
// Mutex which guards m_realms
std::mutex m_realms_mutex;
// The listener thread
std::future<void> m_thread;

View File

@ -18,17 +18,13 @@
#include "realm_coordinator.hpp"
#include "cached_realm.hpp"
#include "external_commit_helper.hpp"
#include "object_store.hpp"
using namespace realm;
using namespace realm::_impl;
struct realm::_impl::CachedRealm {
std::weak_ptr<Realm> realm;
std::thread::id thread_id;
};
static std::mutex s_coordinator_mutex;
static std::map<std::string, std::weak_ptr<RealmCoordinator>> s_coordinators_per_path;
@ -59,9 +55,9 @@ std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(Str
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
if (!m_notifier && m_cached_realms.empty()) {
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) {
m_config = config;
if (!config.read_only) {
if (!config.read_only && !m_notifier) {
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
}
}
@ -89,13 +85,12 @@ std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
}
}
auto thread_id = std::this_thread::get_id();
if (config.cache) {
for (auto& cachedRealm : m_cached_realms) {
if (cachedRealm.thread_id == thread_id) {
if (cachedRealm.is_cached_for_current_thread()) {
// can be null if we jumped in between ref count hitting zero and
// unregister_realm() getting the lock
if (auto realm = cachedRealm.realm.lock()) {
if (auto realm = cachedRealm.realm()) {
return realm;
}
}
@ -104,12 +99,7 @@ std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
auto realm = std::make_shared<Realm>(config);
realm->init(shared_from_this());
if (m_notifier) {
m_notifier->add_realm(realm.get());
}
if (config.cache) {
m_cached_realms.push_back({realm, thread_id});
}
m_cached_realms.emplace_back(realm, m_config.cache);
return realm;
}
@ -133,14 +123,11 @@ RealmCoordinator::~RealmCoordinator()
}
}
void RealmCoordinator::unregister_realm(Realm* realm)
void RealmCoordinator::unregister_realm(Realm*)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
if (m_notifier) {
m_notifier->remove_realm(realm);
}
for (size_t i = 0; i < m_cached_realms.size(); ++i) {
if (m_cached_realms[i].realm.expired()) {
if (m_cached_realms[i].expired()) {
if (i + 1 < m_cached_realms.size()) {
m_cached_realms[i] = std::move(m_cached_realms.back());
}
@ -164,7 +151,7 @@ void RealmCoordinator::clear_cache()
}
for (auto& cached_realm : coordinator->m_cached_realms) {
if (auto realm = cached_realm.realm.lock()) {
if (auto realm = cached_realm.realm()) {
realms_to_close.push_back(realm);
}
}
@ -185,3 +172,11 @@ void RealmCoordinator::send_commit_notifications()
REALM_ASSERT(!m_config.read_only);
m_notifier->notify_others();
}
void RealmCoordinator::on_change()
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
for (auto& realm : m_cached_realms) {
realm.notify();
}
}

View File

@ -25,8 +25,8 @@
namespace realm {
namespace _impl {
class CachedRealm;
class ExternalCommitHelper;
struct CachedRealm;
// RealmCoordinator manages the weak cache of Realm instances and communication
// between per-thread Realm instances for a given file
@ -63,6 +63,9 @@ public:
// Do not call directly
void unregister_realm(Realm* realm);
// Called by m_notifier when there's a new commit to send notifications for
void on_change();
private:
Realm::Config m_config;