From b920f62ca57f8cbff1adb3fb0523a8d6a66d7bfd Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Wed, 9 Mar 2016 12:05:06 -0800 Subject: [PATCH] Comment and clean up the Notifiers/BackgroundCollection --- src/impl/background_collection.cpp | 13 ++- src/impl/background_collection.hpp | 49 ++++++-- src/impl/list_notifier.cpp | 6 +- src/impl/list_notifier.hpp | 14 ++- src/impl/realm_coordinator.cpp | 174 ++++++++++++++--------------- src/impl/realm_coordinator.hpp | 28 ++--- src/impl/results_notifier.cpp | 63 +++++------ src/impl/results_notifier.hpp | 49 ++++---- src/list.cpp | 2 +- src/results.cpp | 20 ++-- src/results.hpp | 2 +- 11 files changed, 225 insertions(+), 195 deletions(-) diff --git a/src/impl/background_collection.cpp b/src/impl/background_collection.cpp index 5db90c87..09a66133 100644 --- a/src/impl/background_collection.cpp +++ b/src/impl/background_collection.cpp @@ -32,11 +32,9 @@ BackgroundCollection::BackgroundCollection(std::shared_ptr realm) BackgroundCollection::~BackgroundCollection() { - // unregister() may have been called from a different thread than we're being - // destroyed on, so we need to synchronize access to the interesting fields - // modified there - std::lock_guard lock(m_realm_mutex); - m_realm = nullptr; + // Need to do this explicitly to ensure m_realm is destroyed with the mutex + // held to avoid potential double-deletion + unregister(); } size_t BackgroundCollection::add_callback(CollectionChangeCallback callback) @@ -102,6 +100,11 @@ bool BackgroundCollection::is_alive() const noexcept return m_realm != nullptr; } +std::unique_lock BackgroundCollection::lock_target() +{ + return std::unique_lock{m_realm_mutex}; +} + // Recursively add `table` and all tables it links to to `out` static void find_relevant_tables(std::vector& out, Table const& table) { diff --git a/src/impl/background_collection.hpp b/src/impl/background_collection.hpp index 26da3904..2d4ef046 100644 --- a/src/impl/background_collection.hpp +++ b/src/impl/background_collection.hpp @@ -25,7 +25,6 @@ #include #include -#include #include namespace realm { @@ -34,42 +33,72 @@ class Realm; namespace _impl { struct TransactionChangeInfo; +// A base class for a notifier that keeps a collection up to date and/or +// generates detailed change notifications on a background thread. This manages +// most of the lifetime-management issues related to sharing an object between +// the worker thread and the collection on the target thread, along with the +// thread-safe callback collection. class BackgroundCollection { public: BackgroundCollection(std::shared_ptr); virtual ~BackgroundCollection(); + + // ------------------------------------------------------------------------ + // Public API for the collections using this to get notifications: + + // Stop receiving notifications from this background worker + // This must be called in the destructor of the collection void unregister() noexcept; - virtual void release_data() noexcept = 0; - + // Add a callback to be called each time the collection changes + // This can only be called from the target collection's thread + // Returns a token which can be passed to remove_callback() size_t add_callback(CollectionChangeCallback callback); + // Remove a previously added token. The token is no longer valid after + // calling this function and must not be used again. This function can be + // called from any thread. void remove_callback(size_t token); + // ------------------------------------------------------------------------ + // API for RealmCoordinator to manage running things and calling callbacks + + Realm* get_realm() const noexcept { return m_realm.get(); } + + // Get the SharedGroup version which this collection can attach to (if it's + // in handover mode), or can deliver to (if it's been handed over to the BG worker alredad) + SharedGroup::VersionID version() const noexcept { return m_sg_version; } + + // Release references to all core types + // This is called on the worker thread to ensure that non-thread-safe things + // can be destroyed on the correct thread, even if the last reference to the + // BackgroundCollection is released on a different thread + virtual void release_data() noexcept = 0; + + // Call each of the currently registered callbacks, if there have been any + // changes since the last time each of those callbacks was called void call_callbacks(); bool is_alive() const noexcept; - Realm& get_realm() const noexcept { return *m_realm; } - - // Attach the handed-over query to `sg` + // Attach the handed-over query to `sg`. Must not be already attaged to a SharedGroup. void attach_to(SharedGroup& sg); // Create a new query handover object and stop using the previously attached // SharedGroup void detach(); - void add_required_change_info(TransactionChangeInfo&); + // Set `info` as the new ChangeInfo that will be populated by the next + // transaction advance, and register all required information in it + void add_required_change_info(TransactionChangeInfo& info); virtual void run() { } void prepare_handover(); bool deliver(SharedGroup&, std::exception_ptr); - // Get the version of the current handover object - SharedGroup::VersionID version() const noexcept { return m_sg_version; } - protected: bool have_callbacks() const noexcept { return m_have_callbacks; } void add_changes(CollectionChangeIndices change) { m_accumulated_changes.merge(std::move(change)); } void set_table(Table const& table); + std::unique_lock lock_target(); private: virtual void do_attach_to(SharedGroup&) = 0; diff --git a/src/impl/list_notifier.cpp b/src/impl/list_notifier.cpp index 8bdc4142..ef451917 100644 --- a/src/impl/list_notifier.cpp +++ b/src/impl/list_notifier.cpp @@ -1,6 +1,6 @@ //////////////////////////////////////////////////////////////////////////// // -// Copyright 2015 Realm Inc. +// Copyright 2016 Realm Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ using namespace realm; using namespace realm::_impl; - ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr realm) : BackgroundCollection(std::move(realm)) , m_prev_size(lv->size()) @@ -45,13 +44,12 @@ ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr realm) set_table(lv->get_target_table()); - auto& sg = Realm::Internal::get_shared_group(get_realm()); + auto& sg = Realm::Internal::get_shared_group(*get_realm()); m_lv_handover = sg.export_linkview_for_handover(lv); } void ListNotifier::release_data() noexcept { - // FIXME: does this need a lock? m_lv.reset(); } diff --git a/src/impl/list_notifier.hpp b/src/impl/list_notifier.hpp index 5d961306..4029de85 100644 --- a/src/impl/list_notifier.hpp +++ b/src/impl/list_notifier.hpp @@ -1,6 +1,6 @@ //////////////////////////////////////////////////////////////////////////// // -// Copyright 2015 Realm Inc. +// Copyright 2016 Realm Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -30,12 +30,20 @@ public: ListNotifier(LinkViewRef lv, std::shared_ptr realm); private: + // The linkview, in handover form if this has not been attached to the main + // SharedGroup yet LinkViewRef m_lv; std::unique_ptr> m_lv_handover; - CollectionChangeIndices m_change; + + // The last-seen size of the LinkView so that we can report row deletions + // when the LinkView itself is deleted size_t m_prev_size; + + // The column index of the LinkView size_t m_col_ndx; - std::vector m_relevant_tables; + + // The actual change, calculated in run() and delivered in prepare_handover() + CollectionChangeIndices m_change; TransactionChangeInfo* m_info; void run() override; diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp index 472daa5b..6d657f1c 100644 --- a/src/impl/realm_coordinator.cpp +++ b/src/impl/realm_coordinator.cpp @@ -269,8 +269,8 @@ void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index) m_advancer_history = nullptr; } } - else if (m_new_queries.empty()) { - // If this is the first query then we don't already have a read transaction + else if (m_new_notifiers.empty()) { + // If this is the first notifier then we don't already have a read transaction m_advancer_sg->begin_read(versionid); } else if (versionid < m_advancer_sg->get_version_of_current_transaction()) { @@ -281,18 +281,18 @@ void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index) } } -void RealmCoordinator::register_query(std::shared_ptr query) +void RealmCoordinator::register_notifier(std::shared_ptr notifier) { - auto version = query->version(); - auto& self = Realm::Internal::get_coordinator(query->get_realm()); + auto version = notifier->version(); + auto& self = Realm::Internal::get_coordinator(*notifier->get_realm()); { - std::lock_guard lock(self.m_query_mutex); + std::lock_guard lock(self.m_notifier_mutex); self.pin_version(version.version, version.index); - self.m_new_queries.push_back(std::move(query)); + self.m_new_notifiers.push_back(std::move(notifier)); } } -void RealmCoordinator::clean_up_dead_queries() +void RealmCoordinator::clean_up_dead_notifiers() { auto swap_remove = [&](auto& container) { bool did_remove = false; @@ -300,8 +300,8 @@ void RealmCoordinator::clean_up_dead_queries() if (container[i]->is_alive()) continue; - // Ensure the query is destroyed here even if there's lingering refs - // to the async query elsewhere + // Ensure the notifier is destroyed here even if there's lingering refs + // to the async notifier elsewhere container[i]->release_data(); if (container.size() > i + 1) @@ -313,16 +313,16 @@ void RealmCoordinator::clean_up_dead_queries() return did_remove; }; - if (swap_remove(m_queries)) { + if (swap_remove(m_notifiers)) { // Make sure we aren't holding on to read versions needlessly if there - // are no queries left, but don't close them entirely as opening shared + // are no notifiers left, but don't close them entirely as opening shared // groups is expensive - if (m_queries.empty() && m_query_sg) { - m_query_sg->end_read(); + if (m_notifiers.empty() && m_notifier_sg) { + m_notifier_sg->end_read(); } } - if (swap_remove(m_new_queries)) { - if (m_new_queries.empty() && m_advancer_sg) { + if (swap_remove(m_new_notifiers)) { + if (m_new_notifiers.empty() && m_advancer_sg) { m_advancer_sg->end_read(); } } @@ -330,7 +330,7 @@ void RealmCoordinator::clean_up_dead_queries() void RealmCoordinator::on_change() { - run_async_queries(); + run_async_notifiers(); std::lock_guard lock(m_realm_mutex); for (auto& realm : m_weak_realm_notifiers) { @@ -338,13 +338,13 @@ void RealmCoordinator::on_change() } } -void RealmCoordinator::run_async_queries() +void RealmCoordinator::run_async_notifiers() { - std::unique_lock lock(m_query_mutex); + std::unique_lock lock(m_notifier_mutex); - clean_up_dead_queries(); + clean_up_dead_notifiers(); - if (m_queries.empty() && m_new_queries.empty()) { + if (m_notifiers.empty() && m_new_notifiers.empty()) { return; } @@ -353,73 +353,73 @@ void RealmCoordinator::run_async_queries() } if (m_async_error) { - std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries)); - m_new_queries.clear(); + std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers)); + m_new_notifiers.clear(); return; } std::vector change_info; SharedGroup::VersionID version; - auto new_queries = std::move(m_new_queries); - if (new_queries.empty()) { + auto new_notifiers = std::move(m_new_notifiers); + if (new_notifiers.empty()) { change_info.resize(1); } else { change_info.resize(2); - // Sort newly added queries by their source version so that we can pull them + // Sort newly added notifiers by their source version so that we can pull them // all forward to the latest version in a single pass over the transaction log - std::sort(new_queries.begin(), new_queries.end(), + std::sort(new_notifiers.begin(), new_notifiers.end(), [](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); }); version = m_advancer_sg->get_version_of_current_transaction(); - REALM_ASSERT(version == new_queries.front()->version()); + REALM_ASSERT(version == new_notifiers.front()->version()); TransactionChangeInfo* info = &change_info.back(); - // Advance each of the new queries to the latest version, attaching them + // Advance each of the new notifiers to the latest version, attaching them // to the SG at their handover version. This requires a unique // TransactionChangeInfo for each source version, so that things don't // see changes from before the version they were handed over from. // Each Info has all of the changes between that source version and the // next source version, and they'll be merged together later after // releasing the lock - for (auto& query : new_queries) { - if (version != query->version()) { - transaction::advance_and_observe_linkviews(*m_advancer_sg, *info, query->version()); + for (auto& notifier : new_notifiers) { + if (version != notifier->version()) { + transaction::advance_and_observe_linkviews(*m_advancer_sg, *info, notifier->version()); change_info.push_back({{}, std::move(info->lists)}); info = &change_info.back(); - version = query->version(); + version = notifier->version(); } - query->attach_to(*m_advancer_sg); - query->add_required_change_info(*info); + notifier->attach_to(*m_advancer_sg); + notifier->add_required_change_info(*info); } transaction::advance_and_observe_linkviews(*m_advancer_sg, *info); - for (auto& query : new_queries) { - query->detach(); + for (auto& notifier : new_notifiers) { + notifier->detach(); } version = m_advancer_sg->get_version_of_current_transaction(); m_advancer_sg->end_read(); } - // Make a copy of the queries vector and then release the lock to avoid - // blocking other threads trying to register or unregister queries while we run them - auto queries = m_queries; + // Make a copy of the notifiers vector and then release the lock to avoid + // blocking other threads trying to register or unregister notifiers while we run them + auto notifiers = m_notifiers; lock.unlock(); - for (auto& query : queries) { - query->add_required_change_info(change_info[0]); + for (auto& notifier : notifiers) { + notifier->add_required_change_info(change_info[0]); } - transaction::advance_and_observe_linkviews(*m_query_sg, change_info[0], version); + transaction::advance_and_observe_linkviews(*m_notifier_sg, change_info[0], version); - // Attach the new queries to the main SG and move them to the main list - for (auto& query : new_queries) { - query->attach_to(*m_query_sg); + // Attach the new notifiers to the main SG and move them to the main list + for (auto& notifier : new_notifiers) { + notifier->attach_to(*m_notifier_sg); } - std::move(new_queries.begin(), new_queries.end(), std::back_inserter(queries)); + std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers)); for (size_t i = change_info.size() - 1; i > 1; --i) { auto& cur = change_info[i]; @@ -452,57 +452,57 @@ void RealmCoordinator::run_async_queries() } } - for (auto& query : queries) { - query->run(); + for (auto& notifier : notifiers) { + notifier->run(); } // Reacquire the lock while updating the fields that are actually read on // other threads lock.lock(); - for (auto& query : queries) { - query->prepare_handover(); + for (auto& notifier : notifiers) { + notifier->prepare_handover(); } - m_queries = std::move(queries); - clean_up_dead_queries(); + m_notifiers = std::move(notifiers); + clean_up_dead_notifiers(); } void RealmCoordinator::open_helper_shared_group() { - if (!m_query_sg) { + if (!m_notifier_sg) { try { std::unique_ptr read_only_group; - Realm::open_with_config(m_config, m_query_history, m_query_sg, read_only_group); + Realm::open_with_config(m_config, m_notifier_history, m_notifier_sg, read_only_group); REALM_ASSERT(!read_only_group); - m_query_sg->begin_read(); + m_notifier_sg->begin_read(); } catch (...) { - // Store the error to be passed to the async queries + // Store the error to be passed to the async notifiers m_async_error = std::current_exception(); - m_query_sg = nullptr; - m_query_history = nullptr; + m_notifier_sg = nullptr; + m_notifier_history = nullptr; } } - else if (m_queries.empty()) { - m_query_sg->begin_read(); + else if (m_notifiers.empty()) { + m_notifier_sg->begin_read(); } } -void RealmCoordinator::move_new_queries_to_main() +void RealmCoordinator::move_new_notifiers_to_main() { - m_queries.reserve(m_queries.size() + m_new_queries.size()); - std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries)); - m_new_queries.clear(); + m_notifiers.reserve(m_notifiers.size() + m_new_notifiers.size()); + std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers)); + m_new_notifiers.clear(); } void RealmCoordinator::advance_to_ready(Realm& realm) { - decltype(m_queries) queries; + decltype(m_notifiers) notifiers; auto& sg = Realm::Internal::get_shared_group(realm); - auto get_query_version = [&] { - for (auto& query : m_queries) { - auto version = query->version(); + auto get_notifier_version = [&] { + for (auto& notifier : m_notifiers) { + auto version = notifier->version(); if (version != SharedGroup::VersionID{}) { return version; } @@ -512,11 +512,11 @@ void RealmCoordinator::advance_to_ready(Realm& realm) SharedGroup::VersionID version; { - std::lock_guard lock(m_query_mutex); - version = get_query_version(); + std::lock_guard lock(m_notifier_mutex); + version = get_notifier_version(); } - // no async queries; just advance to latest + // no async notifiers; just advance to latest if (version.version == std::numeric_limits::max()) { transaction::advance(sg, realm.m_binding_context.get()); return; @@ -532,44 +532,44 @@ void RealmCoordinator::advance_to_ready(Realm& realm) // may end up calling user code (in did_change() notifications) transaction::advance(sg, realm.m_binding_context.get(), version); - // Reacquire the lock and recheck the query version, as the queries may + // Reacquire the lock and recheck the notifier version, as the notifiers may // have advanced to a later version while we didn't hold the lock. If // so, we need to release the lock and re-advance - std::lock_guard lock(m_query_mutex); - version = get_query_version(); + std::lock_guard lock(m_notifier_mutex); + version = get_notifier_version(); if (version.version == std::numeric_limits::max()) return; if (version != sg.get_version_of_current_transaction()) continue; // Query version now matches the SG version, so we can deliver them - for (auto& query : m_queries) { - if (query->deliver(sg, m_async_error)) { - queries.push_back(query); + for (auto& notifier : m_notifiers) { + if (notifier->deliver(sg, m_async_error)) { + notifiers.push_back(notifier); } } break; } - for (auto& query : queries) { - query->call_callbacks(); + for (auto& notifier : notifiers) { + notifier->call_callbacks(); } } void RealmCoordinator::process_available_async(Realm& realm) { auto& sg = Realm::Internal::get_shared_group(realm); - decltype(m_queries) queries; + decltype(m_notifiers) notifiers; { - std::lock_guard lock(m_query_mutex); - for (auto& query : m_queries) { - if (query->deliver(sg, m_async_error)) { - queries.push_back(query); + std::lock_guard lock(m_notifier_mutex); + for (auto& notifier : m_notifiers) { + if (notifier->deliver(sg, m_async_error)) { + notifiers.push_back(notifier); } } } - for (auto& query : queries) { - query->call_callbacks(); + for (auto& notifier : notifiers) { + notifier->call_callbacks(); } } diff --git a/src/impl/realm_coordinator.hpp b/src/impl/realm_coordinator.hpp index 905c17b8..bd404e05 100644 --- a/src/impl/realm_coordinator.hpp +++ b/src/impl/realm_coordinator.hpp @@ -102,7 +102,7 @@ public: // Update the schema in the cached config void update_schema(Schema const& new_schema); - static void register_query(std::shared_ptr query); + static void register_notifier(std::shared_ptr notifier); // Advance the Realm to the most recent transaction version which all async // work is complete for @@ -115,32 +115,32 @@ private: std::mutex m_realm_mutex; std::vector m_weak_realm_notifiers; - std::mutex m_query_mutex; - std::vector> m_new_queries; - std::vector> m_queries; + std::mutex m_notifier_mutex; + std::vector> m_new_notifiers; + std::vector> m_notifiers; - // SharedGroup used for actually running async queries - // Will have a read transaction iff m_queries is non-empty - std::unique_ptr m_query_history; - std::unique_ptr m_query_sg; + // SharedGroup used for actually running async notifiers + // Will have a read transaction iff m_notifiers is non-empty + std::unique_ptr m_notifier_history; + std::unique_ptr m_notifier_sg; - // SharedGroup used to advance queries in m_new_queries to the main shared + // SharedGroup used to advance notifiers in m_new_notifiers to the main shared // group's transaction version - // Will have a read transaction iff m_new_queries is non-empty + // Will have a read transaction iff m_new_notifiers is non-empty std::unique_ptr m_advancer_history; std::unique_ptr m_advancer_sg; std::exception_ptr m_async_error; std::unique_ptr<_impl::ExternalCommitHelper> m_notifier; - // must be called with m_query_mutex locked + // must be called with m_notifier_mutex locked void pin_version(uint_fast64_t version, uint_fast32_t index); - void run_async_queries(); + void run_async_notifiers(); void open_helper_shared_group(); - void move_new_queries_to_main(); + void move_new_notifiers_to_main(); void advance_helper_shared_group_to_latest(); - void clean_up_dead_queries(); + void clean_up_dead_notifiers(); }; } // namespace _impl diff --git a/src/impl/results_notifier.cpp b/src/impl/results_notifier.cpp index 2c79fd28..b40dbefe 100644 --- a/src/impl/results_notifier.cpp +++ b/src/impl/results_notifier.cpp @@ -31,7 +31,7 @@ ResultsNotifier::ResultsNotifier(Results& target) { Query q = target.get_query(); set_table(*q.get_table()); - m_query_handover = Realm::Internal::get_shared_group(get_realm()).export_for_handover(q, MutableSourcePayload::Move); + m_query_handover = Realm::Internal::get_shared_group(*get_realm()).export_for_handover(q, MutableSourcePayload::Move); } void ResultsNotifier::release_data() noexcept @@ -39,30 +39,6 @@ void ResultsNotifier::release_data() noexcept m_query = nullptr; } -// Most of the inter-thread synchronization for run(), prepare_handover(), -// attach_to(), detach(), release_query() and deliver() is done by -// RealmCoordinator external to this code, which has some potentially -// non-obvious results on which members are and are not safe to use without -// holding a lock. -// -// attach_to(), detach(), run(), prepare_handover(), and release_query() are -// all only ever called on a single thread. call_callbacks() and deliver() are -// called on the same thread. Calls to prepare_handover() and deliver() are -// guarded by a lock. -// -// In total, this means that the safe data flow is as follows: -// - prepare_handover(), attach_to(), detach() and release_query() can read -// members written by each other -// - deliver() can read members written to in prepare_handover(), deliver(), -// and call_callbacks() -// - call_callbacks() and read members written to in deliver() -// -// Separately from this data flow for the query results, all uses of -// m_target_results, m_callbacks, and m_callback_index must be done with the -// appropriate mutex held to avoid race conditions when the Results object is -// destroyed while the background work is running, and to allow removing -// callbacks from any thread. - static bool map_moves(size_t& idx, CollectionChangeIndices const& changes) { for (auto&& move : changes.moves) { @@ -74,6 +50,27 @@ static bool map_moves(size_t& idx, CollectionChangeIndices const& changes) return false; } +// Most of the inter-thread synchronization for run(), prepare_handover(), +// attach_to(), detach(), release_data() and deliver() is done by +// RealmCoordinator external to this code, which has some potentially +// non-obvious results on which members are and are not safe to use without +// holding a lock. +// +// add_required_change_info(), attach_to(), detach(), run(), +// prepare_handover(), and release_data() are all only ever called on a single +// background worker thread. call_callbacks() and deliver() are called on the +// target thread. Calls to prepare_handover() and deliver() are guarded by a +// lock. +// +// In total, this means that the safe data flow is as follows: +// - add_Required_change_info(), prepare_handover(), attach_to(), detach() and +// release_data() can read members written by each other +// - deliver() can read members written to in prepare_handover(), deliver(), +// and call_callbacks() +// - call_callbacks() and read members written to in deliver() +// +// Separately from the handover data flow, m_target_results is guarded by the target lock + void ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info) { REALM_ASSERT(m_query); @@ -83,24 +80,21 @@ void ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info) void ResultsNotifier::run() { REALM_ASSERT(m_info); + REALM_ASSERT(!m_tv.is_attached()); { - std::lock_guard target_lock(m_target_mutex); + auto lock = lock_target(); // Don't run the query if the results aren't actually going to be used - if (!m_target_results || (!have_callbacks() && !m_target_results->wants_background_updates())) { + if (!get_realm() || (!have_callbacks() && !m_target_results->wants_background_updates())) { return; } } - REALM_ASSERT(!m_tv.is_attached()); - - size_t table_ndx = m_query->get_table()->get_index_in_group(); - // If we've run previously, check if we need to rerun if (m_initial_run_complete) { // Make an empty tableview from the query to get the table version, since // Query doesn't expose it - if (m_query->find_all(0, 0, 0).sync_if_needed() == m_handed_over_table_version) { + if (m_query->find_all(0, 0, 0).sync_if_needed() == m_last_seen_version) { return; } } @@ -109,7 +103,9 @@ void ResultsNotifier::run() if (m_sort) { m_tv.sort(m_sort.column_indices, m_sort.ascending); } + m_last_seen_version = m_tv.sync_if_needed(); + size_t table_ndx = m_query->get_table()->get_index_in_group(); if (m_initial_run_complete) { auto changes = table_ndx < m_info->tables.size() ? &m_info->tables[table_ndx] : nullptr; @@ -156,7 +152,6 @@ void ResultsNotifier::do_prepare_handover(SharedGroup& sg) REALM_ASSERT(m_tv.is_in_sync()); m_initial_run_complete = true; - m_handed_over_table_version = m_tv.sync_if_needed(); m_tv_handover = sg.export_for_handover(m_tv, MutableSourcePayload::Move); add_changes(std::move(m_changes)); @@ -169,7 +164,7 @@ void ResultsNotifier::do_prepare_handover(SharedGroup& sg) bool ResultsNotifier::do_deliver(SharedGroup& sg) { - std::lock_guard target_lock(m_target_mutex); + auto lock = lock_target(); // Target results being null here indicates that it was destroyed while we // were in the process of advancing the Realm version and preparing for diff --git a/src/impl/results_notifier.hpp b/src/impl/results_notifier.hpp index 32ab6963..1e88ed9f 100644 --- a/src/impl/results_notifier.hpp +++ b/src/impl/results_notifier.hpp @@ -24,12 +24,8 @@ #include -#include #include #include -#include -#include -#include namespace realm { namespace _impl { @@ -40,24 +36,8 @@ public: ResultsNotifier(Results& target); private: - // Run/rerun the query if needed - void run() override; - // Prepare the handover object if run() did update the TableView - void do_prepare_handover(SharedGroup&) override; - // Update the target results from the handover - // Returns if any callbacks need to be invoked - bool do_deliver(SharedGroup& sg) override; - - void do_add_required_change_info(TransactionChangeInfo& info) override; - - void release_data() noexcept override; - void do_attach_to(SharedGroup& sg) override; - void do_detach_from(SharedGroup& sg) override; - - bool should_deliver_initial() const noexcept override { return true; } - - // Target Results to update and a mutex which guards it - mutable std::mutex m_target_mutex; + // Target Results to update + // Can only be used with lock_target() held Results* m_target_results; const SortOrder m_sort; @@ -71,14 +51,31 @@ private: TableView m_tv; std::unique_ptr> m_tv_handover; + // The table version from the last time the query was run. Used to avoid + // rerunning the query when there's no chance of it changing. + uint_fast64_t m_last_seen_version = -1; + + // The rows from the previous run of the query, for calculating diffs + std::vector m_previous_rows; + + // The changeset calculated during run() and delivered in do_prepare_handover() CollectionChangeIndices m_changes; TransactionChangeInfo* m_info = nullptr; - uint_fast64_t m_handed_over_table_version = -1; - - std::vector m_previous_rows; - + // Flag for whether or not the query has been run at all, as goofy timing + // can lead to deliver() being called before that bool m_initial_run_complete = false; + + void run() override; + void do_prepare_handover(SharedGroup&) override; + bool do_deliver(SharedGroup& sg) override; + void do_add_required_change_info(TransactionChangeInfo& info) override; + + void release_data() noexcept override; + void do_attach_to(SharedGroup& sg) override; + void do_detach_from(SharedGroup& sg) override; + + bool should_deliver_initial() const noexcept override { return true; } }; } // namespace _impl diff --git a/src/list.cpp b/src/list.cpp index 0d3b9f2e..b4b59aa6 100644 --- a/src/list.cpp +++ b/src/list.cpp @@ -176,7 +176,7 @@ NotificationToken List::add_notification_callback(CollectionChangeCallback cb) verify_attached(); if (!m_notifier) { m_notifier = std::make_shared(m_link_view, m_realm); - RealmCoordinator::register_query(m_notifier); + RealmCoordinator::register_notifier(m_notifier); } return {m_notifier, m_notifier->add_callback(std::move(cb))}; } diff --git a/src/results.cpp b/src/results.cpp index 8a1a6129..acf79aad 100644 --- a/src/results.cpp +++ b/src/results.cpp @@ -59,8 +59,8 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Table& table) Results::~Results() { - if (m_background_query) { - m_background_query->unregister(); + if (m_notifier) { + m_notifier->unregister(); } } @@ -181,9 +181,9 @@ void Results::update_tableview() if (!m_live) { return; } - if (!m_background_query && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) { - m_background_query = std::make_shared<_impl::ResultsNotifier>(*this); - _impl::RealmCoordinator::register_query(m_background_query); + if (!m_notifier && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) { + m_notifier = std::make_shared<_impl::ResultsNotifier>(*this); + _impl::RealmCoordinator::register_notifier(m_notifier); } m_has_used_table_view = true; m_table_view.sync_if_needed(); @@ -369,9 +369,9 @@ void Results::prepare_async() throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction"); } - if (!m_background_query) { - m_background_query = std::make_shared<_impl::ResultsNotifier>(*this); - _impl::RealmCoordinator::register_query(m_background_query); + if (!m_notifier) { + m_notifier = std::make_shared<_impl::ResultsNotifier>(*this); + _impl::RealmCoordinator::register_notifier(m_notifier); } } @@ -379,13 +379,13 @@ NotificationToken Results::async(std::function target { prepare_async(); auto wrap = [=](CollectionChangeIndices, std::exception_ptr e) { target(e); }; - return {m_background_query, m_background_query->add_callback(wrap)}; + return {m_notifier, m_notifier->add_callback(wrap)}; } NotificationToken Results::add_notification_callback(CollectionChangeCallback cb) { prepare_async(); - return {m_background_query, m_background_query->add_callback(std::move(cb))}; + return {m_notifier, m_notifier->add_callback(std::move(cb))}; } void Results::Internal::set_table_view(Results& results, realm::TableView &&tv) diff --git a/src/results.hpp b/src/results.hpp index 0051df47..7c6f53cf 100644 --- a/src/results.hpp +++ b/src/results.hpp @@ -197,7 +197,7 @@ private: SortOrder m_sort; bool m_live = true; - std::shared_ptr<_impl::ResultsNotifier> m_background_query; + std::shared_ptr<_impl::ResultsNotifier> m_notifier; Mode m_mode = Mode::Empty; bool m_has_used_table_view = false;