Extract out the parts of AsyncQuery not directly related to query running

This commit is contained in:
Thomas Goyne 2016-01-26 18:08:23 -08:00
parent deea1e8f5f
commit 6380335fc3
8 changed files with 343 additions and 225 deletions

View File

@ -7,6 +7,7 @@ set(SOURCES
schema.cpp
shared_realm.cpp
impl/async_query.cpp
impl/background_collection.cpp
impl/realm_coordinator.cpp
impl/transact_log_handler.cpp
parser/parser.cpp
@ -20,10 +21,11 @@ set(HEADERS
results.hpp
schema.hpp
shared_realm.hpp
impl/weak_realm_notifier.hpp
impl/weak_realm_notifier_base.hpp
impl/background_collection.hpp
impl/external_commit_helper.hpp
impl/transact_log_handler.hpp
impl/weak_realm_notifier.hpp
impl/weak_realm_notifier_base.hpp
parser/parser.hpp
parser/query_builder.hpp
util/atomic_shared_ptr.hpp)

View File

@ -25,98 +25,19 @@ using namespace realm;
using namespace realm::_impl;
AsyncQuery::AsyncQuery(Results& target)
: m_target_results(&target)
, m_realm(target.get_realm())
: BackgroundCollection(target.get_realm())
, m_target_results(&target)
, m_sort(target.get_sort())
, m_sg_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction())
{
Query q = target.get_query();
m_query_handover = Realm::Internal::get_shared_group(*m_realm).export_for_handover(q, MutableSourcePayload::Move);
m_query_handover = Realm::Internal::get_shared_group(get_realm()).export_for_handover(q, MutableSourcePayload::Move);
}
AsyncQuery::~AsyncQuery()
void AsyncQuery::release_data() noexcept
{
// unregister() may have been called from a different thread than we're being
// destroyed on, so we need to synchronize access to the interesting fields
// modified there
std::lock_guard<std::mutex> lock(m_target_mutex);
m_realm = nullptr;
}
size_t AsyncQuery::add_callback(std::function<void (std::exception_ptr)> callback)
{
m_realm->verify_thread();
auto next_token = [=] {
size_t token = 0;
for (auto& callback : m_callbacks) {
if (token <= callback.token) {
token = callback.token + 1;
}
}
return token;
};
std::lock_guard<std::mutex> lock(m_callback_mutex);
auto token = next_token();
m_callbacks.push_back({std::move(callback), token, -1ULL});
if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
Realm::Internal::get_coordinator(*m_realm).send_commit_notifications();
m_have_callbacks = true;
}
return token;
}
void AsyncQuery::remove_callback(size_t token)
{
Callback old;
{
std::lock_guard<std::mutex> lock(m_callback_mutex);
REALM_ASSERT(m_error || m_callbacks.size() > 0);
auto it = find_if(begin(m_callbacks), end(m_callbacks),
[=](const auto& c) { return c.token == token; });
// We should only fail to find the callback if it was removed due to an error
REALM_ASSERT(m_error || it != end(m_callbacks));
if (it == end(m_callbacks)) {
return;
}
size_t idx = distance(begin(m_callbacks), it);
if (m_callback_index != npos && m_callback_index >= idx) {
--m_callback_index;
}
old = std::move(*it);
m_callbacks.erase(it);
m_have_callbacks = !m_callbacks.empty();
}
}
void AsyncQuery::unregister() noexcept
{
std::lock_guard<std::mutex> lock(m_target_mutex);
m_target_results = nullptr;
m_realm = nullptr;
}
void AsyncQuery::release_query() noexcept
{
{
std::lock_guard<std::mutex> lock(m_target_mutex);
REALM_ASSERT(!m_realm && !m_target_results);
}
m_query = nullptr;
}
bool AsyncQuery::is_alive() const noexcept
{
std::lock_guard<std::mutex> lock(m_target_mutex);
return m_target_results != nullptr;
}
// Most of the inter-thread synchronization for run(), prepare_handover(),
// attach_to(), detach(), release_query() and deliver() is done by
// RealmCoordinator external to this code, which has some potentially
@ -143,12 +64,12 @@ bool AsyncQuery::is_alive() const noexcept
void AsyncQuery::run()
{
REALM_ASSERT(m_sg);
m_did_change = false;
{
std::lock_guard<std::mutex> target_lock(m_target_mutex);
// Don't run the query if the results aren't actually going to be used
if (!m_target_results || (!m_have_callbacks && !m_target_results->wants_background_updates())) {
if (!m_target_results || (!have_callbacks() && !m_target_results->wants_background_updates())) {
return;
}
}
@ -168,33 +89,31 @@ void AsyncQuery::run()
if (m_sort) {
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
}
m_did_change = true;
}
void AsyncQuery::prepare_handover()
bool AsyncQuery::do_prepare_handover(SharedGroup& sg)
{
m_sg_version = m_sg->get_version_of_current_transaction();
if (!m_tv.is_attached()) {
return;
return false;
}
REALM_ASSERT(m_tv.is_in_sync());
m_initial_run_complete = true;
m_handed_over_table_version = m_tv.sync_if_needed();
m_tv_handover = m_sg->export_for_handover(m_tv, MutableSourcePayload::Move);
m_tv_handover = sg.export_for_handover(m_tv, MutableSourcePayload::Move);
// detach the TableView as we won't need it again and keeping it around
// makes advance_read() much more expensive
m_tv = TableView();
return m_did_change;
}
bool AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
bool AsyncQuery::do_deliver(SharedGroup& sg)
{
if (!is_for_current_thread()) {
return false;
}
std::lock_guard<std::mutex> target_lock(m_target_mutex);
// Target results being null here indicates that it was destroyed while we
@ -207,84 +126,32 @@ bool AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
// We can get called before the query has actually had the chance to run if
// we're added immediately before a different set of async results are
// delivered
if (!m_initial_run_complete && !err) {
if (!m_initial_run_complete) {
return false;
}
if (err) {
m_error = err;
return m_have_callbacks;
}
REALM_ASSERT(!m_query_handover);
auto realm_sg_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction();
if (m_sg_version != realm_sg_version) {
// Realm version can be newer if a commit was made on our thread or the
// user manually called refresh(), or older if a commit was made on a
// different thread and we ran *really* fast in between the check for
// if the shared group has changed and when we pick up async results
return false;
}
if (m_tv_handover) {
m_tv_handover->version = m_sg_version;
m_tv_handover->version = version();
Results::Internal::set_table_view(*m_target_results,
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
m_delivered_table_version = m_handed_over_table_version;
}
REALM_ASSERT(!m_tv_handover);
return m_have_callbacks;
return have_callbacks();
}
void AsyncQuery::call_callbacks()
void AsyncQuery::do_attach_to(SharedGroup& sg)
{
REALM_ASSERT(is_for_current_thread());
while (auto fn = next_callback()) {
fn(m_error);
}
if (m_error) {
// Remove all the callbacks as we never need to call anything ever again
// after delivering an error
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
m_callbacks.clear();
}
}
std::function<void (std::exception_ptr)> AsyncQuery::next_callback()
{
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) {
auto& callback = m_callbacks[m_callback_index];
if (m_error || callback.delivered_version != m_delivered_table_version) {
callback.delivered_version = m_delivered_table_version;
return callback.fn;
}
}
m_callback_index = npos;
return nullptr;
}
void AsyncQuery::attach_to(realm::SharedGroup& sg)
{
REALM_ASSERT(!m_sg);
REALM_ASSERT(m_query_handover);
m_query = sg.import_from_handover(std::move(m_query_handover));
m_sg = &sg;
}
void AsyncQuery::detatch()
void AsyncQuery::do_detach_from(SharedGroup& sg)
{
REALM_ASSERT(m_sg);
REALM_ASSERT(m_query);
REALM_ASSERT(!m_tv.is_attached());
m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move);
m_sg = nullptr;
m_query_handover = sg.export_for_handover(*m_query, MutableSourcePayload::Move);
m_query = nullptr;
}

View File

@ -19,6 +19,7 @@
#ifndef REALM_ASYNC_QUERY_HPP
#define REALM_ASYNC_QUERY_HPP
#include "background_collection.hpp"
#include "results.hpp"
#include <realm/group_shared.hpp>
@ -31,46 +32,28 @@
namespace realm {
namespace _impl {
class AsyncQuery {
class AsyncQuery : public BackgroundCollection {
public:
AsyncQuery(Results& target);
~AsyncQuery();
size_t add_callback(std::function<void (std::exception_ptr)>);
void remove_callback(size_t token);
void unregister() noexcept;
void release_query() noexcept;
// Run/rerun the query if needed
void run();
// Prepare the handover object if run() did update the TableView
void prepare_handover();
// Update the target results from the handover
// Returns if any callbacks need to be invoked
bool deliver(SharedGroup& sg, std::exception_ptr err);
void call_callbacks();
// Attach the handed-over query to `sg`
void attach_to(SharedGroup& sg);
// Create a new query handover object and stop using the previously attached
// SharedGroup
void detatch();
Realm& get_realm() { return *m_target_results->get_realm(); }
// Get the version of the current handover object
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
bool is_alive() const noexcept;
private:
// Run/rerun the query if needed
void run() override;
// Prepare the handover object if run() did update the TableView
bool do_prepare_handover(SharedGroup&) override;
// Update the target results from the handover
// Returns if any callbacks need to be invoked
bool do_deliver(SharedGroup& sg) override;
void release_data() noexcept override;
void do_attach_to(SharedGroup& sg) override;
void do_detach_from(SharedGroup& sg) override;
// Target Results to update and a mutex which guards it
mutable std::mutex m_target_mutex;
Results* m_target_results;
std::shared_ptr<Realm> m_realm;
const SortOrder m_sort;
const std::thread::id m_thread_id = std::this_thread::get_id();
// The source Query, in handover form iff m_sg is null
std::unique_ptr<SharedGroup::Handover<Query>> m_query_handover;
@ -80,40 +63,11 @@ private:
// the query was (re)run since the last time the handover object was created
TableView m_tv;
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
SharedGroup::VersionID m_sg_version;
std::exception_ptr m_error;
struct Callback {
std::function<void (std::exception_ptr)> fn;
size_t token;
uint_fast64_t delivered_version;
};
// Currently registered callbacks and a mutex which must always be held
// while doing anything with them or m_callback_index
std::mutex m_callback_mutex;
std::vector<Callback> m_callbacks;
SharedGroup* m_sg = nullptr;
uint_fast64_t m_handed_over_table_version = -1;
uint_fast64_t m_delivered_table_version = -1;
// Iteration variable for looping over callbacks
// remove_callback() updates this when needed
size_t m_callback_index = npos;
bool m_did_change = false;
bool m_initial_run_complete = false;
// Cached value for if m_callbacks is empty, needed to avoid deadlocks in
// run() due to lock-order inversion between m_callback_mutex and m_target_mutex
// It's okay if this value is stale as at worst it'll result in us doing
// some extra work.
std::atomic<bool> m_have_callbacks = {false};
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
std::function<void (std::exception_ptr)> next_callback();
};
} // namespace _impl

View File

@ -0,0 +1,179 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/background_collection.hpp"
#include "impl/realm_coordinator.hpp"
#include "shared_realm.hpp"
using namespace realm;
using namespace realm::_impl;
BackgroundCollection::BackgroundCollection(std::shared_ptr<Realm> realm)
: m_realm(std::move(realm))
, m_sg_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction())
{
}
BackgroundCollection::~BackgroundCollection()
{
// unregister() may have been called from a different thread than we're being
// destroyed on, so we need to synchronize access to the interesting fields
// modified there
std::lock_guard<std::mutex> lock(m_realm_mutex);
m_realm = nullptr;
}
size_t BackgroundCollection::add_callback(CollectionChangeCallback callback)
{
m_realm->verify_thread();
auto next_token = [=] {
size_t token = 0;
for (auto& callback : m_callbacks) {
if (token <= callback.token) {
token = callback.token + 1;
}
}
return token;
};
std::lock_guard<std::mutex> lock(m_callback_mutex);
auto token = next_token();
m_callbacks.push_back({std::move(callback), token, -1ULL});
if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
Realm::Internal::get_coordinator(*m_realm).send_commit_notifications();
m_have_callbacks = true;
}
return token;
}
void BackgroundCollection::remove_callback(size_t token)
{
Callback old;
{
std::lock_guard<std::mutex> lock(m_callback_mutex);
REALM_ASSERT(m_error || m_callbacks.size() > 0);
auto it = find_if(begin(m_callbacks), end(m_callbacks),
[=](const auto& c) { return c.token == token; });
// We should only fail to find the callback if it was removed due to an error
REALM_ASSERT(m_error || it != end(m_callbacks));
if (it == end(m_callbacks)) {
return;
}
size_t idx = distance(begin(m_callbacks), it);
if (m_callback_index != npos && m_callback_index >= idx) {
--m_callback_index;
}
old = std::move(*it);
m_callbacks.erase(it);
m_have_callbacks = !m_callbacks.empty();
}
}
void BackgroundCollection::unregister() noexcept
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
m_realm = nullptr;
}
bool BackgroundCollection::is_alive() const noexcept
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
return m_realm != nullptr;
}
void BackgroundCollection::prepare_handover()
{
REALM_ASSERT(m_sg);
m_sg_version = m_sg->get_version_of_current_transaction();
if (do_prepare_handover(*m_sg))
++m_results_version;
}
bool BackgroundCollection::deliver(SharedGroup& sg,
std::exception_ptr err)
{
if (!is_for_current_thread()) {
return false;
}
if (err) {
m_error = err;
return have_callbacks();
}
auto realm_sg_version = sg.get_version_of_current_transaction();
if (version() != realm_sg_version) {
// Realm version can be newer if a commit was made on our thread or the
// user manually called refresh(), or older if a commit was made on a
// different thread and we ran *really* fast in between the check for
// if the shared group has changed and when we pick up async results
return false;
}
return do_deliver(sg);
}
void BackgroundCollection::call_callbacks()
{
while (auto fn = next_callback()) {
fn(m_error);
}
if (m_error) {
// Remove all the callbacks as we never need to call anything ever again
// after delivering an error
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
m_callbacks.clear();
}
}
CollectionChangeCallback BackgroundCollection::next_callback()
{
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) {
auto& callback = m_callbacks[m_callback_index];
if (m_error || callback.delivered_version != m_results_version) {
callback.delivered_version = m_results_version;
return callback.fn;
}
}
m_callback_index = npos;
return nullptr;
}
void BackgroundCollection::attach_to(SharedGroup& sg)
{
REALM_ASSERT(!m_sg);
m_sg = &sg;
do_attach_to(sg);
}
void BackgroundCollection::detach()
{
REALM_ASSERT(m_sg);
do_detach_from(*m_sg);
m_sg = nullptr;
}

