Make RealmCoordinator::run_async_notifiers a bit less gross
This commit is contained in:
parent
32b05314f5
commit
83b4d8ded2
|
@ -302,6 +302,85 @@ void RealmCoordinator::on_change()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SharedGroup::VersionID advance_mixed_version_notifiers(std::vector<std::shared_ptr<_impl::BackgroundCollection>>& notifiers,
|
||||||
|
SharedGroup& sg,
|
||||||
|
std::vector<TransactionChangeInfo>& change_info)
|
||||||
|
{
|
||||||
|
if (notifiers.empty())
|
||||||
|
return SharedGroup::VersionID{};
|
||||||
|
|
||||||
|
// Sort the notifiers by their source version so that we can pull them
|
||||||
|
// all forward to the latest version in a single pass over the transaction log
|
||||||
|
std::sort(notifiers.begin(), notifiers.end(),
|
||||||
|
[](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); });
|
||||||
|
auto version = sg.get_version_of_current_transaction();
|
||||||
|
REALM_ASSERT(version == notifiers.front()->version());
|
||||||
|
|
||||||
|
// Preallocate the required amount of space in the vector so that we can
|
||||||
|
// safely give out pointers to within the vector
|
||||||
|
{
|
||||||
|
size_t count = 1;
|
||||||
|
for (auto it = notifiers.begin(), next = it + 1; next != notifiers.end(); ++it, ++next) {
|
||||||
|
if ((*it)->version() != (*next)->version())
|
||||||
|
++count;
|
||||||
|
}
|
||||||
|
change_info.reserve(count);
|
||||||
|
change_info.resize(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TransactionChangeInfo* info = &change_info.back();
|
||||||
|
|
||||||
|
// Advance each of the new notifiers to the latest version, attaching them
|
||||||
|
// to the SG at their handover version. This requires a unique
|
||||||
|
// TransactionChangeInfo for each source version, so that things don't
|
||||||
|
// see changes from before the version they were handed over from.
|
||||||
|
// Each Info has all of the changes between that source version and the
|
||||||
|
// next source version, and they'll be merged together later after
|
||||||
|
// releasing the lock
|
||||||
|
for (auto& notifier : notifiers) {
|
||||||
|
if (version != notifier->version()) {
|
||||||
|
transaction::advance(sg, *info, notifier->version());
|
||||||
|
change_info.push_back({{}, {}, std::move(info->lists)});
|
||||||
|
info = &change_info.back();
|
||||||
|
version = notifier->version();
|
||||||
|
}
|
||||||
|
notifier->attach_to(sg);
|
||||||
|
notifier->add_required_change_info(*info);
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction::advance(sg, *info);
|
||||||
|
|
||||||
|
// We now need to combine the transaction change info objects so that all of
|
||||||
|
// the notifiers see the complete set of changes from their first version to
|
||||||
|
// the most recent one
|
||||||
|
for (size_t i = change_info.size() - 1; i > 0; --i) {
|
||||||
|
auto& cur = change_info[i];
|
||||||
|
if (cur.tables.empty())
|
||||||
|
continue;
|
||||||
|
auto& prev = change_info[i - 1];
|
||||||
|
if (prev.tables.empty()) {
|
||||||
|
prev.tables = cur.tables;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) {
|
||||||
|
prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
|
||||||
|
}
|
||||||
|
prev.tables.reserve(cur.tables.size());
|
||||||
|
while (prev.tables.size() < cur.tables.size()) {
|
||||||
|
prev.tables.push_back(cur.tables[prev.tables.size()]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& notifier : notifiers) {
|
||||||
|
notifier->detach();
|
||||||
|
}
|
||||||
|
version = sg.get_version_of_current_transaction();
|
||||||
|
sg.end_read();
|
||||||
|
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
void RealmCoordinator::run_async_notifiers()
|
void RealmCoordinator::run_async_notifiers()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(m_notifier_mutex);
|
std::unique_lock<std::mutex> lock(m_notifier_mutex);
|
||||||
|
@ -322,72 +401,24 @@ void RealmCoordinator::run_async_notifiers()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<TransactionChangeInfo> change_info;
|
// Advance all of the new notifiers to the most recent version, if any
|
||||||
SharedGroup::VersionID version;
|
|
||||||
|
|
||||||
auto new_notifiers = std::move(m_new_notifiers);
|
auto new_notifiers = std::move(m_new_notifiers);
|
||||||
if (new_notifiers.empty()) {
|
std::vector<TransactionChangeInfo> incremental_change_info;
|
||||||
change_info.resize(1);
|
auto version = advance_mixed_version_notifiers(new_notifiers, *m_advancer_sg,
|
||||||
}
|
incremental_change_info);
|
||||||
else {
|
|
||||||
// Sort newly added notifiers by their source version so that we can pull them
|
|
||||||
// all forward to the latest version in a single pass over the transaction log
|
|
||||||
std::sort(new_notifiers.begin(), new_notifiers.end(),
|
|
||||||
[](auto&& lft, auto&& rgt) { return lft->version() < rgt->version(); });
|
|
||||||
version = m_advancer_sg->get_version_of_current_transaction();
|
|
||||||
REALM_ASSERT(version == new_notifiers.front()->version());
|
|
||||||
|
|
||||||
// Preallocate the required amount of space in the vector so that we can
|
|
||||||
// safely give out pointers to within the vector
|
|
||||||
{
|
|
||||||
size_t count = 2;
|
|
||||||
for (auto it = new_notifiers.begin(), next = it + 1; next != new_notifiers.end(); ++it, ++next) {
|
|
||||||
if ((*it)->version() != (*next)->version())
|
|
||||||
++count;
|
|
||||||
}
|
|
||||||
change_info.reserve(count);
|
|
||||||
change_info.resize(2);
|
|
||||||
}
|
|
||||||
|
|
||||||
TransactionChangeInfo* info = &change_info.back();
|
|
||||||
|
|
||||||
// Advance each of the new notifiers to the latest version, attaching them
|
|
||||||
// to the SG at their handover version. This requires a unique
|
|
||||||
// TransactionChangeInfo for each source version, so that things don't
|
|
||||||
// see changes from before the version they were handed over from.
|
|
||||||
// Each Info has all of the changes between that source version and the
|
|
||||||
// next source version, and they'll be merged together later after
|
|
||||||
// releasing the lock
|
|
||||||
for (auto& notifier : new_notifiers) {
|
|
||||||
if (version != notifier->version()) {
|
|
||||||
transaction::advance(*m_advancer_sg, *info, notifier->version());
|
|
||||||
change_info.push_back({{}, {}, std::move(info->lists)});
|
|
||||||
info = &change_info.back();
|
|
||||||
version = notifier->version();
|
|
||||||
}
|
|
||||||
notifier->attach_to(*m_advancer_sg);
|
|
||||||
notifier->add_required_change_info(*info);
|
|
||||||
}
|
|
||||||
|
|
||||||
transaction::advance(*m_advancer_sg, *info);
|
|
||||||
|
|
||||||
for (auto& notifier : new_notifiers) {
|
|
||||||
notifier->detach();
|
|
||||||
}
|
|
||||||
version = m_advancer_sg->get_version_of_current_transaction();
|
|
||||||
m_advancer_sg->end_read();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make a copy of the notifiers vector and then release the lock to avoid
|
// Make a copy of the notifiers vector and then release the lock to avoid
|
||||||
// blocking other threads trying to register or unregister notifiers while we run them
|
// blocking other threads trying to register or unregister notifiers while we run them
|
||||||
auto notifiers = m_notifiers;
|
auto notifiers = m_notifiers;
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
|
// Advance the non-new notifiers to the same version as we advanced the new
|
||||||
|
// ones to (or the latest if there were no new ones)
|
||||||
|
TransactionChangeInfo change_info;
|
||||||
for (auto& notifier : notifiers) {
|
for (auto& notifier : notifiers) {
|
||||||
notifier->add_required_change_info(change_info[0]);
|
notifier->add_required_change_info(change_info);
|
||||||
}
|
}
|
||||||
|
transaction::advance(*m_notifier_sg, change_info, version);
|
||||||
transaction::advance(*m_notifier_sg, change_info[0], version);
|
|
||||||
|
|
||||||
// Attach the new notifiers to the main SG and move them to the main list
|
// Attach the new notifiers to the main SG and move them to the main list
|
||||||
for (auto& notifier : new_notifiers) {
|
for (auto& notifier : new_notifiers) {
|
||||||
|
@ -395,28 +426,9 @@ void RealmCoordinator::run_async_notifiers()
|
||||||
}
|
}
|
||||||
std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers));
|
std::move(new_notifiers.begin(), new_notifiers.end(), std::back_inserter(notifiers));
|
||||||
|
|
||||||
for (size_t i = change_info.size() - 1; i > 1; --i) {
|
|
||||||
auto& cur = change_info[i];
|
|
||||||
if (cur.tables.empty())
|
|
||||||
continue;
|
|
||||||
auto& prev = change_info[i - 1];
|
|
||||||
if (prev.tables.empty()) {
|
|
||||||
prev.tables = cur.tables;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t j = 0; j < prev.tables.size() && j < cur.tables.size(); ++j) {
|
|
||||||
prev.tables[j].merge(CollectionChangeBuilder{cur.tables[j]});
|
|
||||||
}
|
|
||||||
prev.tables.reserve(cur.tables.size());
|
|
||||||
while (prev.tables.size() < cur.tables.size()) {
|
|
||||||
prev.tables.push_back(cur.tables[prev.tables.size()]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy the list change info if there's multiple LinkViews for the same LinkList
|
// Copy the list change info if there's multiple LinkViews for the same LinkList
|
||||||
auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
|
auto id = [](auto const& list) { return std::tie(list.table_ndx, list.col_ndx, list.row_ndx); };
|
||||||
for (auto& info : change_info) {
|
auto merge_linkview_info = [&](TransactionChangeInfo& info) {
|
||||||
for (size_t i = 1; i < info.lists.size(); ++i) {
|
for (size_t i = 1; i < info.lists.size(); ++i) {
|
||||||
for (size_t j = i; j > 0; --j) {
|
for (size_t j = i; j > 0; --j) {
|
||||||
if (id(info.lists[i]) == id(info.lists[j - 1])) {
|
if (id(info.lists[i]) == id(info.lists[j - 1])) {
|
||||||
|
@ -424,8 +436,15 @@ void RealmCoordinator::run_async_notifiers()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
merge_linkview_info(change_info);
|
||||||
|
for (auto& info : incremental_change_info) {
|
||||||
|
merge_linkview_info(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Change info is now all ready, so the notifiers can now perform their
|
||||||
|
// background work
|
||||||
for (auto& notifier : notifiers) {
|
for (auto& notifier : notifiers) {
|
||||||
notifier->run();
|
notifier->run();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue