Add fine-grained notifications for Results

This commit is contained in:
Thomas Goyne 2016-01-08 10:13:05 -08:00
parent 6609bcaed7
commit f4aaa7c9de
13 changed files with 803 additions and 87 deletions

View File

@ -311,3 +311,255 @@ void CollectionChangeIndices::verify()
REALM_ASSERT(!modifications.contains(index));
#endif
}
namespace {
struct RowInfo {
size_t shifted_row_index;
size_t prev_tv_index;
size_t tv_index;
};
void calculate_moves_unsorted(std::vector<RowInfo>& new_rows, CollectionChangeIndices& changeset,
std::function<bool (size_t)> row_did_change)
{
std::sort(begin(new_rows), end(new_rows), [](auto& lft, auto& rgt) {
return lft.tv_index < rgt.tv_index;
});
IndexSet::IndexInterator ins = changeset.insertions.begin(), del = changeset.deletions.begin();
int shift = 0;
for (auto& row : new_rows) {
while (del != changeset.deletions.end() && *del <= row.tv_index) {
++del;
++shift;
}
while (ins != changeset.insertions.end() && *ins <= row.tv_index) {
++ins;
--shift;
}
if (row.prev_tv_index == npos)
continue;
// For unsorted, non-LV queries a row can only move to an index before
// its original position due to a move_last_over
if (row.tv_index + shift != row.prev_tv_index) {
--shift;
changeset.moves.push_back({row.prev_tv_index, row.tv_index});
}
// FIXME: currently move implies modification, and so they're mutally exclusive
// this is correct for sorted, but for unsorted a row can move without actually changing
else if (row_did_change(row.shifted_row_index)) {
// FIXME: needlessly quadratic
if (!changeset.insertions.contains(row.tv_index))
changeset.modifications.add(row.tv_index);
}
}
// FIXME: this is required for merge(), but it would be nice if it wasn't
for (auto&& move : changeset.moves) {
changeset.insertions.add(move.to);
changeset.deletions.add(move.from);
}
}
using items = std::vector<std::pair<size_t, size_t>>;
struct Match {
size_t i, j, size;
};
Match find_longest_match(items const& a, items const& b,
size_t begin1, size_t end1, size_t begin2, size_t end2)
{
Match best = {begin1, begin2, 0};
std::vector<size_t> len_from_j;
len_from_j.resize(end2 - begin2, 0);
std::vector<size_t> len_from_j_prev = len_from_j;
for (size_t i = begin1; i < end1; ++i) {
std::fill(begin(len_from_j), end(len_from_j), 0);
size_t ai = a[i].first;
auto it = lower_bound(begin(b), end(b), std::make_pair(size_t(0), ai),
[](auto a, auto b) { return a.second < b.second; });
for (; it != end(b) && it->second == ai; ++it) {
size_t j = it->first;
if (j < begin2)
continue;
if (j >= end2)
break;
size_t off = j - begin2;
size_t size = off == 0 ? 1 : len_from_j_prev[off - 1] + 1;
len_from_j[off] = size;
if (size > best.size) {
best.i = i - size + 1;
best.j = j - size + 1;
best.size = size;
}
}
len_from_j.swap(len_from_j_prev);
}
return best;
}
void find_longest_matches(items const& a, items const& b_ndx,
size_t begin1, size_t end1, size_t begin2, size_t end2, std::vector<Match>& ret)
{
// FIXME: recursion could get too deep here
Match m = find_longest_match(a, b_ndx, begin1, end1, begin2, end2);
if (!m.size)
return;
if (m.i > begin1 && m.j > begin2)
find_longest_matches(a, b_ndx, begin1, m.i, begin2, m.j, ret);
ret.push_back(m);
if (m.i + m.size < end2 && m.j + m.size < end2)
find_longest_matches(a, b_ndx, m.i + m.size, end1, m.j + m.size, end2, ret);
}
void calculate_moves_sorted(std::vector<RowInfo>& new_rows, CollectionChangeIndices& changeset,
std::function<bool (size_t)> row_did_change)
{
std::vector<std::pair<size_t, size_t>> old_candidates;
std::vector<std::pair<size_t, size_t>> new_candidates;
std::sort(begin(new_rows), end(new_rows), [](auto& lft, auto& rgt) {
return lft.tv_index < rgt.tv_index;
});
IndexSet::IndexInterator ins = changeset.insertions.begin(), del = changeset.deletions.begin();
int shift = 0;
for (auto& row : new_rows) {
while (del != changeset.deletions.end() && *del <= row.tv_index) {
++del;
++shift;
}
while (ins != changeset.insertions.end() && *ins <= row.tv_index) {
++ins;
--shift;
}
if (row.prev_tv_index == npos)
continue;
if (row_did_change(row.shifted_row_index)) {
// FIXME: needlessly quadratic
if (!changeset.insertions.contains(row.tv_index))
changeset.modifications.add(row.tv_index);
}
old_candidates.push_back({row.shifted_row_index, row.prev_tv_index});
new_candidates.push_back({row.shifted_row_index, row.tv_index});
// }
}
std::sort(begin(old_candidates), end(old_candidates), [](auto a, auto b) {
if (a.second != b.second)
return a.second < b.second;
return a.first < b.first;
});
// First check if the order of any of the rows actually changed
size_t first_difference = npos;
for (size_t i = 0; i < old_candidates.size(); ++i) {
if (old_candidates[i].first != new_candidates[i].first) {
first_difference = i;
break;
}
}
if (first_difference == npos)
return;
const auto b_ndx = [&]{
std::vector<std::pair<size_t, size_t>> ret;
ret.reserve(new_candidates.size());
for (size_t i = 0; i < new_candidates.size(); ++i)
ret.push_back(std::make_pair(i, new_candidates[i].first));
std::sort(begin(ret), end(ret), [](auto a, auto b) {
if (a.second != b.second)
return a.second < b.second;
return a.first < b.first;
});
return ret;
}();
std::vector<Match> longest_matches;
find_longest_matches(old_candidates, b_ndx,
first_difference, old_candidates.size(),
first_difference, new_candidates.size(),
longest_matches);
longest_matches.push_back({old_candidates.size(), new_candidates.size(), 0});
size_t i = first_difference, j = first_difference;
for (auto match : longest_matches) {
for (; i < match.i; ++i)
changeset.deletions.add(old_candidates[i].second);
for (; j < match.j; ++j)
changeset.insertions.add(new_candidates[j].second);
i += match.size;
j += match.size;
}
// FIXME: needlessly suboptimal
changeset.modifications.remove(changeset.insertions);
}
} // Anonymous namespace
CollectionChangeIndices CollectionChangeIndices::calculate(std::vector<size_t> const& prev_rows,
std::vector<size_t> const& next_rows,
std::function<bool (size_t)> row_did_change,
bool sort)
{
CollectionChangeIndices ret;
std::vector<RowInfo> old_rows;
for (size_t i = 0; i < prev_rows.size(); ++i) {
if (prev_rows[i] == npos)
ret.deletions.add(i);
else
old_rows.push_back({prev_rows[i], npos, i});
}
std::stable_sort(begin(old_rows), end(old_rows), [](auto& lft, auto& rgt) {
return lft.shifted_row_index < rgt.shifted_row_index;
});
std::vector<RowInfo> new_rows;
for (size_t i = 0; i < next_rows.size(); ++i) {
new_rows.push_back({next_rows[i], npos, i});
}
std::stable_sort(begin(new_rows), end(new_rows), [](auto& lft, auto& rgt) {
return lft.shifted_row_index < rgt.shifted_row_index;
});
size_t i = 0, j = 0;
while (i < old_rows.size() && j < new_rows.size()) {
auto old_index = old_rows[i];
auto new_index = new_rows[j];
if (old_index.shifted_row_index == new_index.shifted_row_index) {
new_rows[j].prev_tv_index = old_rows[i].tv_index;
++i;
++j;
}
else if (old_index.shifted_row_index < new_index.shifted_row_index) {
ret.deletions.add(old_index.tv_index);
++i;
}
else {
ret.insertions.add(new_index.tv_index);
++j;
}
}
for (; i < old_rows.size(); ++i)
ret.deletions.add(old_rows[i].tv_index);
for (; j < new_rows.size(); ++j)
ret.insertions.add(new_rows[j].tv_index);
if (sort) {
calculate_moves_sorted(new_rows, ret, row_did_change);
}
else {
calculate_moves_unsorted(new_rows, ret, row_did_change);
}
ret.verify();
return ret;
}

