Comment and clean up the Notifiers/BackgroundCollection

This commit is contained in:
Thomas Goyne 2016-03-09 12:05:06 -08:00
parent 4ec1090c05
commit b920f62ca5
11 changed files with 225 additions and 195 deletions

View File

@ -32,11 +32,9 @@ BackgroundCollection::BackgroundCollection(std::shared_ptr<Realm> realm)
BackgroundCollection::~BackgroundCollection() BackgroundCollection::~BackgroundCollection()
{ {
// unregister() may have been called from a different thread than we're being // Need to do this explicitly to ensure m_realm is destroyed with the mutex
// destroyed on, so we need to synchronize access to the interesting fields // held to avoid potential double-deletion
// modified there unregister();
std::lock_guard<std::mutex> lock(m_realm_mutex);
m_realm = nullptr;
} }
size_t BackgroundCollection::add_callback(CollectionChangeCallback callback) size_t BackgroundCollection::add_callback(CollectionChangeCallback callback)
@ -102,6 +100,11 @@ bool BackgroundCollection::is_alive() const noexcept
return m_realm != nullptr; return m_realm != nullptr;
} }
std::unique_lock<std::mutex> BackgroundCollection::lock_target()
{
return std::unique_lock<std::mutex>{m_realm_mutex};
}
// Recursively add `table` and all tables it links to to `out` // Recursively add `table` and all tables it links to to `out`
static void find_relevant_tables(std::vector<size_t>& out, Table const& table) static void find_relevant_tables(std::vector<size_t>& out, Table const& table)
{ {

View File

@ -25,7 +25,6 @@
#include <mutex> #include <mutex>
#include <functional> #include <functional>
#include <set>
#include <thread> #include <thread>
namespace realm { namespace realm {
@ -34,42 +33,72 @@ class Realm;
namespace _impl { namespace _impl {
struct TransactionChangeInfo; struct TransactionChangeInfo;
// A base class for a notifier that keeps a collection up to date and/or
// generates detailed change notifications on a background thread. This manages
// most of the lifetime-management issues related to sharing an object between
// the worker thread and the collection on the target thread, along with the
// thread-safe callback collection.
class BackgroundCollection { class BackgroundCollection {
public: public:
BackgroundCollection(std::shared_ptr<Realm>); BackgroundCollection(std::shared_ptr<Realm>);
virtual ~BackgroundCollection(); virtual ~BackgroundCollection();
// ------------------------------------------------------------------------
// Public API for the collections using this to get notifications:
// Stop receiving notifications from this background worker
// This must be called in the destructor of the collection
void unregister() noexcept; void unregister() noexcept;
virtual void release_data() noexcept = 0; // Add a callback to be called each time the collection changes
// This can only be called from the target collection's thread
// Returns a token which can be passed to remove_callback()
size_t add_callback(CollectionChangeCallback callback); size_t add_callback(CollectionChangeCallback callback);
// Remove a previously added token. The token is no longer valid after
// calling this function and must not be used again. This function can be
// called from any thread.
void remove_callback(size_t token); void remove_callback(size_t token);
// ------------------------------------------------------------------------
// API for RealmCoordinator to manage running things and calling callbacks
Realm* get_realm() const noexcept { return m_realm.get(); }
// Get the SharedGroup version which this collection can attach to (if it's
// in handover mode), or can deliver to (if it's been handed over to the BG worker alredad)
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
// Release references to all core types
// This is called on the worker thread to ensure that non-thread-safe things
// can be destroyed on the correct thread, even if the last reference to the
// BackgroundCollection is released on a different thread
virtual void release_data() noexcept = 0;
// Call each of the currently registered callbacks, if there have been any
// changes since the last time each of those callbacks was called
void call_callbacks(); void call_callbacks();
bool is_alive() const noexcept; bool is_alive() const noexcept;
Realm& get_realm() const noexcept { return *m_realm; } // Attach the handed-over query to `sg`. Must not be already attaged to a SharedGroup.
// Attach the handed-over query to `sg`
void attach_to(SharedGroup& sg); void attach_to(SharedGroup& sg);
// Create a new query handover object and stop using the previously attached // Create a new query handover object and stop using the previously attached
// SharedGroup // SharedGroup
void detach(); void detach();
void add_required_change_info(TransactionChangeInfo&); // Set `info` as the new ChangeInfo that will be populated by the next
// transaction advance, and register all required information in it
void add_required_change_info(TransactionChangeInfo& info);
virtual void run() { } virtual void run() { }
void prepare_handover(); void prepare_handover();
bool deliver(SharedGroup&, std::exception_ptr); bool deliver(SharedGroup&, std::exception_ptr);
// Get the version of the current handover object
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
protected: protected:
bool have_callbacks() const noexcept { return m_have_callbacks; } bool have_callbacks() const noexcept { return m_have_callbacks; }
void add_changes(CollectionChangeIndices change) { m_accumulated_changes.merge(std::move(change)); } void add_changes(CollectionChangeIndices change) { m_accumulated_changes.merge(std::move(change)); }
void set_table(Table const& table); void set_table(Table const& table);
std::unique_lock<std::mutex> lock_target();
private: private:
virtual void do_attach_to(SharedGroup&) = 0; virtual void do_attach_to(SharedGroup&) = 0;

View File

@ -1,6 +1,6 @@
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
// //
// Copyright 2015 Realm Inc. // Copyright 2016 Realm Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -26,7 +26,6 @@
using namespace realm; using namespace realm;
using namespace realm::_impl; using namespace realm::_impl;
ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm) ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm)
: BackgroundCollection(std::move(realm)) : BackgroundCollection(std::move(realm))
, m_prev_size(lv->size()) , m_prev_size(lv->size())
@ -45,13 +44,12 @@ ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm)
set_table(lv->get_target_table()); set_table(lv->get_target_table());
auto& sg = Realm::Internal::get_shared_group(get_realm()); auto& sg = Realm::Internal::get_shared_group(*get_realm());
m_lv_handover = sg.export_linkview_for_handover(lv); m_lv_handover = sg.export_linkview_for_handover(lv);
} }
void ListNotifier::release_data() noexcept void ListNotifier::release_data() noexcept
{ {
// FIXME: does this need a lock?
m_lv.reset(); m_lv.reset();
} }

View File

@ -1,6 +1,6 @@
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
// //
// Copyright 2015 Realm Inc. // Copyright 2016 Realm Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -30,12 +30,20 @@ public:
ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm); ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm);
private: private:
// The linkview, in handover form if this has not been attached to the main
// SharedGroup yet
LinkViewRef m_lv; LinkViewRef m_lv;
std::unique_ptr<SharedGroup::Handover<LinkView>> m_lv_handover; std::unique_ptr<SharedGroup::Handover<LinkView>> m_lv_handover;
CollectionChangeIndices m_change;
// The last-seen size of the LinkView so that we can report row deletions
// when the LinkView itself is deleted
size_t m_prev_size; size_t m_prev_size;
// The column index of the LinkView
size_t m_col_ndx; size_t m_col_ndx;
std::vector<size_t> m_relevant_tables;
// The actual change, calculated in run() and delivered in prepare_handover()
CollectionChangeIndices m_change;
TransactionChangeInfo* m_info; TransactionChangeInfo* m_info;
void run() override; void run() override;

View File

@ -269,8 +269,8 @@ void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index)
m_advancer_history = nullptr; m_advancer_history = nullptr;
} }
} }
else if (m_new_queries.empty()) { else if (m_new_notifiers.empty()) {
// If this is the first query then we don't already have a read transaction // If this is the first notifier then we don't already have a read transaction
m_advancer_sg->begin_read(versionid); m_advancer_sg->begin_read(versionid);
} }
else if (versionid < m_advancer_sg->get_version_of_current_transaction()) { else if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
@ -281,18 +281,18 @@ void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index)
} }
} }
void RealmCoordinator::register_query(std::shared_ptr<BackgroundCollection> query) void RealmCoordinator::register_notifier(std::shared_ptr<BackgroundCollection> notifier)
{ {
auto version = query->version(); auto version = notifier->version();
auto& self = Realm::Internal::get_coordinator(query->get_realm()); auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
{ {
std::lock_guard<std::mutex> lock(self.m_query_mutex); std::lock_guard<std::mutex> lock(self.m_notifier_mutex);
self.pin_version(version.version, version.index); self.pin_version(version.version, version.index);
self.m_new_queries.push_back(std::move(query)); self.m_new_notifiers.push_back(std::move(notifier));
} }
} }
void RealmCoordinator::clean_up_dead_queries() void RealmCoordinator::clean_up_dead_notifiers()
{ {
auto swap_remove = [&](auto& container) { auto swap_remove = [&](auto& container) {
bool did_remove = false; bool did_remove = false;
@ -300,8 +300,8 @@ void RealmCoordinator::clean_up_dead_queries()
if (container[i]->is_alive()) if (container[i]->is_alive())
continue; continue;
// Ensure the query is destroyed here even if there's lingering refs // Ensure the notifier is destroyed here even if there's lingering refs
// to the async query elsewhere // to the async notifier elsewhere
container[i]->release_data(); container[i]->release_data();
if (container.size() > i + 1) if (container.size() > i + 1)
@ -313,16 +313,16 @@ void RealmCoordinator::clean_up_dead_queries()
return did_remove; return did_remove;
}; };
if (swap_remove(m_queries)) { if (swap_remove(m_notifiers)) {
// Make sure we aren't holding on to read versions needlessly if there // Make sure we aren't holding on to read versions needlessly if there
// are no queries left, but don't close them entirely as opening shared // are no notifiers left, but don't close them entirely as opening shared
// groups is expensive // groups is expensive
if (m_queries.empty() && m_query_sg) { if (m_notifiers.empty() && m_notifier_sg) {
m_query_sg->end_read(); m_notifier_sg->end_read();
} }
} }
if (swap_remove(m_new_queries)) { if (swap_remove(m_new_notifiers)) {
if (m_new_queries.empty() && m_advancer_sg) { if (m_new_notifiers.empty() && m_advancer_sg) {
m_advancer_sg->end_read(); m_advancer_sg->end_read();
} }
} }
@ -330,7 +330,7 @@ void RealmCoordinator::clean_up_dead_queries()
void RealmCoordinator::on_change() void RealmCoordinator::on_change()
{ {
run_async_queries(); run_async_notifiers();
std::lock_guard<std::mutex> lock(m_realm_mutex); std::lock_guard<std::mutex> lock(m_realm_mutex);
for (auto& realm : m_weak_realm_notifiers) { for (auto& realm : m_weak_realm_notifiers) {
@ -338,13 +338,13 @@ void RealmCoordinator::on_change()
} }
} }
void RealmCoordinator::run_async_queries() void RealmCoordinator::run_async_notifiers()
{ {
std::unique_lock<std::mutex> lock(m_query_mutex); std::unique_lock<std::mutex> lock(m_notifier_mutex);
clean_up_dead_queries(); clean_up_dead_notifiers();
if (m_queries.empty() && m_new_queries.empty()) { if (m_notifiers.empty() && m_new_notifiers.empty()) {
return; return;
} }
@ -353,73 +353,73 @@ void RealmCoordinator::run_async_queries()
} }
if (m_async_error) { if (m_async_error) {
std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries)); std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers));
m_new_queries.clear(); m_new_notifiers.clear();
return; return;
} }
std::vector<TransactionChangeInfo> change_info; std::vector<TransactionChangeInfo> change_info;
SharedGroup::VersionID version; SharedGroup::VersionID version;
auto new_queries = std::move(m_new_queries); auto new_notifiers = std::move(m_new_notifiers);
if (new_queries.empty()) { if (new_notifiers.empty()) {
change_info.resize(1); change_info.resize(1);
} }
else { else {
change_info.resize(2); change_info.resize(2);
// Sort newly added queries by their source version so that we can pull them // Sort newly added notifiers by their source version so that we can pull them
// all forward to the latest version in a single pass over the transaction log // all forward to the latest version in a single pass over the transaction log
std::sort(new_queries.begin(), new_queries.end(), std::sort(new_notifiers.begin(), new_notifiers.end(),
[](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); }); [](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); });
version = m_advancer_sg->get_version_of_current_transaction(); version = m_advancer_sg->get_version_of_current_transaction();
REALM_ASSERT(version == new_queries.front()->version()); REALM_ASSERT(version == new_notifiers.front()->version());
TransactionChangeInfo* info = &change_info.back(); TransactionChangeInfo* info = &change_info.back();
// Advance each of the new queries to the latest version, attaching them // Advance each of the new notifiers to the latest version, attaching them
// to the SG at their handover version. This requires a unique // to the SG at their handover version. This requires a unique
// TransactionChangeInfo for each source version, so that things don't // TransactionChangeInfo for each source version, so that things don't
// see changes from before the version they were handed over from. // see changes from before the version they were handed over from.
// Each Info has all of the changes between that source version and the // Each Info has all of the changes between that source version and the
// next source version, and they'll be merged together later after // next source version, and they'll be merged together later after
// releasing the lock // releasing the lock
for (auto& query : new_queries) { for (auto& notifier : new_notifiers) {
if (version != query->version()) { if (version != notifier->version()) {
transaction::advance_and_observe_linkviews(*m_advancer_sg, *info, query->version()); transaction::advance_and_observe_linkviews(*m_advancer_sg, *info, notifier->version());
change_info.push_back({{}, std::move(info->lists)}); change_info.push_back({{}, std::move(info->lists)});
info = &change_info.back(); info = &change_info.back();
version = query->version(); version = notifier->version();
} }
query->attach_to(*m_advancer_sg); notifier->attach_to(*m_advancer_sg);
query->add_required_change_info(*info); notifier->add_required_change_info(*info);
} }
transaction::advance_and_observe_linkviews(*m_advancer_sg, *info); transaction::advance_and_observe_linkviews(*m_advancer_sg, *info);
for (auto& query : new_queries) { for (auto& notifier : new_notifiers) {
query->detach(); notifier->detach();
} }
version = m_advancer_sg->get_version_of_current_transaction(); version = m_advancer_sg->get_version_of_current_transaction();
m_advancer_sg->end_read(); m_advancer_sg->end_read();
} }
// Make a copy of the queries vector and then release the lock to avoid // Make a copy of the notifiers vector and then release the lock to avoid
// blocking other threads trying to register or unregister queries while we run them // blocking other threads trying to register or unregister notifiers while we run them
auto queries = m_queries; auto notifiers = m_notifiers;
lock.unlock(); lock.unlock();
for (auto& query : queries) { for (auto& notifier : notifiers) {
query->add_required_change_info(change_info[0]); notifier->add_required_change_info(change_info[0]);
} }
transaction::advance_and_observe_linkviews(*m_query_sg, change_info[0], version); transaction::advance_and_observe_linkviews(*m_notifier_sg, change_info[0], version);
// Attach the new queries to the main SG and move them to the main list // Attach the new notifiers to the main SG and move them to the main list
for (auto& query : new_queries) { for (auto& notifier : new_notifiers) {
query->attach_to(*m_query_sg); notifier->attach_to(*m_notifier_sg);
} }
std::move(new_queries.begin(), new_queries.end(), std::back_inserter(queries)); std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers));
for (size_t i = change_info.size() - 1; i > 1; --i) { for (size_t i = change_info.size() - 1; i > 1; --i) {
auto& cur = change_info[i]; auto& cur = change_info[i];
@ -452,57 +452,57 @@ void RealmCoordinator::run_async_queries()
} }
} }
for (auto& query : queries) { for (auto& notifier : notifiers) {
query->run(); notifier->run();
} }
// Reacquire the lock while updating the fields that are actually read on // Reacquire the lock while updating the fields that are actually read on
// other threads // other threads
lock.lock(); lock.lock();
for (auto& query : queries) { for (auto& notifier : notifiers) {
query->prepare_handover(); notifier->prepare_handover();
} }
m_queries = std::move(queries); m_notifiers = std::move(notifiers);
clean_up_dead_queries(); clean_up_dead_notifiers();
} }
void RealmCoordinator::open_helper_shared_group() void RealmCoordinator::open_helper_shared_group()
{ {
if (!m_query_sg) { if (!m_notifier_sg) {
try { try {
std::unique_ptr<Group> read_only_group; std::unique_ptr<Group> read_only_group;
Realm::open_with_config(m_config, m_query_history, m_query_sg, read_only_group); Realm::open_with_config(m_config, m_notifier_history, m_notifier_sg, read_only_group);
REALM_ASSERT(!read_only_group); REALM_ASSERT(!read_only_group);
m_query_sg->begin_read(); m_notifier_sg->begin_read();
} }
catch (...) { catch (...) {
// Store the error to be passed to the async queries // Store the error to be passed to the async notifiers
m_async_error = std::current_exception(); m_async_error = std::current_exception();
m_query_sg = nullptr; m_notifier_sg = nullptr;
m_query_history = nullptr; m_notifier_history = nullptr;
} }
} }
else if (m_queries.empty()) { else if (m_notifiers.empty()) {
m_query_sg->begin_read(); m_notifier_sg->begin_read();
} }
} }
void RealmCoordinator::move_new_queries_to_main() void RealmCoordinator::move_new_notifiers_to_main()
{ {
m_queries.reserve(m_queries.size() + m_new_queries.size()); m_notifiers.reserve(m_notifiers.size() + m_new_notifiers.size());
std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries)); std::move(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(m_notifiers));
m_new_queries.clear(); m_new_notifiers.clear();
} }
void RealmCoordinator::advance_to_ready(Realm& realm) void RealmCoordinator::advance_to_ready(Realm& realm)
{ {
decltype(m_queries) queries; decltype(m_notifiers) notifiers;
auto& sg = Realm::Internal::get_shared_group(realm); auto& sg = Realm::Internal::get_shared_group(realm);
auto get_query_version = [&] { auto get_notifier_version = [&] {
for (auto& query : m_queries) { for (auto& notifier : m_notifiers) {
auto version = query->version(); auto version = notifier->version();
if (version != SharedGroup::VersionID{}) { if (version != SharedGroup::VersionID{}) {
return version; return version;
} }
@ -512,11 +512,11 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
SharedGroup::VersionID version; SharedGroup::VersionID version;
{ {
std::lock_guard<std::mutex> lock(m_query_mutex); std::lock_guard<std::mutex> lock(m_notifier_mutex);
version = get_query_version(); version = get_notifier_version();
} }
// no async queries; just advance to latest // no async notifiers; just advance to latest
if (version.version == std::numeric_limits<uint_fast64_t>::max()) { if (version.version == std::numeric_limits<uint_fast64_t>::max()) {
transaction::advance(sg, realm.m_binding_context.get()); transaction::advance(sg, realm.m_binding_context.get());
return; return;
@ -532,44 +532,44 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
// may end up calling user code (in did_change() notifications) // may end up calling user code (in did_change() notifications)
transaction::advance(sg, realm.m_binding_context.get(), version); transaction::advance(sg, realm.m_binding_context.get(), version);
// Reacquire the lock and recheck the query version, as the queries may // Reacquire the lock and recheck the notifier version, as the notifiers may
// have advanced to a later version while we didn't hold the lock. If // have advanced to a later version while we didn't hold the lock. If
// so, we need to release the lock and re-advance // so, we need to release the lock and re-advance
std::lock_guard<std::mutex> lock(m_query_mutex); std::lock_guard<std::mutex> lock(m_notifier_mutex);
version = get_query_version(); version = get_notifier_version();
if (version.version == std::numeric_limits<uint_fast64_t>::max()) if (version.version == std::numeric_limits<uint_fast64_t>::max())
return; return;
if (version != sg.get_version_of_current_transaction()) if (version != sg.get_version_of_current_transaction())
continue; continue;
// Query version now matches the SG version, so we can deliver them // Query version now matches the SG version, so we can deliver them
for (auto& query : m_queries) { for (auto& notifier : m_notifiers) {
if (query->deliver(sg, m_async_error)) { if (notifier->deliver(sg, m_async_error)) {
queries.push_back(query); notifiers.push_back(notifier);
} }
} }
break; break;
} }
for (auto& query : queries) { for (auto& notifier : notifiers) {
query->call_callbacks(); notifier->call_callbacks();
} }
} }
void RealmCoordinator::process_available_async(Realm& realm) void RealmCoordinator::process_available_async(Realm& realm)
{ {
auto& sg = Realm::Internal::get_shared_group(realm); auto& sg = Realm::Internal::get_shared_group(realm);
decltype(m_queries) queries; decltype(m_notifiers) notifiers;
{ {
std::lock_guard<std::mutex> lock(m_query_mutex); std::lock_guard<std::mutex> lock(m_notifier_mutex);
for (auto& query : m_queries) { for (auto& notifier : m_notifiers) {
if (query->deliver(sg, m_async_error)) { if (notifier->deliver(sg, m_async_error)) {
queries.push_back(query); notifiers.push_back(notifier);
} }
} }
} }
for (auto& query : queries) { for (auto& notifier : notifiers) {
query->call_callbacks(); notifier->call_callbacks();
} }
} }

View File

@ -102,7 +102,7 @@ public:
// Update the schema in the cached config // Update the schema in the cached config
void update_schema(Schema const& new_schema); void update_schema(Schema const& new_schema);
static void register_query(std::shared_ptr<BackgroundCollection> query); static void register_notifier(std::shared_ptr<BackgroundCollection> notifier);
// Advance the Realm to the most recent transaction version which all async // Advance the Realm to the most recent transaction version which all async
// work is complete for // work is complete for
@ -115,32 +115,32 @@ private:
std::mutex m_realm_mutex; std::mutex m_realm_mutex;
std::vector<WeakRealmNotifier> m_weak_realm_notifiers; std::vector<WeakRealmNotifier> m_weak_realm_notifiers;
std::mutex m_query_mutex; std::mutex m_notifier_mutex;
std::vector<std::shared_ptr<_impl::BackgroundCollection>> m_new_queries; std::vector<std::shared_ptr<_impl::BackgroundCollection>> m_new_notifiers;
std::vector<std::shared_ptr<_impl::BackgroundCollection>> m_queries; std::vector<std::shared_ptr<_impl::BackgroundCollection>> m_notifiers;
// SharedGroup used for actually running async queries // SharedGroup used for actually running async notifiers
// Will have a read transaction iff m_queries is non-empty // Will have a read transaction iff m_notifiers is non-empty
std::unique_ptr<Replication> m_query_history; std::unique_ptr<Replication> m_notifier_history;
std::unique_ptr<SharedGroup> m_query_sg; std::unique_ptr<SharedGroup> m_notifier_sg;
// SharedGroup used to advance queries in m_new_queries to the main shared // SharedGroup used to advance notifiers in m_new_notifiers to the main shared
// group's transaction version // group's transaction version
// Will have a read transaction iff m_new_queries is non-empty // Will have a read transaction iff m_new_notifiers is non-empty
std::unique_ptr<Replication> m_advancer_history; std::unique_ptr<Replication> m_advancer_history;
std::unique_ptr<SharedGroup> m_advancer_sg; std::unique_ptr<SharedGroup> m_advancer_sg;
std::exception_ptr m_async_error; std::exception_ptr m_async_error;
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier; std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
// must be called with m_query_mutex locked // must be called with m_notifier_mutex locked
void pin_version(uint_fast64_t version, uint_fast32_t index); void pin_version(uint_fast64_t version, uint_fast32_t index);
void run_async_queries(); void run_async_notifiers();
void open_helper_shared_group(); void open_helper_shared_group();
void move_new_queries_to_main(); void move_new_notifiers_to_main();
void advance_helper_shared_group_to_latest(); void advance_helper_shared_group_to_latest();
void clean_up_dead_queries(); void clean_up_dead_notifiers();
}; };
} // namespace _impl } // namespace _impl

View File

@ -31,7 +31,7 @@ ResultsNotifier::ResultsNotifier(Results& target)
{ {
Query q = target.get_query(); Query q = target.get_query();
set_table(*q.get_table()); set_table(*q.get_table());
m_query_handover = Realm::Internal::get_shared_group(get_realm()).export_for_handover(q, MutableSourcePayload::Move); m_query_handover = Realm::Internal::get_shared_group(*get_realm()).export_for_handover(q, MutableSourcePayload::Move);
} }
void ResultsNotifier::release_data() noexcept void ResultsNotifier::release_data() noexcept
@ -39,30 +39,6 @@ void ResultsNotifier::release_data() noexcept
m_query = nullptr; m_query = nullptr;
} }
// Most of the inter-thread synchronization for run(), prepare_handover(),
// attach_to(), detach(), release_query() and deliver() is done by
// RealmCoordinator external to this code, which has some potentially
// non-obvious results on which members are and are not safe to use without
// holding a lock.
//
// attach_to(), detach(), run(), prepare_handover(), and release_query() are
// all only ever called on a single thread. call_callbacks() and deliver() are
// called on the same thread. Calls to prepare_handover() and deliver() are
// guarded by a lock.
//
// In total, this means that the safe data flow is as follows:
// - prepare_handover(), attach_to(), detach() and release_query() can read
// members written by each other
// - deliver() can read members written to in prepare_handover(), deliver(),
// and call_callbacks()
// - call_callbacks() and read members written to in deliver()
//
// Separately from this data flow for the query results, all uses of
// m_target_results, m_callbacks, and m_callback_index must be done with the
// appropriate mutex held to avoid race conditions when the Results object is
// destroyed while the background work is running, and to allow removing
// callbacks from any thread.
static bool map_moves(size_t& idx, CollectionChangeIndices const& changes) static bool map_moves(size_t& idx, CollectionChangeIndices const& changes)
{ {
for (auto&& move : changes.moves) { for (auto&& move : changes.moves) {
@ -74,6 +50,27 @@ static bool map_moves(size_t& idx, CollectionChangeIndices const& changes)
return false; return false;
} }
// Most of the inter-thread synchronization for run(), prepare_handover(),
// attach_to(), detach(), release_data() and deliver() is done by
// RealmCoordinator external to this code, which has some potentially
// non-obvious results on which members are and are not safe to use without
// holding a lock.
//
// add_required_change_info(), attach_to(), detach(), run(),
// prepare_handover(), and release_data() are all only ever called on a single
// background worker thread. call_callbacks() and deliver() are called on the
// target thread. Calls to prepare_handover() and deliver() are guarded by a
// lock.
//
// In total, this means that the safe data flow is as follows:
// - add_Required_change_info(), prepare_handover(), attach_to(), detach() and
// release_data() can read members written by each other
// - deliver() can read members written to in prepare_handover(), deliver(),
// and call_callbacks()
// - call_callbacks() and read members written to in deliver()
//
// Separately from the handover data flow, m_target_results is guarded by the target lock
void ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info) void ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info)
{ {
REALM_ASSERT(m_query); REALM_ASSERT(m_query);
@ -83,24 +80,21 @@ void ResultsNotifier::do_add_required_change_info(TransactionChangeInfo& info)
void ResultsNotifier::run() void ResultsNotifier::run()
{ {
REALM_ASSERT(m_info); REALM_ASSERT(m_info);
REALM_ASSERT(!m_tv.is_attached());
{ {
std::lock_guard<std::mutex> target_lock(m_target_mutex); auto lock = lock_target();
// Don't run the query if the results aren't actually going to be used // Don't run the query if the results aren't actually going to be used
if (!m_target_results || (!have_callbacks() && !m_target_results->wants_background_updates())) { if (!get_realm() || (!have_callbacks() && !m_target_results->wants_background_updates())) {
return; return;
} }
} }
REALM_ASSERT(!m_tv.is_attached());
size_t table_ndx = m_query->get_table()->get_index_in_group();
// If we've run previously, check if we need to rerun // If we've run previously, check if we need to rerun
if (m_initial_run_complete) { if (m_initial_run_complete) {
// Make an empty tableview from the query to get the table version, since // Make an empty tableview from the query to get the table version, since
// Query doesn't expose it // Query doesn't expose it
if (m_query->find_all(0, 0, 0).sync_if_needed() == m_handed_over_table_version) { if (m_query->find_all(0, 0, 0).sync_if_needed() == m_last_seen_version) {
return; return;
} }
} }
@ -109,7 +103,9 @@ void ResultsNotifier::run()
if (m_sort) { if (m_sort) {
m_tv.sort(m_sort.column_indices, m_sort.ascending); m_tv.sort(m_sort.column_indices, m_sort.ascending);
} }
m_last_seen_version = m_tv.sync_if_needed();
size_t table_ndx = m_query->get_table()->get_index_in_group();
if (m_initial_run_complete) { if (m_initial_run_complete) {
auto changes = table_ndx < m_info->tables.size() ? &m_info->tables[table_ndx] : nullptr; auto changes = table_ndx < m_info->tables.size() ? &m_info->tables[table_ndx] : nullptr;
@ -156,7 +152,6 @@ void ResultsNotifier::do_prepare_handover(SharedGroup& sg)
REALM_ASSERT(m_tv.is_in_sync()); REALM_ASSERT(m_tv.is_in_sync());
m_initial_run_complete = true; m_initial_run_complete = true;
m_handed_over_table_version = m_tv.sync_if_needed();
m_tv_handover = sg.export_for_handover(m_tv, MutableSourcePayload::Move); m_tv_handover = sg.export_for_handover(m_tv, MutableSourcePayload::Move);
add_changes(std::move(m_changes)); add_changes(std::move(m_changes));
@ -169,7 +164,7 @@ void ResultsNotifier::do_prepare_handover(SharedGroup& sg)
bool ResultsNotifier::do_deliver(SharedGroup& sg) bool ResultsNotifier::do_deliver(SharedGroup& sg)
{ {
std::lock_guard<std::mutex> target_lock(m_target_mutex); auto lock = lock_target();
// Target results being null here indicates that it was destroyed while we // Target results being null here indicates that it was destroyed while we
// were in the process of advancing the Realm version and preparing for // were in the process of advancing the Realm version and preparing for

View File

@ -24,12 +24,8 @@
#include <realm/group_shared.hpp> #include <realm/group_shared.hpp>
#include <exception>
#include <mutex> #include <mutex>
#include <functional> #include <functional>
#include <set>
#include <thread>
#include <vector>
namespace realm { namespace realm {
namespace _impl { namespace _impl {
@ -40,24 +36,8 @@ public:
ResultsNotifier(Results& target); ResultsNotifier(Results& target);
private: private:
// Run/rerun the query if needed // Target Results to update
void run() override; // Can only be used with lock_target() held
// Prepare the handover object if run() did update the TableView
void do_prepare_handover(SharedGroup&) override;
// Update the target results from the handover
// Returns if any callbacks need to be invoked
bool do_deliver(SharedGroup& sg) override;
void do_add_required_change_info(TransactionChangeInfo& info) override;
void release_data() noexcept override;
void do_attach_to(SharedGroup& sg) override;
void do_detach_from(SharedGroup& sg) override;
bool should_deliver_initial() const noexcept override { return true; }
// Target Results to update and a mutex which guards it
mutable std::mutex m_target_mutex;
Results* m_target_results; Results* m_target_results;
const SortOrder m_sort; const SortOrder m_sort;
@ -71,14 +51,31 @@ private:
TableView m_tv; TableView m_tv;
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover; std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
// The table version from the last time the query was run. Used to avoid
// rerunning the query when there's no chance of it changing.
uint_fast64_t m_last_seen_version = -1;
// The rows from the previous run of the query, for calculating diffs
std::vector<size_t> m_previous_rows;
// The changeset calculated during run() and delivered in do_prepare_handover()
CollectionChangeIndices m_changes; CollectionChangeIndices m_changes;
TransactionChangeInfo* m_info = nullptr; TransactionChangeInfo* m_info = nullptr;
uint_fast64_t m_handed_over_table_version = -1; // Flag for whether or not the query has been run at all, as goofy timing
// can lead to deliver() being called before that
std::vector<size_t> m_previous_rows;
bool m_initial_run_complete = false; bool m_initial_run_complete = false;
void run() override;
void do_prepare_handover(SharedGroup&) override;
bool do_deliver(SharedGroup& sg) override;
void do_add_required_change_info(TransactionChangeInfo& info) override;
void release_data() noexcept override;
void do_attach_to(SharedGroup& sg) override;
void do_detach_from(SharedGroup& sg) override;
bool should_deliver_initial() const noexcept override { return true; }
}; };
} // namespace _impl } // namespace _impl

View File

@ -176,7 +176,7 @@ NotificationToken List::add_notification_callback(CollectionChangeCallback cb)
verify_attached(); verify_attached();
if (!m_notifier) { if (!m_notifier) {
m_notifier = std::make_shared<ListNotifier>(m_link_view, m_realm); m_notifier = std::make_shared<ListNotifier>(m_link_view, m_realm);
RealmCoordinator::register_query(m_notifier); RealmCoordinator::register_notifier(m_notifier);
} }
return {m_notifier, m_notifier->add_callback(std::move(cb))}; return {m_notifier, m_notifier->add_callback(std::move(cb))};
} }

View File

@ -59,8 +59,8 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Table& table)
Results::~Results() Results::~Results()
{ {
if (m_background_query) { if (m_notifier) {
m_background_query->unregister(); m_notifier->unregister();
} }
} }
@ -181,9 +181,9 @@ void Results::update_tableview()
if (!m_live) { if (!m_live) {
return; return;
} }
if (!m_background_query && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) { if (!m_notifier && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) {
m_background_query = std::make_shared<_impl::ResultsNotifier>(*this); m_notifier = std::make_shared<_impl::ResultsNotifier>(*this);
_impl::RealmCoordinator::register_query(m_background_query); _impl::RealmCoordinator::register_notifier(m_notifier);
} }
m_has_used_table_view = true; m_has_used_table_view = true;
m_table_view.sync_if_needed(); m_table_view.sync_if_needed();
@ -369,9 +369,9 @@ void Results::prepare_async()
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction"); throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
} }
if (!m_background_query) { if (!m_notifier) {
m_background_query = std::make_shared<_impl::ResultsNotifier>(*this); m_notifier = std::make_shared<_impl::ResultsNotifier>(*this);
_impl::RealmCoordinator::register_query(m_background_query); _impl::RealmCoordinator::register_notifier(m_notifier);
} }
} }
@ -379,13 +379,13 @@ NotificationToken Results::async(std::function<void (std::exception_ptr)> target
{ {
prepare_async(); prepare_async();
auto wrap = [=](CollectionChangeIndices, std::exception_ptr e) { target(e); }; auto wrap = [=](CollectionChangeIndices, std::exception_ptr e) { target(e); };
return {m_background_query, m_background_query->add_callback(wrap)}; return {m_notifier, m_notifier->add_callback(wrap)};
} }
NotificationToken Results::add_notification_callback(CollectionChangeCallback cb) NotificationToken Results::add_notification_callback(CollectionChangeCallback cb)
{ {
prepare_async(); prepare_async();
return {m_background_query, m_background_query->add_callback(std::move(cb))}; return {m_notifier, m_notifier->add_callback(std::move(cb))};
} }
void Results::Internal::set_table_view(Results& results, realm::TableView &&tv) void Results::Internal::set_table_view(Results& results, realm::TableView &&tv)

View File

@ -197,7 +197,7 @@ private:
SortOrder m_sort; SortOrder m_sort;
bool m_live = true; bool m_live = true;
std::shared_ptr<_impl::ResultsNotifier> m_background_query; std::shared_ptr<_impl::ResultsNotifier> m_notifier;
Mode m_mode = Mode::Empty; Mode m_mode = Mode::Empty;
bool m_has_used_table_view = false; bool m_has_used_table_view = false;