Merge pull request #22 from realm/tg-realm-coordinator

Extract cache management and inter-Realm sharing to RealmCoordinator
This commit is contained in:
Thomas Goyne 2016-02-10 10:20:51 -08:00
commit f7957b7633
17 changed files with 821 additions and 302 deletions

View File

@ -1,5 +1,3 @@
include_directories(impl)
set(SOURCES set(SOURCES
index_set.cpp index_set.cpp
list.cpp list.cpp
@ -8,6 +6,7 @@ set(SOURCES
results.cpp results.cpp
schema.cpp schema.cpp
shared_realm.cpp shared_realm.cpp
impl/realm_coordinator.cpp
impl/transact_log_handler.cpp impl/transact_log_handler.cpp
parser/parser.cpp parser/parser.cpp
parser/query_builder.cpp) parser/query_builder.cpp)
@ -20,17 +19,27 @@ set(HEADERS
results.hpp results.hpp
schema.hpp schema.hpp
shared_realm.hpp shared_realm.hpp
impl/cached_realm.hpp
impl/cached_realm_base.hpp
impl/external_commit_helper.hpp
impl/transact_log_handler.hpp impl/transact_log_handler.hpp
parser/parser.hpp parser/parser.hpp
parser/query_builder.hpp) parser/query_builder.hpp)
if(APPLE) if(APPLE)
include_directories(impl/apple)
list(APPEND SOURCES list(APPEND SOURCES
impl/apple/cached_realm.cpp
impl/apple/external_commit_helper.cpp) impl/apple/external_commit_helper.cpp)
list(APPEND HEADERS list(APPEND HEADERS
impl/apple/cached_realm.hpp
impl/apple/external_commit_helper.hpp) impl/apple/external_commit_helper.hpp)
find_library(CF_LIBRARY CoreFoundation) find_library(CF_LIBRARY CoreFoundation)
else()
list(APPEND SOURCES
impl/generic/external_commit_helper.cpp)
list(APPEND HEADERS
impl/generic/cached_realm.hpp
impl/generic/external_commit_helper.hpp)
endif() endif()
add_library(realm-object-store SHARED ${SOURCES} ${HEADERS}) add_library(realm-object-store SHARED ${SOURCES} ${HEADERS})

View File

@ -0,0 +1,94 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/cached_realm.hpp"
#include "shared_realm.hpp"
using namespace realm;
using namespace realm::_impl;
CachedRealm::CachedRealm(const std::shared_ptr<Realm>& realm, bool cache)
: CachedRealmBase(realm, cache)
{
struct RefCountedWeakPointer {
std::weak_ptr<Realm> realm;
std::atomic<size_t> ref_count = {1};
};
CFRunLoopSourceContext ctx{};
ctx.info = new RefCountedWeakPointer{realm};
ctx.perform = [](void* info) {
if (auto realm = static_cast<RefCountedWeakPointer*>(info)->realm.lock()) {
realm->notify();
}
};
ctx.retain = [](const void* info) {
static_cast<RefCountedWeakPointer*>(const_cast<void*>(info))->ref_count.fetch_add(1, std::memory_order_relaxed);
return info;
};
ctx.release = [](const void* info) {
auto ptr = static_cast<RefCountedWeakPointer*>(const_cast<void*>(info));
if (ptr->ref_count.fetch_add(-1, std::memory_order_acq_rel) == 1) {
delete ptr;
}
};
m_runloop = CFRunLoopGetCurrent();
CFRetain(m_runloop);
m_signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(m_runloop, m_signal, kCFRunLoopDefaultMode);
}
CachedRealm::CachedRealm(CachedRealm&& rgt)
: CachedRealmBase(std::move(rgt))
, m_runloop(rgt.m_runloop)
, m_signal(rgt.m_signal)
{
rgt.m_runloop = nullptr;
rgt.m_signal = nullptr;
}
CachedRealm& CachedRealm::operator=(CachedRealm&& rgt)
{
CachedRealmBase::operator=(std::move(rgt));
m_runloop = rgt.m_runloop;
m_signal = rgt.m_signal;
rgt.m_runloop = nullptr;
rgt.m_signal = nullptr;
return *this;
}
CachedRealm::~CachedRealm()
{
if (m_signal) {
CFRunLoopSourceInvalidate(m_signal);
CFRelease(m_signal);
CFRelease(m_runloop);
}
}
void CachedRealm::notify()
{
CFRunLoopSourceSignal(m_signal);
// Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle
// waiting for events
CFRunLoopWakeUp(m_runloop);
}

View File

