From 3e90c3057118bb83aefd47494c832f383defc132 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Thu, 10 Dec 2015 14:00:02 -0800 Subject: [PATCH] Add more tests and fix bugs --- src/impl/async_query.cpp | 194 +++++++++++++++++++-------------- src/impl/async_query.hpp | 42 ++++--- src/impl/realm_coordinator.cpp | 75 ++++++------- src/impl/realm_coordinator.hpp | 3 +- 4 files changed, 181 insertions(+), 133 deletions(-) diff --git a/src/impl/async_query.cpp b/src/impl/async_query.cpp index 75831434..7d0e8641 100644 --- a/src/impl/async_query.cpp +++ b/src/impl/async_query.cpp @@ -28,68 +28,100 @@ AsyncQuery::AsyncQuery(Results& target) : m_target_results(&target) , m_realm(target.get_realm()) , m_sort(target.get_sort()) -, m_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction()) +, m_sg_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction()) { Query q = target.get_query(); m_query_handover = Realm::Internal::get_shared_group(*m_realm).export_for_handover(q, MutableSourcePayload::Move); } +AsyncQuery::~AsyncQuery() +{ + std::lock_guard lock(m_target_mutex); + m_realm = nullptr; +} + size_t AsyncQuery::add_callback(std::function callback) { - std::lock_guard lock(m_callback_mutex); + m_realm->verify_thread(); - size_t token = 0; - for (auto& callback : m_callbacks) { - if (token <= callback.token) { - token = callback.token + 1; + auto next_token = [=] { + size_t token = 0; + for (auto& callback : m_callbacks) { + if (token <= callback.token) { + token = callback.token + 1; + } } + return token; + }; + + std::lock_guard lock(m_callback_mutex); + auto token = next_token(); + m_callbacks.push_back({std::move(callback), token, -1ULL}); + if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications + Realm::Internal::get_coordinator(*m_realm).send_commit_notifications(); + m_have_callbacks = true; } return token; } void AsyncQuery::remove_callback(size_t token) { - std::lock_guard lock(m_callback_mutex); - if (is_for_current_thread() && m_calling_callbacks) { - // Schedule the removal for after we're done calling callbacks - m_callbacks_to_remove.push_back(token); - return; - } - do_remove_callback(token); -} + Callback old; + { + std::lock_guard lock(m_callback_mutex); + REALM_ASSERT(m_error || m_callbacks.size() > 0); -void AsyncQuery::do_remove_callback(size_t token) noexcept -{ - REALM_ASSERT(m_error || m_callbacks.size() > 0); - auto it = find_if(begin(m_callbacks), end(m_callbacks), - [=](const auto& c) { return c.token == token; }); - // We should only fail to find the callback if it was removed due to an error - REALM_ASSERT(m_error || it != end(m_callbacks)); - - if (it != end(m_callbacks)) { - if (it != prev(end(m_callbacks))) { - *it = std::move(m_callbacks.back()); + auto it = find_if(begin(m_callbacks), end(m_callbacks), + [=](const auto& c) { return c.token == token; }); + // We should only fail to find the callback if it was removed due to an error + REALM_ASSERT(m_error || it != end(m_callbacks)); + if (it == end(m_callbacks)) { + return; } - m_callbacks.pop_back(); + + size_t idx = distance(begin(m_callbacks), it); + if (m_callback_index != npos && m_callback_index >= idx) { + --m_callback_index; + } + + old = std::move(*it); + m_callbacks.erase(it); + + m_have_callbacks = !m_callbacks.empty(); } } void AsyncQuery::unregister() noexcept { std::lock_guard lock(m_target_mutex); - RealmCoordinator::unregister_query(*this); m_target_results = nullptr; m_realm = nullptr; } +void AsyncQuery::release_query() noexcept +{ + { + std::lock_guard lock(m_target_mutex); + REALM_ASSERT(!m_realm && !m_target_results); + } + + m_query = nullptr; +} + +bool AsyncQuery::is_alive() const noexcept +{ + std::lock_guard lock(m_target_mutex); + return m_target_results != nullptr; +} + void AsyncQuery::run() { REALM_ASSERT(m_sg); { - std::lock_guard callback_lock(m_callback_mutex); std::lock_guard target_lock(m_target_mutex); - if (!m_target_results || (m_callbacks.empty() && !m_target_results->wants_background_updates())) { + // Don't run the query if the results aren't actually going to be used + if (!m_target_results || (!m_have_callbacks && !m_target_results->wants_background_updates())) { m_skipped_running = true; return; } @@ -100,7 +132,7 @@ void AsyncQuery::run() // may be called concurrently (as it'd be pretty bad for a running query to // block the main thread trying to pick up the previous results) if (m_tv.is_attached()) { - m_did_update = m_tv.sync_if_needed(); + m_tv.sync_if_needed(); } else { m_tv = m_query->find_all(); @@ -108,107 +140,111 @@ void AsyncQuery::run() if (m_sort) { m_tv.sort(m_sort.columnIndices, m_sort.ascending); } - m_did_update = true; } } void AsyncQuery::prepare_handover() { if (m_skipped_running) { + m_sg_version = SharedGroup::VersionID{}; return; } REALM_ASSERT(m_tv.is_attached()); REALM_ASSERT(m_tv.is_in_sync()); - std::lock_guard lock(m_callback_mutex); - m_version = m_sg->get_version_of_current_transaction(); + m_sg_version = m_sg->get_version_of_current_transaction(); m_initial_run_complete = true; - // Even if the TV didn't change, we need to re-export it if the previous - // export has not been consumed yet, as the old handover object is no longer - // usable due to the version not matching - if (m_did_update || (m_tv_handover && m_tv_handover->version != m_version)) { - m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy); - } -} - -void AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err) -{ - if (!is_for_current_thread()) { + auto table_version = m_tv.outside_version(); + if (!m_tv_handover && table_version == m_handed_over_table_version) { + // We've already delivered the query results since the last time the + // table changed, so no need to do anything return; } - std::lock_guard callback_lock(m_callback_mutex); + m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy); + m_handed_over_table_version = table_version; +} + +bool AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err) +{ + if (!is_for_current_thread()) { + return false; + } + std::lock_guard target_lock(m_target_mutex); // Target results being null here indicates that it was destroyed while we // were in the process of advancing the Realm version and preparing for // delivery, i.e. it was destroyed from the "wrong" thread if (!m_target_results) { - return; + return false; } // We can get called before the query has actually had the chance to run if // we're added immediately before a different set of async results are // delivered if (!m_initial_run_complete && !err) { - return; + return false; } - // Tell remove_callback() to defer actually removing them, so that calling it - // in the callback block works - m_calling_callbacks = true; - if (err) { - m_error = true; - for (auto& callback : m_callbacks) { - callback.fn(err); - } - - // Remove all the callbacks as we never need to call anything ever again - // after delivering an error - m_callbacks.clear(); - m_callbacks_to_remove.clear(); - m_calling_callbacks = false; - return; + m_error = err; + return m_have_callbacks; } REALM_ASSERT(!m_query_handover); - auto realm_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction(); - if (m_version != realm_version) { + auto realm_sg_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction(); + if (m_sg_version != realm_sg_version) { // Realm version can be newer if a commit was made on our thread or the // user manually called refresh(), or older if a commit was made on a // different thread and we ran *really* fast in between the check for // if the shared group has changed and when we pick up async results - return; + return false; } - // Cannot use m_did_update here as it is used unlocked in run() - bool did_update = false; if (m_tv_handover) { + m_tv_handover->version = m_sg_version; Results::AsyncFriend::set_table_view(*m_target_results, std::move(*sg.import_from_handover(std::move(m_tv_handover)))); + m_delievered_table_version = m_handed_over_table_version; - did_update = true; } REALM_ASSERT(!m_tv_handover); + return m_have_callbacks; +} - for (auto& callback : m_callbacks) { - if (did_update || callback.first_run) { - callback.fn(nullptr); - callback.first_run = false; +void AsyncQuery::call_callbacks() +{ + REALM_ASSERT(is_for_current_thread()); + + while (auto fn = next_callback()) { + fn(m_error); + } + + if (m_error) { + // Remove all the callbacks as we never need to call anything ever again + // after delivering an error + std::lock_guard callback_lock(m_callback_mutex); + m_callbacks.clear(); + } +} + +std::function AsyncQuery::next_callback() +{ + std::lock_guard callback_lock(m_callback_mutex); + for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) { + auto& callback = m_callbacks[m_callback_index]; + if (m_error || callback.delivered_version != m_delievered_table_version) { + callback.delivered_version = m_delievered_table_version; + return callback.fn; } } - m_calling_callbacks = false; - - // Actually remove any callbacks whose removal was requested during iteration - for (auto token : m_callbacks_to_remove) { - do_remove_callback(token); - } - m_callbacks_to_remove.clear(); + m_callback_index = npos; + return nullptr; } void AsyncQuery::attach_to(realm::SharedGroup& sg) diff --git a/src/impl/async_query.hpp b/src/impl/async_query.hpp index 4c3ec0a4..fef16927 100644 --- a/src/impl/async_query.hpp +++ b/src/impl/async_query.hpp @@ -34,18 +34,22 @@ namespace _impl { class AsyncQuery { public: AsyncQuery(Results& target); + ~AsyncQuery(); size_t add_callback(std::function); void remove_callback(size_t token); void unregister() noexcept; + void release_query() noexcept; // Run/rerun the query if needed void run(); // Prepare the handover object if run() did update the TableView void prepare_handover(); - // Update the target results from the handover and call callbacks - void deliver(SharedGroup& sg, std::exception_ptr err); + // Update the target results from the handover + // Returns if any callbacks need to be invoked + bool deliver(SharedGroup& sg, std::exception_ptr err); + void call_callbacks(); // Attach the handed-over query to `sg` void attach_to(SharedGroup& sg); @@ -55,11 +59,13 @@ public: Realm& get_realm() { return *m_target_results->get_realm(); } // Get the version of the current handover object - SharedGroup::VersionID version() const noexcept { return m_version; } + SharedGroup::VersionID version() const noexcept { return m_sg_version; } + + bool is_alive() const noexcept; private: // Target Results to update and a mutex which guards it - std::mutex m_target_mutex; + mutable std::mutex m_target_mutex; Results* m_target_results; std::shared_ptr m_realm; @@ -77,13 +83,13 @@ private: // be non-null TableView m_tv; std::unique_ptr> m_tv_handover; - SharedGroup::VersionID m_version; + SharedGroup::VersionID m_sg_version; + std::exception_ptr m_error; struct Callback { std::function fn; - std::unique_ptr> handover; size_t token; - bool first_run; + uint_fast64_t delivered_version; }; // Currently registered callbacks and a mutex which must always be held @@ -91,21 +97,27 @@ private: std::mutex m_callback_mutex; std::vector m_callbacks; - // Callbacks which the user has asked to have removed whose removal has been - // deferred until after we're done looping over m_callbacks - std::vector m_callbacks_to_remove; - SharedGroup* m_sg = nullptr; - bool m_did_update = false; + uint_fast64_t m_handed_over_table_version = -1; + uint_fast64_t m_delievered_table_version = -1; + + // Iteration variable for looping over callbacks + // remove_callback() updates this when needed + size_t m_callback_index = npos; + bool m_skipped_running = false; bool m_initial_run_complete = false; - bool m_calling_callbacks = false; - bool m_error = false; - void do_remove_callback(size_t token) noexcept; + // Cached value for if m_callbacks is empty, needed to avoid deadlocks in + // run() due to lock-order inversion between m_callback_mutex and m_target_mutex + // It's okay if this value is stale as at worst it'll result in us doing + // some extra work. + std::atomic m_have_callbacks = {false}; bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); } + + std::function next_callback(); }; } // namespace _impl diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp index 29abf8ee..91259c6d 100644 --- a/src/impl/realm_coordinator.cpp +++ b/src/impl/realm_coordinator.cpp @@ -246,37 +246,40 @@ void RealmCoordinator::register_query(std::shared_ptr query) self.pin_version(version.version, version.index); self.m_new_queries.push_back(std::move(query)); } - - // Wake up the background worker threads by pretending we made a commit - self.m_notifier->notify_others(); } -void RealmCoordinator::unregister_query(AsyncQuery& query) +void RealmCoordinator::clean_up_dead_queries() { auto swap_remove = [&](auto& container) { - auto it = std::find_if(container.begin(), container.end(), - [&](auto const& ptr) { return ptr.get() == &query; }); - if (it != container.end()) { - std::iter_swap(--container.end(), it); + bool did_remove = false; + for (size_t i = 0; i < container.size(); ++i) { + if (container[i]->is_alive()) + continue; + + // Ensure the query is destroyed here even if there's lingering refs + // to the async query elsewhere + container[i]->release_query(); + + if (container.size() > i + 1) + container[i] = std::move(container.back()); container.pop_back(); - return true; + --i; + did_remove = true; } - return false; + return did_remove; }; - auto& self = Realm::Internal::get_coordinator(query.get_realm()); - std::lock_guard lock(self.m_query_mutex); - if (swap_remove(self.m_queries)) { + if (swap_remove(m_queries)) { // Make sure we aren't holding on to read versions needlessly if there // are no queries left, but don't close them entirely as opening shared // groups is expensive - if (!self.m_running_queries && self.m_queries.empty() && self.m_query_sg) { - self.m_query_sg->end_read(); + if (m_queries.empty() && m_query_sg) { + m_query_sg->end_read(); } } - else if (swap_remove(self.m_new_queries)) { - if (self.m_new_queries.empty() && self.m_advancer_sg) { - self.m_advancer_sg->end_read(); + if (swap_remove(m_new_queries)) { + if (m_new_queries.empty() && m_advancer_sg) { + m_advancer_sg->end_read(); } } } @@ -295,6 +298,8 @@ void RealmCoordinator::run_async_queries() { std::unique_lock lock(m_query_mutex); + clean_up_dead_queries(); + if (m_queries.empty() && m_new_queries.empty()) { return; } @@ -310,10 +315,6 @@ void RealmCoordinator::run_async_queries() advance_helper_shared_group_to_latest(); - // Tell other threads not to close the shared group as we need it even - // though we aren't holding the lock - m_running_queries = true; - // Make a copy of the queries vector so that we can release the lock while // we run the queries auto queries_to_run = m_queries; @@ -326,20 +327,13 @@ void RealmCoordinator::run_async_queries() // Reacquire the lock while updating the fields that are actually read on // other threads { - // Make sure we don't change the version while another thread is delivering - std::lock_guard version_lock(m_query_version_mutex); lock.lock(); for (auto& query : queries_to_run) { query->prepare_handover(); } } - // Check if all queries were removed while we were running them, as if so - // the shared group didn't get closed by do_unregister_query() - m_running_queries = false; - if (m_queries.empty()) { - m_query_sg->end_read(); - } + clean_up_dead_queries(); } void RealmCoordinator::open_helper_shared_group() @@ -413,7 +407,6 @@ void RealmCoordinator::advance_to_ready(Realm& realm) auto& sg = Realm::Internal::get_shared_group(realm); auto& history = Realm::Internal::get_history(realm); - std::lock_guard lock(m_query_version_mutex); { std::lock_guard lock(m_query_mutex); @@ -436,25 +429,33 @@ void RealmCoordinator::advance_to_ready(Realm& realm) } transaction::advance(sg, history, realm.m_binding_context.get(), version); - queries = m_queries; + + for (auto& query : m_queries) { + if (query->deliver(sg, m_async_error)) { + queries.push_back(query); + } + } } for (auto& query : queries) { - query->deliver(sg, m_async_error); + query->call_callbacks(); } } void RealmCoordinator::process_available_async(Realm& realm) { + auto& sg = Realm::Internal::get_shared_group(realm); decltype(m_queries) queries; { std::lock_guard lock(m_query_mutex); - queries = m_queries; + for (auto& query : m_queries) { + if (query->deliver(sg, m_async_error)) { + queries.push_back(query); + } + } } - auto& sg = Realm::Internal::get_shared_group(realm); - std::lock_guard lock(m_query_version_mutex); for (auto& query : queries) { - query->deliver(sg, m_async_error); + query->call_callbacks(); } } diff --git a/src/impl/realm_coordinator.hpp b/src/impl/realm_coordinator.hpp index 2a35cd01..3e153aff 100644 --- a/src/impl/realm_coordinator.hpp +++ b/src/impl/realm_coordinator.hpp @@ -81,7 +81,6 @@ public: void update_schema(Schema const& new_schema); static void register_query(std::shared_ptr query); - static void unregister_query(AsyncQuery& query); // Advance the Realm to the most recent transaction version which all async // work is complete for @@ -95,7 +94,6 @@ private: std::vector m_cached_realms; std::mutex m_query_mutex; - std::mutex m_query_version_mutex; bool m_running_queries = false; std::vector> m_new_queries; std::vector> m_queries; @@ -121,6 +119,7 @@ private: void open_helper_shared_group(); void move_new_queries_to_main(); void advance_helper_shared_group_to_latest(); + void clean_up_dead_queries(); }; } // namespace _impl