View File

@ -0,0 +1,114 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_BACKGROUND_COLLECTION_HPP
#define REALM_BACKGROUND_COLLECTION_HPP
#include <realm/group_shared.hpp>
#include <mutex>
#include <functional>
#include <set>
#include <thread>
namespace realm {
class Realm;
using CollectionChangeCallback = std::function<void (std::exception_ptr)>;
namespace _impl {
class BackgroundCollection {
public:
BackgroundCollection(std::shared_ptr<Realm>);
virtual ~BackgroundCollection();
void unregister() noexcept;
virtual void release_data() noexcept = 0;
size_t add_callback(CollectionChangeCallback callback);
void remove_callback(size_t token);
void call_callbacks();
bool is_alive() const noexcept;
Realm& get_realm() const noexcept { return *m_realm; }
// Attach the handed-over query to `sg`
void attach_to(SharedGroup& sg);
// Create a new query handover object and stop using the previously attached
// SharedGroup
void detach();
virtual void run() { }
void prepare_handover();
bool deliver(SharedGroup&, std::exception_ptr);
// Get the version of the current handover object
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
protected:
bool have_callbacks() const noexcept { return m_have_callbacks; }
private:
virtual void do_attach_to(SharedGroup&) = 0;
virtual void do_detach_from(SharedGroup&) = 0;
virtual bool do_prepare_handover(SharedGroup&) = 0;
virtual bool do_deliver(SharedGroup&) = 0;
const std::thread::id m_thread_id = std::this_thread::get_id();
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
mutable std::mutex m_realm_mutex;
std::shared_ptr<Realm> m_realm;
SharedGroup::VersionID m_sg_version;
SharedGroup* m_sg = nullptr;
std::exception_ptr m_error;
uint_fast64_t m_results_version = 0;
struct Callback {
CollectionChangeCallback fn;
size_t token;
uint_fast64_t delivered_version;
};
// Currently registered callbacks and a mutex which must always be held
// while doing anything with them or m_callback_index
std::mutex m_callback_mutex;
std::vector<Callback> m_callbacks;
// Cached value for if m_callbacks is empty, needed to avoid deadlocks in
// run() due to lock-order inversion between m_callback_mutex and m_target_mutex
// It's okay if this value is stale as at worst it'll result in us doing
// some extra work.
std::atomic<bool> m_have_callbacks = {false};
// Iteration variable for looping over callbacks
// remove_callback() updates this when needed
size_t m_callback_index = npos;
CollectionChangeCallback next_callback();
};
} // namespace _impl
} // namespace realm
#endif /* REALM_BACKGROUND_COLLECTION_HPP */