@ -0,0 +1,48 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/cached_realm_base.hpp"
#include <CoreFoundation/CFRunLoop.h>
namespace realm {
class Realm;
namespace _impl {
class CachedRealm : public CachedRealmBase {
public:
CachedRealm(const std::shared_ptr<Realm>& realm, bool cache);
~CachedRealm();
CachedRealm(CachedRealm&&);
CachedRealm& operator=(CachedRealm&&);
CachedRealm(const CachedRealm&) = delete;
CachedRealm& operator=(const CachedRealm&) = delete;
// Asyncronously call notify() on the Realm on the appropriate thread
void notify();
private:
CFRunLoopRef m_runloop;
CFRunLoopSourceRef m_signal;
};
} // namespace _impl
} // namespace realm

View File

@ -16,18 +16,19 @@
// //
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
#include "external_commit_helper.hpp" #include "impl/external_commit_helper.hpp"
#include "shared_realm.hpp" #include "impl/realm_coordinator.hpp"
#include <asl.h>
#include <assert.h> #include <assert.h>
#include <fcntl.h>
#include <sstream>
#include <sys/event.h> #include <sys/event.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h> #include <sys/time.h>
#include <system_error> #include <system_error>
#include <fcntl.h>
#include <unistd.h> #include <unistd.h>
#include <sstream>
using namespace realm; using namespace realm;
using namespace realm::_impl; using namespace realm::_impl;
@ -56,11 +57,10 @@ void notify_fd(int fd)
void ExternalCommitHelper::FdHolder::close() void ExternalCommitHelper::FdHolder::close()
{ {
if (m_fd != -1) { if (m_fd != -1) {
::close(m_fd); ::close(m_fd);
} }
m_fd = -1; m_fd = -1;
} }
// Inter-thread and inter-process notifications of changes are done using a // Inter-thread and inter-process notifications of changes are done using a
@ -86,16 +86,15 @@ void ExternalCommitHelper::FdHolder::close()
// signal the runloop source and wake up the target runloop, and when data is // signal the runloop source and wake up the target runloop, and when data is
// written to the anonymous pipe the background thread removes the runloop // written to the anonymous pipe the background thread removes the runloop
// source from the runloop and and shuts down. // source from the runloop and and shuts down.
ExternalCommitHelper::ExternalCommitHelper(Realm* realm) ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
: m_parent(parent)
{ {
add_realm(realm);
m_kq = kqueue(); m_kq = kqueue();
if (m_kq == -1) { if (m_kq == -1) {
throw std::system_error(errno, std::system_category()); throw std::system_error(errno, std::system_category());
} }
auto path = realm->config().path + ".note"; auto path = parent.get_path() + ".note";
// Create and open the named pipe // Create and open the named pipe
int ret = mkfifo(path.c_str(), 0600); int ret = mkfifo(path.c_str(), 0600);
@ -140,69 +139,33 @@ ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
m_shutdown_read_fd = pipeFd[0]; m_shutdown_read_fd = pipeFd[0];
m_shutdown_write_fd = pipeFd[1]; m_shutdown_write_fd = pipeFd[1];
// Use the minimum allowed stack size, as we need very little in our listener m_thread = std::async(std::launch::async, [=] {
// https://developer.apple.com/library/ios/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html#//apple_ref/doc/uid/10000057i-CH15-SW7 try {
pthread_attr_t attr; listen();
pthread_attr_init(&attr); }
pthread_attr_setstacksize(&attr, 16 * 1024); catch (std::exception const& e) {
fprintf(stderr, "uncaught exception in notifier thread: %s: %s\n", typeid(e).name(), e.what());
auto fn = [](void *self) -> void * { asl_log(nullptr, nullptr, ASL_LEVEL_ERR, "uncaught exception in notifier thread: %s: %s", typeid(e).name(), e.what());
static_cast<ExternalCommitHelper *>(self)->listen(); throw;
return nullptr; }
}; catch (...) {
ret = pthread_create(&m_thread, &attr, fn, this); fprintf(stderr, "uncaught exception in notifier thread\n");
pthread_attr_destroy(&attr); asl_log(nullptr, nullptr, ASL_LEVEL_ERR, "uncaught exception in notifier thread");
if (ret != 0) { throw;
throw std::system_error(errno, std::system_category()); }
} });
} }
ExternalCommitHelper::~ExternalCommitHelper() ExternalCommitHelper::~ExternalCommitHelper()
{ {
REALM_ASSERT_DEBUG(m_realms.empty());
notify_fd(m_shutdown_write_fd); notify_fd(m_shutdown_write_fd);
pthread_join(m_thread, nullptr); // Wait for the thread to exit m_thread.wait(); // Wait for the thread to exit
}
void ExternalCommitHelper::add_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
// Create the runloop source
CFRunLoopSourceContext ctx{};
ctx.info = realm;
ctx.perform = [](void* info) {
static_cast<Realm*>(info)->notify();
};
CFRunLoopRef runloop = CFRunLoopGetCurrent();
CFRetain(runloop);
CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(runloop, signal, kCFRunLoopDefaultMode);
m_realms.push_back({realm, runloop, signal});
}
void ExternalCommitHelper::remove_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto it = m_realms.begin(); it != m_realms.end(); ++it) {
if (it->realm == realm) {
CFRunLoopSourceInvalidate(it->signal);
CFRelease(it->signal);
CFRelease(it->runloop);
m_realms.erase(it);
return;
}
}
REALM_TERMINATE("Realm not registered");
} }
void ExternalCommitHelper::listen() void ExternalCommitHelper::listen()
{ {
pthread_setname_np("RLMRealm notification listener"); pthread_setname_np("RLMRealm notification listener");
// Set up the kqueue // Set up the kqueue
// EVFILT_READ indicates that we care about data being available to read // EVFILT_READ indicates that we care about data being available to read
// on the given file descriptor. // on the given file descriptor.
@ -233,14 +196,7 @@ void ExternalCommitHelper::listen()
} }
assert(event.ident == (uint32_t)m_notify_fd); assert(event.ident == (uint32_t)m_notify_fd);
std::lock_guard<std::mutex> lock(m_realms_mutex); m_parent.on_change();
for (auto const& realm : m_realms) {
CFRunLoopSourceSignal(realm.signal);
// Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle
// waiting for events
CFRunLoopWakeUp(realm.runloop);
}
} }
} }
@ -248,4 +204,3 @@ void ExternalCommitHelper::notify_others()
{ {
notify_fd(m_notify_fd); notify_fd(m_notify_fd);
} }