View File

@ -70,6 +70,11 @@ struct CollectionChangeIndices {
IndexSet modification = {},
std::vector<Move> moves = {});
static CollectionChangeIndices calculate(std::vector<size_t> const& old_rows,
std::vector<size_t> const& new_rows,
std::function<bool (size_t)> row_did_change,
bool sort);
bool empty() const { return deletions.empty() && insertions.empty() && modifications.empty() && moves.empty(); }
void merge(CollectionChangeIndices&&);

View File

@ -30,6 +30,7 @@ AsyncQuery::AsyncQuery(Results& target)
, m_sort(target.get_sort())
{
Query q = target.get_query();
set_table(*q.get_table());
m_query_handover = Realm::Internal::get_shared_group(get_realm()).export_for_handover(q, MutableSourcePayload::Move);
}
@ -62,8 +63,26 @@ void AsyncQuery::release_data() noexcept
// destroyed while the background work is running, and to allow removing
// callbacks from any thread.
static bool map_moves(size_t& idx, CollectionChangeIndices const& changes)
{
for (auto&& move : changes.moves) {
if (move.from == idx) {
idx = move.to;
return true;
}
}
return false;
}
void AsyncQuery::do_add_required_change_info(TransactionChangeInfo& info)
{
REALM_ASSERT(m_query);
m_info = &info;
}
void AsyncQuery::run()
{
REALM_ASSERT(m_info);
m_did_change = false;
{
@ -76,6 +95,8 @@ void AsyncQuery::run()
REALM_ASSERT(!m_tv.is_attached());
size_t table_ndx = m_query->get_table()->get_index_in_group();
// If we've run previously, check if we need to rerun
if (m_initial_run_complete) {
// Make an empty tableview from the query to get the table version, since
@ -90,6 +111,39 @@ void AsyncQuery::run()
m_tv.sort(m_sort.column_indices, m_sort.ascending);
}
if (m_initial_run_complete) {
auto changes = table_ndx < m_info->tables.size() ? &m_info->tables[table_ndx] : nullptr;
std::vector<size_t> next_rows;
next_rows.reserve(m_tv.size());
for (size_t i = 0; i < m_tv.size(); ++i)
next_rows.push_back(m_tv[i].get_index());
if (changes) {
for (auto& idx : m_previous_rows) {
if (changes->deletions.contains(idx))
idx = npos;
else
map_moves(idx, *changes);
REALM_ASSERT_DEBUG(!changes->insertions.contains(idx));
}
}
m_changes = CollectionChangeIndices::calculate(m_previous_rows, next_rows,
[&](size_t row) { return m_info->row_did_change(*m_query->get_table(), row); },
!!m_sort);
m_previous_rows = std::move(next_rows);
if (m_changes.empty()) {
m_tv = {};
return;
}
}
else {
m_previous_rows.resize(m_tv.size());
for (size_t i = 0; i < m_tv.size(); ++i)
m_previous_rows[i] = m_tv[i].get_index();
}
m_did_change = true;
}
@ -105,9 +159,12 @@ bool AsyncQuery::do_prepare_handover(SharedGroup& sg)
m_handed_over_table_version = m_tv.sync_if_needed();
m_tv_handover = sg.export_for_handover(m_tv, MutableSourcePayload::Move);
add_changes(std::move(m_changes));
REALM_ASSERT(m_changes.empty());
// detach the TableView as we won't need it again and keeping it around
// makes advance_read() much more expensive
m_tv = TableView();
m_tv = {};
return m_did_change;
}

