Add support for running queries asynchronously
This commit is contained in:
parent
565e39a287
commit
d165458601
|
@ -6,6 +6,7 @@ set(SOURCES
|
||||||
results.cpp
|
results.cpp
|
||||||
schema.cpp
|
schema.cpp
|
||||||
shared_realm.cpp
|
shared_realm.cpp
|
||||||
|
impl/async_query.cpp
|
||||||
impl/realm_coordinator.cpp
|
impl/realm_coordinator.cpp
|
||||||
impl/transact_log_handler.cpp
|
impl/transact_log_handler.cpp
|
||||||
parser/parser.cpp
|
parser/parser.cpp
|
||||||
|
|
|
@ -36,7 +36,7 @@ public:
|
||||||
CachedRealm(const CachedRealm&) = delete;
|
CachedRealm(const CachedRealm&) = delete;
|
||||||
CachedRealm& operator=(const CachedRealm&) = delete;
|
CachedRealm& operator=(const CachedRealm&) = delete;
|
||||||
|
|
||||||
// Asyncronously call notify() on the Realm on the appropriate thread
|
// Asynchronously call notify() on the Realm on the appropriate thread
|
||||||
void notify();
|
void notify();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2015 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include "impl/async_query.hpp"
|
||||||
|
|
||||||
|
#include "impl/realm_coordinator.hpp"
|
||||||
|
|
||||||
|
using namespace realm;
|
||||||
|
using namespace realm::_impl;
|
||||||
|
|
||||||
|
AsyncQuery::AsyncQuery(SortOrder sort,
|
||||||
|
std::unique_ptr<SharedGroup::Handover<Query>> handover,
|
||||||
|
std::unique_ptr<AsyncQueryCallback> callback,
|
||||||
|
RealmCoordinator& parent)
|
||||||
|
: parent(parent.shared_from_this())
|
||||||
|
, m_sort(std::move(sort))
|
||||||
|
, m_query_handover(std::move(handover))
|
||||||
|
, m_callback(std::move(callback))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::get_results(const SharedRealm& realm, SharedGroup& sg, std::vector<std::function<void()>>& ret)
|
||||||
|
{
|
||||||
|
if (!m_callback->is_for_current_thread()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_error) {
|
||||||
|
ret.emplace_back([self = shared_from_this()] {
|
||||||
|
self->m_callback->error(self->m_error);
|
||||||
|
RealmCoordinator::unregister_query(*self);
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!m_tv_handover) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (m_tv_handover->version < sg.get_version_of_current_transaction()) {
|
||||||
|
// async results are stale; ignore
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// auto r = Results(realm,
|
||||||
|
// m_sort,
|
||||||
|
// std::move(*sg.import_from_handover(std::move(m_tv_handover))));
|
||||||
|
Results r;
|
||||||
|
auto version = sg.get_version_of_current_transaction();
|
||||||
|
ret.emplace_back([r = std::move(r), version, &sg, self = shared_from_this()] {
|
||||||
|
if (sg.get_version_of_current_transaction() == version) {
|
||||||
|
self->m_callback->deliver(std::move(r));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::prepare_update()
|
||||||
|
{
|
||||||
|
// This function must not touch m_tv_handover as it is called without the
|
||||||
|
// relevant lock held (so that another thread can consume m_tv_handover
|
||||||
|
// while this is running)
|
||||||
|
|
||||||
|
REALM_ASSERT(m_sg);
|
||||||
|
|
||||||
|
if (m_tv.is_attached()) {
|
||||||
|
m_did_update = m_tv.sync_if_needed();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
m_tv = m_query->find_all();
|
||||||
|
if (m_sort) {
|
||||||
|
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
|
||||||
|
}
|
||||||
|
m_did_update = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::prepare_handover()
|
||||||
|
{
|
||||||
|
// Even if the TV didn't change, we need to re-export it if the previous
|
||||||
|
// export has not been consumed yet, as the old handover object is no longer
|
||||||
|
// usable due to the version not matching
|
||||||
|
if (m_did_update || (m_tv_handover && m_tv_handover->version != m_sg->get_version_of_current_transaction())) {
|
||||||
|
m_tv_handover = m_sg->export_for_handover(m_tv, ConstSourcePayload::Copy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::set_error(std::exception_ptr err)
|
||||||
|
{
|
||||||
|
if (!m_error) {
|
||||||
|
m_error = err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedGroup::VersionID AsyncQuery::version() const noexcept
|
||||||
|
{
|
||||||
|
if (m_tv_handover)
|
||||||
|
return m_tv_handover->version;
|
||||||
|
if (m_query_handover)
|
||||||
|
return m_query_handover->version;
|
||||||
|
return SharedGroup::VersionID{};
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::attach_to(realm::SharedGroup& sg)
|
||||||
|
{
|
||||||
|
REALM_ASSERT(!m_sg);
|
||||||
|
|
||||||
|
m_query = sg.import_from_handover(std::move(m_query_handover));
|
||||||
|
m_sg = &sg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncQuery::detatch()
|
||||||
|
{
|
||||||
|
REALM_ASSERT(m_sg);
|
||||||
|
|
||||||
|
m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move);
|
||||||
|
m_sg = nullptr;
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2015 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#ifndef REALM_ASYNC_QUERY_HPP
|
||||||
|
#define REALM_ASYNC_QUERY_HPP
|
||||||
|
|
||||||
|
#include "results.hpp"
|
||||||
|
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
|
namespace realm {
|
||||||
|
namespace _impl {
|
||||||
|
class AsyncQuery : public std::enable_shared_from_this<AsyncQuery> {
|
||||||
|
public:
|
||||||
|
AsyncQuery(SortOrder sort,
|
||||||
|
std::unique_ptr<SharedGroup::Handover<Query>> handover,
|
||||||
|
std::unique_ptr<AsyncQueryCallback> callback,
|
||||||
|
RealmCoordinator& parent);
|
||||||
|
|
||||||
|
void get_results(const SharedRealm& realm, SharedGroup& sg, std::vector<std::function<void()>>& ret);
|
||||||
|
|
||||||
|
void set_error(std::exception_ptr err);
|
||||||
|
|
||||||
|
// Run/rerun the query if needed
|
||||||
|
void prepare_update();
|
||||||
|
// Update the handover object with the new data produced in prepare_update()
|
||||||
|
void prepare_handover();
|
||||||
|
|
||||||
|
// Get the version of the current handover object
|
||||||
|
SharedGroup::VersionID version() const noexcept;
|
||||||
|
|
||||||
|
void attach_to(SharedGroup& sg);
|
||||||
|
void detatch();
|
||||||
|
|
||||||
|
std::shared_ptr<RealmCoordinator> parent;
|
||||||
|
|
||||||
|
private:
|
||||||
|
const SortOrder m_sort;
|
||||||
|
|
||||||
|
std::unique_ptr<SharedGroup::Handover<Query>> m_query_handover;
|
||||||
|
std::unique_ptr<Query> m_query;
|
||||||
|
|
||||||
|
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
|
||||||
|
TableView m_tv;
|
||||||
|
|
||||||
|
const std::unique_ptr<AsyncQueryCallback> m_callback;
|
||||||
|
|
||||||
|
SharedGroup* m_sg = nullptr;
|
||||||
|
|
||||||
|
std::exception_ptr m_error;
|
||||||
|
|
||||||
|
bool m_did_update = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace _impl
|
||||||
|
} // namespace realm
|
||||||
|
|
||||||
|
#endif /* REALM_ASYNC_QUERY_HPP */
|
|
@ -18,11 +18,20 @@
|
||||||
|
|
||||||
#include "impl/realm_coordinator.hpp"
|
#include "impl/realm_coordinator.hpp"
|
||||||
|
|
||||||
|
#include "impl/async_query.hpp"
|
||||||
#include "impl/cached_realm.hpp"
|
#include "impl/cached_realm.hpp"
|
||||||
#include "impl/external_commit_helper.hpp"
|
#include "impl/external_commit_helper.hpp"
|
||||||
|
#include "impl/transact_log_handler.hpp"
|
||||||
#include "object_store.hpp"
|
#include "object_store.hpp"
|
||||||
#include "schema.hpp"
|
#include "schema.hpp"
|
||||||
|
|
||||||
|
#include <realm/commit_log.hpp>
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
#include <realm/lang_bind_helper.hpp>
|
||||||
|
#include <realm/query.hpp>
|
||||||
|
#include <realm/table_view.hpp>
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
using namespace realm;
|
using namespace realm;
|
||||||
|
@ -58,7 +67,12 @@ std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
|
||||||
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) {
|
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_cached_realms.empty())) {
|
||||||
m_config = config;
|
m_config = config;
|
||||||
if (!config.read_only && !m_notifier) {
|
if (!config.read_only && !m_notifier) {
|
||||||
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
|
try {
|
||||||
|
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
|
||||||
|
}
|
||||||
|
catch (std::system_error const& ex) {
|
||||||
|
throw RealmFileException(RealmFileException::Kind::AccessError, config.path, ex.code().message());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -103,6 +117,11 @@ std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
|
||||||
return realm;
|
return realm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<Realm> RealmCoordinator::get_realm()
|
||||||
|
{
|
||||||
|
return get_realm(m_config);
|
||||||
|
}
|
||||||
|
|
||||||
const Schema* RealmCoordinator::get_schema() const noexcept
|
const Schema* RealmCoordinator::get_schema() const noexcept
|
||||||
{
|
{
|
||||||
return m_cached_realms.empty() ? nullptr : m_config.schema.get();
|
return m_cached_realms.empty() ? nullptr : m_config.schema.get();
|
||||||
|
@ -186,10 +205,279 @@ void RealmCoordinator::send_commit_notifications()
|
||||||
m_notifier->notify_others();
|
m_notifier->notify_others();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index)
|
||||||
|
{
|
||||||
|
if (m_async_error) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SharedGroup::VersionID versionid(version, index);
|
||||||
|
if (!m_advancer_sg) {
|
||||||
|
try {
|
||||||
|
// Use a temporary Realm instance to open the shared group to reuse
|
||||||
|
// the error handling there
|
||||||
|
Realm tmp(m_config);
|
||||||
|
m_advancer_history = std::move(tmp.m_history);
|
||||||
|
m_advancer_sg = std::move(tmp.m_shared_group);
|
||||||
|
m_advancer_sg->begin_read(versionid);
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
m_async_error = std::current_exception();
|
||||||
|
m_advancer_sg = nullptr;
|
||||||
|
m_advancer_history = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (m_new_queries.empty()) {
|
||||||
|
// If this is the first query then we don't already have a read transaction
|
||||||
|
m_advancer_sg->begin_read(versionid);
|
||||||
|
}
|
||||||
|
else if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
|
||||||
|
// Ensure we're holding a readlock on the oldest version we have a
|
||||||
|
// handover object for, as handover objects don't
|
||||||
|
m_advancer_sg->end_read();
|
||||||
|
m_advancer_sg->begin_read(versionid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken RealmCoordinator::register_query(const Results& r, std::unique_ptr<AsyncQueryCallback> target)
|
||||||
|
{
|
||||||
|
return r.get_realm()->m_coordinator->do_register_query(r, std::move(target));
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken RealmCoordinator::do_register_query(const Results& r, std::unique_ptr<AsyncQueryCallback> target)
|
||||||
|
{
|
||||||
|
if (m_config.read_only) {
|
||||||
|
throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms");
|
||||||
|
}
|
||||||
|
if (r.get_realm()->is_in_transaction()) {
|
||||||
|
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto handover = r.get_realm()->m_shared_group->export_for_handover(r.get_query(), ConstSourcePayload::Copy);
|
||||||
|
auto version = handover->version;
|
||||||
|
auto query = std::make_shared<AsyncQuery>(r.get_sort(),
|
||||||
|
std::move(handover),
|
||||||
|
std::move(target),
|
||||||
|
*this);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
pin_version(version.version, version.index);
|
||||||
|
m_new_queries.push_back(query);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wake up the background worker threads by pretending we made a commit
|
||||||
|
m_notifier->notify_others();
|
||||||
|
return query;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::unregister_query(AsyncQuery& registration)
|
||||||
|
{
|
||||||
|
registration.parent->do_unregister_query(registration);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::do_unregister_query(AsyncQuery& registration)
|
||||||
|
{
|
||||||
|
auto swap_remove = [&](auto& container) {
|
||||||
|
auto it = std::find_if(container.begin(), container.end(),
|
||||||
|
[&](auto const& ptr) { return ptr.get() == ®istration; });
|
||||||
|
if (it != container.end()) {
|
||||||
|
std::iter_swap(--container.end(), it);
|
||||||
|
container.pop_back();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
if (swap_remove(m_queries)) {
|
||||||
|
// Make sure we aren't holding on to read versions needlessly if there
|
||||||
|
// are no queries left, but don't close them entirely as opening shared
|
||||||
|
// groups is expensive
|
||||||
|
if (!m_running_queries && m_queries.empty() && m_query_sg) {
|
||||||
|
m_query_sg->end_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (swap_remove(m_new_queries)) {
|
||||||
|
if (m_new_queries.empty() && m_advancer_sg) {
|
||||||
|
m_advancer_sg->end_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RealmCoordinator::on_change()
|
void RealmCoordinator::on_change()
|
||||||
{
|
{
|
||||||
|
run_async_queries();
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
std::lock_guard<std::mutex> lock(m_realm_mutex);
|
||||||
for (auto& realm : m_cached_realms) {
|
for (auto& realm : m_cached_realms) {
|
||||||
realm.notify();
|
realm.notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::run_async_queries()
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(m_query_mutex);
|
||||||
|
|
||||||
|
if (m_queries.empty() && m_new_queries.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!m_async_error) {
|
||||||
|
open_helper_shared_group();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_async_error) {
|
||||||
|
move_new_queries_to_main();
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
query->set_error(m_async_error);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
advance_helper_shared_group_to_latest();
|
||||||
|
|
||||||
|
// Tell other threads not to close the shared group as we need it even
|
||||||
|
// though we aren't holding the lock
|
||||||
|
m_running_queries = true;
|
||||||
|
|
||||||
|
// Make a copy of the queries vector so that we can release the lock while
|
||||||
|
// we run the queries
|
||||||
|
auto queries_to_run = m_queries;
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
for (auto& query : queries_to_run) {
|
||||||
|
query->prepare_update();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reacquire the lock while updating the fields that are actually read on
|
||||||
|
// other threads
|
||||||
|
lock.lock();
|
||||||
|
for (auto& query : queries_to_run) {
|
||||||
|
query->prepare_handover();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if all queries were removed while we were running them, as if so
|
||||||
|
// the shared group didn't get closed by do_unregister_query()
|
||||||
|
m_running_queries = false;
|
||||||
|
if (m_queries.empty()) {
|
||||||
|
m_query_sg->end_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::open_helper_shared_group()
|
||||||
|
{
|
||||||
|
if (!m_query_sg) {
|
||||||
|
try {
|
||||||
|
Realm tmp(m_config);
|
||||||
|
m_query_history = std::move(tmp.m_history);
|
||||||
|
m_query_sg = std::move(tmp.m_shared_group);
|
||||||
|
m_query_sg->begin_read();
|
||||||
|
}
|
||||||
|
catch (...) {
|
||||||
|
// Store the error to be passed to the async queries
|
||||||
|
m_async_error = std::current_exception();
|
||||||
|
m_query_sg = nullptr;
|
||||||
|
m_query_history = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (m_queries.empty()) {
|
||||||
|
m_query_sg->begin_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::move_new_queries_to_main()
|
||||||
|
{
|
||||||
|
m_queries.reserve(m_queries.size() + m_new_queries.size());
|
||||||
|
std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries));
|
||||||
|
m_new_queries.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::advance_helper_shared_group_to_latest()
|
||||||
|
{
|
||||||
|
if (m_new_queries.empty()) {
|
||||||
|
LangBindHelper::advance_read(*m_query_sg, *m_query_history);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort newly added queries by their source version so that we can pull them
|
||||||
|
// all forward to the latest version in a single pass over the transaction log
|
||||||
|
std::sort(m_new_queries.begin(), m_new_queries.end(), [](auto const& lft, auto const& rgt) {
|
||||||
|
return lft->version() < rgt->version();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Import all newly added queries to our helper SG
|
||||||
|
for (auto& query : m_new_queries) {
|
||||||
|
LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history, query->version());
|
||||||
|
query->attach_to(*m_advancer_sg);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance both SGs to the newest version
|
||||||
|
LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history);
|
||||||
|
LangBindHelper::advance_read(*m_query_sg, *m_query_history,
|
||||||
|
m_advancer_sg->get_version_of_current_transaction());
|
||||||
|
|
||||||
|
// Transfer all new queries over to the main SG
|
||||||
|
for (auto& query : m_new_queries) {
|
||||||
|
query->detatch();
|
||||||
|
query->attach_to(*m_query_sg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!m_new_queries.empty()) {
|
||||||
|
move_new_queries_to_main();
|
||||||
|
m_advancer_sg->end_read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::advance_to_ready(Realm& realm)
|
||||||
|
{
|
||||||
|
std::vector<std::function<void()>> async_results;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
|
||||||
|
SharedGroup::VersionID version;
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
version = query->version();
|
||||||
|
if (version != SharedGroup::VersionID()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// no untargeted async queries; just advance to latest
|
||||||
|
if (version.version == 0) {
|
||||||
|
transaction::advance(*realm.m_shared_group, *realm.m_history, realm.m_binding_context.get());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// async results are out of date; ignore
|
||||||
|
else if (version < realm.m_shared_group->get_version_of_current_transaction()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
transaction::advance(*realm.m_shared_group, *realm.m_history, realm.m_binding_context.get(), version);
|
||||||
|
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& results : async_results) {
|
||||||
|
results();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RealmCoordinator::process_available_async(Realm& realm)
|
||||||
|
{
|
||||||
|
std::vector<std::function<void()>> async_results;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(m_query_mutex);
|
||||||
|
for (auto& query : m_queries) {
|
||||||
|
query->get_results(realm.shared_from_this(), *realm.m_shared_group, async_results);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto& results : async_results) {
|
||||||
|
results();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -24,9 +24,15 @@
|
||||||
#include <realm/string_data.hpp>
|
#include <realm/string_data.hpp>
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
|
class AsyncQueryCallback;
|
||||||
|
class ClientHistory;
|
||||||
|
class Results;
|
||||||
class Schema;
|
class Schema;
|
||||||
|
class SharedGroup;
|
||||||
|
struct AsyncQueryCancelationToken;
|
||||||
|
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
|
class AsyncQuery;
|
||||||
class CachedRealm;
|
class CachedRealm;
|
||||||
class ExternalCommitHelper;
|
class ExternalCommitHelper;
|
||||||
|
|
||||||
|
@ -43,6 +49,7 @@ public:
|
||||||
// If the Realm is already open on another thread, validates that the given
|
// If the Realm is already open on another thread, validates that the given
|
||||||
// configuration is compatible with the existing one
|
// configuration is compatible with the existing one
|
||||||
std::shared_ptr<Realm> get_realm(Realm::Config config);
|
std::shared_ptr<Realm> get_realm(Realm::Config config);
|
||||||
|
std::shared_ptr<Realm> get_realm();
|
||||||
|
|
||||||
const Schema* get_schema() const noexcept;
|
const Schema* get_schema() const noexcept;
|
||||||
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
|
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
|
||||||
|
@ -50,7 +57,7 @@ public:
|
||||||
const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
|
const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
|
||||||
bool is_in_memory() const noexcept { return m_config.in_memory; }
|
bool is_in_memory() const noexcept { return m_config.in_memory; }
|
||||||
|
|
||||||
// Asyncronously call notify() on every Realm instance for this coordinator's
|
// Asynchronously call notify() on every Realm instance for this coordinator's
|
||||||
// path, including those in other processes
|
// path, including those in other processes
|
||||||
void send_commit_notifications();
|
void send_commit_notifications();
|
||||||
|
|
||||||
|
@ -73,13 +80,49 @@ public:
|
||||||
// Update the schema in the cached config
|
// Update the schema in the cached config
|
||||||
void update_schema(Schema const& new_schema);
|
void update_schema(Schema const& new_schema);
|
||||||
|
|
||||||
|
static AsyncQueryCancelationToken register_query(const Results& r, std::unique_ptr<AsyncQueryCallback>);
|
||||||
|
static void unregister_query(AsyncQuery& registration);
|
||||||
|
|
||||||
|
// Advance the Realm to the most recent transaction version which all async
|
||||||
|
// work is complete for
|
||||||
|
void advance_to_ready(Realm& realm);
|
||||||
|
void process_available_async(Realm& realm);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Realm::Config m_config;
|
Realm::Config m_config;
|
||||||
|
|
||||||
std::mutex m_realm_mutex;
|
std::mutex m_realm_mutex;
|
||||||
std::vector<CachedRealm> m_cached_realms;
|
std::vector<CachedRealm> m_cached_realms;
|
||||||
|
|
||||||
|
std::mutex m_query_mutex;
|
||||||
|
bool m_running_queries = false;
|
||||||
|
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
|
||||||
|
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
|
||||||
|
|
||||||
|
// SharedGroup used for actually running async queries
|
||||||
|
// Will have a read transaction iff m_queries is non-empty
|
||||||
|
std::unique_ptr<ClientHistory> m_query_history;
|
||||||
|
std::unique_ptr<SharedGroup> m_query_sg;
|
||||||
|
|
||||||
|
// SharedGroup used to advance queries in m_new_queries to the main shared
|
||||||
|
// group's transaction version
|
||||||
|
// Will have a read transaction iff m_new_queries is non-empty
|
||||||
|
std::unique_ptr<ClientHistory> m_advancer_history;
|
||||||
|
std::unique_ptr<SharedGroup> m_advancer_sg;
|
||||||
|
std::exception_ptr m_async_error;
|
||||||
|
|
||||||
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
|
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken do_register_query(const Results& r, std::unique_ptr<AsyncQueryCallback>);
|
||||||
|
void do_unregister_query(AsyncQuery& registration);
|
||||||
|
|
||||||
|
// must be called with m_query_mutex locked
|
||||||
|
void pin_version(uint_fast64_t version, uint_fast32_t index);
|
||||||
|
|
||||||
|
void run_async_queries();
|
||||||
|
void open_helper_shared_group();
|
||||||
|
void move_new_queries_to_main();
|
||||||
|
void advance_helper_shared_group_to_latest();
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace _impl
|
} // namespace _impl
|
||||||
|
|
|
@ -432,7 +432,8 @@ public:
|
||||||
namespace realm {
|
namespace realm {
|
||||||
namespace _impl {
|
namespace _impl {
|
||||||
namespace transaction {
|
namespace transaction {
|
||||||
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context)
|
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context,
|
||||||
|
SharedGroup::VersionID version)
|
||||||
{
|
{
|
||||||
TransactLogObserver(context, sg, [&](auto&&... args) {
|
TransactLogObserver(context, sg, [&](auto&&... args) {
|
||||||
LangBindHelper::advance_read(sg, history, std::move(args)...);
|
LangBindHelper::advance_read(sg, history, std::move(args)...);
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
#ifndef REALM_TRANSACT_LOG_HANDLER_HPP
|
#ifndef REALM_TRANSACT_LOG_HANDLER_HPP
|
||||||
#define REALM_TRANSACT_LOG_HANDLER_HPP
|
#define REALM_TRANSACT_LOG_HANDLER_HPP
|
||||||
|
|
||||||
|
#include <realm/group_shared.hpp>
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
class BindingContext;
|
class BindingContext;
|
||||||
class SharedGroup;
|
class SharedGroup;
|
||||||
|
@ -28,7 +30,8 @@ namespace _impl {
|
||||||
namespace transaction {
|
namespace transaction {
|
||||||
// Advance the read transaction version, with change notifications sent to delegate
|
// Advance the read transaction version, with change notifications sent to delegate
|
||||||
// Must not be called from within a write transaction.
|
// Must not be called from within a write transaction.
|
||||||
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context);
|
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context,
|
||||||
|
SharedGroup::VersionID version=SharedGroup::VersionID{});
|
||||||
|
|
||||||
// Begin a write transaction
|
// Begin a write transaction
|
||||||
// If the read transaction version is not up to date, will first advance to the
|
// If the read transaction version is not up to date, will first advance to the
|
||||||
|
|
|
@ -5,10 +5,13 @@
|
||||||
#ifndef REALM_OBJECT_ACCESSOR_HPP
|
#ifndef REALM_OBJECT_ACCESSOR_HPP
|
||||||
#define REALM_OBJECT_ACCESSOR_HPP
|
#define REALM_OBJECT_ACCESSOR_HPP
|
||||||
|
|
||||||
#include <string>
|
|
||||||
#include "shared_realm.hpp"
|
|
||||||
#include "schema.hpp"
|
|
||||||
#include "list.hpp"
|
#include "list.hpp"
|
||||||
|
#include "object_schema.hpp"
|
||||||
|
#include "object_store.hpp"
|
||||||
|
#include "schema.hpp"
|
||||||
|
#include "shared_realm.hpp"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "results.hpp"
|
#include "results.hpp"
|
||||||
|
|
||||||
#include "object_store.hpp"
|
#include "object_store.hpp"
|
||||||
|
#include "impl/realm_coordinator.hpp"
|
||||||
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
|
||||||
|
@ -60,6 +61,8 @@ void Results::validate_read() const
|
||||||
m_realm->verify_thread();
|
m_realm->verify_thread();
|
||||||
if (m_table && !m_table->is_attached())
|
if (m_table && !m_table->is_attached())
|
||||||
throw InvalidatedException();
|
throw InvalidatedException();
|
||||||
|
if (m_mode == Mode::TableView && !m_table_view.is_attached())
|
||||||
|
throw InvalidatedException();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Results::validate_write() const
|
void Results::validate_write() const
|
||||||
|
@ -94,6 +97,11 @@ size_t Results::size()
|
||||||
REALM_UNREACHABLE();
|
REALM_UNREACHABLE();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StringData Results::get_object_type() const noexcept
|
||||||
|
{
|
||||||
|
return get_object_schema().name;
|
||||||
|
}
|
||||||
|
|
||||||
RowExpr Results::get(size_t row_ndx)
|
RowExpr Results::get(size_t row_ndx)
|
||||||
{
|
{
|
||||||
validate_read();
|
validate_read();
|
||||||
|
@ -301,8 +309,9 @@ Query Results::get_query() const
|
||||||
switch (m_mode) {
|
switch (m_mode) {
|
||||||
case Mode::Empty:
|
case Mode::Empty:
|
||||||
case Mode::Query:
|
case Mode::Query:
|
||||||
case Mode::TableView:
|
|
||||||
return m_query;
|
return m_query;
|
||||||
|
case Mode::TableView:
|
||||||
|
return m_table_view.get_query();
|
||||||
case Mode::Table:
|
case Mode::Table:
|
||||||
return m_table->where();
|
return m_table->where();
|
||||||
}
|
}
|
||||||
|
@ -342,3 +351,31 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c
|
||||||
, column_type(table->get_column_type(column))
|
, column_type(table->get_column_type(column))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken Results::async(std::unique_ptr<AsyncQueryCallback> target)
|
||||||
|
{
|
||||||
|
return _impl::RealmCoordinator::register_query(*this, std::move(target));
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken::~AsyncQueryCancelationToken()
|
||||||
|
{
|
||||||
|
if (m_registration) {
|
||||||
|
_impl::RealmCoordinator::unregister_query(*m_registration);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken::AsyncQueryCancelationToken(AsyncQueryCancelationToken&& rgt)
|
||||||
|
: m_registration(std::move(rgt.m_registration))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken& AsyncQueryCancelationToken::operator=(realm::AsyncQueryCancelationToken&& rgt)
|
||||||
|
{
|
||||||
|
if (this != &rgt) {
|
||||||
|
if (m_registration) {
|
||||||
|
_impl::RealmCoordinator::unregister_query(*m_registration);
|
||||||
|
}
|
||||||
|
m_registration = std::move(rgt.m_registration);
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
|
@ -29,6 +29,46 @@ namespace realm {
|
||||||
template<typename T> class BasicRowExpr;
|
template<typename T> class BasicRowExpr;
|
||||||
using RowExpr = BasicRowExpr<Table>;
|
using RowExpr = BasicRowExpr<Table>;
|
||||||
class Mixed;
|
class Mixed;
|
||||||
|
class Results;
|
||||||
|
class ObjectSchema;
|
||||||
|
|
||||||
|
namespace _impl {
|
||||||
|
class AsyncQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
// A token which keeps an asynchronous query alive
|
||||||
|
struct AsyncQueryCancelationToken {
|
||||||
|
AsyncQueryCancelationToken() = default;
|
||||||
|
AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> registration) : m_registration(std::move(registration)) { }
|
||||||
|
~AsyncQueryCancelationToken();
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken(AsyncQueryCancelationToken&&);
|
||||||
|
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken&&);
|
||||||
|
|
||||||
|
AsyncQueryCancelationToken(AsyncQueryCancelationToken const&) = delete;
|
||||||
|
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken const&) = delete;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<_impl::AsyncQuery> m_registration;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Subclass to get notifications about async query things
|
||||||
|
class AsyncQueryCallback {
|
||||||
|
public:
|
||||||
|
virtual ~AsyncQueryCallback() = default;
|
||||||
|
|
||||||
|
// Called with the Results object generated by the query on a thread where
|
||||||
|
// is_for_current_thread() returned true
|
||||||
|
virtual void deliver(Results) = 0;
|
||||||
|
|
||||||
|
// If an error occured while running the query on the worker thread, this is
|
||||||
|
// called with an exception on a thread where is_for_current_thread()
|
||||||
|
// returned true
|
||||||
|
virtual void error(std::exception_ptr) = 0;
|
||||||
|
|
||||||
|
// Return whether or not this query is associated with the current thread
|
||||||
|
virtual bool is_for_current_thread() { return true; }
|
||||||
|
};
|
||||||
|
|
||||||
struct SortOrder {
|
struct SortOrder {
|
||||||
std::vector<size_t> columnIndices;
|
std::vector<size_t> columnIndices;
|
||||||
|
@ -72,7 +112,7 @@ public:
|
||||||
TableView get_tableview();
|
TableView get_tableview();
|
||||||
|
|
||||||
// Get the object type which will be returned by get()
|
// Get the object type which will be returned by get()
|
||||||
StringData get_object_type() const noexcept { return get_object_schema().name; }
|
StringData get_object_type() const noexcept;
|
||||||
|
|
||||||
// Set whether the TableView should sync if needed before accessing results
|
// Set whether the TableView should sync if needed before accessing results
|
||||||
void set_live(bool live);
|
void set_live(bool live);
|
||||||
|
@ -165,6 +205,13 @@ public:
|
||||||
UnsupportedColumnTypeException(size_t column, const Table* table);
|
UnsupportedColumnTypeException(size_t column, const Table* table);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void update_tableview();
|
||||||
|
|
||||||
|
// Create an async query from this Results
|
||||||
|
// The query will be run on a background thread and delivered to the callback,
|
||||||
|
// and then rerun after each commit (if needed) and redelivered if it changed
|
||||||
|
AsyncQueryCancelationToken async(std::unique_ptr<AsyncQueryCallback> target);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SharedRealm m_realm;
|
SharedRealm m_realm;
|
||||||
const ObjectSchema *m_object_schema;
|
const ObjectSchema *m_object_schema;
|
||||||
|
@ -179,8 +226,6 @@ private:
|
||||||
void validate_read() const;
|
void validate_read() const;
|
||||||
void validate_write() const;
|
void validate_write() const;
|
||||||
|
|
||||||
void update_tableview();
|
|
||||||
|
|
||||||
template<typename Int, typename Float, typename Double, typename DateTime>
|
template<typename Int, typename Float, typename Double, typename DateTime>
|
||||||
util::Optional<Mixed> aggregate(size_t column, bool return_none_for_empty,
|
util::Optional<Mixed> aggregate(size_t column, bool return_none_for_empty,
|
||||||
Int agg_int, Float agg_float,
|
Int agg_int, Float agg_float,
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
#ifndef REALM_SCHEMA_HPP
|
#ifndef REALM_SCHEMA_HPP
|
||||||
#define REALM_SCHEMA_HPP
|
#define REALM_SCHEMA_HPP
|
||||||
|
|
||||||
|
#include "property.hpp"
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
|
|
@ -361,13 +361,16 @@ void Realm::notify()
|
||||||
}
|
}
|
||||||
if (m_auto_refresh) {
|
if (m_auto_refresh) {
|
||||||
if (m_group) {
|
if (m_group) {
|
||||||
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
|
m_coordinator->advance_to_ready(*this);
|
||||||
}
|
}
|
||||||
else if (m_binding_context) {
|
else if (m_binding_context) {
|
||||||
m_binding_context->did_change({}, {});
|
m_binding_context->did_change({}, {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
m_coordinator->process_available_async(*this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Realm::refresh()
|
bool Realm::refresh()
|
||||||
|
@ -387,6 +390,7 @@ bool Realm::refresh()
|
||||||
|
|
||||||
if (m_group) {
|
if (m_group) {
|
||||||
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
|
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
|
||||||
|
m_coordinator->process_available_async(*this);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// Create the read transaction
|
// Create the read transaction
|
||||||
|
|
|
@ -19,9 +19,10 @@
|
||||||
#ifndef REALM_REALM_HPP
|
#ifndef REALM_REALM_HPP
|
||||||
#define REALM_REALM_HPP
|
#define REALM_REALM_HPP
|
||||||
|
|
||||||
#include "object_store.hpp"
|
#include <realm/handover_defs.hpp>
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
@ -42,8 +43,7 @@ namespace realm {
|
||||||
class RealmCoordinator;
|
class RealmCoordinator;
|
||||||
}
|
}
|
||||||
|
|
||||||
class Realm : public std::enable_shared_from_this<Realm>
|
class Realm : public std::enable_shared_from_this<Realm> {
|
||||||
{
|
|
||||||
public:
|
public:
|
||||||
typedef std::function<void(SharedRealm old_realm, SharedRealm realm)> MigrationFunction;
|
typedef std::function<void(SharedRealm old_realm, SharedRealm realm)> MigrationFunction;
|
||||||
|
|
||||||
|
@ -135,6 +135,8 @@ namespace realm {
|
||||||
|
|
||||||
// FIXME private
|
// FIXME private
|
||||||
Group *read_group();
|
Group *read_group();
|
||||||
|
|
||||||
|
friend class _impl::RealmCoordinator;
|
||||||
};
|
};
|
||||||
|
|
||||||
class RealmFileException : public std::runtime_error {
|
class RealmFileException : public std::runtime_error {
|
||||||
|
|
Loading…
Reference in New Issue