View File

@ -16,25 +16,20 @@
// //
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP #include <future>
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <CoreFoundation/CFRunLoop.h>
#include <mutex>
#include <vector>
namespace realm { namespace realm {
class Realm; class Realm;
namespace _impl { namespace _impl {
class RealmCoordinator;
class ExternalCommitHelper { class ExternalCommitHelper {
public: public:
ExternalCommitHelper(Realm* realm); ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper(); ~ExternalCommitHelper();
void notify_others(); void notify_others();
void add_realm(Realm* realm);
void remove_realm(Realm* realm);
private: private:
// A RAII holder for a file descriptor which automatically closes the wrapped // A RAII holder for a file descriptor which automatically closes the wrapped
@ -59,23 +54,12 @@ private:
FdHolder(FdHolder const&) = delete; FdHolder(FdHolder const&) = delete;
}; };
struct PerRealmInfo {
Realm* realm;
CFRunLoopRef runloop;
CFRunLoopSourceRef signal;
};
void listen(); void listen();
// Currently registered realms and the signal for delivering notifications RealmCoordinator& m_parent;
// to them
std::vector<PerRealmInfo> m_realms;
// Mutex which guards m_realms
std::mutex m_realms_mutex;
// The listener thread // The listener thread
pthread_t m_thread; std::future<void> m_thread;
// Read-write file descriptor for the named pipe which is waited on for // Read-write file descriptor for the named pipe which is waited on for
// changes and written to when a commit is made // changes and written to when a commit is made
@ -90,5 +74,3 @@ private:
} // namespace _impl } // namespace _impl
} // namespace realm } // namespace realm
#endif /* REALM_EXTERNAL_COMMIT_HELPER_HPP */

30
src/impl/cached_realm.hpp Normal file
View File

@ -0,0 +1,30 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_CACHED_REALM_HPP
#define REALM_CACHED_REALM_HPP
#include <realm/util/features.h>
#if REALM_PLATFORM_APPLE
#include "impl/apple/cached_realm.hpp"
#else
#include "impl/generic/cached_realm.hpp"
#endif
#endif // REALM_CACHED_REALM_HPP

View File

@ -0,0 +1,68 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_CACHED_REALM_BASE_HPP
#define REALM_CACHED_REALM_BASE_HPP
#include <memory>
#include <thread>
namespace realm {
class Realm;
namespace _impl {
// CachedRealm stores a weak reference to a Realm instance, along with all of
// the information about a Realm that needs to be accessed from other threads.
// This is needed to avoid forming strong references to the Realm instances on
// other threads, which can produce deadlocks when the last strong reference to
// a Realm instance is released from within a function holding the cache lock.
class CachedRealmBase {
public:
CachedRealmBase(const std::shared_ptr<Realm>& realm, bool cache);
// Get a strong reference to the cached realm
std::shared_ptr<Realm> realm() const { return m_realm.lock(); }
// Does this CachedRealmBase store a Realm instance that should be used on the current thread?
bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); }
// Has the Realm instance been destroyed?
bool expired() const { return m_realm.expired(); }
// Is this a CachedRealmBase for the given Realm instance?
bool is_for_realm(Realm* realm) const { return realm == m_realm_key; }
private:
std::weak_ptr<Realm> m_realm;
std::thread::id m_thread_id = std::this_thread::get_id();
void* m_realm_key;
bool m_cache = false;
};
inline CachedRealmBase::CachedRealmBase(const std::shared_ptr<Realm>& realm, bool cache)
: m_realm(realm)
, m_realm_key(realm.get())
, m_cache(cache)
{
}
} // namespace _impl
} // namespace realm
#endif // REALM_CACHED_REALM_BASE_HPP