View File

@ -27,11 +27,14 @@
#include <exception>
#include <mutex>
#include <functional>
#include <set>
#include <thread>
#include <vector>
namespace realm {
namespace _impl {
struct TransactionChangeInfo;
class AsyncQuery : public BackgroundCollection {
public:
AsyncQuery(Results& target);
@ -45,6 +48,8 @@ private:
// Returns if any callbacks need to be invoked
bool do_deliver(SharedGroup& sg) override;
void do_add_required_change_info(TransactionChangeInfo& info) override;
void release_data() noexcept override;
void do_attach_to(SharedGroup& sg) override;
void do_detach_from(SharedGroup& sg) override;
@ -64,9 +69,14 @@ private:
TableView m_tv;
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
CollectionChangeIndices m_changes;
TransactionChangeInfo* m_info = nullptr;
uint_fast64_t m_handed_over_table_version = -1;
bool m_did_change = false;
std::vector<size_t> m_previous_rows;
bool m_initial_run_complete = false;
};

View File

@ -102,6 +102,38 @@ bool BackgroundCollection::is_alive() const noexcept
return m_realm != nullptr;
}
// Recursively add `table` and all tables it links to to `out`
static void find_relevant_tables(std::vector<size_t>& out, Table const& table)
{
auto table_ndx = table.get_index_in_group();
if (find(begin(out), end(out), table_ndx) != end(out))
return;
out.push_back(table_ndx);
for (size_t i = 0, count = table.get_column_count(); i != count; ++i) {
if (table.get_column_type(i) == type_Link || table.get_column_type(i) == type_LinkList) {
find_relevant_tables(out, *table.get_link_target(i));
}
}
}
void BackgroundCollection::set_table(Table const& table)
{
find_relevant_tables(m_relevant_tables, table);
}
void BackgroundCollection::add_required_change_info(TransactionChangeInfo& info)
{
auto max = *max_element(begin(m_relevant_tables), end(m_relevant_tables)) + 1;
if (max > info.tables_needed.size())
info.tables_needed.resize(max, false);
for (auto table_ndx : m_relevant_tables) {
info.tables_needed[table_ndx] = true;
}
do_add_required_change_info(info);
}
void BackgroundCollection::prepare_handover()
{
REALM_ASSERT(m_sg);

View File

@ -57,7 +57,7 @@ public:
// SharedGroup
void detach();
virtual void add_required_change_info(TransactionChangeInfo&) { }
void add_required_change_info(TransactionChangeInfo&);
virtual void run() { }
void prepare_handover();
@ -70,12 +70,14 @@ protected:
bool have_callbacks() const noexcept { return m_have_callbacks; }
bool have_changes() const noexcept { return !m_accumulated_changes.empty(); }
void add_changes(CollectionChangeIndices change) { m_accumulated_changes.merge(std::move(change)); }
void set_table(Table const& table);
private:
virtual void do_attach_to(SharedGroup&) = 0;
virtual void do_detach_from(SharedGroup&) = 0;
virtual bool do_prepare_handover(SharedGroup&) = 0;
virtual bool do_deliver(SharedGroup&) = 0;
virtual void do_add_required_change_info(TransactionChangeInfo&) { }
const std::thread::id m_thread_id = std::this_thread::get_id();
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
@ -92,6 +94,9 @@ private:
uint_fast64_t m_results_version = 0;
// Tables which this collection needs change information for
std::vector<size_t> m_relevant_tables;
struct Callback {
CollectionChangeCallback fn;
size_t token;

View File

@ -26,28 +26,11 @@
using namespace realm;
using namespace realm::_impl;
// Recursively add `table` and all tables it links to to `out`
static void find_relevant_tables(std::vector<size_t>& out, Table const& table)
{
auto table_ndx = table.get_index_in_group();
if (find(begin(out), end(out), table_ndx) != end(out))
return;
out.push_back(table_ndx);
for (size_t i = 0, count = table.get_column_count(); i != count; ++i) {
if (table.get_column_type(i) == type_Link || table.get_column_type(i) == type_LinkList) {
find_relevant_tables(out, *table.get_link_target(i));
}
}
}
ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm)
: BackgroundCollection(std::move(realm))
, m_prev_size(lv->size())
{
find_relevant_tables(m_relevant_tables, lv->get_target_table());
// Find the lv's column, since that isn't tracked directly
size_t row_ndx = lv->get_origin_row_index();
m_col_ndx = not_found;
@ -60,6 +43,8 @@ ListNotifier::ListNotifier(LinkViewRef lv, std::shared_ptr<Realm> realm)
}
REALM_ASSERT(m_col_ndx != not_found);
set_table(lv->get_target_table());
auto& sg = Realm::Internal::get_shared_group(get_realm());
m_lv_handover = sg.export_linkview_for_handover(lv);
}
@ -86,7 +71,7 @@ void ListNotifier::do_detach_from(SharedGroup& sg)
}
}
void ListNotifier::add_required_change_info(TransactionChangeInfo& info)
void ListNotifier::do_add_required_change_info(TransactionChangeInfo& info)
{
REALM_ASSERT(!m_lv_handover);
if (!m_lv) {
@ -97,13 +82,6 @@ void ListNotifier::add_required_change_info(TransactionChangeInfo& info)
auto& table = m_lv->get_origin_table();
info.lists.push_back({table.get_index_in_group(), row_ndx, m_col_ndx, &m_change});
auto max = *max_element(begin(m_relevant_tables), end(m_relevant_tables)) + 1;
if (max > info.tables_needed.size())
info.tables_needed.resize(max, false);
for (auto table_ndx : m_relevant_tables) {
info.tables_needed[table_ndx] = true;
}
m_info = &info;
}

View File

@ -47,7 +47,7 @@ private:
void do_detach_from(SharedGroup& sg) override;
void release_data() noexcept override;
void add_required_change_info(TransactionChangeInfo& info) override;
void do_add_required_change_info(TransactionChangeInfo& info) override;
};
}
}

