mirror of
https://github.com/status-im/realm-js.git
synced 2025-02-05 19:24:31 +00:00
Use a single AsyncQuery per Results regardless of number of callbacks added
This commit is contained in:
parent
d165458601
commit
8f668fdf09
@ -19,59 +19,74 @@
|
||||
#include "impl/async_query.hpp"
|
||||
|
||||
#include "impl/realm_coordinator.hpp"
|
||||
#include "results.hpp"
|
||||
|
||||
using namespace realm;
|
||||
using namespace realm::_impl;
|
||||
|
||||
AsyncQuery::AsyncQuery(SortOrder sort,
|
||||
std::unique_ptr<SharedGroup::Handover<Query>> handover,
|
||||
std::unique_ptr<AsyncQueryCallback> callback,
|
||||
RealmCoordinator& parent)
|
||||
: parent(parent.shared_from_this())
|
||||
, m_sort(std::move(sort))
|
||||
, m_query_handover(std::move(handover))
|
||||
, m_callback(std::move(callback))
|
||||
AsyncQuery::AsyncQuery(Results& target)
|
||||
: m_target_results(&target)
|
||||
, m_realm(target.get_realm())
|
||||
, m_sort(target.get_sort())
|
||||
, m_version(m_realm->m_shared_group->get_version_of_current_transaction())
|
||||
{
|
||||
Query q = target.get_query();
|
||||
m_query_handover = m_realm->m_shared_group->export_for_handover(q, MutableSourcePayload::Move);
|
||||
}
|
||||
|
||||
void AsyncQuery::get_results(const SharedRealm& realm, SharedGroup& sg, std::vector<std::function<void()>>& ret)
|
||||
size_t AsyncQuery::add_callback(std::function<void (std::exception_ptr)> callback)
|
||||
{
|
||||
if (!m_callback->is_for_current_thread()) {
|
||||
return;
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||
|
||||
if (m_error) {
|
||||
ret.emplace_back([self = shared_from_this()] {
|
||||
self->m_callback->error(self->m_error);
|
||||
RealmCoordinator::unregister_query(*self);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!m_tv_handover) {
|
||||
return;
|
||||
}
|
||||
if (m_tv_handover->version < sg.get_version_of_current_transaction()) {
|
||||
// async results are stale; ignore
|
||||
return;
|
||||
}
|
||||
// auto r = Results(realm,
|
||||
// m_sort,
|
||||
// std::move(*sg.import_from_handover(std::move(m_tv_handover))));
|
||||
Results r;
|
||||
auto version = sg.get_version_of_current_transaction();
|
||||
ret.emplace_back([r = std::move(r), version, &sg, self = shared_from_this()] {
|
||||
if (sg.get_version_of_current_transaction() == version) {
|
||||
self->m_callback->deliver(std::move(r));
|
||||
size_t token = 0;
|
||||
for (auto& callback : m_callbacks) {
|
||||
if (token <= callback.token) {
|
||||
token = callback.token + 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
void AsyncQuery::prepare_update()
|
||||
void AsyncQuery::remove_callback(size_t token)
|
||||
{
|
||||
// This function must not touch m_tv_handover as it is called without the
|
||||
// relevant lock held (so that another thread can consume m_tv_handover
|
||||
// while this is running)
|
||||
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||
if (is_for_current_thread() && m_calling_callbacks) {
|
||||
// Schedule the removal for after we're done calling callbacks
|
||||
m_callbacks_to_remove.push_back(token);
|
||||
return;
|
||||
}
|
||||
do_remove_callback(token);
|
||||
}
|
||||
|
||||
void AsyncQuery::do_remove_callback(size_t token) noexcept
|
||||
{
|
||||
REALM_ASSERT(m_error || m_callbacks.size() > 0);
|
||||
auto it = find_if(begin(m_callbacks), end(m_callbacks),
|
||||
[=](const auto& c) { return c.token == token; });
|
||||
// We should only fail to find the callback if it was removed due to an error
|
||||
REALM_ASSERT(m_error || it != end(m_callbacks));
|
||||
|
||||
if (it != end(m_callbacks)) {
|
||||
if (it != prev(end(m_callbacks))) {
|
||||
*it = std::move(m_callbacks.back());
|
||||
}
|
||||
m_callbacks.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncQuery::unregister() noexcept
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_target_mutex);
|
||||
RealmCoordinator::unregister_query(*this);
|
||||
m_target_results = nullptr;
|
||||
m_realm = nullptr;
|
||||
}
|
||||
|
||||
void AsyncQuery::run()
|
||||
{
|
||||
// This function must not touch any members touched in deliver(), as they
|
||||
// may be called concurrently (as it'd be pretty bad for a running query to
|
||||
// block the main thread trying to pick up the previous results)
|
||||
|
||||
REALM_ASSERT(m_sg);
|
||||
|
||||
@ -80,6 +95,7 @@ void AsyncQuery::prepare_update()
|
||||
}
|
||||
else {
|
||||
m_tv = m_query->find_all();
|
||||
m_query = nullptr;
|
||||
if (m_sort) {
|
||||
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
|
||||
}
|
||||
@ -89,33 +105,103 @@ void AsyncQuery::prepare_update()
|
||||
|
||||
void AsyncQuery::prepare_handover()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_callback_mutex);
|
||||
|
||||
REALM_ASSERT(m_tv.is_in_sync());
|
||||
|
||||
m_version = m_sg->get_version_of_current_transaction();
|
||||
m_initial_run_complete = true;
|
||||
|
||||
// Even if the TV didn't change, we need to re-export it if the previous
|
||||
// export has not been consumed yet, as the old handover object is no longer
|
||||
// usable due to the version not matching
|
||||
if (m_did_update || (m_tv_handover && m_tv_handover->version != m_sg->get_version_of_current_transaction())) {
|
||||
if (m_did_update || (m_tv_handover && m_tv_handover->version != m_version)) {
|
||||
m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncQuery::set_error(std::exception_ptr err)
|
||||
void AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
|
||||
{
|
||||
if (!m_error) {
|
||||
m_error = err;
|
||||
if (!is_for_current_thread()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
SharedGroup::VersionID AsyncQuery::version() const noexcept
|
||||
{
|
||||
if (m_tv_handover)
|
||||
return m_tv_handover->version;
|
||||
if (m_query_handover)
|
||||
return m_query_handover->version;
|
||||
return SharedGroup::VersionID{};
|
||||
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
|
||||
std::lock_guard<std::mutex> target_lock(m_target_mutex);
|
||||
|
||||
// Target results being null here indicates that it was destroyed while we
|
||||
// were in the process of advancing the Realm version and preparing for
|
||||
// delivery, i.e. it was destroyed from the "wrong" thread
|
||||
if (!m_target_results) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We can get called before the query has actually had the chance to run if
|
||||
// we're added immediately before a different set of async results are
|
||||
// delivered
|
||||
if (!m_initial_run_complete && !err) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Tell remove_callback() to defer actually removing them, so that calling it
|
||||
// in the callback block works
|
||||
m_calling_callbacks = true;
|
||||
|
||||
if (err) {
|
||||
m_error = true;
|
||||
for (auto& callback : m_callbacks) {
|
||||
callback.fn(err);
|
||||
}
|
||||
|
||||
// Remove all the callbacks as we never need to call anything ever again
|
||||
// after delivering an error
|
||||
m_callbacks.clear();
|
||||
m_callbacks_to_remove.clear();
|
||||
m_calling_callbacks = false;
|
||||
return;
|
||||
}
|
||||
|
||||
REALM_ASSERT(!m_query_handover);
|
||||
|
||||
auto realm_version = m_realm->m_shared_group->get_version_of_current_transaction();
|
||||
if (m_version != realm_version) {
|
||||
// Realm version can be newer if a commit was made on our thread or the
|
||||
// user manually called refresh(), or older if a commit was made on a
|
||||
// different thread and we ran *really* fast in between the check for
|
||||
// if the shared group has changed and when we pick up async results
|
||||
return;
|
||||
}
|
||||
|
||||
// Cannot use m_did_update here as it is used unlocked in run()
|
||||
bool did_update = false;
|
||||
if (m_tv_handover) {
|
||||
Results::AsyncFriend::set_table_view(*m_target_results,
|
||||
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
|
||||
|
||||
did_update = true;
|
||||
}
|
||||
REALM_ASSERT(!m_tv_handover);
|
||||
|
||||
for (auto& callback : m_callbacks) {
|
||||
if (did_update || callback.first_run) {
|
||||
callback.fn(nullptr);
|
||||
callback.first_run = false;
|
||||
}
|
||||
}
|
||||
|
||||
m_calling_callbacks = false;
|
||||
|
||||
// Actually remove any callbacks whose removal was requested during iteration
|
||||
for (auto token : m_callbacks_to_remove) {
|
||||
do_remove_callback(token);
|
||||
}
|
||||
m_callbacks_to_remove.clear();
|
||||
}
|
||||
|
||||
void AsyncQuery::attach_to(realm::SharedGroup& sg)
|
||||
{
|
||||
REALM_ASSERT(!m_sg);
|
||||
REALM_ASSERT(m_query_handover);
|
||||
|
||||
m_query = sg.import_from_handover(std::move(m_query_handover));
|
||||
m_sg = &sg;
|
||||
@ -124,7 +210,10 @@ void AsyncQuery::attach_to(realm::SharedGroup& sg)
|
||||
void AsyncQuery::detatch()
|
||||
{
|
||||
REALM_ASSERT(m_sg);
|
||||
REALM_ASSERT(m_query);
|
||||
REALM_ASSERT(!m_tv.is_attached());
|
||||
|
||||
m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move);
|
||||
m_sg = nullptr;
|
||||
m_query = nullptr;
|
||||
}
|
||||
|
@ -23,50 +23,88 @@
|
||||
|
||||
#include <realm/group_shared.hpp>
|
||||
|
||||
#include <exception>
|
||||
#include <mutex>
|
||||
#include <functional>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
namespace realm {
|
||||
namespace _impl {
|
||||
class AsyncQuery : public std::enable_shared_from_this<AsyncQuery> {
|
||||
class AsyncQuery {
|
||||
public:
|
||||
AsyncQuery(SortOrder sort,
|
||||
std::unique_ptr<SharedGroup::Handover<Query>> handover,
|
||||
std::unique_ptr<AsyncQueryCallback> callback,
|
||||
RealmCoordinator& parent);
|
||||
AsyncQuery(Results& target);
|
||||
|
||||
void get_results(const SharedRealm& realm, SharedGroup& sg, std::vector<std::function<void()>>& ret);
|
||||
size_t add_callback(std::function<void (std::exception_ptr)>);
|
||||
void remove_callback(size_t token);
|
||||
|
||||
void set_error(std::exception_ptr err);
|
||||
void unregister() noexcept;
|
||||
|
||||
// Run/rerun the query if needed
|
||||
void prepare_update();
|
||||
// Update the handover object with the new data produced in prepare_update()
|
||||
void run();
|
||||
// Prepare the handover object if run() did update the TableView
|
||||
void prepare_handover();
|
||||
// Update the target results from the handover and call callbacks
|
||||
void deliver(SharedGroup& sg, std::exception_ptr err);
|
||||
|
||||
// Get the version of the current handover object
|
||||
SharedGroup::VersionID version() const noexcept;
|
||||
|
||||
// Attach the handed-over query to `sg`
|
||||
void attach_to(SharedGroup& sg);
|
||||
// Create a new query handover object and stop using the previously attached
|
||||
// SharedGroup
|
||||
void detatch();
|
||||
|
||||
std::shared_ptr<RealmCoordinator> parent;
|
||||
Realm& get_realm() { return *m_target_results->get_realm(); }
|
||||
// Get the version of the current handover object
|
||||
SharedGroup::VersionID version() const noexcept { return m_version; }
|
||||
|
||||
private:
|
||||
const SortOrder m_sort;
|
||||
// Target Results to update and a mutex which guards it
|
||||
std::mutex m_target_mutex;
|
||||
Results* m_target_results;
|
||||
|
||||
std::shared_ptr<Realm> m_realm;
|
||||
const SortOrder m_sort;
|
||||
const std::thread::id m_thread_id = std::this_thread::get_id();
|
||||
|
||||
// The source Query, in handover from iff m_sg is null
|
||||
// Only used until the first time the query is actually run, after which
|
||||
// both will be null
|
||||
std::unique_ptr<SharedGroup::Handover<Query>> m_query_handover;
|
||||
std::unique_ptr<Query> m_query;
|
||||
|
||||
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
|
||||
// The TableView resulting from running the query. Will be detached if the
|
||||
// Query has not yet been run, in which case m_query or m_query_handover will
|
||||
// be non-null
|
||||
TableView m_tv;
|
||||
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
|
||||
SharedGroup::VersionID m_version;
|
||||
|
||||
const std::unique_ptr<AsyncQueryCallback> m_callback;
|
||||
struct Callback {
|
||||
std::function<void (std::exception_ptr)> fn;
|
||||
std::unique_ptr<SharedGroup::Handover<TableView>> handover;
|
||||
size_t token;
|
||||
bool first_run;
|
||||
};
|
||||
|
||||
// Currently registered callbacks and a mutex which must always be held
|
||||
// while doing anything with them
|
||||
std::mutex m_callback_mutex;
|
||||
std::vector<Callback> m_callbacks;
|
||||
|
||||
// Callbacks which the user has asked to have removed whose removal has been
|
||||
// deferred until after we're done looping over m_callbacks
|
||||
std::vector<size_t> m_callbacks_to_remove;
|
||||
|
||||
SharedGroup* m_sg = nullptr;
|
||||
|
||||
std::exception_ptr m_error;
|
||||
|
||||
bool m_did_update = false;
|
||||
bool m_initial_run_complete = false;
|
||||
bool m_calling_callbacks = false;
|
||||
bool m_error = false;
|
||||
|
||||
void do_remove_callback(size_t token) noexcept;
|
||||
|
||||
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
|
||||
};
|
||||
|
||||
} // namespace _impl
|
||||
|
@ -239,48 +239,25 @@ void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index)
|
||||
}
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken RealmCoordinator::register_query(const Results& r, std::unique_ptr<AsyncQueryCallback> target)
|
||||
void RealmCoordinator::register_query(std::shared_ptr<AsyncQuery> query)
|
||||
{
|
||||
return r.get_realm()->m_coordinator->do_register_query(r, std::move(target));
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken RealmCoordinator::do_register_query(const Results& r, std::unique_ptr<AsyncQueryCallback> target)
|
||||
{
|
||||
if (m_config.read_only) {
|
||||
throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms");
|
||||
}
|
||||
if (r.get_realm()->is_in_transaction()) {
|
||||
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
|
||||
}
|
||||
|
||||
auto handover = r.get_realm()->m_shared_group->export_for_handover(r.get_query(), ConstSourcePayload::Copy);
|
||||
auto version = handover->version;
|
||||
auto query = std::make_shared<AsyncQuery>(r.get_sort(),
|
||||
std::move(handover),
|
||||
std::move(target),
|
||||
*this);
|
||||
|
||||
auto version = query->version();
|
||||
auto& self = *query->get_realm().m_coordinator;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||
pin_version(version.version, version.index);
|
||||
m_new_queries.push_back(query);
|
||||
std::lock_guard<std::mutex> lock(self.m_query_mutex);
|
||||
self.pin_version(version.version, version.index);
|
||||
self.m_new_queries.push_back(std::move(query));
|
||||
}
|
||||
|
||||
// Wake up the background worker threads by pretending we made a commit
|
||||
m_notifier->notify_others();
|
||||
return query;
|
||||
self.m_notifier->notify_others();
|
||||
}
|
||||
|
||||
void RealmCoordinator::unregister_query(AsyncQuery& registration)
|
||||
{
|
||||
registration.parent->do_unregister_query(registration);
|
||||
}
|
||||
|
||||
void RealmCoordinator::do_unregister_query(AsyncQuery& registration)
|
||||
void RealmCoordinator::unregister_query(AsyncQuery& query)
|
||||
{
|
||||
auto swap_remove = [&](auto& container) {
|
||||
auto it = std::find_if(container.begin(), container.end(),
|
||||
[&](auto const& ptr) { return ptr.get() == ®istration; });
|
||||
[&](auto const& ptr) { return ptr.get() == &query; });
|
||||
if (it != container.end()) {
|
||||
std::iter_swap(--container.end(), it);
|
||||
container.pop_back();
|
||||
@ -289,18 +266,19 @@ void RealmCoordinator::do_unregister_query(AsyncQuery& registration)
|
||||
return false;
|
||||
};
|
||||
|
||||
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||
if (swap_remove(m_queries)) {
|
||||
auto& self = *query.get_realm().m_coordinator;
|
||||
std::lock_guard<std::mutex> lock(self.m_query_mutex);
|
||||
if (swap_remove(self.m_queries)) {
|
||||
// Make sure we aren't holding on to read versions needlessly if there
|
||||
// are no queries left, but don't close them entirely as opening shared
|
||||
// groups is expensive
|
||||
if (!m_running_queries && m_queries.empty() && m_query_sg) {
|
||||
m_query_sg->end_read();
|
||||
if (!self.m_running_queries && self.m_queries.empty() && self.m_query_sg) {
|
||||
self.m_query_sg->end_read();
|
||||
}
|
||||
}
|
||||
else if (swap_remove(m_new_queries)) {
|
||||
if (m_new_queries.empty() && m_advancer_sg) {
|
||||
m_advancer_sg->end_read();
|
||||
else if (swap_remove(self.m_new_queries)) {
|
||||
if (self.m_new_queries.empty() && self.m_advancer_sg) {
|
||||
self.m_advancer_sg->end_read();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -329,9 +307,6 @@ void RealmCoordinator::run_async_queries()
|
||||
|
||||
if (m_async_error) {
|
||||
move_new_queries_to_main();
|
||||
for (auto& query : m_queries) {
|
||||
query->set_error(m_async_error);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -347,14 +322,18 @@ void RealmCoordinator::run_async_queries()
|
||||
lock.unlock();
|
||||
|
||||
for (auto& query : queries_to_run) {
|
||||
query->prepare_update();
|
||||
query->run();
|
||||
}
|
||||
|
||||
// Reacquire the lock while updating the fields that are actually read on
|
||||
// other threads
|
||||
lock.lock();
|
||||
for (auto& query : queries_to_run) {
|
||||
query->prepare_handover();
|
||||
{
|
||||
// Make sure we don't change the version while another thread is delivering
|
||||
std::lock_guard<std::mutex> version_lock(m_query_version_mutex);
|
||||
lock.lock();
|
||||
for (auto& query : queries_to_run) {
|
||||
query->prepare_handover();
|
||||
}
|
||||
}
|
||||
|
||||
// Check if all queries were removed while we were running them, as if so
|
||||
@ -431,8 +410,9 @@ void RealmCoordinator::advance_helper_shared_group_to_latest()
|
||||
|
||||
void RealmCoordinator::advance_to_ready(Realm& realm)
|
||||
{
|
||||
std::vector<std::function<void()>> async_results;
|
||||
decltype(m_queries) queries;
|
||||
|
||||
std::lock_guard<std::mutex> lock(m_query_version_mutex);
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||
|
||||
@ -455,29 +435,24 @@ void RealmCoordinator::advance_to_ready(Realm& realm)
|
||||
}
|
||||
|
||||
transaction::advance(*realm.m_shared_group, *realm.m_history, realm.m_binding_context.get(), version);
|
||||
|
||||
for (auto& query : m_queries) {
|
||||
query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results);
|
||||
}
|
||||
queries = m_queries;
|
||||
}
|
||||
|
||||
for (auto& results : async_results) {
|
||||
results();
|
||||
for (auto& query : queries) {
|
||||
query->deliver(*realm.m_shared_group, m_async_error);
|
||||
}
|
||||
}
|
||||
|
||||
void RealmCoordinator::process_available_async(Realm& realm)
|
||||
{
|
||||
std::vector<std::function<void()>> async_results;
|
||||
|
||||
decltype(m_queries) queries;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||
for (auto& query : m_queries) {
|
||||
query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results);
|
||||
}
|
||||
queries = m_queries;
|
||||
}
|
||||
|
||||
for (auto& results : async_results) {
|
||||
results();
|
||||
std::lock_guard<std::mutex> lock(m_query_version_mutex);
|
||||
for (auto& query : queries) {
|
||||
query->deliver(*realm.m_shared_group, m_async_error);
|
||||
}
|
||||
}
|
||||
|
@ -80,8 +80,8 @@ public:
|
||||
// Update the schema in the cached config
|
||||
void update_schema(Schema const& new_schema);
|
||||
|
||||
static AsyncQueryCancelationToken register_query(const Results& r, std::unique_ptr<AsyncQueryCallback>);
|
||||
static void unregister_query(AsyncQuery& registration);
|
||||
static void register_query(std::shared_ptr<AsyncQuery> query);
|
||||
static void unregister_query(AsyncQuery& query);
|
||||
|
||||
// Advance the Realm to the most recent transaction version which all async
|
||||
// work is complete for
|
||||
@ -95,6 +95,7 @@ private:
|
||||
std::vector<CachedRealm> m_cached_realms;
|
||||
|
||||
std::mutex m_query_mutex;
|
||||
std::mutex m_query_version_mutex;
|
||||
bool m_running_queries = false;
|
||||
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
|
||||
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
|
||||
@ -113,9 +114,6 @@ private:
|
||||
|
||||
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
|
||||
|
||||
AsyncQueryCancelationToken do_register_query(const Results& r, std::unique_ptr<AsyncQueryCallback>);
|
||||
void do_unregister_query(AsyncQuery& registration);
|
||||
|
||||
// must be called with m_query_mutex locked
|
||||
void pin_version(uint_fast64_t version, uint_fast32_t index);
|
||||
|
||||
|
@ -18,8 +18,9 @@
|
||||
|
||||
#include "results.hpp"
|
||||
|
||||
#include "object_store.hpp"
|
||||
#include "impl/async_query.hpp"
|
||||
#include "impl/realm_coordinator.hpp"
|
||||
#include "object_store.hpp"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
@ -55,6 +56,13 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Table& table)
|
||||
{
|
||||
}
|
||||
|
||||
Results::~Results()
|
||||
{
|
||||
if (m_background_query) {
|
||||
m_background_query->unregister();
|
||||
}
|
||||
}
|
||||
|
||||
void Results::validate_read() const
|
||||
{
|
||||
if (m_realm)
|
||||
@ -344,6 +352,29 @@ Results Results::filter(Query&& q) const
|
||||
return Results(m_realm, get_object_schema(), get_query().and_query(std::move(q)), get_sort());
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken Results::async(std::function<void (std::exception_ptr)> target)
|
||||
{
|
||||
if (m_realm->config().read_only) {
|
||||
throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms");
|
||||
}
|
||||
if (m_realm->is_in_transaction()) {
|
||||
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
|
||||
}
|
||||
|
||||
if (!m_background_query) {
|
||||
m_background_query = std::make_shared<_impl::AsyncQuery>(*this);
|
||||
_impl::RealmCoordinator::register_query(m_background_query);
|
||||
}
|
||||
return {m_background_query, m_background_query->add_callback(std::move(target))};
|
||||
}
|
||||
|
||||
void Results::AsyncFriend::set_table_view(Results& results, realm::TableView &&tv)
|
||||
{
|
||||
results.m_table_view = std::move(tv);
|
||||
results.m_mode = Mode::TableView;
|
||||
REALM_ASSERT(results.m_table_view.is_in_sync());
|
||||
}
|
||||
|
||||
Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t column, const Table* table)
|
||||
: std::runtime_error((std::string)"Operation not supported on '" + table->get_column_name(column).data() + "' columns")
|
||||
, column_index(column)
|
||||
@ -352,30 +383,37 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c
|
||||
{
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken Results::async(std::unique_ptr<AsyncQueryCallback> target)
|
||||
AsyncQueryCancelationToken::AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token)
|
||||
: m_query(std::move(query)), m_token(token)
|
||||
{
|
||||
return _impl::RealmCoordinator::register_query(*this, std::move(target));
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken::~AsyncQueryCancelationToken()
|
||||
{
|
||||
if (m_registration) {
|
||||
_impl::RealmCoordinator::unregister_query(*m_registration);
|
||||
// m_query itself (and not just the pointed-to thing) needs to be accessed
|
||||
// atomically to ensure that there are no data races when the token is
|
||||
// destroyed after being modified on a different thread.
|
||||
// This is needed despite the token not being thread-safe in general as
|
||||
// users find it very surpringing for obj-c objects to care about what
|
||||
// thread they are deallocated on.
|
||||
if (auto query = std::atomic_load(&m_query)) {
|
||||
query->remove_callback(m_token);
|
||||
}
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken::AsyncQueryCancelationToken(AsyncQueryCancelationToken&& rgt)
|
||||
: m_registration(std::move(rgt.m_registration))
|
||||
: m_query(std::atomic_exchange(&rgt.m_query, {})), m_token(rgt.m_token)
|
||||
{
|
||||
}
|
||||
|
||||
AsyncQueryCancelationToken& AsyncQueryCancelationToken::operator=(realm::AsyncQueryCancelationToken&& rgt)
|
||||
{
|
||||
if (this != &rgt) {
|
||||
if (m_registration) {
|
||||
_impl::RealmCoordinator::unregister_query(*m_registration);
|
||||
if (auto query = std::atomic_load(&m_query)) {
|
||||
query->remove_callback(m_token);
|
||||
}
|
||||
m_registration = std::move(rgt.m_registration);
|
||||
std::atomic_store(&m_query, std::atomic_exchange(&rgt.m_query, {}));
|
||||
m_token = rgt.m_token;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ namespace _impl {
|
||||
// A token which keeps an asynchronous query alive
|
||||
struct AsyncQueryCancelationToken {
|
||||
AsyncQueryCancelationToken() = default;
|
||||
AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> registration) : m_registration(std::move(registration)) { }
|
||||
AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token);
|
||||
~AsyncQueryCancelationToken();
|
||||
|
||||
AsyncQueryCancelationToken(AsyncQueryCancelationToken&&);
|
||||
@ -49,25 +49,8 @@ struct AsyncQueryCancelationToken {
|
||||
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken const&) = delete;
|
||||
|
||||
private:
|
||||
std::shared_ptr<_impl::AsyncQuery> m_registration;
|
||||
};
|
||||
|
||||
// Subclass to get notifications about async query things
|
||||
class AsyncQueryCallback {
|
||||
public:
|
||||
virtual ~AsyncQueryCallback() = default;
|
||||
|
||||
// Called with the Results object generated by the query on a thread where
|
||||
// is_for_current_thread() returned true
|
||||
virtual void deliver(Results) = 0;
|
||||
|
||||
// If an error occured while running the query on the worker thread, this is
|
||||
// called with an exception on a thread where is_for_current_thread()
|
||||
// returned true
|
||||
virtual void error(std::exception_ptr) = 0;
|
||||
|
||||
// Return whether or not this query is associated with the current thread
|
||||
virtual bool is_for_current_thread() { return true; }
|
||||
std::shared_ptr<_impl::AsyncQuery> m_query;
|
||||
size_t m_token;
|
||||
};
|
||||
|
||||
struct SortOrder {
|
||||
@ -88,6 +71,7 @@ public:
|
||||
Results() = default;
|
||||
Results(SharedRealm r, const ObjectSchema& o, Table& table);
|
||||
Results(SharedRealm r, const ObjectSchema& o, Query q, SortOrder s = {});
|
||||
~Results();
|
||||
|
||||
// Results is copyable and moveable
|
||||
Results(Results const&) = default;
|
||||
@ -210,7 +194,14 @@ public:
|
||||
// Create an async query from this Results
|
||||
// The query will be run on a background thread and delivered to the callback,
|
||||
// and then rerun after each commit (if needed) and redelivered if it changed
|
||||
AsyncQueryCancelationToken async(std::unique_ptr<AsyncQueryCallback> target);
|
||||
AsyncQueryCancelationToken async(std::function<void (std::exception_ptr)> target);
|
||||
|
||||
// Helper type to let AsyncQuery update the tableview without giving access
|
||||
// to any other privates or letting anyone else do so
|
||||
class AsyncFriend {
|
||||
friend class _impl::AsyncQuery;
|
||||
static void set_table_view(Results& results, TableView&& tv);
|
||||
};
|
||||
|
||||
private:
|
||||
SharedRealm m_realm;
|
||||
@ -221,6 +212,8 @@ private:
|
||||
SortOrder m_sort;
|
||||
bool m_live = true;
|
||||
|
||||
std::shared_ptr<_impl::AsyncQuery> m_background_query;
|
||||
|
||||
Mode m_mode = Mode::Empty;
|
||||
|
||||
void validate_read() const;
|
||||
@ -230,6 +223,8 @@ private:
|
||||
util::Optional<Mixed> aggregate(size_t column, bool return_none_for_empty,
|
||||
Int agg_int, Float agg_float,
|
||||
Double agg_double, DateTime agg_datetime);
|
||||
|
||||
void set_table_view(TableView&& tv);
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,7 @@ namespace realm {
|
||||
typedef std::weak_ptr<Realm> WeakRealm;
|
||||
|
||||
namespace _impl {
|
||||
class ExternalCommitHelper;
|
||||
class AsyncQuery;
|
||||
class RealmCoordinator;
|
||||
}
|
||||
|
||||
@ -136,6 +136,7 @@ namespace realm {
|
||||
// FIXME private
|
||||
Group *read_group();
|
||||
|
||||
friend class _impl::AsyncQuery;
|
||||
friend class _impl::RealmCoordinator;
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user