View File

@ -0,0 +1,30 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <realm/util/features.h>
#if REALM_PLATFORM_APPLE
#include "impl/apple/external_commit_helper.hpp"
#else
#include "impl/generic/external_commit_helper.hpp"
#endif
#endif // REALM_EXTERNAL_COMMIT_HELPER_HPP

View File

@ -0,0 +1,36 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/cached_realm_base.hpp"
namespace realm {
class Realm;
namespace _impl {
class CachedRealm : public CachedRealmBase {
public:
using CachedRealmBase::CachedRealmBase;
// Do nothing, as this can't be implemented portably
void notify() { }
};
} // namespace _impl
} // namespace realm

View File

@ -0,0 +1,49 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/external_commit_helper.hpp"
#include "impl/realm_coordinator.hpp"
#include <realm/commit_log.hpp>
#include <realm/replication.hpp>
using namespace realm;
using namespace realm::_impl;
ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
: m_parent(parent)
, m_history(realm::make_client_history(parent.get_path(), parent.get_encryption_key().data()))
, m_sg(*m_history, parent.is_in_memory() ? SharedGroup::durability_MemOnly : SharedGroup::durability_Full,
parent.get_encryption_key().data())
, m_thread(std::async(std::launch::async, [=] {
m_sg.begin_read();
while (m_sg.wait_for_change()) {
m_sg.end_read();
m_sg.begin_read();
m_parent.on_change();
}
}))
{
}
ExternalCommitHelper::~ExternalCommitHelper()
{
m_sg.wait_for_change_release();
m_thread.wait(); // Wait for the thread to exit
}

View File

@ -0,0 +1,50 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include <realm/group_shared.hpp>
#include <future>
namespace realm {
class ClientHistory;
namespace _impl {
class RealmCoordinator;
class ExternalCommitHelper {
public:
ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper();
// A no-op in this version, but needed for the Apple version
void notify_others() { }
private:
RealmCoordinator& m_parent;
// A shared group used to listen for changes
std::unique_ptr<ClientHistory> m_history;
SharedGroup m_sg;
// The listener thread
std::future<void> m_thread;
};
} // namespace _impl
} // namespace realm

View File

