Skip calling callbacks if two commits cancel each other out when merged

This commit is contained in:
Thomas Goyne 2016-03-08 11:19:13 -08:00 committed by Thomas Goyne
parent a16cd7d42d
commit a428f813d5
7 changed files with 43 additions and 33 deletions

View File

@ -83,7 +83,6 @@ void AsyncQuery::do_add_required_change_info(TransactionChangeInfo& info)
void AsyncQuery::run()
{
REALM_ASSERT(m_info);
m_did_change = false;
{
std::lock_guard<std::mutex> target_lock(m_target_mutex);
@ -145,14 +144,12 @@ void AsyncQuery::run()
for (size_t i = 0; i < m_tv.size(); ++i)
m_previous_rows[i] = m_tv[i].get_index();
}
m_did_change = true;
}
bool AsyncQuery::do_prepare_handover(SharedGroup& sg)
void AsyncQuery::do_prepare_handover(SharedGroup& sg)
{
if (!m_tv.is_attached()) {
return false;
return;
}
REALM_ASSERT(m_tv.is_in_sync());
@ -167,8 +164,6 @@ bool AsyncQuery::do_prepare_handover(SharedGroup& sg)
// detach the TableView as we won't need it again and keeping it around
// makes advance_read() much more expensive
m_tv = {};
return m_did_change;
}
bool AsyncQuery::do_deliver(SharedGroup& sg)
@ -197,7 +192,7 @@ bool AsyncQuery::do_deliver(SharedGroup& sg)
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
}
REALM_ASSERT(!m_tv_handover);
return have_callbacks();
return true;
}
void AsyncQuery::do_attach_to(SharedGroup& sg)

View File

@ -43,7 +43,7 @@ private:
// Run/rerun the query if needed
void run() override;
// Prepare the handover object if run() did update the TableView
bool do_prepare_handover(SharedGroup&) override;
void do_prepare_handover(SharedGroup&) override;
// Update the target results from the handover
// Returns if any callbacks need to be invoked
bool do_deliver(SharedGroup& sg) override;
@ -54,6 +54,8 @@ private:
void do_attach_to(SharedGroup& sg) override;
void do_detach_from(SharedGroup& sg) override;
bool should_deliver_initial() const noexcept override { return true; }
// Target Results to update and a mutex which guards it
mutable std::mutex m_target_mutex;
Results* m_target_results;
@ -73,7 +75,6 @@ private:
TransactionChangeInfo* m_info = nullptr;
uint_fast64_t m_handed_over_table_version = -1;
bool m_did_change = false;
std::vector<size_t> m_previous_rows;

View File

@ -55,7 +55,7 @@ size_t BackgroundCollection::add_callback(CollectionChangeCallback callback)
std::lock_guard<std::mutex> lock(m_callback_mutex);
auto token = next_token();
m_callbacks.push_back({std::move(callback), token, -1ULL});
m_callbacks.push_back({std::move(callback), token, false});
if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
Realm::Internal::get_coordinator(*m_realm).send_commit_notifications();
m_have_callbacks = true;
@ -138,8 +138,7 @@ void BackgroundCollection::prepare_handover()
{
REALM_ASSERT(m_sg);
m_sg_version = m_sg->get_version_of_current_transaction();
if (do_prepare_handover(*m_sg))
++m_results_version;
do_prepare_handover(*m_sg);
}
bool BackgroundCollection::deliver(SharedGroup& sg, std::exception_ptr err)
@ -162,9 +161,9 @@ bool BackgroundCollection::deliver(SharedGroup& sg, std::exception_ptr err)
return false;
}
bool ret = do_deliver(sg);
bool should_call_callbacks = do_deliver(sg);
m_changes_to_deliver = std::move(m_accumulated_changes);
return ret;
return should_call_callbacks && have_callbacks();
}
void BackgroundCollection::call_callbacks()
@ -184,12 +183,16 @@ void BackgroundCollection::call_callbacks()
CollectionChangeCallback BackgroundCollection::next_callback()
{
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) {
auto& callback = m_callbacks[m_callback_index];
if (m_error || callback.delivered_version != m_results_version) {
callback.delivered_version = m_results_version;
return callback.fn;
bool deliver_initial = !callback.initial_delivered && should_deliver_initial();
if (!m_error && !deliver_initial && m_changes_to_deliver.empty()) {
continue;
}
callback.initial_delivered = true;
return callback.fn;
}
m_callback_index = npos;

View File

@ -68,16 +68,16 @@ public:
protected:
bool have_callbacks() const noexcept { return m_have_callbacks; }
bool have_changes() const noexcept { return !m_accumulated_changes.empty(); }
void add_changes(CollectionChangeIndices change) { m_accumulated_changes.merge(std::move(change)); }
void set_table(Table const& table);
private:
virtual void do_attach_to(SharedGroup&) = 0;
virtual void do_detach_from(SharedGroup&) = 0;
virtual bool do_prepare_handover(SharedGroup&) = 0;
virtual bool do_deliver(SharedGroup&) = 0;
virtual void do_prepare_handover(SharedGroup&) = 0;
virtual bool do_deliver(SharedGroup&) { return true; }
virtual void do_add_required_change_info(TransactionChangeInfo&) { }
virtual bool should_deliver_initial() const noexcept { return false; }
const std::thread::id m_thread_id = std::this_thread::get_id();
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
@ -92,15 +92,13 @@ private:
CollectionChangeIndices m_accumulated_changes;
CollectionChangeIndices m_changes_to_deliver;
uint_fast64_t m_results_version = 0;
// Tables which this collection needs change information for
std::vector<size_t> m_relevant_tables;
struct Callback {
CollectionChangeCallback fn;
size_t token;
uint_fast64_t delivered_version;
bool initial_delivered;
};
// Currently registered callbacks and a mutex which must always be held

View File

@ -112,13 +112,7 @@ void ListNotifier::run()
m_prev_size = m_lv->size();
}
bool ListNotifier::do_prepare_handover(SharedGroup&)
void ListNotifier::do_prepare_handover(SharedGroup&)
{
add_changes(std::move(m_change));
return true;
}
bool ListNotifier::do_deliver(SharedGroup&)
{
return have_callbacks() && have_changes();
}

View File

@ -40,8 +40,7 @@ private:
void run() override;
bool do_deliver(SharedGroup& sg) override;
bool do_prepare_handover(SharedGroup&) override;
void do_prepare_handover(SharedGroup&) override;
void do_attach_to(SharedGroup& sg) override;
void do_detach_from(SharedGroup& sg) override;

View File

@ -235,6 +235,8 @@ TEST_CASE("Results") {
table->set_int(0, 0, 6);
r->commit_transaction();
coordinator->on_change();
r->begin_transaction();
table->set_int(0, 1, 0);
r->commit_transaction();
@ -244,6 +246,24 @@ TEST_CASE("Results") {
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("notifications are not delivered when collapsing transactions results in no net change") {
r->begin_transaction();
size_t ndx = table->add_empty_row();
table->set_int(0, ndx, 5);
r->commit_transaction();
coordinator->on_change();
r->begin_transaction();
table->move_last_over(ndx);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 1);
}
}
// Sort in descending order