Add more tests and fix bugs
This commit is contained in:
parent
8c4f2a4f30
commit
3e90c30571
|
@ -28,68 +28,100 @@ AsyncQuery::AsyncQuery(Results& target)
|
||||||
: m_target_results(&target)
|
: m_target_results(&target)
|
||||||
, m_realm(target.get_realm())
|
, m_realm(target.get_realm())
|
||||||
, m_sort(target.get_sort())
|
, m_sort(target.get_sort())
|
||||||
, m_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction())
|
, m_sg_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction())
|
||||||
{
|
{
|
||||||
Query q = target.get_query();
|
Query q = target.get_query();
|
||||||
m_query_handover = Realm::Internal::get_shared_group(*m_realm).export_for_handover(q, MutableSourcePayload::Move);
|
m_query_handover = Realm::Internal::get_shared_group(*m_realm).export_for_handover(q, MutableSourcePayload::Move);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AsyncQuery::~AsyncQuery()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
m_realm = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
size_t AsyncQuery::add_callback(std::function<void (std::exception_ptr)> callback)
|
size_t AsyncQuery::add_callback(std::function<void (std::exception_ptr)> callback)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
m_realm->verify_thread();
|
||||||
|
|
||||||
size_t token = 0;
|
auto next_token = [=] {
|
||||||
for (auto& callback : m_callbacks) {
|
size_t token = 0;
|
||||||
if (token <= callback.token) {
|
for (auto& callback : m_callbacks) {
|
||||||
token = callback.token + 1;
|
if (token <= callback.token) {
|
||||||
|
token = callback.token + 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return token;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||||
|
auto token = next_token();
|
||||||
|
m_callbacks.push_back({std::move(callback), token, -1ULL});
|
||||||
|
if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
|
||||||
|
Realm::Internal::get_coordinator(*m_realm).send_commit_notifications();
|
||||||
|
m_have_callbacks = true;
|
||||||
}
|
}
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncQuery::remove_callback(size_t token)
|
void AsyncQuery::remove_callback(size_t token)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
Callback old;
|
||||||
if (is_for_current_thread() && m_calling_callbacks) {
|
{
|
||||||
// Schedule the removal for after we're done calling callbacks
|
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||||
m_callbacks_to_remove.push_back(token);
|
REALM_ASSERT(m_error || m_callbacks.size() > 0);
|
||||||
return;
|
|
||||||
}
|
|
||||||
do_remove_callback(token);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncQuery::do_remove_callback(size_t token) noexcept
|
auto it = find_if(begin(m_callbacks), end(m_callbacks),
|
||||||
{
|
[=](const auto& c) { return c.token == token; });
|
||||||
REALM_ASSERT(m_error || m_callbacks.size() > 0);
|
// We should only fail to find the callback if it was removed due to an error
|
||||||
auto it = find_if(begin(m_callbacks), end(m_callbacks),
|
REALM_ASSERT(m_error || it != end(m_callbacks));
|
||||||
[=](const auto& c) { return c.token == token; });
|
if (it == end(m_callbacks)) {
|
||||||
// We should only fail to find the callback if it was removed due to an error
|
return;
|
||||||
REALM_ASSERT(m_error || it != end(m_callbacks));
|
|
||||||
|
|
||||||
if (it != end(m_callbacks)) {
|
|
||||||
if (it != prev(end(m_callbacks))) {
|
|
||||||
*it = std::move(m_callbacks.back());
|
|
||||||
}
|
}
|
||||||
m_callbacks.pop_back();
|
|
||||||
|
size_t idx = distance(begin(m_callbacks), it);
|
||||||
|
if (m_callback_index != npos && m_callback_index >= idx) {
|
||||||
|
--m_callback_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
old = std::move(*it);
|
||||||
|
m_callbacks.erase(it);
|
||||||
|
|
||||||
|
m_have_callbacks = !m_callbacks.empty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncQuery::unregister() noexcept
|
void AsyncQuery::unregister() noexcept
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_target_mutex);
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
RealmCoordinator::unregister_query(*this);
|
|
||||||
m_target_results = nullptr;
|
m_target_results = nullptr;
|
||||||
m_realm = nullptr;
|
m_realm = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::release_query() noexcept
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
REALM_ASSERT(!m_realm && !m_target_results);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_query = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncQuery::is_alive() const noexcept
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||||
|
return m_target_results != nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
void AsyncQuery::run()
|
void AsyncQuery::run()
|
||||||
{
|
{
|
||||||
REALM_ASSERT(m_sg);
|
REALM_ASSERT(m_sg);
|
||||||
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
|
||||||
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
||||||
if (!m_target_results || (m_callbacks.empty() && !m_target_results->wants_background_updates())) {
|
// Don't run the query if the results aren't actually going to be used
|
||||||
|
if (!m_target_results || (!m_have_callbacks && !m_target_results->wants_background_updates())) {
|
||||||
m_skipped_running = true;
|
m_skipped_running = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -100,7 +132,7 @@ void AsyncQuery::run()
|
||||||
// may be called concurrently (as it'd be pretty bad for a running query to
|
// may be called concurrently (as it'd be pretty bad for a running query to
|
||||||
// block the main thread trying to pick up the previous results)
|
// block the main thread trying to pick up the previous results)
|
||||||
if (m_tv.is_attached()) {
|
if (m_tv.is_attached()) {
|
||||||
m_did_update = m_tv.sync_if_needed();
|
m_tv.sync_if_needed();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
m_tv = m_query->find_all();
|
m_tv = m_query->find_all();
|
||||||
|
@ -108,107 +140,111 @@ void AsyncQuery::run()
|
||||||
if (m_sort) {
|
if (m_sort) {
|
||||||
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
|
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
|
||||||
}
|
}
|
||||||
m_did_update = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncQuery::prepare_handover()
|
void AsyncQuery::prepare_handover()
|
||||||
{
|
{
|
||||||
if (m_skipped_running) {
|
if (m_skipped_running) {
|
||||||
|
m_sg_version = SharedGroup::VersionID{};
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
REALM_ASSERT(m_tv.is_attached());
|
REALM_ASSERT(m_tv.is_attached());
|
||||||
REALM_ASSERT(m_tv.is_in_sync());
|
REALM_ASSERT(m_tv.is_in_sync());
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
m_sg_version = m_sg->get_version_of_current_transaction();
|
||||||
m_version = m_sg->get_version_of_current_transaction();
|
|
||||||
m_initial_run_complete = true;
|
m_initial_run_complete = true;
|
||||||
|
|
||||||
// Even if the TV didn't change, we need to re-export it if the previous
|
auto table_version = m_tv.outside_version();
|
||||||
// export has not been consumed yet, as the old handover object is no longer
|
if (!m_tv_handover && table_version == m_handed_over_table_version) {
|
||||||
// usable due to the version not matching
|
// We've already delivered the query results since the last time the
|
||||||
if (m_did_update || (m_tv_handover && m_tv_handover->version != m_version)) {
|
// table changed, so no need to do anything
|
||||||
m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
|
|
||||||
{
|
|
||||||
if (!is_for_current_thread()) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy);
|
||||||
|
m_handed_over_table_version = table_version;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
|
||||||
|
{
|
||||||
|
if (!is_for_current_thread()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
||||||
|
|
||||||
// Target results being null here indicates that it was destroyed while we
|
// Target results being null here indicates that it was destroyed while we
|
||||||
// were in the process of advancing the Realm version and preparing for
|
// were in the process of advancing the Realm version and preparing for
|
||||||
// delivery, i.e. it was destroyed from the "wrong" thread
|
// delivery, i.e. it was destroyed from the "wrong" thread
|
||||||
if (!m_target_results) {
|
if (!m_target_results) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can get called before the query has actually had the chance to run if
|
// We can get called before the query has actually had the chance to run if
|
||||||
// we're added immediately before a different set of async results are
|
// we're added immediately before a different set of async results are
|
||||||
// delivered
|
// delivered
|
||||||
if (!m_initial_run_complete && !err) {
|
if (!m_initial_run_complete && !err) {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tell remove_callback() to defer actually removing them, so that calling it
|
|
||||||
// in the callback block works
|
|
||||||
m_calling_callbacks = true;
|
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
m_error = true;
|
m_error = err;
|
||||||
for (auto& callback : m_callbacks) {
|
return m_have_callbacks;
|
||||||
callback.fn(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove all the callbacks as we never need to call anything ever again
|
|
||||||
// after delivering an error
|
|
||||||
m_callbacks.clear();
|
|
||||||
m_callbacks_to_remove.clear();
|
|
||||||
m_calling_callbacks = false;
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
REALM_ASSERT(!m_query_handover);
|
REALM_ASSERT(!m_query_handover);
|
||||||
|
|
||||||
auto realm_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction();
|
auto realm_sg_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction();
|
||||||
if (m_version != realm_version) {
|
if (m_sg_version != realm_sg_version) {
|
||||||
// Realm version can be newer if a commit was made on our thread or the
|
// Realm version can be newer if a commit was made on our thread or the
|
||||||
// user manually called refresh(), or older if a commit was made on a
|
// user manually called refresh(), or older if a commit was made on a
|
||||||
// different thread and we ran *really* fast in between the check for
|
// different thread and we ran *really* fast in between the check for
|
||||||
// if the shared group has changed and when we pick up async results
|
// if the shared group has changed and when we pick up async results
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Cannot use m_did_update here as it is used unlocked in run()
|
|
||||||
bool did_update = false;
|
|
||||||
if (m_tv_handover) {
|
if (m_tv_handover) {
|
||||||
|
m_tv_handover->version = m_sg_version;
|
||||||
Results::AsyncFriend::set_table_view(*m_target_results,
|
Results::AsyncFriend::set_table_view(*m_target_results,
|
||||||
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
|
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
|
||||||
|
m_delievered_table_version = m_handed_over_table_version;
|
||||||
|
|
||||||
did_update = true;
|
|
||||||
}
|
}
|
||||||
REALM_ASSERT(!m_tv_handover);
|
REALM_ASSERT(!m_tv_handover);
|
||||||
|
return m_have_callbacks;
|
||||||
|
}
|
||||||
|
|
||||||
for (auto& callback : m_callbacks) {
|
void AsyncQuery::call_callbacks()
|
||||||
if (did_update || callback.first_run) {
|
{
|
||||||
callback.fn(nullptr);
|
REALM_ASSERT(is_for_current_thread());
|
||||||
callback.first_run = false;
|
|
||||||
|
while (auto fn = next_callback()) {
|
||||||
|
fn(m_error);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_error) {
|
||||||
|
// Remove all the callbacks as we never need to call anything ever again
|
||||||
|
// after delivering an error
|
||||||
|
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
||||||
|
m_callbacks.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::function<void (std::exception_ptr)> AsyncQuery::next_callback()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
||||||
|
for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) {
|
||||||
|
auto& callback = m_callbacks[m_callback_index];
|
||||||
|
if (m_error || callback.delivered_version != m_delievered_table_version) {
|
||||||
|
callback.delivered_version = m_delievered_table_version;
|
||||||
|
return callback.fn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
m_calling_callbacks = false;
|
m_callback_index = npos;
|
||||||
|
return nullptr;
|
||||||
// Actually remove any callbacks whose removal was requested during iteration
|
|
||||||
for (auto token : m_callbacks_to_remove) {
|
|
||||||
do_remove_callback(token);
|
|
||||||
}
|
|
||||||
m_callbacks_to_remove.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void AsyncQuery::attach_to(realm::SharedGroup& sg)
|
void AsyncQuery::attach_to(realm::SharedGroup& sg)
|
||||||
|
|
|
@ -34,18 +34,22 @@ namespace _impl {
|
||||||
class AsyncQuery {
|
class AsyncQuery {
|
||||||
public:
|
public:
|
||||||
AsyncQuery(Results& target);
|
AsyncQuery(Results& target);
|
||||||
|
~AsyncQuery();
|
||||||
|
|
||||||
size_t add_callback(std::function<void (std::exception_ptr)>);
|
size_t add_callback(std::function<void (std::exception_ptr)>);
|
||||||
void remove_callback(size_t token);
|
void remove_callback(size_t token);
|
||||||
|
|
||||||
void unregister() noexcept;
|
void unregister() noexcept;
|
||||||
|
void release_query() noexcept;
|
||||||
|
|
||||||
// Run/rerun the query if needed
|
// Run/rerun the query if needed
|
||||||
void run();
|
void run();
|
||||||
// Prepare the handover object if run() did update the TableView
|
// Prepare the handover object if run() did update the TableView
|
||||||
void prepare_handover();
|
void prepare_handover();
|
||||||
// Update the target results from the handover and call callbacks
|
// Update the target results from the handover
|
||||||
void deliver(SharedGroup& sg, std::exception_ptr err);
|
// Returns if any callbacks need to be invoked
|
||||||
|
bool deliver(SharedGroup& sg, std::exception_ptr err);
|
||||||
|
void call_callbacks();
|
||||||
|
|
||||||
// Attach the handed-over query to `sg`
|
// Attach the handed-over query to `sg`
|
||||||
void attach_to(SharedGroup& sg);
|
void attach_to(SharedGroup& sg);
|
||||||
|
@ -55,11 +59,13 @@ public:
|
||||||
|
|
||||||
Realm& get_realm() { return *m_target_results->get_realm(); }
|
Realm& get_realm() { return *m_target_results->get_realm(); }
|
||||||
// Get the version of the current handover object
|
// Get the version of the current handover object
|
||||||
SharedGroup::VersionID version() const noexcept { return m_version; }
|
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
|
||||||
|
|
||||||
|
bool is_alive() const noexcept;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Target Results to update and a mutex which guards it
|
// Target Results to update and a mutex which guards it
|
||||||
std::mutex m_target_mutex;
|
mutable std::mutex m_target_mutex;
|
||||||
Results* m_target_results;
|
Results* m_target_results;
|
||||||
|
|
||||||
std::shared_ptr<Realm> m_realm;
|
std::shared_ptr<Realm> m_realm;
|
||||||
|
@ -77,13 +83,13 @@ private:
|
||||||
// be non-null
|
// be non-null
|
||||||
TableView m_tv;
|
TableView m_tv;
|
||||||
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
|
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
|
||||||
SharedGroup::VersionID m_version;
|
SharedGroup::VersionID m_sg_version;
|
||||||
|
std::exception_ptr m_error;
|
||||||
|
|
||||||
struct Callback {
|
struct Callback {
|
||||||
std::function<void (std::exception_ptr)> fn;
|
std::function<void (std::exception_ptr)> fn;
|
||||||
std::unique_ptr<SharedGroup::Handover<TableView>> handover;
|
|
||||||
size_t token;
|
size_t token;
|
||||||
bool first_run;
|
uint_fast64_t delivered_version;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Currently registered callbacks and a mutex which must always be held
|
// Currently registered callbacks and a mutex which must always be held
|
||||||
|
@ -91,21 +97,27 @@ private:
|
||||||
std::mutex m_callback_mutex;
|
std::mutex m_callback_mutex;
|
||||||
std::vector<Callback> m_callbacks;
|
std::vector<Callback> m_callbacks;
|
||||||
|
|
||||||
// Callbacks which the user has asked to have removed whose removal has been
|
|
||||||
// deferred until after we're done looping over m_callbacks
|
|
||||||
std::vector<size_t> m_callbacks_to_remove;
|
|
||||||
|
|
||||||
SharedGroup* m_sg = nullptr;
|
SharedGroup* m_sg = nullptr;
|
||||||
|
|
||||||
bool m_did_update = false;
|
uint_fast64_t m_handed_over_table_version = -1;
|
||||||
|
uint_fast64_t m_delievered_table_version = -1;
|
||||||
|
|
||||||
|
// Iteration variable for looping over callbacks
|
||||||
|
// remove_callback() updates this when needed
|
||||||
|
size_t m_callback_index = npos;
|
||||||
|
|
||||||
bool m_skipped_running = false;
|
bool m_skipped_running = false;
|
||||||
bool m_initial_run_complete = false;
|
bool m_initial_run_complete = false;
|
||||||
bool m_calling_callbacks = false;
|
|
||||||
bool m_error = false;
|
|
||||||
|
|
||||||
void do_remove_callback(size_t token) noexcept;
|
// Cached value for if m_callbacks is empty, needed to avoid deadlocks in
|
||||||
|
// run() due to lock-order inversion between m_callback_mutex and m_target_mutex
|
||||||
|
// It's okay if this value is stale as at worst it'll result in us doing
|
||||||
|
// some extra work.
|
||||||
|
std::atomic<bool> m_have_callbacks = {false};
|
||||||
|
|
||||||
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
|
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
|
||||||
|
|
||||||
|
std::function<void (std::exception_ptr)> next_callback();
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace _impl
|
} // namespace _impl
|
||||||
|
|
|
@ -246,37 +246,40 @@ void RealmCoordinator::register_query(std::shared_ptr<AsyncQuery> query)
|
||||||
self.pin_version(version.version, version.index);
|
self.pin_version(version.version, version.index);
|
||||||
self.m_new_queries.push_back(std::move(query));
|
self.m_new_queries.push_back(std::move(query));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wake up the background worker threads by pretending we made a commit
|
|
||||||
self.m_notifier->notify_others();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RealmCoordinator::unregister_query(AsyncQuery& query)
|
void RealmCoordinator::clean_up_dead_queries()
|
||||||
{
|
{
|
||||||
auto swap_remove = [&](auto& container) {
|
auto swap_remove = [&](auto& container) {
|
||||||
auto it = std::find_if(container.begin(), container.end(),
|
bool did_remove = false;
|
||||||
[&](auto const& ptr) { return ptr.get() == &query; });
|
for (size_t i = 0; i < container.size(); ++i) {
|
||||||
if (it != container.end()) {
|
if (container[i]->is_alive())
|
||||||
std::iter_swap(--container.end(), it);
|
continue;
|
||||||
|
|
||||||
|
// Ensure the query is destroyed here even if there's lingering refs
|
||||||
|
// to the async query elsewhere
|
||||||
|
container[i]->release_query();
|
||||||
|
|
||||||
|
if (container.size() > i + 1)
|
||||||
|
container[i] = std::move(container.back());
|
||||||
container.pop_back();
|
container.pop_back();
|
||||||
return true;
|
--i;
|
||||||
|
did_remove = true;
|
||||||
}
|
}
|
||||||
return false;
|
return did_remove;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto& self = Realm::Internal::get_coordinator(query.get_realm());
|
if (swap_remove(m_queries)) {
|
||||||
std::lock_guard<std::mutex> lock(self.m_query_mutex);
|
|
||||||
if (swap_remove(self.m_queries)) {
|
|
||||||
// Make sure we aren't holding on to read versions needlessly if there
|
// Make sure we aren't holding on to read versions needlessly if there
|
||||||
// are no queries left, but don't close them entirely as opening shared
|
// are no queries left, but don't close them entirely as opening shared
|
||||||
// groups is expensive
|
// groups is expensive
|
||||||
if (!self.m_running_queries && self.m_queries.empty() && self.m_query_sg) {
|
if (m_queries.empty() && m_query_sg) {
|
||||||
self.m_query_sg->end_read();
|
m_query_sg->end_read();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (swap_remove(self.m_new_queries)) {
|
if (swap_remove(m_new_queries)) {
|
||||||
if (self.m_new_queries.empty() && self.m_advancer_sg) {
|
if (m_new_queries.empty() && m_advancer_sg) {
|
||||||
self.m_advancer_sg->end_read();
|
m_advancer_sg->end_read();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -295,6 +298,8 @@ void RealmCoordinator::run_async_queries()
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(m_query_mutex);
|
std::unique_lock<std::mutex> lock(m_query_mutex);
|
||||||
|
|
||||||
|
clean_up_dead_queries();
|
||||||
|
|
||||||
if (m_queries.empty() && m_new_queries.empty()) {
|
if (m_queries.empty() && m_new_queries.empty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -310,10 +315,6 @@ void RealmCoordinator::run_async_queries()
|
||||||
|
|
||||||
advance_helper_shared_group_to_latest();
|
advance_helper_shared_group_to_latest();
|
||||||
|
|
||||||
// Tell other threads not to close the shared group as we need it even
|
|
||||||
// though we aren't holding the lock
|
|
||||||
m_running_queries = true;
|
|
||||||
|
|
||||||
// Make a copy of the queries vector so that we can release the lock while
|
// Make a copy of the queries vector so that we can release the lock while
|
||||||
// we run the queries
|
// we run the queries
|
||||||
auto queries_to_run = m_queries;
|
auto queries_to_run = m_queries;
|
||||||
|
@ -326,20 +327,13 @@ void RealmCoordinator::run_async_queries()
|
||||||
// Reacquire the lock while updating the fields that are actually read on
|
// Reacquire the lock while updating the fields that are actually read on
|
||||||
// other threads
|
// other threads
|
||||||
{
|
{
|
||||||
// Make sure we don't change the version while another thread is delivering
|
|
||||||
std::lock_guard<std::mutex> version_lock(m_query_version_mutex);
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
for (auto& query : queries_to_run) {
|
for (auto& query : queries_to_run) {
|
||||||
query->prepare_handover();
|
query->prepare_handover();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if all queries were removed while we were running them, as if so
|
clean_up_dead_queries();
|
||||||
// the shared group didn't get closed by do_unregister_query()
|
|
||||||
m_running_queries = false;
|
|
||||||
if (m_queries.empty()) {
|
|
||||||
m_query_sg->end_read();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RealmCoordinator::open_helper_shared_group()
|
void RealmCoordinator::open_helper_shared_group()
|
||||||
|
@ -413,7 +407,6 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
|
||||||
auto& sg = Realm::Internal::get_shared_group(realm);
|
auto& sg = Realm::Internal::get_shared_group(realm);
|
||||||
auto& history = Realm::Internal::get_history(realm);
|
auto& history = Realm::Internal::get_history(realm);
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(m_query_version_mutex);
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_query_mutex);
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
|
||||||
|
@ -436,25 +429,33 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
|
||||||
}
|
}
|
||||||
|
|
||||||
transaction::advance(sg, history, realm.m_binding_context.get(), version);
|
transaction::advance(sg, history, realm.m_binding_context.get(), version);
|
||||||
queries = m_queries;
|
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
if (query->deliver(sg, m_async_error)) {
|
||||||
|
queries.push_back(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto& query : queries) {
|
for (auto& query : queries) {
|
||||||
query->deliver(sg, m_async_error);
|
query->call_callbacks();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RealmCoordinator::process_available_async(Realm& realm)
|
void RealmCoordinator::process_available_async(Realm& realm)
|
||||||
{
|
{
|
||||||
|
auto& sg = Realm::Internal::get_shared_group(realm);
|
||||||
decltype(m_queries) queries;
|
decltype(m_queries) queries;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(m_query_mutex);
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
queries = m_queries;
|
for (auto& query : m_queries) {
|
||||||
|
if (query->deliver(sg, m_async_error)) {
|
||||||
|
queries.push_back(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto& sg = Realm::Internal::get_shared_group(realm);
|
|
||||||
std::lock_guard<std::mutex> lock(m_query_version_mutex);
|
|
||||||
for (auto& query : queries) {
|
for (auto& query : queries) {
|
||||||
query->deliver(sg, m_async_error);
|
query->call_callbacks();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,6 @@ public:
|
||||||
void update_schema(Schema const& new_schema);
|
void update_schema(Schema const& new_schema);
|
||||||
|
|
||||||
static void register_query(std::shared_ptr<AsyncQuery> query);
|
static void register_query(std::shared_ptr<AsyncQuery> query);
|
||||||
static void unregister_query(AsyncQuery& query);
|
|
||||||
|
|
||||||
// Advance the Realm to the most recent transaction version which all async
|
// Advance the Realm to the most recent transaction version which all async
|
||||||
// work is complete for
|
// work is complete for
|
||||||
|
@ -95,7 +94,6 @@ private:
|
||||||
std::vector<CachedRealm> m_cached_realms;
|
std::vector<CachedRealm> m_cached_realms;
|
||||||
|
|
||||||
std::mutex m_query_mutex;
|
std::mutex m_query_mutex;
|
||||||
std::mutex m_query_version_mutex;
|
|
||||||
bool m_running_queries = false;
|
bool m_running_queries = false;
|
||||||
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
|
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
|
||||||
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
|
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
|
||||||
|
@ -121,6 +119,7 @@ private:
|
||||||
void open_helper_shared_group();
|
void open_helper_shared_group();
|
||||||
void move_new_queries_to_main();
|
void move_new_queries_to_main();
|
||||||
void advance_helper_shared_group_to_latest();
|
void advance_helper_shared_group_to_latest();
|
||||||
|
void clean_up_dead_queries();
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace _impl
|
} // namespace _impl
|
||||||
|
|
Loading…
Reference in New Issue