Refactor the incremental change tracking for mixed source versions

This commit is contained in:
Thomas Goyne 2016-04-14 11:28:19 -07:00
parent 69cefd052e
commit ea3a2f4711
1 changed files with 105 additions and 84 deletions

View File

@ -302,84 +302,98 @@ void RealmCoordinator::on_change()
} }
} }
static SharedGroup::VersionID advance_mixed_version_notifiers(std::vector<std::shared_ptr<_impl::BackgroundCollection>>& notifiers, namespace {
SharedGroup& sg, class IncrementalChangeInfo {
std::vector<TransactionChangeInfo>& change_info) public:
{ IncrementalChangeInfo(SharedGroup& sg,
if (notifiers.empty()) std::vector<std::shared_ptr<_impl::BackgroundCollection>>& notifiers)
return SharedGroup::VersionID{}; : m_sg(sg)
// Sort the notifiers by their source version so that we can pull them
// all forward to the latest version in a single pass over the transaction log
std::sort(notifiers.begin(), notifiers.end(),
[](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); });
auto version = sg.get_version_of_current_transaction();
REALM_ASSERT(version == notifiers.front()->version());
// Preallocate the required amount of space in the vector so that we can
// safely give out pointers to within the vector
{ {
if (notifiers.empty())
return;
auto cmp = [&](auto&& lft, auto&& rgt) {
return lft->version() < rgt->version();
};
// Sort the notifiers by their source version so that we can pull them
// all forward to the latest version in a single pass over the transaction log
std::sort(notifiers.begin(), notifiers.end(), cmp);
// Preallocate the required amount of space in the vector so that we can
// safely give out pointers to within the vector
size_t count = 1; size_t count = 1;
for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) { for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) {
if ((*it)->version() != (*next)->version()) if (cmp(*it, *next))
++count; ++count;
} }
change_info.reserve(count); m_info.reserve(count);
change_info.resize(1); m_info.resize(1);
m_current = &m_info[0];
} }
TransactionChangeInfo* info = &change_info.back(); TransactionChangeInfo& current() const { return *m_current; }
// Advance each of the new notifiers to the latest version, attaching them bool advance_incremental(SharedGroup::VersionID version)
// to the SG at their handover version. This requires a unique {
// TransactionChangeInfo for each source version, so that things don't if (version != m_sg.get_version_of_current_transaction()) {
// see changes from before the version they were handed over from. transaction::advance(m_sg, *m_current, version);
// Each Info has all of the changes between that source version and the // FIXME: needs to copy tables to watch?
// next source version, and they'll be merged together later after m_info.push_back({{}, {}, std::move(m_current->lists)});
// releasing the lock m_current = &m_info.back();
for (auto& notifier : notifiers) { return true;
if (version != notifier->version()) {
transaction::advance(sg, *info, notifier->version());
change_info.push_back({{}, {}, std::move(info->lists)});
info = &change_info.back();
version = notifier->version();
} }
notifier->attach_to(sg); return false;
notifier->add_required_change_info(*info);
} }
transaction::advance(sg, *info); void advance_to_final(SharedGroup::VersionID version)
{
// We now need to combine the transaction change info objects so that all of if (!m_current) {
// the notifiers see the complete set of changes from their first version to transaction::advance(m_sg, nullptr, version);
// the most recent one return;
for (size_t i = change_info.size() - 1; i > 0; --i) {
auto& cur = change_info[i];
if (cur.tables.empty())
continue;
auto& prev = change_info[i - 1];
if (prev.tables.empty()) {
prev.tables = cur.tables;
continue;
} }
for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) { transaction::advance(m_sg, *m_current, version);
prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
// We now need to combine the transaction change info objects so that all of
// the notifiers see the complete set of changes from their first version to
// the most recent one
for (size_t i = m_info.size() - 1; i > 0; --i) {
auto& cur = m_info[i];
if (cur.tables.empty())
continue;
auto& prev = m_info[i - 1];
if (prev.tables.empty()) {
prev.tables = cur.tables;
continue;
}
for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) {
prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
}
prev.tables.reserve(cur.tables.size());
while (prev.tables.size() < cur.tables.size()) {
prev.tables.push_back(cur.tables[prev.tables.size()]);
}
} }
prev.tables.reserve(cur.tables.size());
while (prev.tables.size() < cur.tables.size()) { // Copy the list change info if there's multiple LinkViews for the same LinkList
prev.tables.push_back(cur.tables[prev.tables.size()]); auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
for (size_t i = 1; i < m_current->lists.size(); ++i) {
for (size_t j = i; j > 0; --j) {
if (id(m_current->lists[i]) == id(m_current->lists[j - 1])) {
m_current->lists[j - 1].changes->merge(CollectionChangeBuilder{*m_current->lists[i].changes});
}
}
} }
} }
for (auto& notifier : notifiers) { private:
notifier->detach(); std::vector<TransactionChangeInfo> m_info;
} TransactionChangeInfo* m_current = nullptr;
version = sg.get_version_of_current_transaction(); SharedGroup& m_sg;
sg.end_read(); };
} // anonymous namespace
return version;
}
void RealmCoordinator::run_async_notifiers() void RealmCoordinator::run_async_notifiers()
{ {
@ -401,11 +415,35 @@ void RealmCoordinator::run_async_notifiers()
return; return;
} }
SharedGroup::VersionID version;
// Advance all of the new notifiers to the most recent version, if any // Advance all of the new notifiers to the most recent version, if any
auto new_notifiers = std::move(m_new_notifiers); auto new_notifiers = std::move(m_new_notifiers);
std::vector<TransactionChangeInfo> incremental_change_info; IncrementalChangeInfo new_notifier_change_info(*m_advancer_sg, new_notifiers);
auto version = advance_mixed_version_notifiers(new_notifiers, *m_advancer_sg,
incremental_change_info); if (!new_notifiers.empty()) {
REALM_ASSERT(m_advancer_sg->get_version_of_current_transaction() == new_notifiers.front()->version());
// Advance each of the new notifiers to the latest version, attaching them
// to the SG at their handover version. This requires a unique
// TransactionChangeInfo for each source version, so that things don't
// see changes from before the version they were handed over from.
// Each Info has all of the changes between that source version and the
// next source version, and they'll be merged together later after
// releasing the lock
for (auto& notifier : new_notifiers) {
new_notifier_change_info.advance_incremental(notifier->version());
notifier->attach_to(*m_advancer_sg);
notifier->add_required_change_info(new_notifier_change_info.current());
}
new_notifier_change_info.advance_to_final(SharedGroup::VersionID{});
for (auto& notifier : new_notifiers) {
notifier->detach();
}
version = m_advancer_sg->get_version_of_current_transaction();
m_advancer_sg->end_read();
}
// Make a copy of the notifiers vector and then release the lock to avoid // Make a copy of the notifiers vector and then release the lock to avoid
// blocking other threads trying to register or unregister notifiers while we run them // blocking other threads trying to register or unregister notifiers while we run them
@ -414,11 +452,11 @@ void RealmCoordinator::run_async_notifiers()
// Advance the non-new notifiers to the same version as we advanced the new // Advance the non-new notifiers to the same version as we advanced the new
// ones to (or the latest if there were no new ones) // ones to (or the latest if there were no new ones)
TransactionChangeInfo change_info; IncrementalChangeInfo change_info(*m_notifier_sg, notifiers);
for (auto& notifier : notifiers) { for (auto& notifier : notifiers) {
notifier->add_required_change_info(change_info); notifier->add_required_change_info(change_info.current());
} }
transaction::advance(*m_notifier_sg, change_info, version); change_info.advance_to_final(version);
// Attach the new notifiers to the main SG and move them to the main list // Attach the new notifiers to the main SG and move them to the main list
for (auto& notifier : new_notifiers) { for (auto& notifier : new_notifiers) {
@ -426,23 +464,6 @@ void RealmCoordinator::run_async_notifiers()
} }
std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers)); std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers));
// Copy the list change info if there's multiple LinkViews for the same LinkList
auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
auto merge_linkview_info = [&](TransactionChangeInfo& info) {
for (size_t i = 1; i < info.lists.size(); ++i) {
for (size_t j = i; j > 0; --j) {
if (id(info.lists[i]) == id(info.lists[j - 1])) {
info.lists[j - 1].changes->merge(CollectionChangeBuilder{*info.lists[i].changes});
}
}
}
};
merge_linkview_info(change_info);
for (auto& info : incremental_change_info) {
merge_linkview_info(info);
}
// Change info is now all ready, so the notifiers can now perform their // Change info is now all ready, so the notifiers can now perform their
// background work // background work
for (auto& notifier : notifiers) { for (auto& notifier : notifiers) {