diff --git a/src/impl/async_query.cpp b/src/impl/async_query.cpp index 5f131941..d3dd4787 100644 --- a/src/impl/async_query.cpp +++ b/src/impl/async_query.cpp @@ -19,59 +19,74 @@ #include "impl/async_query.hpp" #include "impl/realm_coordinator.hpp" +#include "results.hpp" using namespace realm; using namespace realm::_impl; -AsyncQuery::AsyncQuery(SortOrder sort, - std::unique_ptr> handover, - std::unique_ptr callback, - RealmCoordinator& parent) -: parent(parent.shared_from_this()) -, m_sort(std::move(sort)) -, m_query_handover(std::move(handover)) -, m_callback(std::move(callback)) +AsyncQuery::AsyncQuery(Results& target) +: m_target_results(&target) +, m_realm(target.get_realm()) +, m_sort(target.get_sort()) +, m_version(m_realm->m_shared_group->get_version_of_current_transaction()) { + Query q = target.get_query(); + m_query_handover = m_realm->m_shared_group->export_for_handover(q, MutableSourcePayload::Move); } -void AsyncQuery::get_results(const SharedRealm& realm, SharedGroup& sg, std::vector>& ret) +size_t AsyncQuery::add_callback(std::function callback) { - if (!m_callback->is_for_current_thread()) { - return; - } + std::lock_guard lock(m_callback_mutex); - if (m_error) { - ret.emplace_back([self = shared_from_this()] { - self->m_callback->error(self->m_error); - RealmCoordinator::unregister_query(*self); - }); - return; - } - - if (!m_tv_handover) { - return; - } - if (m_tv_handover->version < sg.get_version_of_current_transaction()) { - // async results are stale; ignore - return; - } -// auto r = Results(realm, -// m_sort, -// std::move(*sg.import_from_handover(std::move(m_tv_handover)))); - Results r; - auto version = sg.get_version_of_current_transaction(); - ret.emplace_back([r = std::move(r), version, &sg, self = shared_from_this()] { - if (sg.get_version_of_current_transaction() == version) { - self->m_callback->deliver(std::move(r)); + size_t token = 0; + for (auto& callback : m_callbacks) { + if (token <= callback.token) { + token = callback.token + 1; } - }); + } + return token; } -void AsyncQuery::prepare_update() +void AsyncQuery::remove_callback(size_t token) { - // This function must not touch m_tv_handover as it is called without the - // relevant lock held (so that another thread can consume m_tv_handover - // while this is running) + std::lock_guard lock(m_callback_mutex); + if (is_for_current_thread() && m_calling_callbacks) { + // Schedule the removal for after we're done calling callbacks + m_callbacks_to_remove.push_back(token); + return; + } + do_remove_callback(token); +} + +void AsyncQuery::do_remove_callback(size_t token) noexcept +{ + REALM_ASSERT(m_error || m_callbacks.size() > 0); + auto it = find_if(begin(m_callbacks), end(m_callbacks), + [=](const auto& c) { return c.token == token; }); + // We should only fail to find the callback if it was removed due to an error + REALM_ASSERT(m_error || it != end(m_callbacks)); + + if (it != end(m_callbacks)) { + if (it != prev(end(m_callbacks))) { + *it = std::move(m_callbacks.back()); + } + m_callbacks.pop_back(); + } +} + +void AsyncQuery::unregister() noexcept +{ + std::lock_guard lock(m_target_mutex); + RealmCoordinator::unregister_query(*this); + m_target_results = nullptr; + m_realm = nullptr; +} + +void AsyncQuery::run() +{ + // This function must not touch any members touched in deliver(), as they + // may be called concurrently (as it'd be pretty bad for a running query to + // block the main thread trying to pick up the previous results) REALM_ASSERT(m_sg); @@ -80,6 +95,7 @@ void AsyncQuery::prepare_update() } else { m_tv = m_query->find_all(); + m_query = nullptr; if (m_sort) { m_tv.sort(m_sort.columnIndices, m_sort.ascending); } @@ -89,33 +105,103 @@ void AsyncQuery::prepare_update() void AsyncQuery::prepare_handover() { + std::lock_guard lock(m_callback_mutex); + + REALM_ASSERT(m_tv.is_in_sync()); + + m_version = m_sg->get_version_of_current_transaction(); + m_initial_run_complete = true; + // Even if the TV didn't change, we need to re-export it if the previous // export has not been consumed yet, as the old handover object is no longer // usable due to the version not matching - if (m_did_update || (m_tv_handover && m_tv_handover->version != m_sg->get_version_of_current_transaction())) { + if (m_did_update || (m_tv_handover && m_tv_handover->version != m_version)) { m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy); } } -void AsyncQuery::set_error(std::exception_ptr err) +void AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err) { - if (!m_error) { - m_error = err; + if (!is_for_current_thread()) { + return; } -} -SharedGroup::VersionID AsyncQuery::version() const noexcept -{ - if (m_tv_handover) - return m_tv_handover->version; - if (m_query_handover) - return m_query_handover->version; - return SharedGroup::VersionID{}; + std::lock_guard callback_lock(m_callback_mutex); + std::lock_guard target_lock(m_target_mutex); + + // Target results being null here indicates that it was destroyed while we + // were in the process of advancing the Realm version and preparing for + // delivery, i.e. it was destroyed from the "wrong" thread + if (!m_target_results) { + return; + } + + // We can get called before the query has actually had the chance to run if + // we're added immediately before a different set of async results are + // delivered + if (!m_initial_run_complete && !err) { + return; + } + + // Tell remove_callback() to defer actually removing them, so that calling it + // in the callback block works + m_calling_callbacks = true; + + if (err) { + m_error = true; + for (auto& callback : m_callbacks) { + callback.fn(err); + } + + // Remove all the callbacks as we never need to call anything ever again + // after delivering an error + m_callbacks.clear(); + m_callbacks_to_remove.clear(); + m_calling_callbacks = false; + return; + } + + REALM_ASSERT(!m_query_handover); + + auto realm_version = m_realm->m_shared_group->get_version_of_current_transaction(); + if (m_version != realm_version) { + // Realm version can be newer if a commit was made on our thread or the + // user manually called refresh(), or older if a commit was made on a + // different thread and we ran *really* fast in between the check for + // if the shared group has changed and when we pick up async results + return; + } + + // Cannot use m_did_update here as it is used unlocked in run() + bool did_update = false; + if (m_tv_handover) { + Results::AsyncFriend::set_table_view(*m_target_results, + std::move(*sg.import_from_handover(std::move(m_tv_handover)))); + + did_update = true; + } + REALM_ASSERT(!m_tv_handover); + + for (auto& callback : m_callbacks) { + if (did_update || callback.first_run) { + callback.fn(nullptr); + callback.first_run = false; + } + } + + m_calling_callbacks = false; + + // Actually remove any callbacks whose removal was requested during iteration + for (auto token : m_callbacks_to_remove) { + do_remove_callback(token); + } + m_callbacks_to_remove.clear(); } void AsyncQuery::attach_to(realm::SharedGroup& sg) { REALM_ASSERT(!m_sg); + REALM_ASSERT(m_query_handover); m_query = sg.import_from_handover(std::move(m_query_handover)); m_sg = &sg; @@ -124,7 +210,10 @@ void AsyncQuery::attach_to(realm::SharedGroup& sg) void AsyncQuery::detatch() { REALM_ASSERT(m_sg); + REALM_ASSERT(m_query); + REALM_ASSERT(!m_tv.is_attached()); m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move); m_sg = nullptr; + m_query = nullptr; } diff --git a/src/impl/async_query.hpp b/src/impl/async_query.hpp index 1671e646..04f350a9 100644 --- a/src/impl/async_query.hpp +++ b/src/impl/async_query.hpp @@ -23,50 +23,88 @@ #include +#include +#include #include +#include +#include namespace realm { namespace _impl { -class AsyncQuery : public std::enable_shared_from_this { +class AsyncQuery { public: - AsyncQuery(SortOrder sort, - std::unique_ptr> handover, - std::unique_ptr callback, - RealmCoordinator& parent); + AsyncQuery(Results& target); - void get_results(const SharedRealm& realm, SharedGroup& sg, std::vector>& ret); + size_t add_callback(std::function); + void remove_callback(size_t token); - void set_error(std::exception_ptr err); + void unregister() noexcept; // Run/rerun the query if needed - void prepare_update(); - // Update the handover object with the new data produced in prepare_update() + void run(); + // Prepare the handover object if run() did update the TableView void prepare_handover(); + // Update the target results from the handover and call callbacks + void deliver(SharedGroup& sg, std::exception_ptr err); - // Get the version of the current handover object - SharedGroup::VersionID version() const noexcept; - + // Attach the handed-over query to `sg` void attach_to(SharedGroup& sg); + // Create a new query handover object and stop using the previously attached + // SharedGroup void detatch(); - std::shared_ptr parent; + Realm& get_realm() { return *m_target_results->get_realm(); } + // Get the version of the current handover object + SharedGroup::VersionID version() const noexcept { return m_version; } private: - const SortOrder m_sort; + // Target Results to update and a mutex which guards it + std::mutex m_target_mutex; + Results* m_target_results; + std::shared_ptr m_realm; + const SortOrder m_sort; + const std::thread::id m_thread_id = std::this_thread::get_id(); + + // The source Query, in handover from iff m_sg is null + // Only used until the first time the query is actually run, after which + // both will be null std::unique_ptr> m_query_handover; std::unique_ptr m_query; - std::unique_ptr> m_tv_handover; + // The TableView resulting from running the query. Will be detached if the + // Query has not yet been run, in which case m_query or m_query_handover will + // be non-null TableView m_tv; + std::unique_ptr> m_tv_handover; + SharedGroup::VersionID m_version; - const std::unique_ptr m_callback; + struct Callback { + std::function fn; + std::unique_ptr> handover; + size_t token; + bool first_run; + }; + + // Currently registered callbacks and a mutex which must always be held + // while doing anything with them + std::mutex m_callback_mutex; + std::vector m_callbacks; + + // Callbacks which the user has asked to have removed whose removal has been + // deferred until after we're done looping over m_callbacks + std::vector m_callbacks_to_remove; SharedGroup* m_sg = nullptr; - std::exception_ptr m_error; - bool m_did_update = false; + bool m_initial_run_complete = false; + bool m_calling_callbacks = false; + bool m_error = false; + + void do_remove_callback(size_t token) noexcept; + + bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); } }; } // namespace _impl diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp index 8c0dbc71..1644465f 100644 --- a/src/impl/realm_coordinator.cpp +++ b/src/impl/realm_coordinator.cpp @@ -239,48 +239,25 @@ void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index) } } -AsyncQueryCancelationToken RealmCoordinator::register_query(const Results& r, std::unique_ptr target) +void RealmCoordinator::register_query(std::shared_ptr query) { - return r.get_realm()->m_coordinator->do_register_query(r, std::move(target)); -} - -AsyncQueryCancelationToken RealmCoordinator::do_register_query(const Results& r, std::unique_ptr target) -{ - if (m_config.read_only) { - throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms"); - } - if (r.get_realm()->is_in_transaction()) { - throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction"); - } - - auto handover = r.get_realm()->m_shared_group->export_for_handover(r.get_query(), ConstSourcePayload::Copy); - auto version = handover->version; - auto query = std::make_shared(r.get_sort(), - std::move(handover), - std::move(target), - *this); - + auto version = query->version(); + auto& self = *query->get_realm().m_coordinator; { - std::lock_guard lock(m_query_mutex); - pin_version(version.version, version.index); - m_new_queries.push_back(query); + std::lock_guard lock(self.m_query_mutex); + self.pin_version(version.version, version.index); + self.m_new_queries.push_back(std::move(query)); } // Wake up the background worker threads by pretending we made a commit - m_notifier->notify_others(); - return query; + self.m_notifier->notify_others(); } -void RealmCoordinator::unregister_query(AsyncQuery& registration) -{ - registration.parent->do_unregister_query(registration); -} - -void RealmCoordinator::do_unregister_query(AsyncQuery& registration) +void RealmCoordinator::unregister_query(AsyncQuery& query) { auto swap_remove = [&](auto& container) { auto it = std::find_if(container.begin(), container.end(), - [&](auto const& ptr) { return ptr.get() == ®istration; }); + [&](auto const& ptr) { return ptr.get() == &query; }); if (it != container.end()) { std::iter_swap(--container.end(), it); container.pop_back(); @@ -289,18 +266,19 @@ void RealmCoordinator::do_unregister_query(AsyncQuery& registration) return false; }; - std::lock_guard lock(m_query_mutex); - if (swap_remove(m_queries)) { + auto& self = *query.get_realm().m_coordinator; + std::lock_guard lock(self.m_query_mutex); + if (swap_remove(self.m_queries)) { // Make sure we aren't holding on to read versions needlessly if there // are no queries left, but don't close them entirely as opening shared // groups is expensive - if (!m_running_queries && m_queries.empty() && m_query_sg) { - m_query_sg->end_read(); + if (!self.m_running_queries && self.m_queries.empty() && self.m_query_sg) { + self.m_query_sg->end_read(); } } - else if (swap_remove(m_new_queries)) { - if (m_new_queries.empty() && m_advancer_sg) { - m_advancer_sg->end_read(); + else if (swap_remove(self.m_new_queries)) { + if (self.m_new_queries.empty() && self.m_advancer_sg) { + self.m_advancer_sg->end_read(); } } } @@ -329,9 +307,6 @@ void RealmCoordinator::run_async_queries() if (m_async_error) { move_new_queries_to_main(); - for (auto& query : m_queries) { - query->set_error(m_async_error); - } return; } @@ -347,14 +322,18 @@ void RealmCoordinator::run_async_queries() lock.unlock(); for (auto& query : queries_to_run) { - query->prepare_update(); + query->run(); } // Reacquire the lock while updating the fields that are actually read on // other threads - lock.lock(); - for (auto& query : queries_to_run) { - query->prepare_handover(); + { + // Make sure we don't change the version while another thread is delivering + std::lock_guard version_lock(m_query_version_mutex); + lock.lock(); + for (auto& query : queries_to_run) { + query->prepare_handover(); + } } // Check if all queries were removed while we were running them, as if so @@ -431,8 +410,9 @@ void RealmCoordinator::advance_helper_shared_group_to_latest() void RealmCoordinator::advance_to_ready(Realm& realm) { - std::vector> async_results; + decltype(m_queries) queries; + std::lock_guard lock(m_query_version_mutex); { std::lock_guard lock(m_query_mutex); @@ -455,29 +435,24 @@ void RealmCoordinator::advance_to_ready(Realm& realm) } transaction::advance(*realm.m_shared_group, *realm.m_history, realm.m_binding_context.get(), version); - - for (auto& query : m_queries) { - query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results); - } + queries = m_queries; } - for (auto& results : async_results) { - results(); + for (auto& query : queries) { + query->deliver(*realm.m_shared_group, m_async_error); } } void RealmCoordinator::process_available_async(Realm& realm) { - std::vector> async_results; - + decltype(m_queries) queries; { std::lock_guard lock(m_query_mutex); - for (auto& query : m_queries) { - query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results); - } + queries = m_queries; } - for (auto& results : async_results) { - results(); + std::lock_guard lock(m_query_version_mutex); + for (auto& query : queries) { + query->deliver(*realm.m_shared_group, m_async_error); } } diff --git a/src/impl/realm_coordinator.hpp b/src/impl/realm_coordinator.hpp index 0c8b120b..2a35cd01 100644 --- a/src/impl/realm_coordinator.hpp +++ b/src/impl/realm_coordinator.hpp @@ -80,8 +80,8 @@ public: // Update the schema in the cached config void update_schema(Schema const& new_schema); - static AsyncQueryCancelationToken register_query(const Results& r, std::unique_ptr); - static void unregister_query(AsyncQuery& registration); + static void register_query(std::shared_ptr query); + static void unregister_query(AsyncQuery& query); // Advance the Realm to the most recent transaction version which all async // work is complete for @@ -95,6 +95,7 @@ private: std::vector m_cached_realms; std::mutex m_query_mutex; + std::mutex m_query_version_mutex; bool m_running_queries = false; std::vector> m_new_queries; std::vector> m_queries; @@ -113,9 +114,6 @@ private: std::unique_ptr<_impl::ExternalCommitHelper> m_notifier; - AsyncQueryCancelationToken do_register_query(const Results& r, std::unique_ptr); - void do_unregister_query(AsyncQuery& registration); - // must be called with m_query_mutex locked void pin_version(uint_fast64_t version, uint_fast32_t index); diff --git a/src/results.cpp b/src/results.cpp index b3f59945..f19afe1d 100644 --- a/src/results.cpp +++ b/src/results.cpp @@ -18,8 +18,9 @@ #include "results.hpp" -#include "object_store.hpp" +#include "impl/async_query.hpp" #include "impl/realm_coordinator.hpp" +#include "object_store.hpp" #include @@ -55,6 +56,13 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Table& table) { } +Results::~Results() +{ + if (m_background_query) { + m_background_query->unregister(); + } +} + void Results::validate_read() const { if (m_realm) @@ -344,6 +352,29 @@ Results Results::filter(Query&& q) const return Results(m_realm, get_object_schema(), get_query().and_query(std::move(q)), get_sort()); } +AsyncQueryCancelationToken Results::async(std::function target) +{ + if (m_realm->config().read_only) { + throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms"); + } + if (m_realm->is_in_transaction()) { + throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction"); + } + + if (!m_background_query) { + m_background_query = std::make_shared<_impl::AsyncQuery>(*this); + _impl::RealmCoordinator::register_query(m_background_query); + } + return {m_background_query, m_background_query->add_callback(std::move(target))}; +} + +void Results::AsyncFriend::set_table_view(Results& results, realm::TableView &&tv) +{ + results.m_table_view = std::move(tv); + results.m_mode = Mode::TableView; + REALM_ASSERT(results.m_table_view.is_in_sync()); +} + Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t column, const Table* table) : std::runtime_error((std::string)"Operation not supported on '" + table->get_column_name(column).data() + "' columns") , column_index(column) @@ -352,30 +383,37 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c { } -AsyncQueryCancelationToken Results::async(std::unique_ptr target) +AsyncQueryCancelationToken::AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token) +: m_query(std::move(query)), m_token(token) { - return _impl::RealmCoordinator::register_query(*this, std::move(target)); } AsyncQueryCancelationToken::~AsyncQueryCancelationToken() { - if (m_registration) { - _impl::RealmCoordinator::unregister_query(*m_registration); + // m_query itself (and not just the pointed-to thing) needs to be accessed + // atomically to ensure that there are no data races when the token is + // destroyed after being modified on a different thread. + // This is needed despite the token not being thread-safe in general as + // users find it very surpringing for obj-c objects to care about what + // thread they are deallocated on. + if (auto query = std::atomic_load(&m_query)) { + query->remove_callback(m_token); } } AsyncQueryCancelationToken::AsyncQueryCancelationToken(AsyncQueryCancelationToken&& rgt) -: m_registration(std::move(rgt.m_registration)) +: m_query(std::atomic_exchange(&rgt.m_query, {})), m_token(rgt.m_token) { } AsyncQueryCancelationToken& AsyncQueryCancelationToken::operator=(realm::AsyncQueryCancelationToken&& rgt) { if (this != &rgt) { - if (m_registration) { - _impl::RealmCoordinator::unregister_query(*m_registration); + if (auto query = std::atomic_load(&m_query)) { + query->remove_callback(m_token); } - m_registration = std::move(rgt.m_registration); + std::atomic_store(&m_query, std::atomic_exchange(&rgt.m_query, {})); + m_token = rgt.m_token; } return *this; } diff --git a/src/results.hpp b/src/results.hpp index 01e0368b..8e0a6ba1 100644 --- a/src/results.hpp +++ b/src/results.hpp @@ -39,7 +39,7 @@ namespace _impl { // A token which keeps an asynchronous query alive struct AsyncQueryCancelationToken { AsyncQueryCancelationToken() = default; - AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> registration) : m_registration(std::move(registration)) { } + AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token); ~AsyncQueryCancelationToken(); AsyncQueryCancelationToken(AsyncQueryCancelationToken&&); @@ -49,25 +49,8 @@ struct AsyncQueryCancelationToken { AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken const&) = delete; private: - std::shared_ptr<_impl::AsyncQuery> m_registration; -}; - -// Subclass to get notifications about async query things -class AsyncQueryCallback { -public: - virtual ~AsyncQueryCallback() = default; - - // Called with the Results object generated by the query on a thread where - // is_for_current_thread() returned true - virtual void deliver(Results) = 0; - - // If an error occured while running the query on the worker thread, this is - // called with an exception on a thread where is_for_current_thread() - // returned true - virtual void error(std::exception_ptr) = 0; - - // Return whether or not this query is associated with the current thread - virtual bool is_for_current_thread() { return true; } + std::shared_ptr<_impl::AsyncQuery> m_query; + size_t m_token; }; struct SortOrder { @@ -88,6 +71,7 @@ public: Results() = default; Results(SharedRealm r, const ObjectSchema& o, Table& table); Results(SharedRealm r, const ObjectSchema& o, Query q, SortOrder s = {}); + ~Results(); // Results is copyable and moveable Results(Results const&) = default; @@ -210,7 +194,14 @@ public: // Create an async query from this Results // The query will be run on a background thread and delivered to the callback, // and then rerun after each commit (if needed) and redelivered if it changed - AsyncQueryCancelationToken async(std::unique_ptr target); + AsyncQueryCancelationToken async(std::function target); + + // Helper type to let AsyncQuery update the tableview without giving access + // to any other privates or letting anyone else do so + class AsyncFriend { + friend class _impl::AsyncQuery; + static void set_table_view(Results& results, TableView&& tv); + }; private: SharedRealm m_realm; @@ -221,6 +212,8 @@ private: SortOrder m_sort; bool m_live = true; + std::shared_ptr<_impl::AsyncQuery> m_background_query; + Mode m_mode = Mode::Empty; void validate_read() const; @@ -230,6 +223,8 @@ private: util::Optional aggregate(size_t column, bool return_none_for_empty, Int agg_int, Float agg_float, Double agg_double, DateTime agg_datetime); + + void set_table_view(TableView&& tv); }; } diff --git a/src/shared_realm.hpp b/src/shared_realm.hpp index 9d7fe302..fdf064b8 100644 --- a/src/shared_realm.hpp +++ b/src/shared_realm.hpp @@ -39,7 +39,7 @@ namespace realm { typedef std::weak_ptr WeakRealm; namespace _impl { - class ExternalCommitHelper; + class AsyncQuery; class RealmCoordinator; } @@ -136,6 +136,7 @@ namespace realm { // FIXME private Group *read_group(); + friend class _impl::AsyncQuery; friend class _impl::RealmCoordinator; };