@ -0,0 +1,195 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/realm_coordinator.hpp"
#include "impl/cached_realm.hpp"
#include "impl/external_commit_helper.hpp"
#include "object_store.hpp"
#include "schema.hpp"
#include <unordered_map>
using namespace realm;
using namespace realm::_impl;
static std::mutex s_coordinator_mutex;
static std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>> s_coordinators_per_path;
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
auto& weak_coordinator = s_coordinators_per_path[path];
if (auto coordinator = weak_coordinator.lock()) {
return coordinator;
}
auto coordinator = std::make_shared<RealmCoordinator>();
weak_coordinator = coordinator;
return coordinator;
}
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
auto it = s_coordinators_per_path.find(path);
return it == s_coordinators_per_path.end() ? nullptr : it->second.lock();
}
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) {
m_config = config;
if (!config.read_only && !m_notifier) {
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
}
}
else {
if (m_config.read_only != config.read_only) {
throw MismatchedConfigException("Realm at path already opened with different read permissions.");
}
if (m_config.in_memory != config.in_memory) {
throw MismatchedConfigException("Realm at path already opened with different inMemory settings.");
}
if (m_config.encryption_key != config.encryption_key) {
throw MismatchedConfigException("Realm at path already opened with a different encryption key.");
}
if (m_config.schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) {
throw MismatchedConfigException("Realm at path already opened with different schema version.");
}
// FIXME: verify that schema is compatible
// Needs to verify that all tables present in both are identical, and
// then updated m_config with any tables present in config but not in
// it
// Public API currently doesn't make it possible to have non-matching
// schemata so it's not a huge issue
if ((false) && m_config.schema != config.schema) {
throw MismatchedConfigException("Realm at path already opened with different schema");
}
}
if (config.cache) {
for (auto& cachedRealm : m_cached_realms) {
if (cachedRealm.is_cached_for_current_thread()) {
// can be null if we jumped in between ref count hitting zero and
// unregister_realm() getting the lock
if (auto realm = cachedRealm.realm()) {
return realm;
}
}
}
}
auto realm = std::make_shared<Realm>(std::move(config));
realm->init(shared_from_this());
m_cached_realms.emplace_back(realm, m_config.cache);
return realm;
}
const Schema* RealmCoordinator::get_schema() const noexcept
{
return m_cached_realms.empty() ? nullptr : m_config.schema.get();
}
void RealmCoordinator::update_schema(Schema const& schema)
{
// FIXME: this should probably be doing some sort of validation and
// notifying all Realm instances of the new schema in some way
m_config.schema = std::make_unique<Schema>(schema);
}
RealmCoordinator::RealmCoordinator() = default;
RealmCoordinator::~RealmCoordinator()
{
std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) {
if (it->second.expired()) {
it = s_coordinators_per_path.erase(it);
}
else {
++it;
}
}
}
void RealmCoordinator::unregister_realm(Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
for (size_t i = 0; i < m_cached_realms.size(); ++i) {
auto& cached_realm = m_cached_realms[i];
if (!cached_realm.expired() && !cached_realm.is_for_realm(realm)) {
continue;
}
if (i + 1 < m_cached_realms.size()) {
cached_realm = std::move(m_cached_realms.back());
}
m_cached_realms.pop_back();
}
}
void RealmCoordinator::clear_cache()
{
std::vector<WeakRealm> realms_to_close;
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
for (auto& weak_coordinator : s_coordinators_per_path) {
auto coordinator = weak_coordinator.second.lock();
if (!coordinator) {
continue;
}
coordinator->m_notifier = nullptr;
// Gather a list of all of the realms which will be removed
for (auto& cached_realm : coordinator->m_cached_realms) {
if (auto realm = cached_realm.realm()) {
realms_to_close.push_back(realm);
}
}
}
s_coordinators_per_path.clear();
}
// Close all of the previously cached Realms. This can't be done while
// s_coordinator_mutex is held as it may try to re-lock it.
for (auto& weak_realm : realms_to_close) {
if (auto realm = weak_realm.lock()) {
realm->close();
}
}
}
void RealmCoordinator::send_commit_notifications()
{
REALM_ASSERT(!m_config.read_only);
m_notifier->notify_others();
}
void RealmCoordinator::on_change()
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
for (auto& realm : m_cached_realms) {
realm.notify();
}
}

View File

@ -0,0 +1,88 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_COORDINATOR_HPP
#define REALM_COORDINATOR_HPP
#include "shared_realm.hpp"
#include <realm/string_data.hpp>
namespace realm {
class Schema;
namespace _impl {
class CachedRealm;
class ExternalCommitHelper;
// RealmCoordinator manages the weak cache of Realm instances and communication
// between per-thread Realm instances for a given file
class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
public:
// Get the coordinator for the given path, creating it if neccesary
static std::shared_ptr<RealmCoordinator> get_coordinator(StringData path);
// Get the coordinator for the given path, or null if there is none
static std::shared_ptr<RealmCoordinator> get_existing_coordinator(StringData path);
// Get a thread-local shared Realm with the given configuration
// If the Realm is already open on another thread, validates that the given
// configuration is compatible with the existing one
std::shared_ptr<Realm> get_realm(Realm::Config config);
const Schema* get_schema() const noexcept;
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
const std::string& get_path() const noexcept { return m_config.path; }
const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
bool is_in_memory() const noexcept { return m_config.in_memory; }
// Asyncronously call notify() on every Realm instance for this coordinator's
// path, including those in other processes
void send_commit_notifications();
// Clear the weak Realm cache for all paths
// Should only be called in test code, as continuing to use the previously
// cached instances will have odd results
static void clear_cache();
// Explicit constructor/destructor needed for the unique_ptrs to forward-declared types
RealmCoordinator();
~RealmCoordinator();
// Called by Realm's destructor to ensure the cache is cleaned up promptly
// Do not call directly
void unregister_realm(Realm* realm);
// Called by m_notifier when there's a new commit to send notifications for
void on_change();
// Update the schema in the cached config
void update_schema(Schema const& new_schema);
private:
Realm::Config m_config;
std::mutex m_realm_mutex;
std::vector<CachedRealm> m_cached_realms;
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
};
} // namespace _impl
} // namespace realm
#endif /* REALM_COORDINATOR_HPP */

