diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 76233013..76b8a328 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -6,6 +6,7 @@ set(SOURCES results.cpp schema.cpp shared_realm.cpp + impl/async_query.cpp impl/realm_coordinator.cpp impl/transact_log_handler.cpp parser/parser.cpp diff --git a/src/impl/apple/cached_realm.hpp b/src/impl/apple/cached_realm.hpp index 5acf874e..f67c160c 100644 --- a/src/impl/apple/cached_realm.hpp +++ b/src/impl/apple/cached_realm.hpp @@ -36,7 +36,7 @@ public: CachedRealm(const CachedRealm&) = delete; CachedRealm& operator=(const CachedRealm&) = delete; - // Asyncronously call notify() on the Realm on the appropriate thread + // Asynchronously call notify() on the Realm on the appropriate thread void notify(); private: diff --git a/src/impl/async_query.cpp b/src/impl/async_query.cpp new file mode 100644 index 00000000..5f131941 --- /dev/null +++ b/src/impl/async_query.cpp @@ -0,0 +1,130 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "impl/async_query.hpp" + +#include "impl/realm_coordinator.hpp" + +using namespace realm; +using namespace realm::_impl; + +AsyncQuery::AsyncQuery(SortOrder sort, + std::unique_ptr> handover, + std::unique_ptr callback, + RealmCoordinator& parent) +: parent(parent.shared_from_this()) +, m_sort(std::move(sort)) +, m_query_handover(std::move(handover)) +, m_callback(std::move(callback)) +{ +} + +void AsyncQuery::get_results(const SharedRealm& realm, SharedGroup& sg, std::vector>& ret) +{ + if (!m_callback->is_for_current_thread()) { + return; + } + + if (m_error) { + ret.emplace_back([self = shared_from_this()] { + self->m_callback->error(self->m_error); + RealmCoordinator::unregister_query(*self); + }); + return; + } + + if (!m_tv_handover) { + return; + } + if (m_tv_handover->version < sg.get_version_of_current_transaction()) { + // async results are stale; ignore + return; + } +// auto r = Results(realm, +// m_sort, +// std::move(*sg.import_from_handover(std::move(m_tv_handover)))); + Results r; + auto version = sg.get_version_of_current_transaction(); + ret.emplace_back([r = std::move(r), version, &sg, self = shared_from_this()] { + if (sg.get_version_of_current_transaction() == version) { + self->m_callback->deliver(std::move(r)); + } + }); +} + +void AsyncQuery::prepare_update() +{ + // This function must not touch m_tv_handover as it is called without the + // relevant lock held (so that another thread can consume m_tv_handover + // while this is running) + + REALM_ASSERT(m_sg); + + if (m_tv.is_attached()) { + m_did_update = m_tv.sync_if_needed(); + } + else { + m_tv = m_query->find_all(); + if (m_sort) { + m_tv.sort(m_sort.columnIndices, m_sort.ascending); + } + m_did_update = true; + } +} + +void AsyncQuery::prepare_handover() +{ + // Even if the TV didn't change, we need to re-export it if the previous + // export has not been consumed yet, as the old handover object is no longer + // usable due to the version not matching + if (m_did_update || (m_tv_handover && m_tv_handover->version != m_sg->get_version_of_current_transaction())) { + m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy); + } +} + +void AsyncQuery::set_error(std::exception_ptr err) +{ + if (!m_error) { + m_error = err; + } +} + +SharedGroup::VersionID AsyncQuery::version() const noexcept +{ + if (m_tv_handover) + return m_tv_handover->version; + if (m_query_handover) + return m_query_handover->version; + return SharedGroup::VersionID{}; +} + +void AsyncQuery::attach_to(realm::SharedGroup& sg) +{ + REALM_ASSERT(!m_sg); + + m_query = sg.import_from_handover(std::move(m_query_handover)); + m_sg = &sg; +} + +void AsyncQuery::detatch() +{ + REALM_ASSERT(m_sg); + + m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move); + m_sg = nullptr; +} diff --git a/src/impl/async_query.hpp b/src/impl/async_query.hpp new file mode 100644 index 00000000..1671e646 --- /dev/null +++ b/src/impl/async_query.hpp @@ -0,0 +1,75 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_ASYNC_QUERY_HPP +#define REALM_ASYNC_QUERY_HPP + +#include "results.hpp" + +#include + +#include + +namespace realm { +namespace _impl { +class AsyncQuery : public std::enable_shared_from_this { +public: + AsyncQuery(SortOrder sort, + std::unique_ptr> handover, + std::unique_ptr callback, + RealmCoordinator& parent); + + void get_results(const SharedRealm& realm, SharedGroup& sg, std::vector>& ret); + + void set_error(std::exception_ptr err); + + // Run/rerun the query if needed + void prepare_update(); + // Update the handover object with the new data produced in prepare_update() + void prepare_handover(); + + // Get the version of the current handover object + SharedGroup::VersionID version() const noexcept; + + void attach_to(SharedGroup& sg); + void detatch(); + + std::shared_ptr parent; + +private: + const SortOrder m_sort; + + std::unique_ptr> m_query_handover; + std::unique_ptr m_query; + + std::unique_ptr> m_tv_handover; + TableView m_tv; + + const std::unique_ptr m_callback; + + SharedGroup* m_sg = nullptr; + + std::exception_ptr m_error; + + bool m_did_update = false; +}; + +} // namespace _impl +} // namespace realm + +#endif /* REALM_ASYNC_QUERY_HPP */ diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp index d59cad08..8c0dbc71 100644 --- a/src/impl/realm_coordinator.cpp +++ b/src/impl/realm_coordinator.cpp @@ -18,11 +18,20 @@ #include "impl/realm_coordinator.hpp" +#include "impl/async_query.hpp" #include "impl/cached_realm.hpp" #include "impl/external_commit_helper.hpp" +#include "impl/transact_log_handler.hpp" #include "object_store.hpp" #include "schema.hpp" +#include +#include +#include +#include +#include + +#include #include using namespace realm; @@ -58,7 +67,12 @@ std::shared_ptr RealmCoordinator::get_realm(Realm::Config config) if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) { m_config = config; if (!config.read_only && !m_notifier) { - m_notifier = std::make_unique(*this); + try { + m_notifier = std::make_unique(*this); + } + catch (std::system_error const& ex) { + throw RealmFileException(RealmFileException::Kind::AccessError, config.path, ex.code().message()); + } } } else { @@ -103,6 +117,11 @@ std::shared_ptr RealmCoordinator::get_realm(Realm::Config config) return realm; } +std::shared_ptr RealmCoordinator::get_realm() +{ + return get_realm(m_config); +} + const Schema* RealmCoordinator::get_schema() const noexcept { return m_cached_realms.empty() ? nullptr : m_config.schema.get(); @@ -186,10 +205,279 @@ void RealmCoordinator::send_commit_notifications() m_notifier->notify_others(); } +void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index) +{ + if (m_async_error) { + return; + } + + SharedGroup::VersionID versionid(version, index); + if (!m_advancer_sg) { + try { + // Use a temporary Realm instance to open the shared group to reuse + // the error handling there + Realm tmp(m_config); + m_advancer_history = std::move(tmp.m_history); + m_advancer_sg = std::move(tmp.m_shared_group); + m_advancer_sg->begin_read(versionid); + } + catch (...) { + m_async_error = std::current_exception(); + m_advancer_sg = nullptr; + m_advancer_history = nullptr; + } + } + else if (m_new_queries.empty()) { + // If this is the first query then we don't already have a read transaction + m_advancer_sg->begin_read(versionid); + } + else if (versionid < m_advancer_sg->get_version_of_current_transaction()) { + // Ensure we're holding a readlock on the oldest version we have a + // handover object for, as handover objects don't + m_advancer_sg->end_read(); + m_advancer_sg->begin_read(versionid); + } +} + +AsyncQueryCancelationToken RealmCoordinator::register_query(const Results& r, std::unique_ptr target) +{ + return r.get_realm()->m_coordinator->do_register_query(r, std::move(target)); +} + +AsyncQueryCancelationToken RealmCoordinator::do_register_query(const Results& r, std::unique_ptr target) +{ + if (m_config.read_only) { + throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms"); + } + if (r.get_realm()->is_in_transaction()) { + throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction"); + } + + auto handover = r.get_realm()->m_shared_group->export_for_handover(r.get_query(), ConstSourcePayload::Copy); + auto version = handover->version; + auto query = std::make_shared(r.get_sort(), + std::move(handover), + std::move(target), + *this); + + { + std::lock_guard lock(m_query_mutex); + pin_version(version.version, version.index); + m_new_queries.push_back(query); + } + + // Wake up the background worker threads by pretending we made a commit + m_notifier->notify_others(); + return query; +} + +void RealmCoordinator::unregister_query(AsyncQuery& registration) +{ + registration.parent->do_unregister_query(registration); +} + +void RealmCoordinator::do_unregister_query(AsyncQuery& registration) +{ + auto swap_remove = [&](auto& container) { + auto it = std::find_if(container.begin(), container.end(), + [&](auto const& ptr) { return ptr.get() == ®istration; }); + if (it != container.end()) { + std::iter_swap(--container.end(), it); + container.pop_back(); + return true; + } + return false; + }; + + std::lock_guard lock(m_query_mutex); + if (swap_remove(m_queries)) { + // Make sure we aren't holding on to read versions needlessly if there + // are no queries left, but don't close them entirely as opening shared + // groups is expensive + if (!m_running_queries && m_queries.empty() && m_query_sg) { + m_query_sg->end_read(); + } + } + else if (swap_remove(m_new_queries)) { + if (m_new_queries.empty() && m_advancer_sg) { + m_advancer_sg->end_read(); + } + } +} + void RealmCoordinator::on_change() { + run_async_queries(); + std::lock_guard lock(m_realm_mutex); for (auto& realm : m_cached_realms) { realm.notify(); } } + +void RealmCoordinator::run_async_queries() +{ + std::unique_lock lock(m_query_mutex); + + if (m_queries.empty() && m_new_queries.empty()) { + return; + } + + if (!m_async_error) { + open_helper_shared_group(); + } + + if (m_async_error) { + move_new_queries_to_main(); + for (auto& query : m_queries) { + query->set_error(m_async_error); + } + return; + } + + advance_helper_shared_group_to_latest(); + + // Tell other threads not to close the shared group as we need it even + // though we aren't holding the lock + m_running_queries = true; + + // Make a copy of the queries vector so that we can release the lock while + // we run the queries + auto queries_to_run = m_queries; + lock.unlock(); + + for (auto& query : queries_to_run) { + query->prepare_update(); + } + + // Reacquire the lock while updating the fields that are actually read on + // other threads + lock.lock(); + for (auto& query : queries_to_run) { + query->prepare_handover(); + } + + // Check if all queries were removed while we were running them, as if so + // the shared group didn't get closed by do_unregister_query() + m_running_queries = false; + if (m_queries.empty()) { + m_query_sg->end_read(); + } +} + +void RealmCoordinator::open_helper_shared_group() +{ + if (!m_query_sg) { + try { + Realm tmp(m_config); + m_query_history = std::move(tmp.m_history); + m_query_sg = std::move(tmp.m_shared_group); + m_query_sg->begin_read(); + } + catch (...) { + // Store the error to be passed to the async queries + m_async_error = std::current_exception(); + m_query_sg = nullptr; + m_query_history = nullptr; + } + } + else if (m_queries.empty()) { + m_query_sg->begin_read(); + } +} + +void RealmCoordinator::move_new_queries_to_main() +{ + m_queries.reserve(m_queries.size() + m_new_queries.size()); + std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries)); + m_new_queries.clear(); +} + +void RealmCoordinator::advance_helper_shared_group_to_latest() +{ + if (m_new_queries.empty()) { + LangBindHelper::advance_read(*m_query_sg, *m_query_history); + return; + } + + // Sort newly added queries by their source version so that we can pull them + // all forward to the latest version in a single pass over the transaction log + std::sort(m_new_queries.begin(), m_new_queries.end(), [](auto const& lft, auto const& rgt) { + return lft->version() < rgt->version(); + }); + + // Import all newly added queries to our helper SG + for (auto& query : m_new_queries) { + LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history, query->version()); + query->attach_to(*m_advancer_sg); + } + + // Advance both SGs to the newest version + LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history); + LangBindHelper::advance_read(*m_query_sg, *m_query_history, + m_advancer_sg->get_version_of_current_transaction()); + + // Transfer all new queries over to the main SG + for (auto& query : m_new_queries) { + query->detatch(); + query->attach_to(*m_query_sg); + } + + if (!m_new_queries.empty()) { + move_new_queries_to_main(); + m_advancer_sg->end_read(); + } +} + +void RealmCoordinator::advance_to_ready(Realm& realm) +{ + std::vector> async_results; + + { + std::lock_guard lock(m_query_mutex); + + SharedGroup::VersionID version; + for (auto& query : m_queries) { + version = query->version(); + if (version != SharedGroup::VersionID()) { + break; + } + } + + // no untargeted async queries; just advance to latest + if (version.version == 0) { + transaction::advance(*realm.m_shared_group, *realm.m_history, realm.m_binding_context.get()); + return; + } + // async results are out of date; ignore + else if (version < realm.m_shared_group->get_version_of_current_transaction()) { + return; + } + + transaction::advance(*realm.m_shared_group, *realm.m_history, realm.m_binding_context.get(), version); + + for (auto& query : m_queries) { + query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results); + } + } + + for (auto& results : async_results) { + results(); + } +} + +void RealmCoordinator::process_available_async(Realm& realm) +{ + std::vector> async_results; + + { + std::lock_guard lock(m_query_mutex); + for (auto& query : m_queries) { + query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results); + } + } + + for (auto& results : async_results) { + results(); + } +} diff --git a/src/impl/realm_coordinator.hpp b/src/impl/realm_coordinator.hpp index ee1d748b..0c8b120b 100644 --- a/src/impl/realm_coordinator.hpp +++ b/src/impl/realm_coordinator.hpp @@ -24,9 +24,15 @@ #include namespace realm { +class AsyncQueryCallback; +class ClientHistory; +class Results; class Schema; +class SharedGroup; +struct AsyncQueryCancelationToken; namespace _impl { +class AsyncQuery; class CachedRealm; class ExternalCommitHelper; @@ -43,6 +49,7 @@ public: // If the Realm is already open on another thread, validates that the given // configuration is compatible with the existing one std::shared_ptr get_realm(Realm::Config config); + std::shared_ptr get_realm(); const Schema* get_schema() const noexcept; uint64_t get_schema_version() const noexcept { return m_config.schema_version; } @@ -50,7 +57,7 @@ public: const std::vector& get_encryption_key() const noexcept { return m_config.encryption_key; } bool is_in_memory() const noexcept { return m_config.in_memory; } - // Asyncronously call notify() on every Realm instance for this coordinator's + // Asynchronously call notify() on every Realm instance for this coordinator's // path, including those in other processes void send_commit_notifications(); @@ -73,13 +80,49 @@ public: // Update the schema in the cached config void update_schema(Schema const& new_schema); + static AsyncQueryCancelationToken register_query(const Results& r, std::unique_ptr); + static void unregister_query(AsyncQuery& registration); + + // Advance the Realm to the most recent transaction version which all async + // work is complete for + void advance_to_ready(Realm& realm); + void process_available_async(Realm& realm); + private: Realm::Config m_config; std::mutex m_realm_mutex; std::vector m_cached_realms; + std::mutex m_query_mutex; + bool m_running_queries = false; + std::vector> m_new_queries; + std::vector> m_queries; + + // SharedGroup used for actually running async queries + // Will have a read transaction iff m_queries is non-empty + std::unique_ptr m_query_history; + std::unique_ptr m_query_sg; + + // SharedGroup used to advance queries in m_new_queries to the main shared + // group's transaction version + // Will have a read transaction iff m_new_queries is non-empty + std::unique_ptr m_advancer_history; + std::unique_ptr m_advancer_sg; + std::exception_ptr m_async_error; + std::unique_ptr<_impl::ExternalCommitHelper> m_notifier; + + AsyncQueryCancelationToken do_register_query(const Results& r, std::unique_ptr); + void do_unregister_query(AsyncQuery& registration); + + // must be called with m_query_mutex locked + void pin_version(uint_fast64_t version, uint_fast32_t index); + + void run_async_queries(); + void open_helper_shared_group(); + void move_new_queries_to_main(); + void advance_helper_shared_group_to_latest(); }; } // namespace _impl diff --git a/src/impl/transact_log_handler.cpp b/src/impl/transact_log_handler.cpp index cd9a8f9f..44544d17 100644 --- a/src/impl/transact_log_handler.cpp +++ b/src/impl/transact_log_handler.cpp @@ -432,7 +432,8 @@ public: namespace realm { namespace _impl { namespace transaction { -void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context) +void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context, + SharedGroup::VersionID version) { TransactLogObserver(context, sg, [&](auto&&... args) { LangBindHelper::advance_read(sg, history, std::move(args)...); diff --git a/src/impl/transact_log_handler.hpp b/src/impl/transact_log_handler.hpp index f5d3d58a..b68845fb 100644 --- a/src/impl/transact_log_handler.hpp +++ b/src/impl/transact_log_handler.hpp @@ -19,6 +19,8 @@ #ifndef REALM_TRANSACT_LOG_HANDLER_HPP #define REALM_TRANSACT_LOG_HANDLER_HPP +#include + namespace realm { class BindingContext; class SharedGroup; @@ -28,7 +30,8 @@ namespace _impl { namespace transaction { // Advance the read transaction version, with change notifications sent to delegate // Must not be called from within a write transaction. -void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context); +void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context, + SharedGroup::VersionID version=SharedGroup::VersionID{}); // Begin a write transaction // If the read transaction version is not up to date, will first advance to the diff --git a/src/object_accessor.hpp b/src/object_accessor.hpp index bad38f9e..1e95638b 100644 --- a/src/object_accessor.hpp +++ b/src/object_accessor.hpp @@ -5,10 +5,13 @@ #ifndef REALM_OBJECT_ACCESSOR_HPP #define REALM_OBJECT_ACCESSOR_HPP -#include -#include "shared_realm.hpp" -#include "schema.hpp" #include "list.hpp" +#include "object_schema.hpp" +#include "object_store.hpp" +#include "schema.hpp" +#include "shared_realm.hpp" + +#include namespace realm { diff --git a/src/results.cpp b/src/results.cpp index dc029157..b3f59945 100644 --- a/src/results.cpp +++ b/src/results.cpp @@ -19,6 +19,7 @@ #include "results.hpp" #include "object_store.hpp" +#include "impl/realm_coordinator.hpp" #include @@ -60,6 +61,8 @@ void Results::validate_read() const m_realm->verify_thread(); if (m_table && !m_table->is_attached()) throw InvalidatedException(); + if (m_mode == Mode::TableView && !m_table_view.is_attached()) + throw InvalidatedException(); } void Results::validate_write() const @@ -94,6 +97,11 @@ size_t Results::size() REALM_UNREACHABLE(); } +StringData Results::get_object_type() const noexcept +{ + return get_object_schema().name; +} + RowExpr Results::get(size_t row_ndx) { validate_read(); @@ -301,8 +309,9 @@ Query Results::get_query() const switch (m_mode) { case Mode::Empty: case Mode::Query: - case Mode::TableView: return m_query; + case Mode::TableView: + return m_table_view.get_query(); case Mode::Table: return m_table->where(); } @@ -342,3 +351,31 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c , column_type(table->get_column_type(column)) { } + +AsyncQueryCancelationToken Results::async(std::unique_ptr target) +{ + return _impl::RealmCoordinator::register_query(*this, std::move(target)); +} + +AsyncQueryCancelationToken::~AsyncQueryCancelationToken() +{ + if (m_registration) { + _impl::RealmCoordinator::unregister_query(*m_registration); + } +} + +AsyncQueryCancelationToken::AsyncQueryCancelationToken(AsyncQueryCancelationToken&& rgt) +: m_registration(std::move(rgt.m_registration)) +{ +} + +AsyncQueryCancelationToken& AsyncQueryCancelationToken::operator=(realm::AsyncQueryCancelationToken&& rgt) +{ + if (this != &rgt) { + if (m_registration) { + _impl::RealmCoordinator::unregister_query(*m_registration); + } + m_registration = std::move(rgt.m_registration); + } + return *this; +} diff --git a/src/results.hpp b/src/results.hpp index 27127e8d..01e0368b 100644 --- a/src/results.hpp +++ b/src/results.hpp @@ -29,6 +29,46 @@ namespace realm { template class BasicRowExpr; using RowExpr = BasicRowExpr; class Mixed; +class Results; +class ObjectSchema; + +namespace _impl { + class AsyncQuery; +} + +// A token which keeps an asynchronous query alive +struct AsyncQueryCancelationToken { + AsyncQueryCancelationToken() = default; + AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> registration) : m_registration(std::move(registration)) { } + ~AsyncQueryCancelationToken(); + + AsyncQueryCancelationToken(AsyncQueryCancelationToken&&); + AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken&&); + + AsyncQueryCancelationToken(AsyncQueryCancelationToken const&) = delete; + AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken const&) = delete; + +private: + std::shared_ptr<_impl::AsyncQuery> m_registration; +}; + +// Subclass to get notifications about async query things +class AsyncQueryCallback { +public: + virtual ~AsyncQueryCallback() = default; + + // Called with the Results object generated by the query on a thread where + // is_for_current_thread() returned true + virtual void deliver(Results) = 0; + + // If an error occured while running the query on the worker thread, this is + // called with an exception on a thread where is_for_current_thread() + // returned true + virtual void error(std::exception_ptr) = 0; + + // Return whether or not this query is associated with the current thread + virtual bool is_for_current_thread() { return true; } +}; struct SortOrder { std::vector columnIndices; @@ -72,7 +112,7 @@ public: TableView get_tableview(); // Get the object type which will be returned by get() - StringData get_object_type() const noexcept { return get_object_schema().name; } + StringData get_object_type() const noexcept; // Set whether the TableView should sync if needed before accessing results void set_live(bool live); @@ -165,6 +205,13 @@ public: UnsupportedColumnTypeException(size_t column, const Table* table); }; + void update_tableview(); + + // Create an async query from this Results + // The query will be run on a background thread and delivered to the callback, + // and then rerun after each commit (if needed) and redelivered if it changed + AsyncQueryCancelationToken async(std::unique_ptr target); + private: SharedRealm m_realm; const ObjectSchema *m_object_schema; @@ -179,8 +226,6 @@ private: void validate_read() const; void validate_write() const; - void update_tableview(); - template util::Optional aggregate(size_t column, bool return_none_for_empty, Int agg_int, Float agg_float, diff --git a/src/schema.hpp b/src/schema.hpp index 2df1ec9e..3f10129f 100644 --- a/src/schema.hpp +++ b/src/schema.hpp @@ -19,6 +19,8 @@ #ifndef REALM_SCHEMA_HPP #define REALM_SCHEMA_HPP +#include "property.hpp" + #include #include diff --git a/src/shared_realm.cpp b/src/shared_realm.cpp index 69d48818..71e9538e 100644 --- a/src/shared_realm.cpp +++ b/src/shared_realm.cpp @@ -361,13 +361,16 @@ void Realm::notify() } if (m_auto_refresh) { if (m_group) { - transaction::advance(*m_shared_group, *m_history, m_binding_context.get()); + m_coordinator->advance_to_ready(*this); } else if (m_binding_context) { m_binding_context->did_change({}, {}); } } } + else { + m_coordinator->process_available_async(*this); + } } bool Realm::refresh() @@ -387,6 +390,7 @@ bool Realm::refresh() if (m_group) { transaction::advance(*m_shared_group, *m_history, m_binding_context.get()); + m_coordinator->process_available_async(*this); } else { // Create the read transaction diff --git a/src/shared_realm.hpp b/src/shared_realm.hpp index 79f55496..9d7fe302 100644 --- a/src/shared_realm.hpp +++ b/src/shared_realm.hpp @@ -19,9 +19,10 @@ #ifndef REALM_REALM_HPP #define REALM_REALM_HPP -#include "object_store.hpp" +#include #include +#include #include #include #include @@ -42,8 +43,7 @@ namespace realm { class RealmCoordinator; } - class Realm : public std::enable_shared_from_this - { + class Realm : public std::enable_shared_from_this { public: typedef std::function MigrationFunction; @@ -135,6 +135,8 @@ namespace realm { // FIXME private Group *read_group(); + + friend class _impl::RealmCoordinator; }; class RealmFileException : public std::runtime_error {