diff --git a/src/impl/realm_coordinator.cpp b/src/impl/realm_coordinator.cpp index 1fd0cb2e..a9866899 100644 --- a/src/impl/realm_coordinator.cpp +++ b/src/impl/realm_coordinator.cpp @@ -302,6 +302,85 @@ void RealmCoordinator::on_change() } } +static SharedGroup::VersionID advance_mixed_version_notifiers(std::vector>& notifiers, + SharedGroup& sg, + std::vector& change_info) +{ + if (notifiers.empty()) + return SharedGroup::VersionID{}; + + // Sort the notifiers by their source version so that we can pull them + // all forward to the latest version in a single pass over the transaction log + std::sort(notifiers.begin(), notifiers.end(), + [](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); }); + auto version = sg.get_version_of_current_transaction(); + REALM_ASSERT(version == notifiers.front()->version()); + + // Preallocate the required amount of space in the vector so that we can + // safely give out pointers to within the vector + { + size_t count = 1; + for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) { + if ((*it)->version() != (*next)->version()) + ++count; + } + change_info.reserve(count); + change_info.resize(1); + } + + TransactionChangeInfo* info = &change_info.back(); + + // Advance each of the new notifiers to the latest version, attaching them + // to the SG at their handover version. This requires a unique + // TransactionChangeInfo for each source version, so that things don't + // see changes from before the version they were handed over from. + // Each Info has all of the changes between that source version and the + // next source version, and they'll be merged together later after + // releasing the lock + for (auto& notifier : notifiers) { + if (version != notifier->version()) { + transaction::advance(sg, *info, notifier->version()); + change_info.push_back({{}, {}, std::move(info->lists)}); + info = &change_info.back(); + version = notifier->version(); + } + notifier->attach_to(sg); + notifier->add_required_change_info(*info); + } + + transaction::advance(sg, *info); + + // We now need to combine the transaction change info objects so that all of + // the notifiers see the complete set of changes from their first version to + // the most recent one + for (size_t i = change_info.size() - 1; i > 0; --i) { + auto& cur = change_info[i]; + if (cur.tables.empty()) + continue; + auto& prev = change_info[i - 1]; + if (prev.tables.empty()) { + prev.tables = cur.tables; + continue; + } + + for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) { + prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]}); + } + prev.tables.reserve(cur.tables.size()); + while (prev.tables.size() < cur.tables.size()) { + prev.tables.push_back(cur.tables[prev.tables.size()]); + } + } + + for (auto& notifier : notifiers) { + notifier->detach(); + } + version = sg.get_version_of_current_transaction(); + sg.end_read(); + + return version; +} + void RealmCoordinator::run_async_notifiers() { std::unique_lock lock(m_notifier_mutex); @@ -322,72 +401,24 @@ void RealmCoordinator::run_async_notifiers() return; } - std::vector change_info; - SharedGroup::VersionID version; - + // Advance all of the new notifiers to the most recent version, if any auto new_notifiers = std::move(m_new_notifiers); - if (new_notifiers.empty()) { - change_info.resize(1); - } - else { - // Sort newly added notifiers by their source version so that we can pull them - // all forward to the latest version in a single pass over the transaction log - std::sort(new_notifiers.begin(), new_notifiers.end(), - [](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); }); - version = m_advancer_sg->get_version_of_current_transaction(); - REALM_ASSERT(version == new_notifiers.front()->version()); - - // Preallocate the required amount of space in the vector so that we can - // safely give out pointers to within the vector - { - size_t count = 2; - for (auto it = new_notifiers.begin(), next = it + 1; next != new_notifiers.end(); ++it, ++next) { - if ((*it)->version() != (*next)->version()) - ++count; - } - change_info.reserve(count); - change_info.resize(2); - } - - TransactionChangeInfo* info = &change_info.back(); - - // Advance each of the new notifiers to the latest version, attaching them - // to the SG at their handover version. This requires a unique - // TransactionChangeInfo for each source version, so that things don't - // see changes from before the version they were handed over from. - // Each Info has all of the changes between that source version and the - // next source version, and they'll be merged together later after - // releasing the lock - for (auto& notifier : new_notifiers) { - if (version != notifier->version()) { - transaction::advance(*m_advancer_sg, *info, notifier->version()); - change_info.push_back({{}, {}, std::move(info->lists)}); - info = &change_info.back(); - version = notifier->version(); - } - notifier->attach_to(*m_advancer_sg); - notifier->add_required_change_info(*info); - } - - transaction::advance(*m_advancer_sg, *info); - - for (auto& notifier : new_notifiers) { - notifier->detach(); - } - version = m_advancer_sg->get_version_of_current_transaction(); - m_advancer_sg->end_read(); - } + std::vector incremental_change_info; + auto version = advance_mixed_version_notifiers(new_notifiers, *m_advancer_sg, + incremental_change_info); // Make a copy of the notifiers vector and then release the lock to avoid // blocking other threads trying to register or unregister notifiers while we run them auto notifiers = m_notifiers; lock.unlock(); + // Advance the non-new notifiers to the same version as we advanced the new + // ones to (or the latest if there were no new ones) + TransactionChangeInfo change_info; for (auto& notifier : notifiers) { - notifier->add_required_change_info(change_info[0]); + notifier->add_required_change_info(change_info); } - - transaction::advance(*m_notifier_sg, change_info[0], version); + transaction::advance(*m_notifier_sg, change_info, version); // Attach the new notifiers to the main SG and move them to the main list for (auto& notifier : new_notifiers) { @@ -395,28 +426,9 @@ void RealmCoordinator::run_async_notifiers() } std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers)); - for (size_t i = change_info.size() - 1; i > 1; --i) { - auto& cur = change_info[i]; - if (cur.tables.empty()) - continue; - auto& prev = change_info[i - 1]; - if (prev.tables.empty()) { - prev.tables = cur.tables; - continue; - } - - for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) { - prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]}); - } - prev.tables.reserve(cur.tables.size()); - while (prev.tables.size() < cur.tables.size()) { - prev.tables.push_back(cur.tables[prev.tables.size()]); - } - } - // Copy the list change info if there's multiple LinkViews for the same LinkList auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); }; - for (auto& info : change_info) { + auto merge_linkview_info = [&](TransactionChangeInfo& info) { for (size_t i = 1; i < info.lists.size(); ++i) { for (size_t j = i; j > 0; --j) { if (id(info.lists[i]) == id(info.lists[j - 1])) { @@ -424,8 +436,15 @@ void RealmCoordinator::run_async_notifiers() } } } + }; + + merge_linkview_info(change_info); + for (auto& info : incremental_change_info) { + merge_linkview_info(info); } + // Change info is now all ready, so the notifiers can now perform their + // background work for (auto& notifier : notifiers) { notifier->run(); }