Extract cache management and inter-Realm sharing to RealmCoordinator

This commit is contained in:
Thomas Goyne 2015-08-19 12:27:12 -07:00
parent 5e71c4178e
commit e87a507223
8 changed files with 362 additions and 230 deletions

View File

@ -8,6 +8,7 @@ set(SOURCES
results.cpp
schema.cpp
shared_realm.cpp
impl/realm_coordinator.cpp
impl/transact_log_handler.cpp
parser/parser.cpp
parser/query_builder.cpp)

View File

@ -18,16 +18,16 @@
#include "external_commit_helper.hpp"
#include "shared_realm.hpp"
#include "realm_coordinator.hpp"
#include <assert.h>
#include <fcntl.h>
#include <sstream>
#include <sys/event.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <system_error>
#include <fcntl.h>
#include <unistd.h>
#include <sstream>
using namespace realm;
using namespace realm::_impl;
@ -60,7 +60,6 @@ void ExternalCommitHelper::FdHolder::close()
::close(m_fd);
}
m_fd = -1;
}
// Inter-thread and inter-process notifications of changes are done using a
@ -86,16 +85,15 @@ void ExternalCommitHelper::FdHolder::close()
// signal the runloop source and wake up the target runloop, and when data is
// written to the anonymous pipe the background thread removes the runloop
// source from the runloop and and shuts down.
ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
: m_parent(parent)
{
add_realm(realm);
m_kq = kqueue();
if (m_kq == -1) {
throw std::system_error(errno, std::system_category());
}
auto path = realm->config().path + ".note";
auto path = parent.get_path() + ".note";
// Create and open the named pipe
int ret = mkfifo(path.c_str(), 0600);
@ -140,28 +138,14 @@ ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
m_shutdown_read_fd = pipeFd[0];
m_shutdown_write_fd = pipeFd[1];
// Use the minimum allowed stack size, as we need very little in our listener
// https://developer.apple.com/library/ios/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html#//apple_ref/doc/uid/10000057i-CH15-SW7
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 16 * 1024);
auto fn = [](void *self) -> void * {
static_cast<ExternalCommitHelper *>(self)->listen();
return nullptr;
};
ret = pthread_create(&m_thread, &attr, fn, this);
pthread_attr_destroy(&attr);
if (ret != 0) {
throw std::system_error(errno, std::system_category());
}
m_thread = std::async(std::launch::async, [=] { listen(); });
}
ExternalCommitHelper::~ExternalCommitHelper()
{
REALM_ASSERT_DEBUG(m_realms.empty());
notify_fd(m_shutdown_write_fd);
pthread_join(m_thread, nullptr); // Wait for the thread to exit
m_thread.wait(); // Wait for the thread to exit
}
void ExternalCommitHelper::add_realm(realm::Realm* realm)
@ -202,7 +186,6 @@ void ExternalCommitHelper::listen()
{
pthread_setname_np("RLMRealm notification listener");
// Set up the kqueue
// EVFILT_READ indicates that we care about data being available to read
// on the given file descriptor.
@ -248,4 +231,3 @@ void ExternalCommitHelper::notify_others()
{
notify_fd(m_notify_fd);
}

View File

@ -20,6 +20,8 @@
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <CoreFoundation/CFRunLoop.h>
#include <functional>
#include <future>
#include <mutex>
#include <vector>
@ -27,9 +29,13 @@ namespace realm {
class Realm;
namespace _impl {
class RealmCoordinator;
// FIXME: split IPC from the local cross-thread stuff
// both are platform-specific but need to be useable separately
class ExternalCommitHelper {
public:
ExternalCommitHelper(Realm* realm);
ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper();
void notify_others();
@ -67,6 +73,8 @@ private:
void listen();
RealmCoordinator& m_parent;
// Currently registered realms and the signal for delivering notifications
// to them
std::vector<PerRealmInfo> m_realms;
@ -75,7 +83,7 @@ private:
std::mutex m_realms_mutex;
// The listener thread
pthread_t m_thread;
std::future<void> m_thread;
// Read-write file descriptor for the named pipe which is waited on for
// changes and written to when a commit is made

View File

@ -0,0 +1,182 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "realm_coordinator.hpp"
#include "external_commit_helper.hpp"
#include "object_store.hpp"
using namespace realm;
using namespace realm::_impl;
static std::mutex s_coordinator_mutex;
static std::map<std::string, std::weak_ptr<RealmCoordinator>> s_coordinators_per_path;
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
std::shared_ptr<RealmCoordinator> coordinator;
auto it = s_coordinators_per_path.find(path);
if (it != s_coordinators_per_path.end()) {
coordinator = it->second.lock();
}
if (!coordinator) {
s_coordinators_per_path[path] = coordinator = std::make_shared<RealmCoordinator>();
}
return coordinator;
}
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
auto it = s_coordinators_per_path.find(path);
return it == s_coordinators_per_path.end() ? nullptr : it->second.lock();
}
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
if (!m_notifier && m_cached_realms.empty()) {
m_config = config;
if (!config.read_only) {
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
}
}
else {
if (m_config.read_only != config.read_only) {
throw MismatchedConfigException("Realm at path already opened with different read permissions.");
}
if (m_config.in_memory != config.in_memory) {
throw MismatchedConfigException("Realm at path already opened with different inMemory settings.");
}
if (m_config.encryption_key != config.encryption_key) {
throw MismatchedConfigException("Realm at path already opened with a different encryption key.");
}
if (m_config.schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) {
throw MismatchedConfigException("Realm at path already opened with different schema version.");
}
// FIXME: verify that schema is compatible
// Needs to verify that all tables present in both are identical, and
// then updated m_config with any tables present in config but not in
// it
// Public API currently doesn't make it possible to have non-matching
// schemata so it's not a huge issue
if ((false) && m_config.schema != config.schema) {
throw MismatchedConfigException("Realm at path already opened with different schema");
}
}
auto thread_id = std::this_thread::get_id();
if (config.cache) {
for (auto& weakRealm : m_cached_realms) {
// can be null if we jumped in between ref count hitting zero and
// unregister_realm() getting the lock
if (auto realm = weakRealm.lock()) {
if (realm->thread_id() == thread_id) {
return realm;
}
}
}
}
auto realm = std::make_shared<Realm>(config);
realm->init(shared_from_this());
if (m_notifier) {
m_notifier->add_realm(realm.get());
}
if (config.cache) {
m_cached_realms.push_back(realm);
}
return realm;
}
const Schema* RealmCoordinator::get_schema() const noexcept
{
return m_cached_realms.empty() ? nullptr : m_config.schema.get();
}
RealmCoordinator::RealmCoordinator() = default;
RealmCoordinator::~RealmCoordinator()
{
std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) {
if (it->second.expired()) {
it = s_coordinators_per_path.erase(it);
}
else {
++it;
}
}
}
void RealmCoordinator::unregister_realm(Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
if (m_notifier) {
m_notifier->remove_realm(realm);
}
for (size_t i = 0; i < m_cached_realms.size(); ++i) {
if (m_cached_realms[i].expired()) {
if (i + 1 < m_cached_realms.size()) {
m_cached_realms[i] = std::move(m_cached_realms.back());
}
m_cached_realms.pop_back();
}
}
}
void RealmCoordinator::clear_cache()
{
std::vector<SharedRealm> realms_to_close;
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
// Gather a list of all of the realms which will be removed
for (auto& weak_coordinator : s_coordinators_per_path) {
auto coordinator = weak_coordinator.second.lock();
if (!coordinator) {
continue;
}
for (auto& cached_realm : coordinator->m_cached_realms) {
if (auto realm = cached_realm.lock()) {
realms_to_close.push_back(realm);
}
}
}
s_coordinators_per_path.clear();
}
// Close all of the previously cached Realms. This can't be done while
// s_coordinator_mutex is held as it may try to re-lock it.
for (auto& realm : realms_to_close) {
realm->close();
}
}
void RealmCoordinator::send_commit_notifications()
{
REALM_ASSERT(!m_config.read_only);
m_notifier->notify_others();
}

View File

@ -0,0 +1,77 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_COORDINATOR_HPP
#define REALM_COORDINATOR_HPP
#include "shared_realm.hpp"
#include <realm/string_data.hpp>
namespace realm {
namespace _impl {
class ExternalCommitHelper;
// RealmCoordinator manages the weak cache of Realm instances and communication
// between per-thread Realm instances for a given file
class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
public:
// Get the coordinator for the given path, creating it if neccesary
static std::shared_ptr<RealmCoordinator> get_coordinator(StringData path);
// Get the coordinator for the given path, or null if there is none
static std::shared_ptr<RealmCoordinator> get_existing_coordinator(StringData path);
// Get a thread-local shared Realm with the given configuration
// If the Realm is already open on another thread, validates that the given
// configuration is compatible with the existing one
std::shared_ptr<Realm> get_realm(Realm::Config config);
const Schema* get_schema() const noexcept;
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
const std::string& get_path() const noexcept { return m_config.path; }
// Asyncronously call notify() on every Realm instance for this coordinator's
// path, including those in other processes
void send_commit_notifications();
// Clear the weak Realm cache for all paths
// Should only be called in test code, as continuing to use the previously
// cached instances will have odd results
static void clear_cache();
// Explicit constructor/destructor needed for the unique_ptrs to forward-declared types
RealmCoordinator();
~RealmCoordinator();
// Called by Realm's destructor to ensure the cache is cleaned up promptly
// Do not call directly
void unregister_realm(Realm* realm);
private:
Realm::Config m_config;
std::mutex m_realm_mutex;
std::vector<std::weak_ptr<Realm>> m_cached_realms;
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
};
} // namespace _impl
} // namespace realm
#endif /* REALM_COORDINATOR_HPP */

