diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp index a9866899..5e7314d3 100644 --- a/src/impl/realm_coordinator.cpp +++ b/src/impl/realm_coordinator.cpp @@ -302,84 +302,98 @@ void RealmCoordinator::on_change() } } -static SharedGroup::VersionID advance_mixed_version_notifiers(std::vector>& notifiers, - SharedGroup& sg, - std::vector& change_info) -{ - if (notifiers.empty()) - return SharedGroup::VersionID{}; - - // Sort the notifiers by their source version so that we can pull them - // all forward to the latest version in a single pass over the transaction log - std::sort(notifiers.begin(), notifiers.end(), - [](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); }); - auto version = sg.get_version_of_current_transaction(); - REALM_ASSERT(version == notifiers.front()->version()); - - // Preallocate the required amount of space in the vector so that we can - // safely give out pointers to within the vector +namespace { +class IncrementalChangeInfo { +public: + IncrementalChangeInfo(SharedGroup& sg, + std::vector>& notifiers) + : m_sg(sg) { + if (notifiers.empty()) + return; + + auto cmp = [&](auto&& lft, auto&& rgt) { + return lft->version() < rgt->version(); + }; + + // Sort the notifiers by their source version so that we can pull them + // all forward to the latest version in a single pass over the transaction log + std::sort(notifiers.begin(), notifiers.end(), cmp); + + // Preallocate the required amount of space in the vector so that we can + // safely give out pointers to within the vector size_t count = 1; for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) { - if ((*it)->version() != (*next)->version()) + if (cmp(*it, *next)) ++count; } - change_info.reserve(count); - change_info.resize(1); + m_info.reserve(count); + m_info.resize(1); + m_current = &m_info[0]; } - TransactionChangeInfo* info = &change_info.back(); + TransactionChangeInfo& current() const { return *m_current; } - // Advance each of the new notifiers to the latest version, attaching them - // to the SG at their handover version. This requires a unique - // TransactionChangeInfo for each source version, so that things don't - // see changes from before the version they were handed over from. - // Each Info has all of the changes between that source version and the - // next source version, and they'll be merged together later after - // releasing the lock - for (auto& notifier : notifiers) { - if (version != notifier->version()) { - transaction::advance(sg, *info, notifier->version()); - change_info.push_back({{}, {}, std::move(info->lists)}); - info = &change_info.back(); - version = notifier->version(); + bool advance_incremental(SharedGroup::VersionID version) + { + if (version != m_sg.get_version_of_current_transaction()) { + transaction::advance(m_sg, *m_current, version); + // FIXME: needs to copy tables to watch? + m_info.push_back({{}, {}, std::move(m_current->lists)}); + m_current = &m_info.back(); + return true; } - notifier->attach_to(sg); - notifier->add_required_change_info(*info); + return false; } - transaction::advance(sg, *info); - - // We now need to combine the transaction change info objects so that all of - // the notifiers see the complete set of changes from their first version to - // the most recent one - for (size_t i = change_info.size() - 1; i > 0; --i) { - auto& cur = change_info[i]; - if (cur.tables.empty()) - continue; - auto& prev = change_info[i - 1]; - if (prev.tables.empty()) { - prev.tables = cur.tables; - continue; + void advance_to_final(SharedGroup::VersionID version) + { + if (!m_current) { + transaction::advance(m_sg, nullptr, version); + return; } - for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) { - prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]}); + transaction::advance(m_sg, *m_current, version); + + // We now need to combine the transaction change info objects so that all of + // the notifiers see the complete set of changes from their first version to + // the most recent one + for (size_t i = m_info.size() - 1; i > 0; --i) { + auto& cur = m_info[i]; + if (cur.tables.empty()) + continue; + auto& prev = m_info[i - 1]; + if (prev.tables.empty()) { + prev.tables = cur.tables; + continue; + } + + for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) { + prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]}); + } + prev.tables.reserve(cur.tables.size()); + while (prev.tables.size() < cur.tables.size()) { + prev.tables.push_back(cur.tables[prev.tables.size()]); + } } - prev.tables.reserve(cur.tables.size()); - while (prev.tables.size() < cur.tables.size()) { - prev.tables.push_back(cur.tables[prev.tables.size()]); + + // Copy the list change info if there's multiple LinkViews for the same LinkList + auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); }; + for (size_t i = 1; i < m_current->lists.size(); ++i) { + for (size_t j = i; j > 0; --j) { + if (id(m_current->lists[i]) == id(m_current->lists[j - 1])) { + m_current->lists[j - 1].changes->merge(CollectionChangeBuilder{*m_current->lists[i].changes}); + } + } } } - for (auto& notifier : notifiers) { - notifier->detach(); - } - version = sg.get_version_of_current_transaction(); - sg.end_read(); - - return version; -} +private: + std::vector m_info; + TransactionChangeInfo* m_current = nullptr; + SharedGroup& m_sg; +}; +} // anonymous namespace void RealmCoordinator::run_async_notifiers() { @@ -401,11 +415,35 @@ void RealmCoordinator::run_async_notifiers() return; } + SharedGroup::VersionID version; + // Advance all of the new notifiers to the most recent version, if any auto new_notifiers = std::move(m_new_notifiers); - std::vector incremental_change_info; - auto version = advance_mixed_version_notifiers(new_notifiers, *m_advancer_sg, - incremental_change_info); + IncrementalChangeInfo new_notifier_change_info(*m_advancer_sg, new_notifiers); + + if (!new_notifiers.empty()) { + REALM_ASSERT(m_advancer_sg->get_version_of_current_transaction() == new_notifiers.front()->version()); + + // Advance each of the new notifiers to the latest version, attaching them + // to the SG at their handover version. This requires a unique + // TransactionChangeInfo for each source version, so that things don't + // see changes from before the version they were handed over from. + // Each Info has all of the changes between that source version and the + // next source version, and they'll be merged together later after + // releasing the lock + for (auto& notifier : new_notifiers) { + new_notifier_change_info.advance_incremental(notifier->version()); + notifier->attach_to(*m_advancer_sg); + notifier->add_required_change_info(new_notifier_change_info.current()); + } + new_notifier_change_info.advance_to_final(SharedGroup::VersionID{}); + + for (auto& notifier : new_notifiers) { + notifier->detach(); + } + version = m_advancer_sg->get_version_of_current_transaction(); + m_advancer_sg->end_read(); + } // Make a copy of the notifiers vector and then release the lock to avoid // blocking other threads trying to register or unregister notifiers while we run them @@ -414,11 +452,11 @@ void RealmCoordinator::run_async_notifiers() // Advance the non-new notifiers to the same version as we advanced the new // ones to (or the latest if there were no new ones) - TransactionChangeInfo change_info; + IncrementalChangeInfo change_info(*m_notifier_sg, notifiers); for (auto& notifier : notifiers) { - notifier->add_required_change_info(change_info); + notifier->add_required_change_info(change_info.current()); } - transaction::advance(*m_notifier_sg, change_info, version); + change_info.advance_to_final(version); // Attach the new notifiers to the main SG and move them to the main list for (auto& notifier : new_notifiers) { @@ -426,23 +464,6 @@ void RealmCoordinator::run_async_notifiers() } std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers)); - // Copy the list change info if there's multiple LinkViews for the same LinkList - auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); }; - auto merge_linkview_info = [&](TransactionChangeInfo& info) { - for (size_t i = 1; i < info.lists.size(); ++i) { - for (size_t j = i; j > 0; --j) { - if (id(info.lists[i]) == id(info.lists[j - 1])) { - info.lists[j - 1].changes->merge(CollectionChangeBuilder{*info.lists[i].changes}); - } - } - } - }; - - merge_linkview_info(change_info); - for (auto& info : incremental_change_info) { - merge_linkview_info(info); - } - // Change info is now all ready, so the notifiers can now perform their // background work for (auto& notifier : notifiers) {