View File

@ -16,9 +16,9 @@
// //
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
#include "transact_log_handler.hpp" #include "impl/transact_log_handler.hpp"
#include "../binding_context.hpp" #include "binding_context.hpp"
#include <realm/commit_log.hpp> #include <realm/commit_log.hpp>
#include <realm/group_shared.hpp> #include <realm/group_shared.hpp>

View File

@ -18,6 +18,8 @@
#include "results.hpp" #include "results.hpp"
#include "object_store.hpp"
#include <stdexcept> #include <stdexcept>
using namespace realm; using namespace realm;

View File

@ -18,10 +18,12 @@
#include "shared_realm.hpp" #include "shared_realm.hpp"
#include "external_commit_helper.hpp"
#include "binding_context.hpp" #include "binding_context.hpp"
#include "impl/external_commit_helper.hpp"
#include "impl/realm_coordinator.hpp"
#include "impl/transact_log_handler.hpp"
#include "object_store.hpp"
#include "schema.hpp" #include "schema.hpp"
#include "transact_log_handler.hpp"
#include <realm/commit_log.hpp> #include <realm/commit_log.hpp>
#include <realm/group_shared.hpp> #include <realm/group_shared.hpp>
@ -31,8 +33,6 @@
using namespace realm; using namespace realm;
using namespace realm::_impl; using namespace realm::_impl;
RealmCache Realm::s_global_cache;
Realm::Config::Config(const Config& c) Realm::Config::Config(const Config& c)
: path(c.path) : path(c.path)
, read_only(c.read_only) , read_only(c.read_only)
@ -48,7 +48,7 @@ Realm::Config::Config(const Config& c)
} }
} }
Realm::Config::Config() = default; Realm::Config::Config() : schema_version(ObjectStore::NotVersioned) { }
Realm::Config::Config(Config&&) = default; Realm::Config::Config(Config&&) = default;
Realm::Config::~Config() = default; Realm::Config::~Config() = default;
@ -104,9 +104,58 @@ Realm::Realm(Config config)
} }
} }
Realm::~Realm() { void Realm::init(std::shared_ptr<RealmCoordinator> coordinator)
if (m_notifier) { // might not exist yet if an error occurred during init {
m_notifier->remove_realm(this); m_coordinator = std::move(coordinator);
// if there is an existing realm at the current path steal its schema/column mapping
if (auto existing = m_coordinator->get_schema()) {
m_config.schema = std::make_unique<Schema>(*existing);
return;
}
try {
// otherwise get the schema from the group
auto target_schema = std::move(m_config.schema);
auto target_schema_version = m_config.schema_version;
m_config.schema_version = ObjectStore::get_schema_version(read_group());
m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(read_group()));
// if a target schema is supplied, verify that it matches or migrate to
// it, as neeeded
if (target_schema) {
if (m_config.read_only) {
if (m_config.schema_version == ObjectStore::NotVersioned) {
throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema");
}
target_schema->validate();
ObjectStore::verify_schema(*m_config.schema, *target_schema, true);
m_config.schema = std::move(target_schema);
}
else {
update_schema(std::move(target_schema), target_schema_version);
}
if (!m_config.read_only) {
// End the read transaction created to validation/update the
// schema to avoid pinning the version even if the user never
// actually reads data
invalidate();
}
}
}
catch (...) {
// Trying to unregister from the coordinator before we finish
// construction will result in a deadlock
m_coordinator = nullptr;
throw;
}
}
Realm::~Realm()
{
if (m_coordinator) {
m_coordinator->unregister_realm(this);
} }
} }
@ -120,85 +169,7 @@ Group *Realm::read_group()
SharedRealm Realm::get_shared_realm(Config config) SharedRealm Realm::get_shared_realm(Config config)
{ {
if (config.cache) { return RealmCoordinator::get_coordinator(config.path)->get_realm(std::move(config));
if (SharedRealm realm = s_global_cache.get_realm(config.path)) {
if (realm->config().read_only != config.read_only) {
throw MismatchedConfigException("Realm at path already opened with different read permissions.");
}
if (realm->config().in_memory != config.in_memory) {
throw MismatchedConfigException("Realm at path already opened with different inMemory settings.");
}
if (realm->config().encryption_key != config.encryption_key) {
throw MismatchedConfigException("Realm at path already opened with a different encryption key.");
}
if (realm->config().schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) {
throw MismatchedConfigException("Realm at path already opened with different schema version.");
}
// FIXME - enable schma comparison
/*if (realm->config().schema != config.schema) {
throw MismatchedConfigException("Realm at path already opened with different schema");
}*/
realm->m_config.migration_function = config.migration_function;
return realm;
}
}
SharedRealm realm(new Realm(std::move(config)));
auto target_schema = std::move(realm->m_config.schema);
auto target_schema_version = realm->m_config.schema_version;
realm->m_config.schema_version = ObjectStore::get_schema_version(realm->read_group());
// we want to ensure we are only initializing a single realm at a time
static std::mutex s_init_mutex;
std::lock_guard<std::mutex> lock(s_init_mutex);
if (auto existing = s_global_cache.get_any_realm(realm->config().path)) {
// if there is an existing realm at the current path steal its schema/column mapping
// FIXME - need to validate that schemas match
realm->m_config.schema = std::make_unique<Schema>(*existing->m_config.schema);
if (!realm->m_config.read_only) {
realm->m_notifier = existing->m_notifier;
realm->m_notifier->add_realm(realm.get());
}
}
else {
if (!realm->m_config.read_only) {
realm->m_notifier = std::make_shared<ExternalCommitHelper>(realm.get());
}
// otherwise get the schema from the group
realm->m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(realm->read_group()));
// if a target schema is supplied, verify that it matches or migrate to
// it, as neeeded
if (target_schema) {
if (realm->m_config.read_only) {
if (realm->m_config.schema_version == ObjectStore::NotVersioned) {
throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema");
}
target_schema->validate();
ObjectStore::verify_schema(*realm->m_config.schema, *target_schema, true);
realm->m_config.schema = std::move(target_schema);
}
else {
realm->update_schema(std::move(target_schema), target_schema_version);
}
if (!realm->m_config.read_only) {
// End the read transaction created to validation/update the
// schema to avoid pinning the version even if the user never
// actually reads data
realm->invalidate();
}
}
}
if (config.cache) {
s_global_cache.cache_realm(realm, realm->m_thread_id);
}
return realm;
} }
void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version) void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
@ -215,6 +186,7 @@ void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
ObjectStore::verify_schema(*m_config.schema, *schema, m_config.read_only); ObjectStore::verify_schema(*m_config.schema, *schema, m_config.read_only);
m_config.schema = std::move(schema); m_config.schema = std::move(schema);
m_config.schema_version = version; m_config.schema_version = version;
m_coordinator->update_schema(*m_config.schema);
return false; return false;
}; };
@ -272,6 +244,8 @@ void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
m_config.schema_version = old_config.schema_version; m_config.schema_version = old_config.schema_version;
throw; throw;
} }
m_coordinator->update_schema(*m_config.schema);
} }
static void check_read_write(Realm *realm) static void check_read_write(Realm *realm)
@ -322,7 +296,7 @@ void Realm::commit_transaction()
m_in_transaction = false; m_in_transaction = false;
transaction::commit(*m_shared_group, *m_history, m_binding_context.get()); transaction::commit(*m_shared_group, *m_history, m_binding_context.get());
m_notifier->notify_others(); m_coordinator->send_commit_notifications();
} }
void Realm::cancel_transaction() void Realm::cancel_transaction()
@ -392,7 +366,6 @@ void Realm::notify()
} }
} }
bool Realm::refresh() bool Realm::refresh()
{ {
verify_thread(); verify_thread();
@ -421,8 +394,9 @@ bool Realm::refresh()
uint64_t Realm::get_schema_version(const realm::Realm::Config &config) uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
{ {
if (auto existing_realm = s_global_cache.get_any_realm(config.path)) { auto coordinator = RealmCoordinator::get_existing_coordinator(config.path);
return existing_realm->config().schema_version; if (coordinator) {
return coordinator->get_schema_version();
} }
return ObjectStore::get_schema_version(Realm(config).read_group()); return ObjectStore::get_schema_version(Realm(config).read_group());
@ -430,97 +404,16 @@ uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
void Realm::close() void Realm::close()
{ {
if (m_notifier) { invalidate();
m_notifier->remove_realm(this);
if (m_coordinator) {
m_coordinator->unregister_realm(this);
} }
m_group = nullptr; m_group = nullptr;
m_shared_group = nullptr; m_shared_group = nullptr;
m_history = nullptr; m_history = nullptr;
m_read_only_group = nullptr; m_read_only_group = nullptr;
m_notifier = nullptr;
m_binding_context = nullptr; m_binding_context = nullptr;
} m_coordinator = nullptr;
SharedRealm RealmCache::get_realm(const std::string &path, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return SharedRealm();
}
auto thread_iter = path_iter->second.find(thread_id);
if (thread_iter == path_iter->second.end()) {
return SharedRealm();
}
return thread_iter->second.lock();
}
SharedRealm RealmCache::get_any_realm(const std::string &path)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return SharedRealm();
}
auto thread_iter = path_iter->second.begin();
while (thread_iter != path_iter->second.end()) {
if (auto realm = thread_iter->second.lock()) {
return realm;
}
path_iter->second.erase(thread_iter++);
}
return SharedRealm();
}
void RealmCache::remove(const std::string &path, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return;
}
auto thread_iter = path_iter->second.find(thread_id);
if (thread_iter != path_iter->second.end()) {
path_iter->second.erase(thread_iter);
}
if (path_iter->second.size() == 0) {
m_cache.erase(path_iter);
}
}
void RealmCache::cache_realm(SharedRealm &realm, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(realm->config().path);
if (path_iter == m_cache.end()) {
m_cache.emplace(realm->config().path, std::map<std::thread::id, WeakRealm>{{thread_id, realm}});
}
else {
path_iter->second.emplace(thread_id, realm);
}
}
void RealmCache::clear()
{
std::lock_guard<std::mutex> lock(m_mutex);
for (auto const& path : m_cache) {
for (auto const& thread : path.second) {
if (auto realm = thread.second.lock()) {
realm->close();
}
}
}
m_cache.clear();
} }

