Avoid holding locks while calling transaction::advance()

It can call user code due to change notifications, which leads to
deadlocks if that code then tries to add async queries (and advancing is
a potentially expensive operation, so doing it while holding a lock
inhibits parallelism anyway).
This commit is contained in:
Thomas Goyne 2016-02-15 09:26:56 -08:00
parent 8284340729
commit 1e35324d97

View File

@ -405,34 +405,55 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
auto& sg = Realm::Internal::get_shared_group(realm);
auto& history = Realm::Internal::get_history(realm);
{
std::lock_guard<std::mutex> lock(m_query_mutex);
SharedGroup::VersionID version;
auto get_query_version = [&] {
for (auto& query : m_queries) {
version = query->version();
if (version != SharedGroup::VersionID()) {
break;
auto version = query->version();
if (version != SharedGroup::VersionID{}) {
return version;
}
}
return SharedGroup::VersionID{};
};
// no untargeted async queries; just advance to latest
if (version.version == 0) {
transaction::advance(sg, history, realm.m_binding_context.get());
return;
}
// async results are out of date; ignore
else if (version < sg.get_version_of_current_transaction()) {
return;
}
SharedGroup::VersionID version;
{
std::lock_guard<std::mutex> lock(m_query_mutex);
version = get_query_version();
}
// no async queries; just advance to latest
if (version.version == 0) {
transaction::advance(sg, history, realm.m_binding_context.get());
return;
}
// async results are out of date; ignore
if (version < sg.get_version_of_current_transaction()) {
return;
}
while (true) {
// Advance to the ready version without holding any locks because it
// may end up calling user code (in did_change() notifications)
transaction::advance(sg, history, realm.m_binding_context.get(), version);
// Reacquire the lock and recheck the query version, as the queries may
// have advanced to a later version while we didn't hold the lock. If
// so, we need to release the lock and re-advance
std::lock_guard<std::mutex> lock(m_query_mutex);
version = get_query_version();
if (version.version == 0)
return;
if (version != sg.get_version_of_current_transaction())
continue;
// Query version now matches the SG version, so we can deliver them
for (auto& query : m_queries) {
if (query->deliver(sg, m_async_error)) {
queries.push_back(query);
}
}
break;
}
for (auto& query : queries) {