diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bef8feb4..76233013 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,3 @@ -include_directories(impl) - set(SOURCES index_set.cpp list.cpp @@ -8,6 +6,7 @@ set(SOURCES results.cpp schema.cpp shared_realm.cpp + impl/realm_coordinator.cpp impl/transact_log_handler.cpp parser/parser.cpp parser/query_builder.cpp) @@ -20,17 +19,27 @@ set(HEADERS results.hpp schema.hpp shared_realm.hpp + impl/cached_realm.hpp + impl/cached_realm_base.hpp + impl/external_commit_helper.hpp impl/transact_log_handler.hpp parser/parser.hpp parser/query_builder.hpp) if(APPLE) - include_directories(impl/apple) list(APPEND SOURCES + impl/apple/cached_realm.cpp impl/apple/external_commit_helper.cpp) list(APPEND HEADERS + impl/apple/cached_realm.hpp impl/apple/external_commit_helper.hpp) find_library(CF_LIBRARY CoreFoundation) +else() + list(APPEND SOURCES + impl/generic/external_commit_helper.cpp) + list(APPEND HEADERS + impl/generic/cached_realm.hpp + impl/generic/external_commit_helper.hpp) endif() add_library(realm-object-store SHARED ${SOURCES} ${HEADERS}) diff --git a/src/impl/apple/cached_realm.cpp b/src/impl/apple/cached_realm.cpp new file mode 100644 index 00000000..1468a8fb --- /dev/null +++ b/src/impl/apple/cached_realm.cpp @@ -0,0 +1,94 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/cached_realm.hpp" + +#include "shared_realm.hpp" + +using namespace realm; +using namespace realm::_impl; + +CachedRealm::CachedRealm(const std::shared_ptr& realm, bool cache) +: CachedRealmBase(realm, cache) +{ + struct RefCountedWeakPointer { + std::weak_ptr realm; + std::atomic ref_count = {1}; + }; + + CFRunLoopSourceContext ctx{}; + ctx.info = new RefCountedWeakPointer{realm}; + ctx.perform = [](void* info) { + if (auto realm = static_cast(info)->realm.lock()) { + realm->notify(); + } + }; + ctx.retain = [](const void* info) { + static_cast(const_cast(info))->ref_count.fetch_add(1, std::memory_order_relaxed); + return info; + }; + ctx.release = [](const void* info) { + auto ptr = static_cast(const_cast(info)); + if (ptr->ref_count.fetch_add(-1, std::memory_order_acq_rel) == 1) { + delete ptr; + } + }; + + m_runloop = CFRunLoopGetCurrent(); + CFRetain(m_runloop); + m_signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx); + CFRunLoopAddSource(m_runloop, m_signal, kCFRunLoopDefaultMode); +} + +CachedRealm::CachedRealm(CachedRealm&& rgt) +: CachedRealmBase(std::move(rgt)) +, m_runloop(rgt.m_runloop) +, m_signal(rgt.m_signal) +{ + rgt.m_runloop = nullptr; + rgt.m_signal = nullptr; +} + +CachedRealm& CachedRealm::operator=(CachedRealm&& rgt) +{ + CachedRealmBase::operator=(std::move(rgt)); + m_runloop = rgt.m_runloop; + m_signal = rgt.m_signal; + rgt.m_runloop = nullptr; + rgt.m_signal = nullptr; + + return *this; +} + +CachedRealm::~CachedRealm() +{ + if (m_signal) { + CFRunLoopSourceInvalidate(m_signal); + CFRelease(m_signal); + CFRelease(m_runloop); + } +} + +void CachedRealm::notify() +{ + CFRunLoopSourceSignal(m_signal); + // Signalling the source makes it run the next time the runloop gets + // to it, but doesn't make the runloop start if it's currently idle + // waiting for events + CFRunLoopWakeUp(m_runloop); +} diff --git a/src/impl/apple/cached_realm.hpp b/src/impl/apple/cached_realm.hpp new file mode 100644 index 00000000..5acf874e --- /dev/null +++ b/src/impl/apple/cached_realm.hpp @@ -0,0 +1,48 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/cached_realm_base.hpp" + +#include + +namespace realm { +class Realm; + +namespace _impl { + +class CachedRealm : public CachedRealmBase { +public: + CachedRealm(const std::shared_ptr& realm, bool cache); + ~CachedRealm(); + + CachedRealm(CachedRealm&&); + CachedRealm& operator=(CachedRealm&&); + + CachedRealm(const CachedRealm&) = delete; + CachedRealm& operator=(const CachedRealm&) = delete; + + // Asyncronously call notify() on the Realm on the appropriate thread + void notify(); + +private: + CFRunLoopRef m_runloop; + CFRunLoopSourceRef m_signal; +}; + +} // namespace _impl +} // namespace realm diff --git a/src/impl/apple/external_commit_helper.cpp b/src/impl/apple/external_commit_helper.cpp index 4cf0b523..db04a1de 100644 --- a/src/impl/apple/external_commit_helper.cpp +++ b/src/impl/apple/external_commit_helper.cpp @@ -16,18 +16,19 @@ // //////////////////////////////////////////////////////////////////////////// -#include "external_commit_helper.hpp" +#include "impl/external_commit_helper.hpp" -#include "shared_realm.hpp" +#include "impl/realm_coordinator.hpp" +#include #include +#include +#include #include #include #include #include -#include #include -#include using namespace realm; using namespace realm::_impl; @@ -56,11 +57,10 @@ void notify_fd(int fd) void ExternalCommitHelper::FdHolder::close() { - if (m_fd != -1) { - ::close(m_fd); - } - m_fd = -1; - + if (m_fd != -1) { + ::close(m_fd); + } + m_fd = -1; } // Inter-thread and inter-process notifications of changes are done using a @@ -86,16 +86,15 @@ void ExternalCommitHelper::FdHolder::close() // signal the runloop source and wake up the target runloop, and when data is // written to the anonymous pipe the background thread removes the runloop // source from the runloop and and shuts down. -ExternalCommitHelper::ExternalCommitHelper(Realm* realm) +ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent) +: m_parent(parent) { - add_realm(realm); - m_kq = kqueue(); if (m_kq == -1) { throw std::system_error(errno, std::system_category()); } - auto path = realm->config().path + ".note"; + auto path = parent.get_path() + ".note"; // Create and open the named pipe int ret = mkfifo(path.c_str(), 0600); @@ -140,69 +139,33 @@ ExternalCommitHelper::ExternalCommitHelper(Realm* realm) m_shutdown_read_fd = pipeFd[0]; m_shutdown_write_fd = pipeFd[1]; - // Use the minimum allowed stack size, as we need very little in our listener - // https://developer.apple.com/library/ios/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html#//apple_ref/doc/uid/10000057i-CH15-SW7 - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setstacksize(&attr, 16 * 1024); - - auto fn = [](void *self) -> void * { - static_cast(self)->listen(); - return nullptr; - }; - ret = pthread_create(&m_thread, &attr, fn, this); - pthread_attr_destroy(&attr); - if (ret != 0) { - throw std::system_error(errno, std::system_category()); - } + m_thread = std::async(std::launch::async, [=] { + try { + listen(); + } + catch (std::exception const& e) { + fprintf(stderr, "uncaught exception in notifier thread: %s: %s\n", typeid(e).name(), e.what()); + asl_log(nullptr, nullptr, ASL_LEVEL_ERR, "uncaught exception in notifier thread: %s: %s", typeid(e).name(), e.what()); + throw; + } + catch (...) { + fprintf(stderr, "uncaught exception in notifier thread\n"); + asl_log(nullptr, nullptr, ASL_LEVEL_ERR, "uncaught exception in notifier thread"); + throw; + } + }); } ExternalCommitHelper::~ExternalCommitHelper() { - REALM_ASSERT_DEBUG(m_realms.empty()); notify_fd(m_shutdown_write_fd); - pthread_join(m_thread, nullptr); // Wait for the thread to exit -} - -void ExternalCommitHelper::add_realm(realm::Realm* realm) -{ - std::lock_guard lock(m_realms_mutex); - - // Create the runloop source - CFRunLoopSourceContext ctx{}; - ctx.info = realm; - ctx.perform = [](void* info) { - static_cast(info)->notify(); - }; - - CFRunLoopRef runloop = CFRunLoopGetCurrent(); - CFRetain(runloop); - CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx); - CFRunLoopAddSource(runloop, signal, kCFRunLoopDefaultMode); - - m_realms.push_back({realm, runloop, signal}); -} - -void ExternalCommitHelper::remove_realm(realm::Realm* realm) -{ - std::lock_guard lock(m_realms_mutex); - for (auto it = m_realms.begin(); it != m_realms.end(); ++it) { - if (it->realm == realm) { - CFRunLoopSourceInvalidate(it->signal); - CFRelease(it->signal); - CFRelease(it->runloop); - m_realms.erase(it); - return; - } - } - REALM_TERMINATE("Realm not registered"); + m_thread.wait(); // Wait for the thread to exit } void ExternalCommitHelper::listen() { pthread_setname_np("RLMRealm notification listener"); - // Set up the kqueue // EVFILT_READ indicates that we care about data being available to read // on the given file descriptor. @@ -233,14 +196,7 @@ void ExternalCommitHelper::listen() } assert(event.ident == (uint32_t)m_notify_fd); - std::lock_guard lock(m_realms_mutex); - for (auto const& realm : m_realms) { - CFRunLoopSourceSignal(realm.signal); - // Signalling the source makes it run the next time the runloop gets - // to it, but doesn't make the runloop start if it's currently idle - // waiting for events - CFRunLoopWakeUp(realm.runloop); - } + m_parent.on_change(); } } @@ -248,4 +204,3 @@ void ExternalCommitHelper::notify_others() { notify_fd(m_notify_fd); } - diff --git a/src/impl/apple/external_commit_helper.hpp b/src/impl/apple/external_commit_helper.hpp index d7acb791..a39876ce 100644 --- a/src/impl/apple/external_commit_helper.hpp +++ b/src/impl/apple/external_commit_helper.hpp @@ -16,25 +16,20 @@ // //////////////////////////////////////////////////////////////////////////// -#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP -#define REALM_EXTERNAL_COMMIT_HELPER_HPP - -#include -#include -#include +#include namespace realm { class Realm; namespace _impl { +class RealmCoordinator; + class ExternalCommitHelper { public: - ExternalCommitHelper(Realm* realm); + ExternalCommitHelper(RealmCoordinator& parent); ~ExternalCommitHelper(); void notify_others(); - void add_realm(Realm* realm); - void remove_realm(Realm* realm); private: // A RAII holder for a file descriptor which automatically closes the wrapped @@ -59,23 +54,12 @@ private: FdHolder(FdHolder const&) = delete; }; - struct PerRealmInfo { - Realm* realm; - CFRunLoopRef runloop; - CFRunLoopSourceRef signal; - }; - void listen(); - // Currently registered realms and the signal for delivering notifications - // to them - std::vector m_realms; - - // Mutex which guards m_realms - std::mutex m_realms_mutex; + RealmCoordinator& m_parent; // The listener thread - pthread_t m_thread; + std::future m_thread; // Read-write file descriptor for the named pipe which is waited on for // changes and written to when a commit is made @@ -90,5 +74,3 @@ private: } // namespace _impl } // namespace realm - -#endif /* REALM_EXTERNAL_COMMIT_HELPER_HPP */ diff --git a/src/impl/cached_realm.hpp b/src/impl/cached_realm.hpp new file mode 100644 index 00000000..3818ca62 --- /dev/null +++ b/src/impl/cached_realm.hpp @@ -0,0 +1,30 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_CACHED_REALM_HPP +#define REALM_CACHED_REALM_HPP + +#include + +#if REALM_PLATFORM_APPLE +#include "impl/apple/cached_realm.hpp" +#else +#include "impl/generic/cached_realm.hpp" +#endif + +#endif // REALM_CACHED_REALM_HPP diff --git a/src/impl/cached_realm_base.hpp b/src/impl/cached_realm_base.hpp new file mode 100644 index 00000000..57feceb5 --- /dev/null +++ b/src/impl/cached_realm_base.hpp @@ -0,0 +1,68 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_CACHED_REALM_BASE_HPP +#define REALM_CACHED_REALM_BASE_HPP + +#include +#include + +namespace realm { +class Realm; + +namespace _impl { + +// CachedRealm stores a weak reference to a Realm instance, along with all of +// the information about a Realm that needs to be accessed from other threads. +// This is needed to avoid forming strong references to the Realm instances on +// other threads, which can produce deadlocks when the last strong reference to +// a Realm instance is released from within a function holding the cache lock. +class CachedRealmBase { +public: + CachedRealmBase(const std::shared_ptr& realm, bool cache); + + // Get a strong reference to the cached realm + std::shared_ptr realm() const { return m_realm.lock(); } + + // Does this CachedRealmBase store a Realm instance that should be used on the current thread? + bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); } + + // Has the Realm instance been destroyed? + bool expired() const { return m_realm.expired(); } + + // Is this a CachedRealmBase for the given Realm instance? + bool is_for_realm(Realm* realm) const { return realm == m_realm_key; } + +private: + std::weak_ptr m_realm; + std::thread::id m_thread_id = std::this_thread::get_id(); + void* m_realm_key; + bool m_cache = false; +}; + +inline CachedRealmBase::CachedRealmBase(const std::shared_ptr& realm, bool cache) +: m_realm(realm) +, m_realm_key(realm.get()) +, m_cache(cache) +{ +} + +} // namespace _impl +} // namespace realm + +#endif // REALM_CACHED_REALM_BASE_HPP diff --git a/src/impl/external_commit_helper.hpp b/src/impl/external_commit_helper.hpp new file mode 100644 index 00000000..c467a4b9 --- /dev/null +++ b/src/impl/external_commit_helper.hpp @@ -0,0 +1,30 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2016 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP +#define REALM_EXTERNAL_COMMIT_HELPER_HPP + +#include + +#if REALM_PLATFORM_APPLE +#include "impl/apple/external_commit_helper.hpp" +#else +#include "impl/generic/external_commit_helper.hpp" +#endif + +#endif // REALM_EXTERNAL_COMMIT_HELPER_HPP diff --git a/src/impl/generic/cached_realm.hpp b/src/impl/generic/cached_realm.hpp new file mode 100644 index 00000000..23a7be11 --- /dev/null +++ b/src/impl/generic/cached_realm.hpp @@ -0,0 +1,36 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/cached_realm_base.hpp" + +namespace realm { +class Realm; + +namespace _impl { + +class CachedRealm : public CachedRealmBase { +public: + using CachedRealmBase::CachedRealmBase; + + // Do nothing, as this can't be implemented portably + void notify() { } +}; + +} // namespace _impl +} // namespace realm + diff --git a/src/impl/generic/external_commit_helper.cpp b/src/impl/generic/external_commit_helper.cpp new file mode 100644 index 00000000..5071304b --- /dev/null +++ b/src/impl/generic/external_commit_helper.cpp @@ -0,0 +1,49 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/external_commit_helper.hpp" + +#include "impl/realm_coordinator.hpp" + +#include +#include + +using namespace realm; +using namespace realm::_impl; + +ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent) +: m_parent(parent) +, m_history(realm::make_client_history(parent.get_path(), parent.get_encryption_key().data())) +, m_sg(*m_history, parent.is_in_memory() ? SharedGroup::durability_MemOnly : SharedGroup::durability_Full, + parent.get_encryption_key().data()) +, m_thread(std::async(std::launch::async, [=] { + m_sg.begin_read(); + while (m_sg.wait_for_change()) { + m_sg.end_read(); + m_sg.begin_read(); + m_parent.on_change(); + } +})) +{ +} + +ExternalCommitHelper::~ExternalCommitHelper() +{ + m_sg.wait_for_change_release(); + m_thread.wait(); // Wait for the thread to exit +} diff --git a/src/impl/generic/external_commit_helper.hpp b/src/impl/generic/external_commit_helper.hpp new file mode 100644 index 00000000..cc7fe5eb --- /dev/null +++ b/src/impl/generic/external_commit_helper.hpp @@ -0,0 +1,50 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include + +#include + +namespace realm { +class ClientHistory; + +namespace _impl { +class RealmCoordinator; + +class ExternalCommitHelper { +public: + ExternalCommitHelper(RealmCoordinator& parent); + ~ExternalCommitHelper(); + + // A no-op in this version, but needed for the Apple version + void notify_others() { } + +private: + RealmCoordinator& m_parent; + + // A shared group used to listen for changes + std::unique_ptr m_history; + SharedGroup m_sg; + + // The listener thread + std::future m_thread; +}; + +} // namespace _impl +} // namespace realm + diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp new file mode 100644 index 00000000..d59cad08 --- /dev/null +++ b/src/impl/realm_coordinator.cpp @@ -0,0 +1,195 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/realm_coordinator.hpp" + +#include "impl/cached_realm.hpp" +#include "impl/external_commit_helper.hpp" +#include "object_store.hpp" +#include "schema.hpp" + +#include + +using namespace realm; +using namespace realm::_impl; + +static std::mutex s_coordinator_mutex; +static std::unordered_map> s_coordinators_per_path; + +std::shared_ptr RealmCoordinator::get_coordinator(StringData path) +{ + std::lock_guard lock(s_coordinator_mutex); + + auto& weak_coordinator = s_coordinators_per_path[path]; + if (auto coordinator = weak_coordinator.lock()) { + return coordinator; + } + + auto coordinator = std::make_shared(); + weak_coordinator = coordinator; + return coordinator; +} + +std::shared_ptr RealmCoordinator::get_existing_coordinator(StringData path) +{ + std::lock_guard lock(s_coordinator_mutex); + auto it = s_coordinators_per_path.find(path); + return it == s_coordinators_per_path.end() ? nullptr : it->second.lock(); +} + +std::shared_ptr RealmCoordinator::get_realm(Realm::Config config) +{ + std::lock_guard lock(m_realm_mutex); + if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) { + m_config = config; + if (!config.read_only && !m_notifier) { + m_notifier = std::make_unique(*this); + } + } + else { + if (m_config.read_only != config.read_only) { + throw MismatchedConfigException("Realm at path already opened with different read permissions."); + } + if (m_config.in_memory != config.in_memory) { + throw MismatchedConfigException("Realm at path already opened with different inMemory settings."); + } + if (m_config.encryption_key != config.encryption_key) { + throw MismatchedConfigException("Realm at path already opened with a different encryption key."); + } + if (m_config.schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) { + throw MismatchedConfigException("Realm at path already opened with different schema version."); + } + // FIXME: verify that schema is compatible + // Needs to verify that all tables present in both are identical, and + // then updated m_config with any tables present in config but not in + // it + // Public API currently doesn't make it possible to have non-matching + // schemata so it's not a huge issue + if ((false) && m_config.schema != config.schema) { + throw MismatchedConfigException("Realm at path already opened with different schema"); + } + } + + if (config.cache) { + for (auto& cachedRealm : m_cached_realms) { + if (cachedRealm.is_cached_for_current_thread()) { + // can be null if we jumped in between ref count hitting zero and + // unregister_realm() getting the lock + if (auto realm = cachedRealm.realm()) { + return realm; + } + } + } + } + + auto realm = std::make_shared(std::move(config)); + realm->init(shared_from_this()); + m_cached_realms.emplace_back(realm, m_config.cache); + return realm; +} + +const Schema* RealmCoordinator::get_schema() const noexcept +{ + return m_cached_realms.empty() ? nullptr : m_config.schema.get(); +} + +void RealmCoordinator::update_schema(Schema const& schema) +{ + // FIXME: this should probably be doing some sort of validation and + // notifying all Realm instances of the new schema in some way + m_config.schema = std::make_unique(schema); +} + +RealmCoordinator::RealmCoordinator() = default; + +RealmCoordinator::~RealmCoordinator() +{ + std::lock_guard coordinator_lock(s_coordinator_mutex); + for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) { + if (it->second.expired()) { + it = s_coordinators_per_path.erase(it); + } + else { + ++it; + } + } +} + +void RealmCoordinator::unregister_realm(Realm* realm) +{ + std::lock_guard lock(m_realm_mutex); + for (size_t i = 0; i < m_cached_realms.size(); ++i) { + auto& cached_realm = m_cached_realms[i]; + if (!cached_realm.expired() && !cached_realm.is_for_realm(realm)) { + continue; + } + + if (i + 1 < m_cached_realms.size()) { + cached_realm = std::move(m_cached_realms.back()); + } + m_cached_realms.pop_back(); + } +} + +void RealmCoordinator::clear_cache() +{ + std::vector realms_to_close; + { + std::lock_guard lock(s_coordinator_mutex); + + for (auto& weak_coordinator : s_coordinators_per_path) { + auto coordinator = weak_coordinator.second.lock(); + if (!coordinator) { + continue; + } + + coordinator->m_notifier = nullptr; + + // Gather a list of all of the realms which will be removed + for (auto& cached_realm : coordinator->m_cached_realms) { + if (auto realm = cached_realm.realm()) { + realms_to_close.push_back(realm); + } + } + } + + s_coordinators_per_path.clear(); + } + + // Close all of the previously cached Realms. This can't be done while + // s_coordinator_mutex is held as it may try to re-lock it. + for (auto& weak_realm : realms_to_close) { + if (auto realm = weak_realm.lock()) { + realm->close(); + } + } +} + +void RealmCoordinator::send_commit_notifications() +{ + REALM_ASSERT(!m_config.read_only); + m_notifier->notify_others(); +} + +void RealmCoordinator::on_change() +{ + std::lock_guard lock(m_realm_mutex); + for (auto& realm : m_cached_realms) { + realm.notify(); + } +} diff --git a/src/impl/realm_coordinator.hpp b/src/impl/realm_coordinator.hpp new file mode 100644 index 00000000..ee1d748b --- /dev/null +++ b/src/impl/realm_coordinator.hpp @@ -0,0 +1,88 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_COORDINATOR_HPP +#define REALM_COORDINATOR_HPP + +#include "shared_realm.hpp" + +#include + +namespace realm { +class Schema; + +namespace _impl { +class CachedRealm; +class ExternalCommitHelper; + +// RealmCoordinator manages the weak cache of Realm instances and communication +// between per-thread Realm instances for a given file +class RealmCoordinator : public std::enable_shared_from_this { +public: + // Get the coordinator for the given path, creating it if neccesary + static std::shared_ptr get_coordinator(StringData path); + // Get the coordinator for the given path, or null if there is none + static std::shared_ptr get_existing_coordinator(StringData path); + + // Get a thread-local shared Realm with the given configuration + // If the Realm is already open on another thread, validates that the given + // configuration is compatible with the existing one + std::shared_ptr get_realm(Realm::Config config); + + const Schema* get_schema() const noexcept; + uint64_t get_schema_version() const noexcept { return m_config.schema_version; } + const std::string& get_path() const noexcept { return m_config.path; } + const std::vector& get_encryption_key() const noexcept { return m_config.encryption_key; } + bool is_in_memory() const noexcept { return m_config.in_memory; } + + // Asyncronously call notify() on every Realm instance for this coordinator's + // path, including those in other processes + void send_commit_notifications(); + + // Clear the weak Realm cache for all paths + // Should only be called in test code, as continuing to use the previously + // cached instances will have odd results + static void clear_cache(); + + // Explicit constructor/destructor needed for the unique_ptrs to forward-declared types + RealmCoordinator(); + ~RealmCoordinator(); + + // Called by Realm's destructor to ensure the cache is cleaned up promptly + // Do not call directly + void unregister_realm(Realm* realm); + + // Called by m_notifier when there's a new commit to send notifications for + void on_change(); + + // Update the schema in the cached config + void update_schema(Schema const& new_schema); + +private: + Realm::Config m_config; + + std::mutex m_realm_mutex; + std::vector m_cached_realms; + + std::unique_ptr<_impl::ExternalCommitHelper> m_notifier; +}; + +} // namespace _impl +} // namespace realm + +#endif /* REALM_COORDINATOR_HPP */ diff --git a/src/impl/transact_log_handler.cpp b/src/impl/transact_log_handler.cpp index ed1630ca..72b2a200 100644 --- a/src/impl/transact_log_handler.cpp +++ b/src/impl/transact_log_handler.cpp @@ -16,9 +16,9 @@ // //////////////////////////////////////////////////////////////////////////// -#include "transact_log_handler.hpp" +#include "impl/transact_log_handler.hpp" -#include "../binding_context.hpp" +#include "binding_context.hpp" #include #include diff --git a/src/results.cpp b/src/results.cpp index 91a4cc22..a49559ca 100644 --- a/src/results.cpp +++ b/src/results.cpp @@ -18,6 +18,8 @@ #include "results.hpp" +#include "object_store.hpp" + #include using namespace realm; diff --git a/src/shared_realm.cpp b/src/shared_realm.cpp index 30e8f928..5c5b818a 100644 --- a/src/shared_realm.cpp +++ b/src/shared_realm.cpp @@ -18,10 +18,12 @@ #include "shared_realm.hpp" -#include "external_commit_helper.hpp" #include "binding_context.hpp" +#include "impl/external_commit_helper.hpp" +#include "impl/realm_coordinator.hpp" +#include "impl/transact_log_handler.hpp" +#include "object_store.hpp" #include "schema.hpp" -#include "transact_log_handler.hpp" #include #include @@ -31,8 +33,6 @@ using namespace realm; using namespace realm::_impl; -RealmCache Realm::s_global_cache; - Realm::Config::Config(const Config& c) : path(c.path) , read_only(c.read_only) @@ -48,7 +48,7 @@ Realm::Config::Config(const Config& c) } } -Realm::Config::Config() = default; +Realm::Config::Config() : schema_version(ObjectStore::NotVersioned) { } Realm::Config::Config(Config&&) = default; Realm::Config::~Config() = default; @@ -104,9 +104,58 @@ Realm::Realm(Config config) } } -Realm::~Realm() { - if (m_notifier) { // might not exist yet if an error occurred during init - m_notifier->remove_realm(this); +void Realm::init(std::shared_ptr coordinator) +{ + m_coordinator = std::move(coordinator); + + // if there is an existing realm at the current path steal its schema/column mapping + if (auto existing = m_coordinator->get_schema()) { + m_config.schema = std::make_unique(*existing); + return; + } + + try { + // otherwise get the schema from the group + auto target_schema = std::move(m_config.schema); + auto target_schema_version = m_config.schema_version; + m_config.schema_version = ObjectStore::get_schema_version(read_group()); + m_config.schema = std::make_unique(ObjectStore::schema_from_group(read_group())); + + // if a target schema is supplied, verify that it matches or migrate to + // it, as neeeded + if (target_schema) { + if (m_config.read_only) { + if (m_config.schema_version == ObjectStore::NotVersioned) { + throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema"); + } + target_schema->validate(); + ObjectStore::verify_schema(*m_config.schema, *target_schema, true); + m_config.schema = std::move(target_schema); + } + else { + update_schema(std::move(target_schema), target_schema_version); + } + + if (!m_config.read_only) { + // End the read transaction created to validation/update the + // schema to avoid pinning the version even if the user never + // actually reads data + invalidate(); + } + } + } + catch (...) { + // Trying to unregister from the coordinator before we finish + // construction will result in a deadlock + m_coordinator = nullptr; + throw; + } +} + +Realm::~Realm() +{ + if (m_coordinator) { + m_coordinator->unregister_realm(this); } } @@ -120,85 +169,7 @@ Group *Realm::read_group() SharedRealm Realm::get_shared_realm(Config config) { - if (config.cache) { - if (SharedRealm realm = s_global_cache.get_realm(config.path)) { - if (realm->config().read_only != config.read_only) { - throw MismatchedConfigException("Realm at path already opened with different read permissions."); - } - if (realm->config().in_memory != config.in_memory) { - throw MismatchedConfigException("Realm at path already opened with different inMemory settings."); - } - if (realm->config().encryption_key != config.encryption_key) { - throw MismatchedConfigException("Realm at path already opened with a different encryption key."); - } - if (realm->config().schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) { - throw MismatchedConfigException("Realm at path already opened with different schema version."); - } - // FIXME - enable schma comparison - /*if (realm->config().schema != config.schema) { - throw MismatchedConfigException("Realm at path already opened with different schema"); - }*/ - realm->m_config.migration_function = config.migration_function; - - return realm; - } - } - - SharedRealm realm(new Realm(std::move(config))); - - auto target_schema = std::move(realm->m_config.schema); - auto target_schema_version = realm->m_config.schema_version; - realm->m_config.schema_version = ObjectStore::get_schema_version(realm->read_group()); - - // we want to ensure we are only initializing a single realm at a time - static std::mutex s_init_mutex; - std::lock_guard lock(s_init_mutex); - if (auto existing = s_global_cache.get_any_realm(realm->config().path)) { - // if there is an existing realm at the current path steal its schema/column mapping - // FIXME - need to validate that schemas match - realm->m_config.schema = std::make_unique(*existing->m_config.schema); - - if (!realm->m_config.read_only) { - realm->m_notifier = existing->m_notifier; - realm->m_notifier->add_realm(realm.get()); - } - } - else { - if (!realm->m_config.read_only) { - realm->m_notifier = std::make_shared(realm.get()); - } - - // otherwise get the schema from the group - realm->m_config.schema = std::make_unique(ObjectStore::schema_from_group(realm->read_group())); - - // if a target schema is supplied, verify that it matches or migrate to - // it, as neeeded - if (target_schema) { - if (realm->m_config.read_only) { - if (realm->m_config.schema_version == ObjectStore::NotVersioned) { - throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema"); - } - target_schema->validate(); - ObjectStore::verify_schema(*realm->m_config.schema, *target_schema, true); - realm->m_config.schema = std::move(target_schema); - } - else { - realm->update_schema(std::move(target_schema), target_schema_version); - } - - if (!realm->m_config.read_only) { - // End the read transaction created to validation/update the - // schema to avoid pinning the version even if the user never - // actually reads data - realm->invalidate(); - } - } - } - - if (config.cache) { - s_global_cache.cache_realm(realm, realm->m_thread_id); - } - return realm; + return RealmCoordinator::get_coordinator(config.path)->get_realm(std::move(config)); } void Realm::update_schema(std::unique_ptr schema, uint64_t version) @@ -215,6 +186,7 @@ void Realm::update_schema(std::unique_ptr schema, uint64_t version) ObjectStore::verify_schema(*m_config.schema, *schema, m_config.read_only); m_config.schema = std::move(schema); m_config.schema_version = version; + m_coordinator->update_schema(*m_config.schema); return false; }; @@ -272,6 +244,8 @@ void Realm::update_schema(std::unique_ptr schema, uint64_t version) m_config.schema_version = old_config.schema_version; throw; } + + m_coordinator->update_schema(*m_config.schema); } static void check_read_write(Realm *realm) @@ -322,7 +296,7 @@ void Realm::commit_transaction() m_in_transaction = false; transaction::commit(*m_shared_group, *m_history, m_binding_context.get()); - m_notifier->notify_others(); + m_coordinator->send_commit_notifications(); } void Realm::cancel_transaction() @@ -392,7 +366,6 @@ void Realm::notify() } } - bool Realm::refresh() { verify_thread(); @@ -421,8 +394,9 @@ bool Realm::refresh() uint64_t Realm::get_schema_version(const realm::Realm::Config &config) { - if (auto existing_realm = s_global_cache.get_any_realm(config.path)) { - return existing_realm->config().schema_version; + auto coordinator = RealmCoordinator::get_existing_coordinator(config.path); + if (coordinator) { + return coordinator->get_schema_version(); } return ObjectStore::get_schema_version(Realm(config).read_group()); @@ -430,97 +404,16 @@ uint64_t Realm::get_schema_version(const realm::Realm::Config &config) void Realm::close() { - if (m_notifier) { - m_notifier->remove_realm(this); + invalidate(); + + if (m_coordinator) { + m_coordinator->unregister_realm(this); } m_group = nullptr; m_shared_group = nullptr; m_history = nullptr; m_read_only_group = nullptr; - m_notifier = nullptr; m_binding_context = nullptr; -} - -SharedRealm RealmCache::get_realm(const std::string &path, std::thread::id thread_id) -{ - std::lock_guard lock(m_mutex); - - auto path_iter = m_cache.find(path); - if (path_iter == m_cache.end()) { - return SharedRealm(); - } - - auto thread_iter = path_iter->second.find(thread_id); - if (thread_iter == path_iter->second.end()) { - return SharedRealm(); - } - - return thread_iter->second.lock(); -} - -SharedRealm RealmCache::get_any_realm(const std::string &path) -{ - std::lock_guard lock(m_mutex); - - auto path_iter = m_cache.find(path); - if (path_iter == m_cache.end()) { - return SharedRealm(); - } - - auto thread_iter = path_iter->second.begin(); - while (thread_iter != path_iter->second.end()) { - if (auto realm = thread_iter->second.lock()) { - return realm; - } - path_iter->second.erase(thread_iter++); - } - - return SharedRealm(); -} - -void RealmCache::remove(const std::string &path, std::thread::id thread_id) -{ - std::lock_guard lock(m_mutex); - - auto path_iter = m_cache.find(path); - if (path_iter == m_cache.end()) { - return; - } - - auto thread_iter = path_iter->second.find(thread_id); - if (thread_iter != path_iter->second.end()) { - path_iter->second.erase(thread_iter); - } - - if (path_iter->second.size() == 0) { - m_cache.erase(path_iter); - } -} - -void RealmCache::cache_realm(SharedRealm &realm, std::thread::id thread_id) -{ - std::lock_guard lock(m_mutex); - - auto path_iter = m_cache.find(realm->config().path); - if (path_iter == m_cache.end()) { - m_cache.emplace(realm->config().path, std::map{{thread_id, realm}}); - } - else { - path_iter->second.emplace(thread_id, realm); - } -} - -void RealmCache::clear() -{ - std::lock_guard lock(m_mutex); - for (auto const& path : m_cache) { - for (auto const& thread : path.second) { - if (auto realm = thread.second.lock()) { - realm->close(); - } - } - } - - m_cache.clear(); + m_coordinator = nullptr; } diff --git a/src/shared_realm.hpp b/src/shared_realm.hpp index ddc625dd..79f55496 100644 --- a/src/shared_realm.hpp +++ b/src/shared_realm.hpp @@ -22,20 +22,24 @@ #include "object_store.hpp" #include -#include +#include #include #include namespace realm { - class ClientHistory; - class Realm; - class RealmCache; class BindingContext; + class ClientHistory; + class Group; + class Realm; + class RealmDelegate; + class Schema; + class SharedGroup; typedef std::shared_ptr SharedRealm; typedef std::weak_ptr WeakRealm; namespace _impl { class ExternalCommitHelper; + class RealmCoordinator; } class Realm : public std::enable_shared_from_this @@ -53,7 +57,7 @@ namespace realm { std::vector encryption_key; std::unique_ptr schema; - uint64_t schema_version = ObjectStore::NotVersioned; + uint64_t schema_version; MigrationFunction migration_function; @@ -109,9 +113,10 @@ namespace realm { ~Realm(); - private: + void init(std::shared_ptr<_impl::RealmCoordinator> coordinator); Realm(Config config); + private: Config m_config; std::thread::id m_thread_id = std::this_thread::get_id(); bool m_in_transaction = false; @@ -123,28 +128,13 @@ namespace realm { Group *m_group = nullptr; - std::shared_ptr<_impl::ExternalCommitHelper> m_notifier; + std::shared_ptr<_impl::RealmCoordinator> m_coordinator; public: std::unique_ptr m_binding_context; // FIXME private Group *read_group(); - static RealmCache s_global_cache; - }; - - class RealmCache - { - public: - SharedRealm get_realm(const std::string &path, std::thread::id thread_id = std::this_thread::get_id()); - SharedRealm get_any_realm(const std::string &path); - void remove(const std::string &path, std::thread::id thread_id); - void cache_realm(SharedRealm &realm, std::thread::id thread_id = std::this_thread::get_id()); - void clear(); - - private: - std::map> m_cache; - std::mutex m_mutex; }; class RealmFileException : public std::runtime_error {