Merge pull request #42 from realm/tg/async-query
Add support for async queries
This commit is contained in:
commit
d73c7dba88
|
@ -6,6 +6,7 @@ set(SOURCES
|
||||||
results.cpp
|
results.cpp
|
||||||
schema.cpp
|
schema.cpp
|
||||||
shared_realm.cpp
|
shared_realm.cpp
|
||||||
|
impl/async_query.cpp
|
||||||
impl/realm_coordinator.cpp
|
impl/realm_coordinator.cpp
|
||||||
impl/transact_log_handler.cpp
|
impl/transact_log_handler.cpp
|
||||||
parser/parser.cpp
|
parser/parser.cpp
|
||||||
|
@ -19,26 +20,27 @@ set(HEADERS
|
||||||
results.hpp
|
results.hpp
|
||||||
schema.hpp
|
schema.hpp
|
||||||
shared_realm.hpp
|
shared_realm.hpp
|
||||||
impl/cached_realm.hpp
|
impl/weak_realm_notifier.hpp
|
||||||
impl/cached_realm_base.hpp
|
impl/weak_realm_notifier_base.hpp
|
||||||
impl/external_commit_helper.hpp
|
impl/external_commit_helper.hpp
|
||||||
impl/transact_log_handler.hpp
|
impl/transact_log_handler.hpp
|
||||||
parser/parser.hpp
|
parser/parser.hpp
|
||||||
parser/query_builder.hpp)
|
parser/query_builder.hpp
|
||||||
|
util/atomic_shared_ptr.hpp)
|
||||||
|
|
||||||
if(APPLE)
|
if(APPLE)
|
||||||
list(APPEND SOURCES
|
list(APPEND SOURCES
|
||||||
impl/apple/cached_realm.cpp
|
impl/apple/weak_realm_notifier.cpp
|
||||||
impl/apple/external_commit_helper.cpp)
|
impl/apple/external_commit_helper.cpp)
|
||||||
list(APPEND HEADERS
|
list(APPEND HEADERS
|
||||||
impl/apple/cached_realm.hpp
|
impl/apple/weak_realm_notifier.hpp
|
||||||
impl/apple/external_commit_helper.hpp)
|
impl/apple/external_commit_helper.hpp)
|
||||||
find_library(CF_LIBRARY CoreFoundation)
|
find_library(CF_LIBRARY CoreFoundation)
|
||||||
else()
|
else()
|
||||||
list(APPEND SOURCES
|
list(APPEND SOURCES
|
||||||
impl/generic/external_commit_helper.cpp)
|
impl/generic/external_commit_helper.cpp)
|
||||||
list(APPEND HEADERS
|
list(APPEND HEADERS
|
||||||
impl/generic/cached_realm.hpp
|
impl/generic/weak_realm_notifier.hpp
|
||||||
impl/generic/external_commit_helper.hpp)
|
impl/generic/external_commit_helper.hpp)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,10 @@ class BindingContext {
|
||||||
public:
|
public:
|
||||||
virtual ~BindingContext() = default;
|
virtual ~BindingContext() = default;
|
||||||
|
|
||||||
|
// If the user adds a notification handler to the Realm, will it ever
|
||||||
|
// actually be called?
|
||||||
|
virtual bool can_deliver_notifications() const noexcept { return true; }
|
||||||
|
|
||||||
// Called by the Realm when a write transaction is committed to the file by
|
// Called by the Realm when a write transaction is committed to the file by
|
||||||
// a different Realm instance (possibly in a different process)
|
// a different Realm instance (possibly in a different process)
|
||||||
virtual void changes_available() { }
|
virtual void changes_available() { }
|
||||||
|
|
|
@ -16,23 +16,25 @@
|
||||||
//
|
//
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include "impl/cached_realm.hpp"
|
#include "impl/weak_realm_notifier.hpp"
|
||||||
|
|
||||||
#include "shared_realm.hpp"
|
#include "shared_realm.hpp"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
using namespace realm;
|
using namespace realm;
|
||||||
using namespace realm::_impl;
|
using namespace realm::_impl;
|
||||||
|
|
||||||
CachedRealm::CachedRealm(const std::shared_ptr<Realm>& realm, bool cache)
|
WeakRealmNotifier::WeakRealmNotifier(const std::shared_ptr<Realm>& realm, bool cache)
|
||||||
: CachedRealmBase(realm, cache)
|
: WeakRealmNotifierBase(realm, cache)
|
||||||
{
|
{
|
||||||
struct RefCountedWeakPointer {
|
struct RefCountedWeakPointer {
|
||||||
std::weak_ptr<Realm> realm;
|
std::weak_ptr<Realm> realm;
|
||||||
std::atomic<size_t> ref_count = {1};
|
std::atomic<size_t> ref_count;
|
||||||
};
|
};
|
||||||
|
|
||||||
CFRunLoopSourceContext ctx{};
|
CFRunLoopSourceContext ctx{};
|
||||||
ctx.info = new RefCountedWeakPointer{realm};
|
ctx.info = new RefCountedWeakPointer{realm, {1}};
|
||||||
ctx.perform = [](void* info) {
|
ctx.perform = [](void* info) {
|
||||||
if (auto realm = static_cast<RefCountedWeakPointer*>(info)->realm.lock()) {
|
if (auto realm = static_cast<RefCountedWeakPointer*>(info)->realm.lock()) {
|
||||||
realm->notify();
|
realm->notify();
|
||||||
|
@ -55,8 +57,8 @@ CachedRealm::CachedRealm(const std::shared_ptr<Realm>& realm, bool cache)
|
||||||
CFRunLoopAddSource(m_runloop, m_signal, kCFRunLoopDefaultMode);
|
CFRunLoopAddSource(m_runloop, m_signal, kCFRunLoopDefaultMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
CachedRealm::CachedRealm(CachedRealm&& rgt)
|
WeakRealmNotifier::WeakRealmNotifier(WeakRealmNotifier&& rgt)
|
||||||
: CachedRealmBase(std::move(rgt))
|
: WeakRealmNotifierBase(std::move(rgt))
|
||||||
, m_runloop(rgt.m_runloop)
|
, m_runloop(rgt.m_runloop)
|
||||||
, m_signal(rgt.m_signal)
|
, m_signal(rgt.m_signal)
|
||||||
{
|
{
|
||||||
|
@ -64,9 +66,9 @@ CachedRealm::CachedRealm(CachedRealm&& rgt)
|
||||||
rgt.m_signal = nullptr;
|
rgt.m_signal = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
CachedRealm& CachedRealm::operator=(CachedRealm&& rgt)
|
WeakRealmNotifier& WeakRealmNotifier::operator=(WeakRealmNotifier&& rgt)
|
||||||
{
|
{
|
||||||
CachedRealmBase::operator=(std::move(rgt));
|
WeakRealmNotifierBase::operator=(std::move(rgt));
|
||||||
m_runloop = rgt.m_runloop;
|
m_runloop = rgt.m_runloop;
|
||||||
m_signal = rgt.m_signal;
|
m_signal = rgt.m_signal;
|
||||||
rgt.m_runloop = nullptr;
|
rgt.m_runloop = nullptr;
|
||||||
|
@ -75,7 +77,7 @@ CachedRealm& CachedRealm::operator=(CachedRealm&& rgt)
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
CachedRealm::~CachedRealm()
|
WeakRealmNotifier::~WeakRealmNotifier()
|
||||||
{
|
{
|
||||||
if (m_signal) {
|
if (m_signal) {
|
||||||
CFRunLoopSourceInvalidate(m_signal);
|
CFRunLoopSourceInvalidate(m_signal);
|
||||||
|
@ -84,7 +86,7 @@ CachedRealm::~CachedRealm()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CachedRealm::notify()
|
void WeakRealmNotifier::notify()
|
||||||
{
|
{
|
||||||
CFRunLoopSourceSignal(m_signal);
|
CFRunLoopSourceSignal(m_signal);
|
||||||
// Signalling the source makes it run the next time the runloop gets
|
// Signalling the source makes it run the next time the runloop gets
|
|
@ -16,7 +16,7 @@
|
||||||
//
|
//
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include "impl/cached_realm_base.hpp"
|
#include "impl/weak_realm_notifier_base.hpp"
|
||||||
|
|
||||||
#include <CoreFoundation/CFRunLoop.h>
|
#include <CoreFoundation/CFRunLoop.h>
|
||||||
|
|
||||||
|
@ -25,18 +25,18 @@ class Realm;
|
||||||
|
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
|
|
||||||
class CachedRealm : public CachedRealmBase {
|
class WeakRealmNotifier : public WeakRealmNotifierBase {
|
||||||
public:
|
public:
|
||||||
CachedRealm(const std::shared_ptr<Realm>& realm, bool cache);
|
WeakRealmNotifier(const std::shared_ptr<Realm>& realm, bool cache);
|
||||||
~CachedRealm();
|
~WeakRealmNotifier();
|
||||||
|
|
||||||
CachedRealm(CachedRealm&&);
|
WeakRealmNotifier(WeakRealmNotifier&&);
|
||||||
CachedRealm& operator=(CachedRealm&&);
|
WeakRealmNotifier& operator=(WeakRealmNotifier&&);
|
||||||
|
|
||||||
CachedRealm(const CachedRealm&) = delete;
|
WeakRealmNotifier(const WeakRealmNotifier&) = delete;
|
||||||
CachedRealm& operator=(const CachedRealm&) = delete;
|
WeakRealmNotifier& operator=(const WeakRealmNotifier&) = delete;
|
||||||
|
|
||||||
// Asyncronously call notify() on the Realm on the appropriate thread
|
// Asynchronously call notify() on the Realm on the appropriate thread
|
||||||
void notify();
|
void notify();
|
||||||
|
|
||||||
private:
|
private:
|
|
@ -0,0 +1,290 @@
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2015 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include "impl/async_query.hpp"
|
||||||
|
|
||||||
|
#include "impl/realm_coordinator.hpp"
|
||||||
|
#include "results.hpp"
|
||||||
|
|
||||||
|
using namespace realm;
|
||||||
|
using namespace realm::_impl;
|
||||||
|
|
||||||
|
AsyncQuery::AsyncQuery(Results& target)
|
||||||
|
: m_target_results(&target)
|
||||||
|
, m_realm(target.get_realm())
|
||||||
|
, m_sort(target.get_sort())
|
||||||
|
, m_sg_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction())
|
||||||
|
{
|
||||||
|
Query q = target.get_query();
|
||||||
|
m_query_handover = Realm::Internal::get_shared_group(*m_realm).export_for_handover(q, MutableSourcePayload::Move);
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQuery::~AsyncQuery()
|
||||||
|
{
|
||||||
|
// unregister() may have been called from a different thread than we're being
|
||||||
|
// destroyed on, so we need to synchronize access to the interesting fields
|
||||||
|
// modified there
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
m_realm = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t AsyncQuery::add_callback(std::function<void (std::exception_ptr)> callback)
|
||||||
|
{
|
||||||
|
m_realm->verify_thread();
|
||||||
|
|
||||||
|
auto next_token = [=] {
|
||||||
|
size_t token = 0;
|
||||||
|
for (auto& callback : m_callbacks) {
|
||||||
|
if (token <= callback.token) {
|
||||||
|
token = callback.token + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return token;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||||
|
auto token = next_token();
|
||||||
|
m_callbacks.push_back({std::move(callback), token, -1ULL});
|
||||||
|
if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
|
||||||
|
Realm::Internal::get_coordinator(*m_realm).send_commit_notifications();
|
||||||
|
m_have_callbacks = true;
|
||||||
|
}
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::remove_callback(size_t token)
|
||||||
|
{
|
||||||
|
Callback old;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||||
|
REALM_ASSERT(m_error || m_callbacks.size() > 0);
|
||||||
|
|
||||||
|
auto it = find_if(begin(m_callbacks), end(m_callbacks),
|
||||||
|
[=](const auto& c) { return c.token == token; });
|
||||||
|
// We should only fail to find the callback if it was removed due to an error
|
||||||
|
REALM_ASSERT(m_error || it != end(m_callbacks));
|
||||||
|
if (it == end(m_callbacks)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t idx = distance(begin(m_callbacks), it);
|
||||||
|
if (m_callback_index != npos && m_callback_index >= idx) {
|
||||||
|
--m_callback_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
old = std::move(*it);
|
||||||
|
m_callbacks.erase(it);
|
||||||
|
|
||||||
|
m_have_callbacks = !m_callbacks.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::unregister() noexcept
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
m_target_results = nullptr;
|
||||||
|
m_realm = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::release_query() noexcept
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
REALM_ASSERT(!m_realm && !m_target_results);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_query = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncQuery::is_alive() const noexcept
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
return m_target_results != nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Most of the inter-thread synchronization for run(), prepare_handover(),
|
||||||
|
// attach_to(), detach(), release_query() and deliver() is done by
|
||||||
|
// RealmCoordinator external to this code, which has some potentially
|
||||||
|
// non-obvious results on which members are and are not safe to use without
|
||||||
|
// holding a lock.
|
||||||
|
//
|
||||||
|
// attach_to(), detach(), run(), prepare_handover(), and release_query() are
|
||||||
|
// all only ever called on a single thread. call_callbacks() and deliver() are
|
||||||
|
// called on the same thread. Calls to prepare_handover() and deliver() are
|
||||||
|
// guarded by a lock.
|
||||||
|
//
|
||||||
|
// In total, this means that the safe data flow is as follows:
|
||||||
|
// - prepare_handover(), attach_to(), detach() and release_query() can read
|
||||||
|
// members written by each other
|
||||||
|
// - deliver() can read members written to in prepare_handover(), deliver(),
|
||||||
|
// and call_callbacks()
|
||||||
|
// - call_callbacks() and read members written to in deliver()
|
||||||
|
//
|
||||||
|
// Separately from this data flow for the query results, all uses of
|
||||||
|
// m_target_results, m_callbacks, and m_callback_index must be done with the
|
||||||
|
// appropriate mutex held to avoid race conditions when the Results object is
|
||||||
|
// destroyed while the background work is running, and to allow removing
|
||||||
|
// callbacks from any thread.
|
||||||
|
|
||||||
|
void AsyncQuery::run()
|
||||||
|
{
|
||||||
|
REALM_ASSERT(m_sg);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
||||||
|
// Don't run the query if the results aren't actually going to be used
|
||||||
|
if (!m_target_results || (!m_have_callbacks && !m_target_results->wants_background_updates())) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
REALM_ASSERT(!m_tv.is_attached());
|
||||||
|
|
||||||
|
// If we've run previously, check if we need to rerun
|
||||||
|
if (m_initial_run_complete) {
|
||||||
|
// Make an empty tableview from the query to get the table version, since
|
||||||
|
// Query doesn't expose it
|
||||||
|
if (m_query->find_all(0, 0, 0).outside_version() == m_handed_over_table_version) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m_tv = m_query->find_all();
|
||||||
|
if (m_sort) {
|
||||||
|
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::prepare_handover()
|
||||||
|
{
|
||||||
|
m_sg_version = m_sg->get_version_of_current_transaction();
|
||||||
|
|
||||||
|
if (!m_tv.is_attached()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
REALM_ASSERT(m_tv.is_in_sync());
|
||||||
|
|
||||||
|
m_initial_run_complete = true;
|
||||||
|
m_handed_over_table_version = m_tv.outside_version();
|
||||||
|
m_tv_handover = m_sg->export_for_handover(m_tv, MutableSourcePayload::Move);
|
||||||
|
|
||||||
|
// detach the TableView as we won't need it again and keeping it around
|
||||||
|
// makes advance_read() much more expensive
|
||||||
|
m_tv = TableView();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
|
||||||
|
{
|
||||||
|
if (!is_for_current_thread()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
||||||
|
|
||||||
|
// Target results being null here indicates that it was destroyed while we
|
||||||
|
// were in the process of advancing the Realm version and preparing for
|
||||||
|
// delivery, i.e. it was destroyed from the "wrong" thread
|
||||||
|
if (!m_target_results) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can get called before the query has actually had the chance to run if
|
||||||
|
// we're added immediately before a different set of async results are
|
||||||
|
// delivered
|
||||||
|
if (!m_initial_run_complete && !err) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err) {
|
||||||
|
m_error = err;
|
||||||
|
return m_have_callbacks;
|
||||||
|
}
|
||||||
|
|
||||||
|
REALM_ASSERT(!m_query_handover);
|
||||||
|
|
||||||
|
auto realm_sg_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction();
|
||||||
|
if (m_sg_version != realm_sg_version) {
|
||||||
|
// Realm version can be newer if a commit was made on our thread or the
|
||||||
|
// user manually called refresh(), or older if a commit was made on a
|
||||||
|
// different thread and we ran *really* fast in between the check for
|
||||||
|
// if the shared group has changed and when we pick up async results
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_tv_handover) {
|
||||||
|
m_tv_handover->version = m_sg_version;
|
||||||
|
Results::Internal::set_table_view(*m_target_results,
|
||||||
|
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
|
||||||
|
m_delievered_table_version = m_handed_over_table_version;
|
||||||
|
|
||||||
|
}
|
||||||
|
REALM_ASSERT(!m_tv_handover);
|
||||||
|
return m_have_callbacks;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::call_callbacks()
|
||||||
|
{
|
||||||
|
REALM_ASSERT(is_for_current_thread());
|
||||||
|
|
||||||
|
while (auto fn = next_callback()) {
|
||||||
|
fn(m_error);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_error) {
|
||||||
|
// Remove all the callbacks as we never need to call anything ever again
|
||||||
|
// after delivering an error
|
||||||
|
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
||||||
|
m_callbacks.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::function<void (std::exception_ptr)> AsyncQuery::next_callback()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
||||||
|
for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) {
|
||||||
|
auto& callback = m_callbacks[m_callback_index];
|
||||||
|
if (m_error || callback.delivered_version != m_delievered_table_version) {
|
||||||
|
callback.delivered_version = m_delievered_table_version;
|
||||||
|
return callback.fn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m_callback_index = npos;
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::attach_to(realm::SharedGroup& sg)
|
||||||
|
{
|
||||||
|
REALM_ASSERT(!m_sg);
|
||||||
|
REALM_ASSERT(m_query_handover);
|
||||||
|
|
||||||
|
m_query = sg.import_from_handover(std::move(m_query_handover));
|
||||||
|
m_sg = &sg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::detatch()
|
||||||
|
{
|
||||||
|
REALM_ASSERT(m_sg);
|
||||||
|
REALM_ASSERT(m_query);
|
||||||
|
REALM_ASSERT(!m_tv.is_attached());
|
||||||
|
|
||||||
|
m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move);
|
||||||
|
m_sg = nullptr;
|
||||||
|
m_query = nullptr;
|
||||||
|
}
|
|
@ -0,0 +1,122 @@
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2015 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#ifndef REALM_ASYNC_QUERY_HPP
|
||||||
|
#define REALM_ASYNC_QUERY_HPP
|
||||||
|
|
||||||
|
#include "results.hpp"
|
||||||
|
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
|
||||||
|
#include <exception>
|
||||||
|
#include <mutex>
|
||||||
|
#include <functional>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace realm {
|
||||||
|
namespace _impl {
|
||||||
|
class AsyncQuery {
|
||||||
|
public:
|
||||||
|
AsyncQuery(Results& target);
|
||||||
|
~AsyncQuery();
|
||||||
|
|
||||||
|
size_t add_callback(std::function<void (std::exception_ptr)>);
|
||||||
|
void remove_callback(size_t token);
|
||||||
|
|
||||||
|
void unregister() noexcept;
|
||||||
|
void release_query() noexcept;
|
||||||
|
|
||||||
|
// Run/rerun the query if needed
|
||||||
|
void run();
|
||||||
|
// Prepare the handover object if run() did update the TableView
|
||||||
|
void prepare_handover();
|
||||||
|
// Update the target results from the handover
|
||||||
|
// Returns if any callbacks need to be invoked
|
||||||
|
bool deliver(SharedGroup& sg, std::exception_ptr err);
|
||||||
|
void call_callbacks();
|
||||||
|
|
||||||
|
// Attach the handed-over query to `sg`
|
||||||
|
void attach_to(SharedGroup& sg);
|
||||||
|
// Create a new query handover object and stop using the previously attached
|
||||||
|
// SharedGroup
|
||||||
|
void detatch();
|
||||||
|
|
||||||
|
Realm& get_realm() { return *m_target_results->get_realm(); }
|
||||||
|
// Get the version of the current handover object
|
||||||
|
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
|
||||||
|
|
||||||
|
bool is_alive() const noexcept;
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Target Results to update and a mutex which guards it
|
||||||
|
mutable std::mutex m_target_mutex;
|
||||||
|
Results* m_target_results;
|
||||||
|
|
||||||
|
std::shared_ptr<Realm> m_realm;
|
||||||
|
const SortOrder m_sort;
|
||||||
|
const std::thread::id m_thread_id = std::this_thread::get_id();
|
||||||
|
|
||||||
|
// The source Query, in handover form iff m_sg is null
|
||||||
|
std::unique_ptr<SharedGroup::Handover<Query>> m_query_handover;
|
||||||
|
std::unique_ptr<Query> m_query;
|
||||||
|
|
||||||
|
// The TableView resulting from running the query. Will be detached unless
|
||||||
|
// the query was (re)run since the last time the handover object was created
|
||||||
|
TableView m_tv;
|
||||||
|
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
|
||||||
|
SharedGroup::VersionID m_sg_version;
|
||||||
|
std::exception_ptr m_error;
|
||||||
|
|
||||||
|
struct Callback {
|
||||||
|
std::function<void (std::exception_ptr)> fn;
|
||||||
|
size_t token;
|
||||||
|
uint_fast64_t delivered_version;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Currently registered callbacks and a mutex which must always be held
|
||||||
|
// while doing anything with them or m_callback_index
|
||||||
|
std::mutex m_callback_mutex;
|
||||||
|
std::vector<Callback> m_callbacks;
|
||||||
|
|
||||||
|
SharedGroup* m_sg = nullptr;
|
||||||
|
|
||||||
|
uint_fast64_t m_handed_over_table_version = -1;
|
||||||
|
uint_fast64_t m_delievered_table_version = -1;
|
||||||
|
|
||||||
|
// Iteration variable for looping over callbacks
|
||||||
|
// remove_callback() updates this when needed
|
||||||
|
size_t m_callback_index = npos;
|
||||||
|
|
||||||
|
bool m_initial_run_complete = false;
|
||||||
|
|
||||||
|
// Cached value for if m_callbacks is empty, needed to avoid deadlocks in
|
||||||
|
// run() due to lock-order inversion between m_callback_mutex and m_target_mutex
|
||||||
|
// It's okay if this value is stale as at worst it'll result in us doing
|
||||||
|
// some extra work.
|
||||||
|
std::atomic<bool> m_have_callbacks = {false};
|
||||||
|
|
||||||
|
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
|
||||||
|
|
||||||
|
std::function<void (std::exception_ptr)> next_callback();
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace _impl
|
||||||
|
} // namespace realm
|
||||||
|
|
||||||
|
#endif /* REALM_ASYNC_QUERY_HPP */
|
|
@ -16,16 +16,16 @@
|
||||||
//
|
//
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#include "impl/cached_realm_base.hpp"
|
#include "impl/weak_realm_notifier_base.hpp"
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
class Realm;
|
class Realm;
|
||||||
|
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
|
|
||||||
class CachedRealm : public CachedRealmBase {
|
class WeakRealmNotifier : public WeakRealmNotifierBase {
|
||||||
public:
|
public:
|
||||||
using CachedRealmBase::CachedRealmBase;
|
using WeakRealmNotifierBase::WeakRealmNotifierBase;
|
||||||
|
|
||||||
// Do nothing, as this can't be implemented portably
|
// Do nothing, as this can't be implemented portably
|
||||||
void notify() { }
|
void notify() { }
|
|
@ -18,11 +18,20 @@
|
||||||
|
|
||||||
#include "impl/realm_coordinator.hpp"
|
#include "impl/realm_coordinator.hpp"
|
||||||
|
|
||||||
#include "impl/cached_realm.hpp"
|
#include "impl/async_query.hpp"
|
||||||
|
#include "impl/weak_realm_notifier.hpp"
|
||||||
#include "impl/external_commit_helper.hpp"
|
#include "impl/external_commit_helper.hpp"
|
||||||
|
#include "impl/transact_log_handler.hpp"
|
||||||
#include "object_store.hpp"
|
#include "object_store.hpp"
|
||||||
#include "schema.hpp"
|
#include "schema.hpp"
|
||||||
|
|
||||||
|
#include <realm/commit_log.hpp>
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
#include <realm/lang_bind_helper.hpp>
|
||||||
|
#include <realm/query.hpp>
|
||||||
|
#include <realm/table_view.hpp>
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
using namespace realm;
|
using namespace realm;
|
||||||
|
@ -55,11 +64,16 @@ std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(Str
|
||||||
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
|
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
||||||
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) {
|
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_weak_realm_notifiers.empty())) {
|
||||||
m_config = config;
|
m_config = config;
|
||||||
if (!config.read_only && !m_notifier) {
|
if (!config.read_only && !m_notifier && config.automatic_change_notifications) {
|
||||||
|
try {
|
||||||
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
|
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
|
||||||
}
|
}
|
||||||
|
catch (std::system_error const& ex) {
|
||||||
|
throw RealmFileException(RealmFileException::Kind::AccessError, config.path, ex.code().message());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
if (m_config.read_only != config.read_only) {
|
if (m_config.read_only != config.read_only) {
|
||||||
|
@ -86,7 +100,7 @@ std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.cache) {
|
if (config.cache) {
|
||||||
for (auto& cachedRealm : m_cached_realms) {
|
for (auto& cachedRealm : m_weak_realm_notifiers) {
|
||||||
if (cachedRealm.is_cached_for_current_thread()) {
|
if (cachedRealm.is_cached_for_current_thread()) {
|
||||||
// can be null if we jumped in between ref count hitting zero and
|
// can be null if we jumped in between ref count hitting zero and
|
||||||
// unregister_realm() getting the lock
|
// unregister_realm() getting the lock
|
||||||
|
@ -99,13 +113,18 @@ std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
|
||||||
|
|
||||||
auto realm = std::make_shared<Realm>(std::move(config));
|
auto realm = std::make_shared<Realm>(std::move(config));
|
||||||
realm->init(shared_from_this());
|
realm->init(shared_from_this());
|
||||||
m_cached_realms.emplace_back(realm, m_config.cache);
|
m_weak_realm_notifiers.emplace_back(realm, m_config.cache);
|
||||||
return realm;
|
return realm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<Realm> RealmCoordinator::get_realm()
|
||||||
|
{
|
||||||
|
return get_realm(m_config);
|
||||||
|
}
|
||||||
|
|
||||||
const Schema* RealmCoordinator::get_schema() const noexcept
|
const Schema* RealmCoordinator::get_schema() const noexcept
|
||||||
{
|
{
|
||||||
return m_cached_realms.empty() ? nullptr : m_config.schema.get();
|
return m_weak_realm_notifiers.empty() ? nullptr : m_config.schema.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RealmCoordinator::update_schema(Schema const& schema)
|
void RealmCoordinator::update_schema(Schema const& schema)
|
||||||
|
@ -133,16 +152,16 @@ RealmCoordinator::~RealmCoordinator()
|
||||||
void RealmCoordinator::unregister_realm(Realm* realm)
|
void RealmCoordinator::unregister_realm(Realm* realm)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
||||||
for (size_t i = 0; i < m_cached_realms.size(); ++i) {
|
for (size_t i = 0; i < m_weak_realm_notifiers.size(); ++i) {
|
||||||
auto& cached_realm = m_cached_realms[i];
|
auto& weak_realm_notifier = m_weak_realm_notifiers[i];
|
||||||
if (!cached_realm.expired() && !cached_realm.is_for_realm(realm)) {
|
if (!weak_realm_notifier.expired() && !weak_realm_notifier.is_for_realm(realm)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i + 1 < m_cached_realms.size()) {
|
if (i + 1 < m_weak_realm_notifiers.size()) {
|
||||||
cached_realm = std::move(m_cached_realms.back());
|
weak_realm_notifier = std::move(m_weak_realm_notifiers.back());
|
||||||
}
|
}
|
||||||
m_cached_realms.pop_back();
|
m_weak_realm_notifiers.pop_back();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,8 +180,8 @@ void RealmCoordinator::clear_cache()
|
||||||
coordinator->m_notifier = nullptr;
|
coordinator->m_notifier = nullptr;
|
||||||
|
|
||||||
// Gather a list of all of the realms which will be removed
|
// Gather a list of all of the realms which will be removed
|
||||||
for (auto& cached_realm : coordinator->m_cached_realms) {
|
for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
|
||||||
if (auto realm = cached_realm.realm()) {
|
if (auto realm = weak_realm_notifier.realm()) {
|
||||||
realms_to_close.push_back(realm);
|
realms_to_close.push_back(realm);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -183,13 +202,281 @@ void RealmCoordinator::clear_cache()
|
||||||
void RealmCoordinator::send_commit_notifications()
|
void RealmCoordinator::send_commit_notifications()
|
||||||
{
|
{
|
||||||
REALM_ASSERT(!m_config.read_only);
|
REALM_ASSERT(!m_config.read_only);
|
||||||
|
if (m_notifier) {
|
||||||
m_notifier->notify_others();
|
m_notifier->notify_others();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index)
|
||||||
|
{
|
||||||
|
if (m_async_error) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedGroup::VersionID versionid(version, index);
|
||||||
|
if (!m_advancer_sg) {
|
||||||
|
try {
|
||||||
|
std::unique_ptr<Group> read_only_group;
|
||||||
|
Realm::open_with_config(m_config, m_advancer_history, m_advancer_sg, read_only_group);
|
||||||
|
REALM_ASSERT(!read_only_group);
|
||||||
|
m_advancer_sg->begin_read(versionid);
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
m_async_error = std::current_exception();
|
||||||
|
m_advancer_sg = nullptr;
|
||||||
|
m_advancer_history = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (m_new_queries.empty()) {
|
||||||
|
// If this is the first query then we don't already have a read transaction
|
||||||
|
m_advancer_sg->begin_read(versionid);
|
||||||
|
}
|
||||||
|
else if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
|
||||||
|
// Ensure we're holding a readlock on the oldest version we have a
|
||||||
|
// handover object for, as handover objects don't
|
||||||
|
m_advancer_sg->end_read();
|
||||||
|
m_advancer_sg->begin_read(versionid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::register_query(std::shared_ptr<AsyncQuery> query)
|
||||||
|
{
|
||||||
|
auto version = query->version();
|
||||||
|
auto& self = Realm::Internal::get_coordinator(query->get_realm());
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(self.m_query_mutex);
|
||||||
|
self.pin_version(version.version, version.index);
|
||||||
|
self.m_new_queries.push_back(std::move(query));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::clean_up_dead_queries()
|
||||||
|
{
|
||||||
|
auto swap_remove = [&](auto& container) {
|
||||||
|
bool did_remove = false;
|
||||||
|
for (size_t i = 0; i < container.size(); ++i) {
|
||||||
|
if (container[i]->is_alive())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// Ensure the query is destroyed here even if there's lingering refs
|
||||||
|
// to the async query elsewhere
|
||||||
|
container[i]->release_query();
|
||||||
|
|
||||||
|
if (container.size() > i + 1)
|
||||||
|
container[i] = std::move(container.back());
|
||||||
|
container.pop_back();
|
||||||
|
--i;
|
||||||
|
did_remove = true;
|
||||||
|
}
|
||||||
|
return did_remove;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (swap_remove(m_queries)) {
|
||||||
|
// Make sure we aren't holding on to read versions needlessly if there
|
||||||
|
// are no queries left, but don't close them entirely as opening shared
|
||||||
|
// groups is expensive
|
||||||
|
if (m_queries.empty() && m_query_sg) {
|
||||||
|
m_query_sg->end_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (swap_remove(m_new_queries)) {
|
||||||
|
if (m_new_queries.empty() && m_advancer_sg) {
|
||||||
|
m_advancer_sg->end_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RealmCoordinator::on_change()
|
void RealmCoordinator::on_change()
|
||||||
{
|
{
|
||||||
|
run_async_queries();
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
||||||
for (auto& realm : m_cached_realms) {
|
for (auto& realm : m_weak_realm_notifiers) {
|
||||||
realm.notify();
|
realm.notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::run_async_queries()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(m_query_mutex);
|
||||||
|
|
||||||
|
clean_up_dead_queries();
|
||||||
|
|
||||||
|
if (m_queries.empty() && m_new_queries.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!m_async_error) {
|
||||||
|
open_helper_shared_group();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_async_error) {
|
||||||
|
move_new_queries_to_main();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
advance_helper_shared_group_to_latest();
|
||||||
|
|
||||||
|
// Make a copy of the queries vector so that we can release the lock while
|
||||||
|
// we run the queries
|
||||||
|
auto queries_to_run = m_queries;
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
for (auto& query : queries_to_run) {
|
||||||
|
query->run();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reacquire the lock while updating the fields that are actually read on
|
||||||
|
// other threads
|
||||||
|
{
|
||||||
|
lock.lock();
|
||||||
|
for (auto& query : queries_to_run) {
|
||||||
|
query->prepare_handover();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
clean_up_dead_queries();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::open_helper_shared_group()
|
||||||
|
{
|
||||||
|
if (!m_query_sg) {
|
||||||
|
try {
|
||||||
|
std::unique_ptr<Group> read_only_group;
|
||||||
|
Realm::open_with_config(m_config, m_query_history, m_query_sg, read_only_group);
|
||||||
|
REALM_ASSERT(!read_only_group);
|
||||||
|
m_query_sg->begin_read();
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
// Store the error to be passed to the async queries
|
||||||
|
m_async_error = std::current_exception();
|
||||||
|
m_query_sg = nullptr;
|
||||||
|
m_query_history = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (m_queries.empty()) {
|
||||||
|
m_query_sg->begin_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::move_new_queries_to_main()
|
||||||
|
{
|
||||||
|
m_queries.reserve(m_queries.size() + m_new_queries.size());
|
||||||
|
std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries));
|
||||||
|
m_new_queries.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::advance_helper_shared_group_to_latest()
|
||||||
|
{
|
||||||
|
if (m_new_queries.empty()) {
|
||||||
|
LangBindHelper::advance_read(*m_query_sg, *m_query_history);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort newly added queries by their source version so that we can pull them
|
||||||
|
// all forward to the latest version in a single pass over the transaction log
|
||||||
|
std::sort(m_new_queries.begin(), m_new_queries.end(), [](auto const& lft, auto const& rgt) {
|
||||||
|
return lft->version() < rgt->version();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Import all newly added queries to our helper SG
|
||||||
|
for (auto& query : m_new_queries) {
|
||||||
|
LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history, query->version());
|
||||||
|
query->attach_to(*m_advancer_sg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance both SGs to the newest version
|
||||||
|
LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history);
|
||||||
|
LangBindHelper::advance_read(*m_query_sg, *m_query_history,
|
||||||
|
m_advancer_sg->get_version_of_current_transaction());
|
||||||
|
|
||||||
|
// Transfer all new queries over to the main SG
|
||||||
|
for (auto& query : m_new_queries) {
|
||||||
|
query->detatch();
|
||||||
|
query->attach_to(*m_query_sg);
|
||||||
|
}
|
||||||
|
|
||||||
|
move_new_queries_to_main();
|
||||||
|
m_advancer_sg->end_read();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::advance_to_ready(Realm& realm)
|
||||||
|
{
|
||||||
|
decltype(m_queries) queries;
|
||||||
|
|
||||||
|
auto& sg = Realm::Internal::get_shared_group(realm);
|
||||||
|
auto& history = Realm::Internal::get_history(realm);
|
||||||
|
|
||||||
|
auto get_query_version = [&] {
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
auto version = query->version();
|
||||||
|
if (version != SharedGroup::VersionID{}) {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return SharedGroup::VersionID{};
|
||||||
|
};
|
||||||
|
|
||||||
|
SharedGroup::VersionID version;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
version = get_query_version();
|
||||||
|
}
|
||||||
|
|
||||||
|
// no async queries; just advance to latest
|
||||||
|
if (version.version == 0) {
|
||||||
|
transaction::advance(sg, history, realm.m_binding_context.get());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// async results are out of date; ignore
|
||||||
|
if (version < sg.get_version_of_current_transaction()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// Advance to the ready version without holding any locks because it
|
||||||
|
// may end up calling user code (in did_change() notifications)
|
||||||
|
transaction::advance(sg, history, realm.m_binding_context.get(), version);
|
||||||
|
|
||||||
|
// Reacquire the lock and recheck the query version, as the queries may
|
||||||
|
// have advanced to a later version while we didn't hold the lock. If
|
||||||
|
// so, we need to release the lock and re-advance
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
version = get_query_version();
|
||||||
|
if (version.version == 0)
|
||||||
|
return;
|
||||||
|
if (version != sg.get_version_of_current_transaction())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// Query version now matches the SG version, so we can deliver them
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
if (query->deliver(sg, m_async_error)) {
|
||||||
|
queries.push_back(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& query : queries) {
|
||||||
|
query->call_callbacks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::process_available_async(Realm& realm)
|
||||||
|
{
|
||||||
|
auto& sg = Realm::Internal::get_shared_group(realm);
|
||||||
|
decltype(m_queries) queries;
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
if (query->deliver(sg, m_async_error)) {
|
||||||
|
queries.push_back(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& query : queries) {
|
||||||
|
query->call_callbacks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -24,10 +24,16 @@
|
||||||
#include <realm/string_data.hpp>
|
#include <realm/string_data.hpp>
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
|
class AsyncQueryCallback;
|
||||||
|
class ClientHistory;
|
||||||
|
class Results;
|
||||||
class Schema;
|
class Schema;
|
||||||
|
class SharedGroup;
|
||||||
|
struct AsyncQueryCancelationToken;
|
||||||
|
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
class CachedRealm;
|
class AsyncQuery;
|
||||||
|
class WeakRealmNotifier;
|
||||||
class ExternalCommitHelper;
|
class ExternalCommitHelper;
|
||||||
|
|
||||||
// RealmCoordinator manages the weak cache of Realm instances and communication
|
// RealmCoordinator manages the weak cache of Realm instances and communication
|
||||||
|
@ -43,6 +49,7 @@ public:
|
||||||
// If the Realm is already open on another thread, validates that the given
|
// If the Realm is already open on another thread, validates that the given
|
||||||
// configuration is compatible with the existing one
|
// configuration is compatible with the existing one
|
||||||
std::shared_ptr<Realm> get_realm(Realm::Config config);
|
std::shared_ptr<Realm> get_realm(Realm::Config config);
|
||||||
|
std::shared_ptr<Realm> get_realm();
|
||||||
|
|
||||||
const Schema* get_schema() const noexcept;
|
const Schema* get_schema() const noexcept;
|
||||||
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
|
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
|
||||||
|
@ -50,7 +57,7 @@ public:
|
||||||
const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
|
const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
|
||||||
bool is_in_memory() const noexcept { return m_config.in_memory; }
|
bool is_in_memory() const noexcept { return m_config.in_memory; }
|
||||||
|
|
||||||
// Asyncronously call notify() on every Realm instance for this coordinator's
|
// Asynchronously call notify() on every Realm instance for this coordinator's
|
||||||
// path, including those in other processes
|
// path, including those in other processes
|
||||||
void send_commit_notifications();
|
void send_commit_notifications();
|
||||||
|
|
||||||
|
@ -73,13 +80,45 @@ public:
|
||||||
// Update the schema in the cached config
|
// Update the schema in the cached config
|
||||||
void update_schema(Schema const& new_schema);
|
void update_schema(Schema const& new_schema);
|
||||||
|
|
||||||
|
static void register_query(std::shared_ptr<AsyncQuery> query);
|
||||||
|
|
||||||
|
// Advance the Realm to the most recent transaction version which all async
|
||||||
|
// work is complete for
|
||||||
|
void advance_to_ready(Realm& realm);
|
||||||
|
void process_available_async(Realm& realm);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Realm::Config m_config;
|
Realm::Config m_config;
|
||||||
|
|
||||||
std::mutex m_realm_mutex;
|
std::mutex m_realm_mutex;
|
||||||
std::vector<CachedRealm> m_cached_realms;
|
std::vector<WeakRealmNotifier> m_weak_realm_notifiers;
|
||||||
|
|
||||||
|
std::mutex m_query_mutex;
|
||||||
|
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
|
||||||
|
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
|
||||||
|
|
||||||
|
// SharedGroup used for actually running async queries
|
||||||
|
// Will have a read transaction iff m_queries is non-empty
|
||||||
|
std::unique_ptr<ClientHistory> m_query_history;
|
||||||
|
std::unique_ptr<SharedGroup> m_query_sg;
|
||||||
|
|
||||||
|
// SharedGroup used to advance queries in m_new_queries to the main shared
|
||||||
|
// group's transaction version
|
||||||
|
// Will have a read transaction iff m_new_queries is non-empty
|
||||||
|
std::unique_ptr<ClientHistory> m_advancer_history;
|
||||||
|
std::unique_ptr<SharedGroup> m_advancer_sg;
|
||||||
|
std::exception_ptr m_async_error;
|
||||||
|
|
||||||
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
|
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
|
||||||
|
|
||||||
|
// must be called with m_query_mutex locked
|
||||||
|
void pin_version(uint_fast64_t version, uint_fast32_t index);
|
||||||
|
|
||||||
|
void run_async_queries();
|
||||||
|
void open_helper_shared_group();
|
||||||
|
void move_new_queries_to_main();
|
||||||
|
void advance_helper_shared_group_to_latest();
|
||||||
|
void clean_up_dead_queries();
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace _impl
|
} // namespace _impl
|
||||||
|
|
|
@ -432,7 +432,8 @@ public:
|
||||||
namespace realm {
|
namespace realm {
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
namespace transaction {
|
namespace transaction {
|
||||||
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context)
|
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context,
|
||||||
|
SharedGroup::VersionID version)
|
||||||
{
|
{
|
||||||
TransactLogObserver(context, sg, [&](auto&&... args) {
|
TransactLogObserver(context, sg, [&](auto&&... args) {
|
||||||
LangBindHelper::advance_read(sg, history, std::move(args)...);
|
LangBindHelper::advance_read(sg, history, std::move(args)...);
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
#ifndef REALM_TRANSACT_LOG_HANDLER_HPP
|
#ifndef REALM_TRANSACT_LOG_HANDLER_HPP
|
||||||
#define REALM_TRANSACT_LOG_HANDLER_HPP
|
#define REALM_TRANSACT_LOG_HANDLER_HPP
|
||||||
|
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
class BindingContext;
|
class BindingContext;
|
||||||
class SharedGroup;
|
class SharedGroup;
|
||||||
|
@ -28,7 +30,8 @@ namespace _impl {
|
||||||
namespace transaction {
|
namespace transaction {
|
||||||
// Advance the read transaction version, with change notifications sent to delegate
|
// Advance the read transaction version, with change notifications sent to delegate
|
||||||
// Must not be called from within a write transaction.
|
// Must not be called from within a write transaction.
|
||||||
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context);
|
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context,
|
||||||
|
SharedGroup::VersionID version=SharedGroup::VersionID{});
|
||||||
|
|
||||||
// Begin a write transaction
|
// Begin a write transaction
|
||||||
// If the read transaction version is not up to date, will first advance to the
|
// If the read transaction version is not up to date, will first advance to the
|
||||||
|
|
|
@ -16,15 +16,15 @@
|
||||||
//
|
//
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#ifndef REALM_CACHED_REALM_HPP
|
#ifndef REALM_WEAK_REALM_NOTIFIER_HPP
|
||||||
#define REALM_CACHED_REALM_HPP
|
#define REALM_WEAK_REALM_NOTIFIER_HPP
|
||||||
|
|
||||||
#include <realm/util/features.h>
|
#include <realm/util/features.h>
|
||||||
|
|
||||||
#if REALM_PLATFORM_APPLE
|
#if REALM_PLATFORM_APPLE
|
||||||
#include "impl/apple/cached_realm.hpp"
|
#include "impl/apple/weak_realm_notifier.hpp"
|
||||||
#else
|
#else
|
||||||
#include "impl/generic/cached_realm.hpp"
|
#include "impl/generic/weak_realm_notifier.hpp"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif // REALM_CACHED_REALM_HPP
|
#endif // REALM_WEAK_REALM_NOTIFIER_HPP
|
|
@ -16,8 +16,8 @@
|
||||||
//
|
//
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
#ifndef REALM_CACHED_REALM_BASE_HPP
|
#ifndef REALM_WEAK_REALM_NOTIFIER_BASE_HPP
|
||||||
#define REALM_CACHED_REALM_BASE_HPP
|
#define REALM_WEAK_REALM_NOTIFIER_BASE_HPP
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
@ -27,25 +27,25 @@ class Realm;
|
||||||
|
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
|
|
||||||
// CachedRealm stores a weak reference to a Realm instance, along with all of
|
// WeakRealmNotifierBase stores a weak reference to a Realm instance, along with all of
|
||||||
// the information about a Realm that needs to be accessed from other threads.
|
// the information about a Realm that needs to be accessed from other threads.
|
||||||
// This is needed to avoid forming strong references to the Realm instances on
|
// This is needed to avoid forming strong references to the Realm instances on
|
||||||
// other threads, which can produce deadlocks when the last strong reference to
|
// other threads, which can produce deadlocks when the last strong reference to
|
||||||
// a Realm instance is released from within a function holding the cache lock.
|
// a Realm instance is released from within a function holding the cache lock.
|
||||||
class CachedRealmBase {
|
class WeakRealmNotifierBase {
|
||||||
public:
|
public:
|
||||||
CachedRealmBase(const std::shared_ptr<Realm>& realm, bool cache);
|
WeakRealmNotifierBase(const std::shared_ptr<Realm>& realm, bool cache);
|
||||||
|
|
||||||
// Get a strong reference to the cached realm
|
// Get a strong reference to the cached realm
|
||||||
std::shared_ptr<Realm> realm() const { return m_realm.lock(); }
|
std::shared_ptr<Realm> realm() const { return m_realm.lock(); }
|
||||||
|
|
||||||
// Does this CachedRealmBase store a Realm instance that should be used on the current thread?
|
// Does this WeakRealmNotifierBase store a Realm instance that should be used on the current thread?
|
||||||
bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); }
|
bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); }
|
||||||
|
|
||||||
// Has the Realm instance been destroyed?
|
// Has the Realm instance been destroyed?
|
||||||
bool expired() const { return m_realm.expired(); }
|
bool expired() const { return m_realm.expired(); }
|
||||||
|
|
||||||
// Is this a CachedRealmBase for the given Realm instance?
|
// Is this a WeakRealmNotifierBase for the given Realm instance?
|
||||||
bool is_for_realm(Realm* realm) const { return realm == m_realm_key; }
|
bool is_for_realm(Realm* realm) const { return realm == m_realm_key; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -55,7 +55,7 @@ private:
|
||||||
bool m_cache = false;
|
bool m_cache = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline CachedRealmBase::CachedRealmBase(const std::shared_ptr<Realm>& realm, bool cache)
|
inline WeakRealmNotifierBase::WeakRealmNotifierBase(const std::shared_ptr<Realm>& realm, bool cache)
|
||||||
: m_realm(realm)
|
: m_realm(realm)
|
||||||
, m_realm_key(realm.get())
|
, m_realm_key(realm.get())
|
||||||
, m_cache(cache)
|
, m_cache(cache)
|
||||||
|
@ -65,4 +65,4 @@ inline CachedRealmBase::CachedRealmBase(const std::shared_ptr<Realm>& realm, boo
|
||||||
} // namespace _impl
|
} // namespace _impl
|
||||||
} // namespace realm
|
} // namespace realm
|
||||||
|
|
||||||
#endif // REALM_CACHED_REALM_BASE_HPP
|
#endif // REALM_WEAK_REALM_NOTIFIER_BASE_HPP
|
|
@ -5,10 +5,13 @@
|
||||||
#ifndef REALM_OBJECT_ACCESSOR_HPP
|
#ifndef REALM_OBJECT_ACCESSOR_HPP
|
||||||
#define REALM_OBJECT_ACCESSOR_HPP
|
#define REALM_OBJECT_ACCESSOR_HPP
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include "shared_realm.hpp"
|
|
||||||
#include "schema.hpp"
|
|
||||||
#include "list.hpp"
|
#include "list.hpp"
|
||||||
|
#include "object_schema.hpp"
|
||||||
|
#include "object_store.hpp"
|
||||||
|
#include "schema.hpp"
|
||||||
|
#include "shared_realm.hpp"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ class Schema;
|
||||||
namespace parser {
|
namespace parser {
|
||||||
struct Expression
|
struct Expression
|
||||||
{
|
{
|
||||||
enum class Type { None, Number, String, KeyPath, Argument, True, False } type = Type::None;
|
enum class Type { None, Number, String, KeyPath, Argument, True, False } type;
|
||||||
std::string s;
|
std::string s;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ struct Predicate
|
||||||
struct Comparison
|
struct Comparison
|
||||||
{
|
{
|
||||||
Operator op = Operator::None;
|
Operator op = Operator::None;
|
||||||
Expression expr[2];
|
Expression expr[2] = {{Expression::Type::None, ""}, {Expression::Type::None, ""}};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Compound
|
struct Compound
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
#include "results.hpp"
|
#include "results.hpp"
|
||||||
|
|
||||||
|
#include "impl/async_query.hpp"
|
||||||
|
#include "impl/realm_coordinator.hpp"
|
||||||
#include "object_store.hpp"
|
#include "object_store.hpp"
|
||||||
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
@ -54,12 +56,21 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Table& table)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Results::~Results()
|
||||||
|
{
|
||||||
|
if (m_background_query) {
|
||||||
|
m_background_query->unregister();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Results::validate_read() const
|
void Results::validate_read() const
|
||||||
{
|
{
|
||||||
if (m_realm)
|
if (m_realm)
|
||||||
m_realm->verify_thread();
|
m_realm->verify_thread();
|
||||||
if (m_table && !m_table->is_attached())
|
if (m_table && !m_table->is_attached())
|
||||||
throw InvalidatedException();
|
throw InvalidatedException();
|
||||||
|
if (m_mode == Mode::TableView && !m_table_view.is_attached())
|
||||||
|
throw InvalidatedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Results::validate_write() const
|
void Results::validate_write() const
|
||||||
|
@ -94,6 +105,11 @@ size_t Results::size()
|
||||||
REALM_UNREACHABLE();
|
REALM_UNREACHABLE();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StringData Results::get_object_type() const noexcept
|
||||||
|
{
|
||||||
|
return get_object_schema().name;
|
||||||
|
}
|
||||||
|
|
||||||
RowExpr Results::get(size_t row_ndx)
|
RowExpr Results::get(size_t row_ndx)
|
||||||
{
|
{
|
||||||
validate_read();
|
validate_read();
|
||||||
|
@ -161,9 +177,15 @@ void Results::update_tableview()
|
||||||
m_mode = Mode::TableView;
|
m_mode = Mode::TableView;
|
||||||
break;
|
break;
|
||||||
case Mode::TableView:
|
case Mode::TableView:
|
||||||
if (m_live) {
|
if (!m_live) {
|
||||||
m_table_view.sync_if_needed();
|
return;
|
||||||
}
|
}
|
||||||
|
if (!m_background_query && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) {
|
||||||
|
m_background_query = std::make_shared<_impl::AsyncQuery>(*this);
|
||||||
|
_impl::RealmCoordinator::register_query(m_background_query);
|
||||||
|
}
|
||||||
|
m_has_used_table_view = true;
|
||||||
|
m_table_view.sync_if_needed();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,8 +323,9 @@ Query Results::get_query() const
|
||||||
switch (m_mode) {
|
switch (m_mode) {
|
||||||
case Mode::Empty:
|
case Mode::Empty:
|
||||||
case Mode::Query:
|
case Mode::Query:
|
||||||
case Mode::TableView:
|
|
||||||
return m_query;
|
return m_query;
|
||||||
|
case Mode::TableView:
|
||||||
|
return m_table_view.get_query();
|
||||||
case Mode::Table:
|
case Mode::Table:
|
||||||
return m_table->where();
|
return m_table->where();
|
||||||
}
|
}
|
||||||
|
@ -335,6 +358,36 @@ Results Results::filter(Query&& q) const
|
||||||
return Results(m_realm, get_object_schema(), get_query().and_query(std::move(q)), get_sort());
|
return Results(m_realm, get_object_schema(), get_query().and_query(std::move(q)), get_sort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken Results::async(std::function<void (std::exception_ptr)> target)
|
||||||
|
{
|
||||||
|
if (m_realm->config().read_only) {
|
||||||
|
throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms");
|
||||||
|
}
|
||||||
|
if (m_realm->is_in_transaction()) {
|
||||||
|
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!m_background_query) {
|
||||||
|
m_background_query = std::make_shared<_impl::AsyncQuery>(*this);
|
||||||
|
_impl::RealmCoordinator::register_query(m_background_query);
|
||||||
|
}
|
||||||
|
return {m_background_query, m_background_query->add_callback(std::move(target))};
|
||||||
|
}
|
||||||
|
|
||||||
|
void Results::Internal::set_table_view(Results& results, realm::TableView &&tv)
|
||||||
|
{
|
||||||
|
// If the previous TableView was never actually used, then stop generating
|
||||||
|
// new ones until the user actually uses the Results object again
|
||||||
|
if (results.m_mode == Mode::TableView) {
|
||||||
|
results.m_wants_background_updates = results.m_has_used_table_view;
|
||||||
|
}
|
||||||
|
|
||||||
|
results.m_table_view = std::move(tv);
|
||||||
|
results.m_mode = Mode::TableView;
|
||||||
|
results.m_has_used_table_view = false;
|
||||||
|
REALM_ASSERT(results.m_table_view.is_in_sync());
|
||||||
|
}
|
||||||
|
|
||||||
Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t column, const Table* table)
|
Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t column, const Table* table)
|
||||||
: std::runtime_error((std::string)"Operation not supported on '" + table->get_column_name(column).data() + "' columns")
|
: std::runtime_error((std::string)"Operation not supported on '" + table->get_column_name(column).data() + "' columns")
|
||||||
, column_index(column)
|
, column_index(column)
|
||||||
|
@ -342,3 +395,35 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c
|
||||||
, column_type(table->get_column_type(column))
|
, column_type(table->get_column_type(column))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken::AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token)
|
||||||
|
: m_query(std::move(query)), m_token(token)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken::~AsyncQueryCancelationToken()
|
||||||
|
{
|
||||||
|
// m_query itself (and not just the pointed-to thing) needs to be accessed
|
||||||
|
// atomically to ensure that there are no data races when the token is
|
||||||
|
// destroyed after being modified on a different thread.
|
||||||
|
// This is needed despite the token not being thread-safe in general as
|
||||||
|
// users find it very surpringing for obj-c objects to care about what
|
||||||
|
// thread they are deallocated on.
|
||||||
|
if (auto query = m_query.exchange({})) {
|
||||||
|
query->remove_callback(m_token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken::AsyncQueryCancelationToken(AsyncQueryCancelationToken&& rgt) = default;
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken& AsyncQueryCancelationToken::operator=(realm::AsyncQueryCancelationToken&& rgt)
|
||||||
|
{
|
||||||
|
if (this != &rgt) {
|
||||||
|
if (auto query = m_query.exchange({})) {
|
||||||
|
query->remove_callback(m_token);
|
||||||
|
}
|
||||||
|
m_query = std::move(rgt.m_query);
|
||||||
|
m_token = rgt.m_token;
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#define REALM_RESULTS_HPP
|
#define REALM_RESULTS_HPP
|
||||||
|
|
||||||
#include "shared_realm.hpp"
|
#include "shared_realm.hpp"
|
||||||
|
#include "util/atomic_shared_ptr.hpp"
|
||||||
|
|
||||||
#include <realm/table_view.hpp>
|
#include <realm/table_view.hpp>
|
||||||
#include <realm/table.hpp>
|
#include <realm/table.hpp>
|
||||||
|
@ -29,6 +30,29 @@ namespace realm {
|
||||||
template<typename T> class BasicRowExpr;
|
template<typename T> class BasicRowExpr;
|
||||||
using RowExpr = BasicRowExpr<Table>;
|
using RowExpr = BasicRowExpr<Table>;
|
||||||
class Mixed;
|
class Mixed;
|
||||||
|
class Results;
|
||||||
|
class ObjectSchema;
|
||||||
|
|
||||||
|
namespace _impl {
|
||||||
|
class AsyncQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
// A token which keeps an asynchronous query alive
|
||||||
|
struct AsyncQueryCancelationToken {
|
||||||
|
AsyncQueryCancelationToken() = default;
|
||||||
|
AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token);
|
||||||
|
~AsyncQueryCancelationToken();
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken(AsyncQueryCancelationToken&&);
|
||||||
|
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken&&);
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken(AsyncQueryCancelationToken const&) = delete;
|
||||||
|
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken const&) = delete;
|
||||||
|
|
||||||
|
private:
|
||||||
|
util::AtomicSharedPtr<_impl::AsyncQuery> m_query;
|
||||||
|
size_t m_token;
|
||||||
|
};
|
||||||
|
|
||||||
struct SortOrder {
|
struct SortOrder {
|
||||||
std::vector<size_t> columnIndices;
|
std::vector<size_t> columnIndices;
|
||||||
|
@ -48,6 +72,7 @@ public:
|
||||||
Results() = default;
|
Results() = default;
|
||||||
Results(SharedRealm r, const ObjectSchema& o, Table& table);
|
Results(SharedRealm r, const ObjectSchema& o, Table& table);
|
||||||
Results(SharedRealm r, const ObjectSchema& o, Query q, SortOrder s = {});
|
Results(SharedRealm r, const ObjectSchema& o, Query q, SortOrder s = {});
|
||||||
|
~Results();
|
||||||
|
|
||||||
// Results is copyable and moveable
|
// Results is copyable and moveable
|
||||||
Results(Results const&) = default;
|
Results(Results const&) = default;
|
||||||
|
@ -72,7 +97,7 @@ public:
|
||||||
TableView get_tableview();
|
TableView get_tableview();
|
||||||
|
|
||||||
// Get the object type which will be returned by get()
|
// Get the object type which will be returned by get()
|
||||||
StringData get_object_type() const noexcept { return get_object_schema().name; }
|
StringData get_object_type() const noexcept;
|
||||||
|
|
||||||
// Set whether the TableView should sync if needed before accessing results
|
// Set whether the TableView should sync if needed before accessing results
|
||||||
void set_live(bool live);
|
void set_live(bool live);
|
||||||
|
@ -165,6 +190,22 @@ public:
|
||||||
UnsupportedColumnTypeException(size_t column, const Table* table);
|
UnsupportedColumnTypeException(size_t column, const Table* table);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void update_tableview();
|
||||||
|
|
||||||
|
// Create an async query from this Results
|
||||||
|
// The query will be run on a background thread and delivered to the callback,
|
||||||
|
// and then rerun after each commit (if needed) and redelivered if it changed
|
||||||
|
AsyncQueryCancelationToken async(std::function<void (std::exception_ptr)> target);
|
||||||
|
|
||||||
|
bool wants_background_updates() const { return m_wants_background_updates; }
|
||||||
|
|
||||||
|
// Helper type to let AsyncQuery update the tableview without giving access
|
||||||
|
// to any other privates or letting anyone else do so
|
||||||
|
class Internal {
|
||||||
|
friend class _impl::AsyncQuery;
|
||||||
|
static void set_table_view(Results& results, TableView&& tv);
|
||||||
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SharedRealm m_realm;
|
SharedRealm m_realm;
|
||||||
const ObjectSchema *m_object_schema;
|
const ObjectSchema *m_object_schema;
|
||||||
|
@ -174,17 +215,21 @@ private:
|
||||||
SortOrder m_sort;
|
SortOrder m_sort;
|
||||||
bool m_live = true;
|
bool m_live = true;
|
||||||
|
|
||||||
|
std::shared_ptr<_impl::AsyncQuery> m_background_query;
|
||||||
|
|
||||||
Mode m_mode = Mode::Empty;
|
Mode m_mode = Mode::Empty;
|
||||||
|
bool m_has_used_table_view = false;
|
||||||
|
bool m_wants_background_updates = true;
|
||||||
|
|
||||||
void validate_read() const;
|
void validate_read() const;
|
||||||
void validate_write() const;
|
void validate_write() const;
|
||||||
|
|
||||||
void update_tableview();
|
|
||||||
|
|
||||||
template<typename Int, typename Float, typename Double, typename DateTime>
|
template<typename Int, typename Float, typename Double, typename DateTime>
|
||||||
util::Optional<Mixed> aggregate(size_t column, bool return_none_for_empty,
|
util::Optional<Mixed> aggregate(size_t column, bool return_none_for_empty,
|
||||||
Int agg_int, Float agg_float,
|
Int agg_int, Float agg_float,
|
||||||
Double agg_double, DateTime agg_datetime);
|
Double agg_double, DateTime agg_datetime);
|
||||||
|
|
||||||
|
void set_table_view(TableView&& tv);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
#ifndef REALM_SCHEMA_HPP
|
#ifndef REALM_SCHEMA_HPP
|
||||||
#define REALM_SCHEMA_HPP
|
#define REALM_SCHEMA_HPP
|
||||||
|
|
||||||
|
#include "property.hpp"
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
|
|
@ -35,13 +35,14 @@ using namespace realm::_impl;
|
||||||
|
|
||||||
Realm::Config::Config(const Config& c)
|
Realm::Config::Config(const Config& c)
|
||||||
: path(c.path)
|
: path(c.path)
|
||||||
|
, encryption_key(c.encryption_key)
|
||||||
|
, schema_version(c.schema_version)
|
||||||
|
, migration_function(c.migration_function)
|
||||||
, read_only(c.read_only)
|
, read_only(c.read_only)
|
||||||
, in_memory(c.in_memory)
|
, in_memory(c.in_memory)
|
||||||
, cache(c.cache)
|
, cache(c.cache)
|
||||||
, disable_format_upgrade(c.disable_format_upgrade)
|
, disable_format_upgrade(c.disable_format_upgrade)
|
||||||
, encryption_key(c.encryption_key)
|
, automatic_change_notifications(c.automatic_change_notifications)
|
||||||
, schema_version(c.schema_version)
|
|
||||||
, migration_function(c.migration_function)
|
|
||||||
{
|
{
|
||||||
if (c.schema) {
|
if (c.schema) {
|
||||||
schema = std::make_unique<Schema>(*c.schema);
|
schema = std::make_unique<Schema>(*c.schema);
|
||||||
|
@ -63,22 +64,33 @@ Realm::Config& Realm::Config::operator=(realm::Realm::Config const& c)
|
||||||
Realm::Realm(Config config)
|
Realm::Realm(Config config)
|
||||||
: m_config(std::move(config))
|
: m_config(std::move(config))
|
||||||
{
|
{
|
||||||
try {
|
open_with_config(m_config, m_history, m_shared_group, m_read_only_group);
|
||||||
if (m_config.read_only) {
|
|
||||||
m_read_only_group = std::make_unique<Group>(m_config.path, m_config.encryption_key.data(), Group::mode_ReadOnly);
|
if (m_read_only_group) {
|
||||||
m_group = m_read_only_group.get();
|
m_group = m_read_only_group.get();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Realm::open_with_config(const Config& config,
|
||||||
|
std::unique_ptr<ClientHistory>& history,
|
||||||
|
std::unique_ptr<SharedGroup>& shared_group,
|
||||||
|
std::unique_ptr<Group>& read_only_group)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
if (config.read_only) {
|
||||||
|
read_only_group = std::make_unique<Group>(config.path, config.encryption_key.data(), Group::mode_ReadOnly);
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
m_history = realm::make_client_history(m_config.path, m_config.encryption_key.data());
|
history = realm::make_client_history(config.path, config.encryption_key.data());
|
||||||
SharedGroup::DurabilityLevel durability = m_config.in_memory ? SharedGroup::durability_MemOnly :
|
SharedGroup::DurabilityLevel durability = config.in_memory ? SharedGroup::durability_MemOnly :
|
||||||
SharedGroup::durability_Full;
|
SharedGroup::durability_Full;
|
||||||
m_shared_group = std::make_unique<SharedGroup>(*m_history, durability, m_config.encryption_key.data(), !m_config.disable_format_upgrade);
|
shared_group = std::make_unique<SharedGroup>(*history, durability, config.encryption_key.data(), !config.disable_format_upgrade);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (util::File::PermissionDenied const& ex) {
|
catch (util::File::PermissionDenied const& ex) {
|
||||||
throw RealmFileException(RealmFileException::Kind::PermissionDenied, ex.get_path(),
|
throw RealmFileException(RealmFileException::Kind::PermissionDenied, ex.get_path(),
|
||||||
"Unable to open a realm at path '" + ex.get_path() +
|
"Unable to open a realm at path '" + ex.get_path() +
|
||||||
"'. Please use a path where your app has " + (m_config.read_only ? "read" : "read-write") + " permissions.");
|
"'. Please use a path where your app has " + (config.read_only ? "read" : "read-write") + " permissions.");
|
||||||
}
|
}
|
||||||
catch (util::File::Exists const& ex) {
|
catch (util::File::Exists const& ex) {
|
||||||
throw RealmFileException(RealmFileException::Kind::Exists, ex.get_path(),
|
throw RealmFileException(RealmFileException::Kind::Exists, ex.get_path(),
|
||||||
|
@ -93,12 +105,12 @@ Realm::Realm(Config config)
|
||||||
"Unable to open a realm at path '" + ex.get_path() + "'");
|
"Unable to open a realm at path '" + ex.get_path() + "'");
|
||||||
}
|
}
|
||||||
catch (IncompatibleLockFile const& ex) {
|
catch (IncompatibleLockFile const& ex) {
|
||||||
throw RealmFileException(RealmFileException::Kind::IncompatibleLockFile, m_config.path,
|
throw RealmFileException(RealmFileException::Kind::IncompatibleLockFile, config.path,
|
||||||
"Realm file is currently open in another process "
|
"Realm file is currently open in another process "
|
||||||
"which cannot share access with this process. All processes sharing a single file must be the same architecture.");
|
"which cannot share access with this process. All processes sharing a single file must be the same architecture.");
|
||||||
}
|
}
|
||||||
catch (FileFormatUpgradeRequired const& ex) {
|
catch (FileFormatUpgradeRequired const& ex) {
|
||||||
throw RealmFileException(RealmFileException::Kind::FormatUpgradeRequired, m_config.path,
|
throw RealmFileException(RealmFileException::Kind::FormatUpgradeRequired, config.path,
|
||||||
"The Realm file format must be allowed to be upgraded "
|
"The Realm file format must be allowed to be upgraded "
|
||||||
"in order to proceed.");
|
"in order to proceed.");
|
||||||
}
|
}
|
||||||
|
@ -361,13 +373,16 @@ void Realm::notify()
|
||||||
}
|
}
|
||||||
if (m_auto_refresh) {
|
if (m_auto_refresh) {
|
||||||
if (m_group) {
|
if (m_group) {
|
||||||
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
|
m_coordinator->advance_to_ready(*this);
|
||||||
}
|
}
|
||||||
else if (m_binding_context) {
|
else if (m_binding_context) {
|
||||||
m_binding_context->did_change({}, {});
|
m_binding_context->did_change({}, {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
m_coordinator->process_available_async(*this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Realm::refresh()
|
bool Realm::refresh()
|
||||||
|
@ -387,6 +402,7 @@ bool Realm::refresh()
|
||||||
|
|
||||||
if (m_group) {
|
if (m_group) {
|
||||||
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
|
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
|
||||||
|
m_coordinator->process_available_async(*this);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// Create the read transaction
|
// Create the read transaction
|
||||||
|
@ -396,6 +412,19 @@ bool Realm::refresh()
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Realm::can_deliver_notifications() const noexcept
|
||||||
|
{
|
||||||
|
if (m_config.read_only) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_binding_context && !m_binding_context->can_deliver_notifications()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
|
uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
|
||||||
{
|
{
|
||||||
auto coordinator = RealmCoordinator::get_existing_coordinator(config.path);
|
auto coordinator = RealmCoordinator::get_existing_coordinator(config.path);
|
||||||
|
|
|
@ -19,9 +19,10 @@
|
||||||
#ifndef REALM_REALM_HPP
|
#ifndef REALM_REALM_HPP
|
||||||
#define REALM_REALM_HPP
|
#define REALM_REALM_HPP
|
||||||
|
|
||||||
#include "object_store.hpp"
|
#include <realm/handover_defs.hpp>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
@ -38,29 +39,48 @@ namespace realm {
|
||||||
typedef std::weak_ptr<Realm> WeakRealm;
|
typedef std::weak_ptr<Realm> WeakRealm;
|
||||||
|
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
class ExternalCommitHelper;
|
class AsyncQuery;
|
||||||
class RealmCoordinator;
|
class RealmCoordinator;
|
||||||
}
|
}
|
||||||
|
|
||||||
class Realm : public std::enable_shared_from_this<Realm>
|
class Realm : public std::enable_shared_from_this<Realm> {
|
||||||
{
|
|
||||||
public:
|
public:
|
||||||
typedef std::function<void(SharedRealm old_realm, SharedRealm realm)> MigrationFunction;
|
typedef std::function<void(SharedRealm old_realm, SharedRealm realm)> MigrationFunction;
|
||||||
|
|
||||||
struct Config
|
struct Config {
|
||||||
{
|
|
||||||
std::string path;
|
std::string path;
|
||||||
bool read_only = false;
|
// User-supplied encryption key. Must be either empty or 64 bytes.
|
||||||
bool in_memory = false;
|
|
||||||
bool cache = true;
|
|
||||||
bool disable_format_upgrade = false;
|
|
||||||
std::vector<char> encryption_key;
|
std::vector<char> encryption_key;
|
||||||
|
|
||||||
|
// Optional schema for the file. If nullptr, the existing schema
|
||||||
|
// from the file opened will be used. If present, the file will be
|
||||||
|
// migrated to the schema if needed.
|
||||||
std::unique_ptr<Schema> schema;
|
std::unique_ptr<Schema> schema;
|
||||||
uint64_t schema_version;
|
uint64_t schema_version;
|
||||||
|
|
||||||
MigrationFunction migration_function;
|
MigrationFunction migration_function;
|
||||||
|
|
||||||
|
bool read_only = false;
|
||||||
|
bool in_memory = false;
|
||||||
|
|
||||||
|
// The following are intended for internal/testing purposes and
|
||||||
|
// should not be publically exposed in binding APIs
|
||||||
|
|
||||||
|
// If false, always return a new Realm instance, and don't return
|
||||||
|
// that Realm instance for other requests for a cached Realm. Useful
|
||||||
|
// for dynamic Realms and for tests that need multiple instances on
|
||||||
|
// one thread
|
||||||
|
bool cache = true;
|
||||||
|
// Throw an exception rather than automatically upgrading the file
|
||||||
|
// format. Used by the browser to warn the user that it'll modify
|
||||||
|
// the file.
|
||||||
|
bool disable_format_upgrade = false;
|
||||||
|
// Disable the background worker thread for producing change
|
||||||
|
// notifications. Useful for tests for those notifications so that
|
||||||
|
// everything can be done deterministically on one thread, and
|
||||||
|
// speeds up tests that don't need notifications.
|
||||||
|
bool automatic_change_notifications = true;
|
||||||
|
|
||||||
Config();
|
Config();
|
||||||
Config(Config&&);
|
Config(Config&&);
|
||||||
Config(const Config& c);
|
Config(const Config& c);
|
||||||
|
@ -107,6 +127,8 @@ namespace realm {
|
||||||
void verify_thread() const;
|
void verify_thread() const;
|
||||||
void verify_in_write() const;
|
void verify_in_write() const;
|
||||||
|
|
||||||
|
bool can_deliver_notifications() const noexcept;
|
||||||
|
|
||||||
// Close this Realm and remove it from the cache. Continuing to use a
|
// Close this Realm and remove it from the cache. Continuing to use a
|
||||||
// Realm after closing it will produce undefined behavior.
|
// Realm after closing it will produce undefined behavior.
|
||||||
void close();
|
void close();
|
||||||
|
@ -116,6 +138,28 @@ namespace realm {
|
||||||
void init(std::shared_ptr<_impl::RealmCoordinator> coordinator);
|
void init(std::shared_ptr<_impl::RealmCoordinator> coordinator);
|
||||||
Realm(Config config);
|
Realm(Config config);
|
||||||
|
|
||||||
|
// Expose some internal functionality to other parts of the ObjectStore
|
||||||
|
// without making it public to everyone
|
||||||
|
class Internal {
|
||||||
|
friend class _impl::AsyncQuery;
|
||||||
|
friend class _impl::RealmCoordinator;
|
||||||
|
|
||||||
|
// AsyncQuery needs access to the SharedGroup to be able to call the
|
||||||
|
// handover functions, which are not very wrappable
|
||||||
|
static SharedGroup& get_shared_group(Realm& realm) { return *realm.m_shared_group; }
|
||||||
|
static ClientHistory& get_history(Realm& realm) { return *realm.m_history; }
|
||||||
|
|
||||||
|
// AsyncQuery needs to be able to access the owning coordinator to
|
||||||
|
// wake up the worker thread when a callback is added, and
|
||||||
|
// coordinators need to be able to get themselves from a Realm
|
||||||
|
static _impl::RealmCoordinator& get_coordinator(Realm& realm) { return *realm.m_coordinator; }
|
||||||
|
};
|
||||||
|
|
||||||
|
static void open_with_config(const Config& config,
|
||||||
|
std::unique_ptr<ClientHistory>& history,
|
||||||
|
std::unique_ptr<SharedGroup>& shared_group,
|
||||||
|
std::unique_ptr<Group>& read_only_group);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Config m_config;
|
Config m_config;
|
||||||
std::thread::id m_thread_id = std::this_thread::get_id();
|
std::thread::id m_thread_id = std::this_thread::get_id();
|
||||||
|
|
|
@ -0,0 +1,137 @@
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2016 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#ifndef REALM_ATOMIC_SHARED_PTR_HPP
|
||||||
|
#define REALM_ATOMIC_SHARED_PTR_HPP
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
namespace realm {
|
||||||
|
namespace _impl {
|
||||||
|
|
||||||
|
// Check if std::atomic_load has an overload taking a std::shared_ptr, and set
|
||||||
|
// HasAtomicPtrOps to either true_type or false_type
|
||||||
|
|
||||||
|
template<typename... Ts> struct make_void { typedef void type; };
|
||||||
|
template<typename... Ts> using void_t = typename make_void<Ts...>::type;
|
||||||
|
|
||||||
|
template<typename, typename = void_t<>>
|
||||||
|
struct HasAtomicPtrOps : std::false_type { };
|
||||||
|
|
||||||
|
template<class T>
|
||||||
|
struct HasAtomicPtrOps<T, void_t<decltype(std::atomic_load(std::declval<T*>()))>> : std::true_type { };
|
||||||
|
} // namespace _impl
|
||||||
|
|
||||||
|
namespace util {
|
||||||
|
// A wrapper for std::shared_ptr that enables sharing a shared_ptr instance
|
||||||
|
// (and not just a thing *pointed to* by a shared_ptr) between threads. Is
|
||||||
|
// lock-free iff the underlying shared_ptr implementation supports atomic
|
||||||
|
// operations. Currently the only implemented operation other than copy/move
|
||||||
|
// construction/assignment is exchange().
|
||||||
|
template<typename T, bool = _impl::HasAtomicPtrOps<std::shared_ptr<T>>::value>
|
||||||
|
class AtomicSharedPtr;
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
class AtomicSharedPtr<T, true> {
|
||||||
|
public:
|
||||||
|
AtomicSharedPtr() = default;
|
||||||
|
AtomicSharedPtr(std::shared_ptr<T> ptr) : m_ptr(std::move(ptr)) { }
|
||||||
|
|
||||||
|
AtomicSharedPtr(AtomicSharedPtr const& ptr) : m_ptr(std::atomic_load(&ptr.m_ptr)) { }
|
||||||
|
AtomicSharedPtr(AtomicSharedPtr&& ptr) : m_ptr(std::atomic_exchange(&ptr.m_ptr, {})) { }
|
||||||
|
|
||||||
|
AtomicSharedPtr& operator=(AtomicSharedPtr const& ptr)
|
||||||
|
{
|
||||||
|
if (&ptr != this) {
|
||||||
|
std::atomic_store(&m_ptr, std::atomic_load(&ptr.m_ptr));
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicSharedPtr& operator=(AtomicSharedPtr&& ptr)
|
||||||
|
{
|
||||||
|
std::atomic_store(&m_ptr, std::atomic_exchange(&ptr.m_ptr, {}));
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<T> exchange(std::shared_ptr<T> ptr)
|
||||||
|
{
|
||||||
|
return std::atomic_exchange(&m_ptr, std::move(ptr));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<T> m_ptr = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
class AtomicSharedPtr<T, false> {
|
||||||
|
public:
|
||||||
|
AtomicSharedPtr() = default;
|
||||||
|
AtomicSharedPtr(std::shared_ptr<T> ptr) : m_ptr(std::move(ptr)) { }
|
||||||
|
|
||||||
|
AtomicSharedPtr(AtomicSharedPtr const& ptr)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ptr.m_mutex);
|
||||||
|
m_ptr = ptr.m_ptr;
|
||||||
|
}
|
||||||
|
AtomicSharedPtr(AtomicSharedPtr&& ptr)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(ptr.m_mutex);
|
||||||
|
m_ptr = std::move(ptr.m_ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicSharedPtr& operator=(AtomicSharedPtr const& ptr)
|
||||||
|
{
|
||||||
|
if (&ptr != this) {
|
||||||
|
// std::lock() ensures that these are locked in a consistent order
|
||||||
|
// to avoid deadlock
|
||||||
|
std::lock(m_mutex, ptr.m_mutex);
|
||||||
|
m_ptr = ptr.m_ptr;
|
||||||
|
m_mutex.unlock();
|
||||||
|
ptr.m_mutex.unlock();
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicSharedPtr& operator=(AtomicSharedPtr&& ptr)
|
||||||
|
{
|
||||||
|
std::lock(m_mutex, ptr.m_mutex);
|
||||||
|
m_ptr = std::move(ptr.m_ptr);
|
||||||
|
m_mutex.unlock();
|
||||||
|
ptr.m_mutex.unlock();
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<T> exchange(std::shared_ptr<T> ptr)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_mutex);
|
||||||
|
m_ptr.swap(ptr);
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::mutex m_mutex;
|
||||||
|
std::shared_ptr<T> m_ptr = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // REALM_ASYNC_QUERY_HPP
|
|
@ -1,11 +1,18 @@
|
||||||
include_directories(../external/catch/single_include)
|
include_directories(../external/catch/single_include .)
|
||||||
|
|
||||||
|
set(HEADERS
|
||||||
|
util/test_file.hpp
|
||||||
|
)
|
||||||
|
|
||||||
set(SOURCES
|
set(SOURCES
|
||||||
index_set.cpp
|
index_set.cpp
|
||||||
main.cpp
|
main.cpp
|
||||||
parser.cpp)
|
parser.cpp
|
||||||
|
results.cpp
|
||||||
|
util/test_file.cpp
|
||||||
|
)
|
||||||
|
|
||||||
add_executable(tests ${SOURCES})
|
add_executable(tests ${SOURCES} ${HEADERS})
|
||||||
target_link_libraries(tests realm-object-store)
|
target_link_libraries(tests realm-object-store)
|
||||||
|
|
||||||
add_custom_target(run-tests USES_TERMINAL DEPENDS tests COMMAND ./tests)
|
add_custom_target(run-tests USES_TERMINAL DEPENDS tests COMMAND ./tests)
|
||||||
|
|
|
@ -0,0 +1,146 @@
|
||||||
|
#include "catch.hpp"
|
||||||
|
|
||||||
|
#include "util/test_file.hpp"
|
||||||
|
|
||||||
|
#include "impl/realm_coordinator.hpp"
|
||||||
|
#include "object_schema.hpp"
|
||||||
|
#include "property.hpp"
|
||||||
|
#include "results.hpp"
|
||||||
|
#include "schema.hpp"
|
||||||
|
|
||||||
|
#include <realm/commit_log.hpp>
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
#include <realm/link_view.hpp>
|
||||||
|
|
||||||
|
using namespace realm;
|
||||||
|
|
||||||
|
TEST_CASE("Results") {
|
||||||
|
InMemoryTestFile config;
|
||||||
|
config.cache = false;
|
||||||
|
config.automatic_change_notifications = false;
|
||||||
|
config.schema = std::make_unique<Schema>(Schema{
|
||||||
|
{"object", "", {
|
||||||
|
{"value", PropertyTypeInt},
|
||||||
|
{"link", PropertyTypeObject, "linked to object", false, false, true}
|
||||||
|
}},
|
||||||
|
{"other object", "", {
|
||||||
|
{"value", PropertyTypeInt}
|
||||||
|
}},
|
||||||
|
{"linking object", "", {
|
||||||
|
{"link", PropertyTypeObject, "object", false, false, true}
|
||||||
|
}},
|
||||||
|
{"linked to object", "", {
|
||||||
|
{"value", PropertyTypeInt}
|
||||||
|
}}
|
||||||
|
});
|
||||||
|
|
||||||
|
auto r = Realm::get_shared_realm(config);
|
||||||
|
auto coordinator = _impl::RealmCoordinator::get_existing_coordinator(config.path);
|
||||||
|
auto table = r->read_group()->get_table("class_object");
|
||||||
|
|
||||||
|
r->begin_transaction();
|
||||||
|
table->add_empty_row(10);
|
||||||
|
for (int i = 0; i < 10; ++i)
|
||||||
|
table->set_int(0, i, i);
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
Results results(r, *config.schema->find("object"), table->where().greater(0, 0).less(0, 5));
|
||||||
|
|
||||||
|
SECTION("notifications") {
|
||||||
|
int notification_calls = 0;
|
||||||
|
auto token = results.async([&](std::exception_ptr err) {
|
||||||
|
REQUIRE_FALSE(err);
|
||||||
|
++notification_calls;
|
||||||
|
});
|
||||||
|
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
|
||||||
|
SECTION("initial results are delivered") {
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("modifying the table sends a notification asynchronously") {
|
||||||
|
r->begin_transaction();
|
||||||
|
table->set_int(0, 0, 0);
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("modifying a linked-to table send a notification") {
|
||||||
|
r->begin_transaction();
|
||||||
|
r->read_group()->get_table("class_linked to object")->add_empty_row();
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("modifying a a linking table sends a notification") {
|
||||||
|
r->begin_transaction();
|
||||||
|
r->read_group()->get_table("class_linking object")->add_empty_row();
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("modifying a an unrelated table does not send a notification") {
|
||||||
|
r->begin_transaction();
|
||||||
|
r->read_group()->get_table("class_other object")->add_empty_row();
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("modifications from multiple transactions are collapsed") {
|
||||||
|
r->begin_transaction();
|
||||||
|
table->set_int(0, 0, 0);
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
r->begin_transaction();
|
||||||
|
table->set_int(0, 1, 0);
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("notifications are not delivered when the token is destroyed before they are calculated") {
|
||||||
|
r->begin_transaction();
|
||||||
|
table->set_int(0, 0, 0);
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
token = {};
|
||||||
|
coordinator->on_change();
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SECTION("notifications are not delivered when the token is destroyed before they are delivered") {
|
||||||
|
r->begin_transaction();
|
||||||
|
table->set_int(0, 0, 0);
|
||||||
|
r->commit_transaction();
|
||||||
|
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
coordinator->on_change();
|
||||||
|
token = {};
|
||||||
|
r->notify();
|
||||||
|
REQUIRE(notification_calls == 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
#include "util/test_file.hpp"
|
||||||
|
|
||||||
|
#include <realm/disable_sync_to_disk.hpp>
|
||||||
|
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
TestFile::TestFile()
|
||||||
|
{
|
||||||
|
static std::string tmpdir = [] {
|
||||||
|
realm::disable_sync_to_disk();
|
||||||
|
|
||||||
|
const char* dir = getenv("TMPDIR");
|
||||||
|
if (dir && *dir)
|
||||||
|
return dir;
|
||||||
|
return "/tmp";
|
||||||
|
}();
|
||||||
|
path = tmpdir + "/realm.XXXXXX";
|
||||||
|
mktemp(&path[0]);
|
||||||
|
unlink(path.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
TestFile::~TestFile()
|
||||||
|
{
|
||||||
|
unlink(path.c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
InMemoryTestFile::InMemoryTestFile()
|
||||||
|
{
|
||||||
|
in_memory = true;
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
#ifndef REALM_TEST_UTIL_TEST_FILE_HPP
|
||||||
|
#define REALM_TEST_UTIL_TEST_FILE_HPP
|
||||||
|
|
||||||
|
#include "shared_realm.hpp"
|
||||||
|
|
||||||
|
struct TestFile : realm::Realm::Config {
|
||||||
|
TestFile();
|
||||||
|
~TestFile();
|
||||||
|
};
|
||||||
|
|
||||||
|
struct InMemoryTestFile : TestFile {
|
||||||
|
InMemoryTestFile();
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif
|
Loading…
Reference in New Issue