View File

@ -268,7 +268,7 @@ void RealmCoordinator::clean_up_dead_queries()
// Ensure the query is destroyed here even if there's lingering refs
// to the async query elsewhere
container[i]->release_query();
container[i]->release_data();
if (container.size() > i + 1)
container[i] = std::move(container.back());
@ -399,7 +399,7 @@ void RealmCoordinator::advance_helper_shared_group_to_latest()
// Transfer all new queries over to the main SG
for (auto& query : m_new_queries) {
query->detatch();
query->detach();
query->attach_to(*m_query_sg);
}

View File

@ -32,9 +32,9 @@ class SharedGroup;
struct AsyncQueryCancelationToken;
namespace _impl {
class AsyncQuery;
class WeakRealmNotifier;
class BackgroundCollection;
class ExternalCommitHelper;
class WeakRealmNotifier;
// RealmCoordinator manages the weak cache of Realm instances and communication
// between per-thread Realm instances for a given file
@ -97,8 +97,8 @@ private:
std::vector<WeakRealmNotifier> m_weak_realm_notifiers;
std::mutex m_query_mutex;
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
std::vector<std::shared_ptr<_impl::BackgroundCollection>> m_new_queries;
std::vector<std::shared_ptr<_impl::BackgroundCollection>> m_queries;
// SharedGroup used for actually running async queries
// Will have a read transaction iff m_queries is non-empty

View File

@ -42,6 +42,7 @@ namespace realm {
namespace _impl {
class AsyncQuery;
class BackgroundCollection;
class RealmCoordinator;
}
@ -144,15 +145,16 @@ namespace realm {
// without making it public to everyone
class Internal {
friend class _impl::AsyncQuery;
friend class _impl::BackgroundCollection;
friend class _impl::RealmCoordinator;
// AsyncQuery needs access to the SharedGroup to be able to call the
// handover functions, which are not very wrappable
static SharedGroup& get_shared_group(Realm& realm) { return *realm.m_shared_group; }
// AsyncQuery needs to be able to access the owning coordinator to
// wake up the worker thread when a callback is added, and
// coordinators need to be able to get themselves from a Realm
// BackgroundCollection needs to be able to access the owning
// coordinator to wake up the worker thread when a callback is
// added, and coordinators need to be able to get themselves from a Realm
static _impl::RealmCoordinator& get_coordinator(Realm& realm) { return *realm.m_coordinator; }
};