View File

@ -51,11 +51,10 @@ bool TransactionChangeInfo::row_did_change(Table const& table, size_t idx, int d
for (size_t i = 0, count = table.get_column_count(); i < count; ++i) {
auto type = table.get_column_type(i);
if (type == type_Link) {
auto& target = *table.get_link_target(i);
if (target.is_null_link(i, idx))
if (table.is_null_link(i, idx))
continue;
auto dst = table.get_link(i, idx);
return row_did_change(target, dst, depth + 1);
return row_did_change(*table.get_link_target(i), dst, depth + 1);
}
if (type != type_LinkList)
continue;

View File

@ -46,6 +46,7 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Query q, SortOrder s)
, m_sort(std::move(s))
, m_mode(Mode::Query)
{
REALM_ASSERT(m_sort.column_indices.size() == m_sort.ascending.size());
}
Results::Results(SharedRealm r, const ObjectSchema &o, Table& table)
@ -350,6 +351,7 @@ TableView Results::get_tableview()
Results Results::sort(realm::SortOrder&& sort) const
{
REALM_ASSERT(sort.column_indices.size() == sort.ascending.size());
return Results(m_realm, get_object_schema(), get_query(), std::move(sort));
}
@ -380,6 +382,12 @@ NotificationToken Results::async(std::function<void (std::exception_ptr)> target
return {m_background_query, m_background_query->add_callback(wrap)};
}
NotificationToken Results::add_notification_callback(CollectionChangeCallback cb)
{
prepare_async();
return {m_background_query, m_background_query->add_callback(std::move(cb))};
}
void Results::Internal::set_table_view(Results& results, realm::TableView &&tv)
{
// If the previous TableView was never actually used, then stop generating

View File

@ -177,6 +177,7 @@ public:
// The query will be run on a background thread and delivered to the callback,
// and then rerun after each commit (if needed) and redelivered if it changed
NotificationToken async(std::function<void (std::exception_ptr)> target);
NotificationToken add_notification_callback(CollectionChangeCallback cb);
bool wants_background_updates() const { return m_wants_background_updates; }

View File

@ -177,6 +177,71 @@ TEST_CASE("collection change indices") {
}
}
SECTION("calculate") {
auto all_modified = [](size_t) { return true; };
auto none_modified = [](size_t) { return false; };
SECTION("no changes") {
c = CollectionChangeIndices::calculate({1, 2, 3}, {1, 2, 3}, none_modified, false);
REQUIRE(c.empty());
}
SECTION("inserting from empty") {
c = CollectionChangeIndices::calculate({}, {1, 2, 3}, all_modified, false);
REQUIRE_INDICES(c.insertions, 0, 1, 2);
}
SECTION("deleting all existing") {
c = CollectionChangeIndices::calculate({1, 2, 3}, {}, all_modified, false);
REQUIRE_INDICES(c.deletions, 0, 1, 2);
}
SECTION("all rows modified without changing order") {
c = CollectionChangeIndices::calculate({1, 2, 3}, {1, 2, 3}, all_modified, false);
REQUIRE_INDICES(c.modifications, 0, 1, 2);
}
SECTION("single insertion in middle") {
c = CollectionChangeIndices::calculate({1, 3}, {1, 2, 3}, all_modified, false);
REQUIRE_INDICES(c.insertions, 1);
}
SECTION("single deletion in middle") {
c = CollectionChangeIndices::calculate({1, 2, 3}, {1, 3}, all_modified, false);
REQUIRE_INDICES(c.deletions, 1);
}
SECTION("unsorted reordering") {
auto calc = [&](std::vector<size_t> values) {
return CollectionChangeIndices::calculate({1, 2, 3}, values, none_modified, false);
};
// The commented-out permutations are not possible with
// move_last_over() and so are unhandled by unsorted mode
REQUIRE(calc({1, 2, 3}).empty());
REQUIRE_MOVES(calc({1, 3, 2}), {2, 1});
// REQUIRE_MOVES(calc({2, 1, 3}), {1, 0});
// REQUIRE_MOVES(calc({2, 3, 1}), {1, 0}, {2, 1});
REQUIRE_MOVES(calc({3, 1, 2}), {2, 0});
REQUIRE_MOVES(calc({3, 2, 1}), {2, 0}, {1, 1});
}
SECTION("sorted reordering") {
auto calc = [&](std::vector<size_t> values) {
return CollectionChangeIndices::calculate({1, 2, 3}, values, all_modified, true);
};
REQUIRE(calc({1, 2, 3}).moves.empty());
return;
// none of these actually work since it just does insert+delete
REQUIRE_MOVES(calc({1, 3, 2}), {2, 1});
REQUIRE_MOVES(calc({2, 1, 3}), {1, 0});
REQUIRE_MOVES(calc({2, 3, 1}), {1, 0}, {2, 1});
REQUIRE_MOVES(calc({3, 1, 2}), {2, 0});
REQUIRE_MOVES(calc({3, 2, 1}), {2, 0}, {1, 1});
}
}
SECTION("merge") {
SECTION("deletions are shifted by previous deletions") {
c = {{5}, {}, {}, {}};

View File

@ -1,5 +1,6 @@
#include "catch.hpp"
#include "util/index_helpers.hpp"
#include "util/test_file.hpp"
#include "impl/realm_coordinator.hpp"
@ -12,6 +13,8 @@
#include <realm/group_shared.hpp>
#include <realm/link_view.hpp>
#include <unistd.h>
using namespace realm;
TEST_CASE("Results") {
@ -41,76 +44,38 @@ TEST_CASE("Results") {
r->begin_transaction();
table->add_empty_row(10);
for (int i = 0; i < 10; ++i)
table->set_int(0, i, i);
table->set_int(0, i, i * 2);
r->commit_transaction();
Results results(r, *config.schema->find("object"), table->where().greater(0, 0).less(0, 5));
Results results(r, *config.schema->find("object"), table->where().greater(0, 0).less(0, 10));
SECTION("notifications") {
SECTION("unsorted notifications") {
int notification_calls = 0;
auto token = results.async([&](std::exception_ptr err) {
CollectionChangeIndices change;
auto token = results.add_notification_callback([&](CollectionChangeIndices c, std::exception_ptr err) {
REQUIRE_FALSE(err);
change = c;
++notification_calls;
});
coordinator->on_change();
r->notify();
auto write = [&](auto&& f) {
r->begin_transaction();
f();
r->commit_transaction();
coordinator->on_change();
r->notify();
};
SECTION("initial results are delivered") {
REQUIRE(notification_calls == 1);
}
SECTION("modifying the table sends a notification asynchronously") {
SECTION("notifications are sent asynchronously") {
r->begin_transaction();
table->set_int(0, 0, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("modifying a linked-to table send a notification") {
r->begin_transaction();
r->read_group()->get_table("class_linked to object")->add_empty_row();
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("modifying a a linking table sends a notification") {
r->begin_transaction();
r->read_group()->get_table("class_linking object")->add_empty_row();
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("modifying a an unrelated table does not send a notification") {
r->begin_transaction();
r->read_group()->get_table("class_other object")->add_empty_row();
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 1);
}
SECTION("modifications from multiple transactions are collapsed") {
r->begin_transaction();
table->set_int(0, 0, 0);
r->commit_transaction();
r->begin_transaction();
table->set_int(0, 1, 0);
table->set_int(0, 0, 4);
r->commit_transaction();
REQUIRE(notification_calls == 1);
@ -121,7 +86,7 @@ TEST_CASE("Results") {
SECTION("notifications are not delivered when the token is destroyed before they are calculated") {
r->begin_transaction();
table->set_int(0, 0, 0);
table->set_int(0, 0, 4);
r->commit_transaction();
REQUIRE(notification_calls == 1);
@ -133,7 +98,7 @@ TEST_CASE("Results") {
SECTION("notifications are not delivered when the token is destroyed before they are delivered") {
r->begin_transaction();
table->set_int(0, 0, 0);
table->set_int(0, 0, 4);
r->commit_transaction();
REQUIRE(notification_calls == 1);
@ -142,5 +107,344 @@ TEST_CASE("Results") {
r->notify();
REQUIRE(notification_calls == 1);
}
SECTION("notifications are delivered when a new callback is added from within a callback") {
NotificationToken token2, token3;
bool called = false;
token2 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr) {
token3 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr) {
called = true;
});
});
coordinator->on_change();
r->notify();
REQUIRE(called);
}
SECTION("notifications are not delivered when a callback is removed from within a callback") {
NotificationToken token2, token3;
token2 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr) {
token3 = {};
});
token3 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr) {
REQUIRE(false);
});
coordinator->on_change();
r->notify();
}
SECTION("removing the current callback does not stop later ones from being called") {
NotificationToken token2, token3;
bool called = false;
token2 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr) {
token2 = {};
});
token3 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr) {
called = true;
});
coordinator->on_change();
r->notify();
REQUIRE(called);
}
SECTION("modifications to unrelated tables do not send notifications") {
write([&] {
r->read_group()->get_table("class_other object")->add_empty_row();
});
REQUIRE(notification_calls == 1);
}
SECTION("irrelevant modifications to linked tables do not send notifications") {
write([&] {
r->read_group()->get_table("class_linked to object")->add_empty_row();
});
REQUIRE(notification_calls == 1);
}
SECTION("irrelevant modifications to linking tables do not send notifications") {
write([&] {
r->read_group()->get_table("class_linking object")->add_empty_row();
});
REQUIRE(notification_calls == 1);
}
SECTION("modifications that leave a non-matching row non-matching do not send notifications") {
write([&] {
table->set_int(0, 6, 13);
});
REQUIRE(notification_calls == 1);
}
SECTION("deleting non-matching rows does not send a notification") {
write([&] {
table->move_last_over(0);
table->move_last_over(6);
});
REQUIRE(notification_calls == 1);
}
SECTION("modifying a matching row and leaving it matching marks that row as modified") {
write([&] {
table->set_int(0, 1, 3);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.modifications, 0);
}
SECTION("modifying a matching row to no longer match marks that row as deleted") {
write([&] {
table->set_int(0, 2, 0);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.deletions, 1);
}
SECTION("modifying a non-matching row to match marks that row as inserted") {
write([&] {
table->set_int(0, 7, 3);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.insertions, 4);
}
SECTION("deleting a matching row marks that row as deleted") {
write([&] {
table->move_last_over(3);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.deletions, 2);
}
SECTION("moving a matching row via deletion marks that row as moved") {
write([&] {
table->where().greater_equal(0, 10).find_all().clear(RemoveMode::unordered);
table->move_last_over(0);
});
REQUIRE(notification_calls == 2);
REQUIRE_MOVES(change, {3, 0});
}
SECTION("modifications from multiple transactions are collapsed") {
r->begin_transaction();
table->set_int(0, 0, 6);
r->commit_transaction();
r->begin_transaction();
table->set_int(0, 1, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
}
// Sort in descending order
results = results.sort({{0}, {false}});
SECTION("sorted notifications") {
int notification_calls = 0;
CollectionChangeIndices change;
auto token = results.add_notification_callback([&](CollectionChangeIndices c, std::exception_ptr err) {
REQUIRE_FALSE(err);
change = c;
++notification_calls;
});
coordinator->on_change();
r->notify();
auto write = [&](auto&& f) {
r->begin_transaction();
f();
r->commit_transaction();
coordinator->on_change();
r->notify();
};
SECTION("modifications that leave a non-matching row non-matching do not send notifications") {
write([&] {
table->set_int(0, 6, 13);
});
REQUIRE(notification_calls == 1);
}
SECTION("deleting non-matching rows does not send a notification") {
write([&] {
table->move_last_over(0);
table->move_last_over(6);
});
REQUIRE(notification_calls == 1);
}
SECTION("modifying a matching row and leaving it matching marks that row as modified") {
write([&] {
table->set_int(0, 1, 3);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.modifications, 3);
}
SECTION("modifying a matching row to no longer match marks that row as deleted") {
write([&] {
table->set_int(0, 2, 0);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.deletions, 2);
}
SECTION("modifying a non-matching row to match marks that row as inserted") {
write([&] {
table->set_int(0, 7, 3);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.insertions, 3);
}
SECTION("deleting a matching row marks that row as deleted") {
write([&] {
table->move_last_over(3);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.deletions, 1);
}
SECTION("moving a matching row via deletion does not send a notification") {
write([&] {
table->where().greater_equal(0, 10).find_all().clear(RemoveMode::unordered);
table->move_last_over(0);
});
REQUIRE(notification_calls == 1);
}
SECTION("modifying a matching row to change its position sends insert+delete") {
write([&] {
table->set_int(0, 2, 9);
});
REQUIRE(notification_calls == 2);
REQUIRE_INDICES(change.deletions, 2);
REQUIRE_INDICES(change.insertions, 0);
}
SECTION("modifications from multiple transactions are collapsed") {
r->begin_transaction();
table->set_int(0, 0, 5);
r->commit_transaction();
r->begin_transaction();
table->set_int(0, 1, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
}
}
TEST_CASE("Async Results error handling") {
InMemoryTestFile config;
config.cache = false;
config.automatic_change_notifications = false;
config.schema = std::make_unique<Schema>(Schema{
{"object", "", {
{"value", PropertyTypeInt},
}},
});
auto r = Realm::get_shared_realm(config);
auto coordinator = _impl::RealmCoordinator::get_existing_coordinator(config.path);
Results results(r, *config.schema->find("object"), *r->read_group()->get_table("class_object"));
SECTION("error when opening the advancer SG") {
unlink(config.path.c_str());
SECTION("error is delivered asynchronously") {
bool called = false;
auto token = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr err) {
REQUIRE(err);
called = true;
});
REQUIRE(!called);
coordinator->on_change();
REQUIRE(!called);
r->notify();
REQUIRE(called);
}
SECTION("adding another callback does not send the error again") {
bool called = false;
auto token = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr err) {
REQUIRE(err);
REQUIRE_FALSE(called);
called = true;
});
coordinator->on_change();
r->notify();
bool called2 = false;
auto token2 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr err) {
REQUIRE(err);
REQUIRE_FALSE(called2);
called2 = true;
});
coordinator->on_change();
r->notify();
REQUIRE(called2);
}
}
SECTION("error when opening the executor SG") {
SECTION("error is delivered asynchronously") {
bool called = false;
auto token = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr err) {
REQUIRE(err);
called = true;
});
unlink(config.path.c_str());
REQUIRE(!called);
coordinator->on_change();
REQUIRE(!called);
r->notify();
REQUIRE(called);
}
SECTION("adding another callback does not send the error again") {
bool called = false;
auto token = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr err) {
REQUIRE(err);
REQUIRE_FALSE(called);
called = true;
});
unlink(config.path.c_str());
coordinator->on_change();
r->notify();
bool called2 = false;
auto token2 = results.add_notification_callback([&](CollectionChangeIndices, std::exception_ptr err) {
REQUIRE(err);
REQUIRE_FALSE(called2);
called2 = true;
});
coordinator->on_change();
r->notify();
REQUIRE(called2);
}
}
}