View File

@ -22,20 +22,24 @@
#include "object_store.hpp" #include "object_store.hpp"
#include <memory> #include <memory>
#include <mutex> #include <string>
#include <thread> #include <thread>
#include <vector> #include <vector>
namespace realm { namespace realm {
class ClientHistory;
class Realm;
class RealmCache;
class BindingContext; class BindingContext;
class ClientHistory;
class Group;
class Realm;
class RealmDelegate;
class Schema;
class SharedGroup;
typedef std::shared_ptr<Realm> SharedRealm; typedef std::shared_ptr<Realm> SharedRealm;
typedef std::weak_ptr<Realm> WeakRealm; typedef std::weak_ptr<Realm> WeakRealm;
namespace _impl { namespace _impl {
class ExternalCommitHelper; class ExternalCommitHelper;
class RealmCoordinator;
} }
class Realm : public std::enable_shared_from_this<Realm> class Realm : public std::enable_shared_from_this<Realm>
@ -53,7 +57,7 @@ namespace realm {
std::vector<char> encryption_key; std::vector<char> encryption_key;
std::unique_ptr<Schema> schema; std::unique_ptr<Schema> schema;
uint64_t schema_version = ObjectStore::NotVersioned; uint64_t schema_version;
MigrationFunction migration_function; MigrationFunction migration_function;
@ -109,9 +113,10 @@ namespace realm {
~Realm(); ~Realm();
private: void init(std::shared_ptr<_impl::RealmCoordinator> coordinator);
Realm(Config config); Realm(Config config);
private:
Config m_config; Config m_config;
std::thread::id m_thread_id = std::this_thread::get_id(); std::thread::id m_thread_id = std::this_thread::get_id();
bool m_in_transaction = false; bool m_in_transaction = false;
@ -123,28 +128,13 @@ namespace realm {
Group *m_group = nullptr; Group *m_group = nullptr;
std::shared_ptr<_impl::ExternalCommitHelper> m_notifier; std::shared_ptr<_impl::RealmCoordinator> m_coordinator;
public: public:
std::unique_ptr<BindingContext> m_binding_context; std::unique_ptr<BindingContext> m_binding_context;
// FIXME private // FIXME private
Group *read_group(); Group *read_group();
static RealmCache s_global_cache;
};
class RealmCache
{
public:
SharedRealm get_realm(const std::string &path, std::thread::id thread_id = std::this_thread::get_id());
SharedRealm get_any_realm(const std::string &path);
void remove(const std::string &path, std::thread::id thread_id);
void cache_realm(SharedRealm &realm, std::thread::id thread_id = std::this_thread::get_id());
void clear();
private:
std::map<std::string, std::map<std::thread::id, WeakRealm>> m_cache;
std::mutex m_mutex;
}; };
class RealmFileException : public std::runtime_error { class RealmFileException : public std::runtime_error {