View File

@ -18,6 +18,8 @@
#include "results.hpp"
#include "object_store.hpp"
#include <stdexcept>
using namespace realm;

View File

@ -18,8 +18,10 @@
#include "shared_realm.hpp"
#include "external_commit_helper.hpp"
#include "binding_context.hpp"
#include "external_commit_helper.hpp"
#include "object_store.hpp"
#include "realm_coordinator.hpp"
#include "schema.hpp"
#include "transact_log_handler.hpp"
@ -31,8 +33,6 @@
using namespace realm;
using namespace realm::_impl;
RealmCache Realm::s_global_cache;
Realm::Config::Config(const Config& c)
: path(c.path)
, read_only(c.read_only)
@ -48,7 +48,7 @@ Realm::Config::Config(const Config& c)
}
}
Realm::Config::Config() = default;
Realm::Config::Config() : schema_version(ObjectStore::NotVersioned) { }
Realm::Config::Config(Config&&) = default;
Realm::Config::~Config() = default;
@ -104,9 +104,58 @@ Realm::Realm(Config config)
}
}
Realm::~Realm() {
if (m_notifier) { // might not exist yet if an error occurred during init
m_notifier->remove_realm(this);
void Realm::init(std::shared_ptr<RealmCoordinator> coordinator)
{
m_coordinator = std::move(coordinator);
// if there is an existing realm at the current path steal its schema/column mapping
if (auto existing = m_coordinator->get_schema()) {
m_config.schema = std::make_unique<Schema>(*existing);
return;
}
try {
// otherwise get the schema from the group
auto target_schema = std::move(m_config.schema);
auto target_schema_version = m_config.schema_version;
m_config.schema_version = ObjectStore::get_schema_version(read_group());
m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(read_group()));
// if a target schema is supplied, verify that it matches or migrate to
// it, as neeeded
if (target_schema) {
if (m_config.read_only) {
if (m_config.schema_version == ObjectStore::NotVersioned) {
throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema");
}
target_schema->validate();
ObjectStore::verify_schema(*m_config.schema, *target_schema, true);
m_config.schema = std::move(target_schema);
}
else {
update_schema(std::move(target_schema), target_schema_version);
}
if (!m_config.read_only) {
// End the read transaction created to validation/update the
// schema to avoid pinning the version even if the user never
// actually reads data
invalidate();
}
}
}
catch (...) {
// Trying to unregister from the coordinator before we finish
// construction will result in a deadlock
m_coordinator = nullptr;
throw;
}
}
Realm::~Realm()
{
if (m_coordinator) {
m_coordinator->unregister_realm(this);
}
}
@ -120,85 +169,7 @@ Group *Realm::read_group()
SharedRealm Realm::get_shared_realm(Config config)
{
if (config.cache) {
if (SharedRealm realm = s_global_cache.get_realm(config.path)) {
if (realm->config().read_only != config.read_only) {
throw MismatchedConfigException("Realm at path already opened with different read permissions.");
}
if (realm->config().in_memory != config.in_memory) {
throw MismatchedConfigException("Realm at path already opened with different inMemory settings.");
}
if (realm->config().encryption_key != config.encryption_key) {
throw MismatchedConfigException("Realm at path already opened with a different encryption key.");
}
if (realm->config().schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) {
throw MismatchedConfigException("Realm at path already opened with different schema version.");
}
// FIXME - enable schma comparison
/*if (realm->config().schema != config.schema) {
throw MismatchedConfigException("Realm at path already opened with different schema");
}*/
realm->m_config.migration_function = config.migration_function;
return realm;
}
}
SharedRealm realm(new Realm(std::move(config)));
auto target_schema = std::move(realm->m_config.schema);
auto target_schema_version = realm->m_config.schema_version;
realm->m_config.schema_version = ObjectStore::get_schema_version(realm->read_group());
// we want to ensure we are only initializing a single realm at a time
static std::mutex s_init_mutex;
std::lock_guard<std::mutex> lock(s_init_mutex);
if (auto existing = s_global_cache.get_any_realm(realm->config().path)) {
// if there is an existing realm at the current path steal its schema/column mapping
// FIXME - need to validate that schemas match
realm->m_config.schema = std::make_unique<Schema>(*existing->m_config.schema);
if (!realm->m_config.read_only) {
realm->m_notifier = existing->m_notifier;
realm->m_notifier->add_realm(realm.get());
}
}
else {
if (!realm->m_config.read_only) {
realm->m_notifier = std::make_shared<ExternalCommitHelper>(realm.get());
}
// otherwise get the schema from the group
realm->m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(realm->read_group()));
// if a target schema is supplied, verify that it matches or migrate to
// it, as neeeded
if (target_schema) {
if (realm->m_config.read_only) {
if (realm->m_config.schema_version == ObjectStore::NotVersioned) {
throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema");
}
target_schema->validate();
ObjectStore::verify_schema(*realm->m_config.schema, *target_schema, true);
realm->m_config.schema = std::move(target_schema);
}
else {
realm->update_schema(std::move(target_schema), target_schema_version);
}
if (!realm->m_config.read_only) {
// End the read transaction created to validation/update the
// schema to avoid pinning the version even if the user never
// actually reads data
realm->invalidate();
}
}
}
if (config.cache) {
s_global_cache.cache_realm(realm, realm->m_thread_id);
}
return realm;
return RealmCoordinator::get_coordinator(config.path)->get_realm(config);
}
void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
@ -322,7 +293,7 @@ void Realm::commit_transaction()
m_in_transaction = false;
transaction::commit(*m_shared_group, *m_history, m_binding_context.get());
m_notifier->notify_others();
m_coordinator->send_commit_notifications();
}
void Realm::cancel_transaction()
@ -392,7 +363,6 @@ void Realm::notify()
}
}
bool Realm::refresh()
{
verify_thread();
@ -421,8 +391,9 @@ bool Realm::refresh()
uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
{
if (auto existing_realm = s_global_cache.get_any_realm(config.path)) {
return existing_realm->config().schema_version;
auto coordinator = RealmCoordinator::get_existing_coordinator(config.path);
if (coordinator) {
return coordinator->get_schema_version();
}
return ObjectStore::get_schema_version(Realm(config).read_group());
@ -430,97 +401,16 @@ uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
void Realm::close()
{
if (m_notifier) {
m_notifier->remove_realm(this);
invalidate();
if (m_coordinator) {
m_coordinator->unregister_realm(this);
}
m_group = nullptr;
m_shared_group = nullptr;
m_history = nullptr;
m_read_only_group = nullptr;
m_notifier = nullptr;
m_binding_context = nullptr;
}
SharedRealm RealmCache::get_realm(const std::string &path, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return SharedRealm();
}
auto thread_iter = path_iter->second.find(thread_id);
if (thread_iter == path_iter->second.end()) {
return SharedRealm();
}
return thread_iter->second.lock();
}
SharedRealm RealmCache::get_any_realm(const std::string &path)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return SharedRealm();
}
auto thread_iter = path_iter->second.begin();
while (thread_iter != path_iter->second.end()) {
if (auto realm = thread_iter->second.lock()) {
return realm;
}
path_iter->second.erase(thread_iter++);
}
return SharedRealm();
}
void RealmCache::remove(const std::string &path, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return;
}
auto thread_iter = path_iter->second.find(thread_id);
if (thread_iter != path_iter->second.end()) {
path_iter->second.erase(thread_iter);
}
if (path_iter->second.size() == 0) {
m_cache.erase(path_iter);
}
}
void RealmCache::cache_realm(SharedRealm &realm, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(realm->config().path);
if (path_iter == m_cache.end()) {
m_cache.emplace(realm->config().path, std::map<std::thread::id, WeakRealm>{{thread_id, realm}});
}
else {
path_iter->second.emplace(thread_id, realm);
}
}
void RealmCache::clear()
{
std::lock_guard<std::mutex> lock(m_mutex);
for (auto const& path : m_cache) {
for (auto const& thread : path.second) {
if (auto realm = thread.second.lock()) {
realm->close();
}
}
}
m_cache.clear();
m_coordinator = nullptr;
}

View File

@ -22,20 +22,24 @@
#include "object_store.hpp"
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
namespace realm {
class ClientHistory;
class Realm;
class RealmCache;
class BindingContext;
class ClientHistory;
class Group;
class Realm;
class RealmDelegate;
class Schema;
class SharedGroup;
typedef std::shared_ptr<Realm> SharedRealm;
typedef std::weak_ptr<Realm> WeakRealm;
namespace _impl {
class ExternalCommitHelper;
class RealmCoordinator;
}
class Realm : public std::enable_shared_from_this<Realm>
@ -53,7 +57,7 @@ namespace realm {
std::vector<char> encryption_key;
std::unique_ptr<Schema> schema;
uint64_t schema_version = ObjectStore::NotVersioned;
uint64_t schema_version;
MigrationFunction migration_function;
@ -109,9 +113,10 @@ namespace realm {
~Realm();
private:
void init(std::shared_ptr<_impl::RealmCoordinator> coordinator);
Realm(Config config);
private:
Config m_config;
std::thread::id m_thread_id = std::this_thread::get_id();
bool m_in_transaction = false;
@ -123,28 +128,13 @@ namespace realm {
Group *m_group = nullptr;
std::shared_ptr<_impl::ExternalCommitHelper> m_notifier;
std::shared_ptr<_impl::RealmCoordinator> m_coordinator;
public:
std::unique_ptr<BindingContext> m_binding_context;
// FIXME private
Group *read_group();
static RealmCache s_global_cache;
};
class RealmCache
{
public:
SharedRealm get_realm(const std::string &path, std::thread::id thread_id = std::this_thread::get_id());
SharedRealm get_any_realm(const std::string &path);
void remove(const std::string &path, std::thread::id thread_id);
void cache_realm(SharedRealm &realm, std::thread::id thread_id = std::this_thread::get_id());
void clear();
private:
std::map<std::string, std::map<std::thread::id, WeakRealm>> m_cache;
std::mutex m_mutex;
};
class RealmFileException : public std::runtime_error {