From efdfa08524a4351def6534d869f75aa33a4aad63 Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Mon, 31 Aug 2015 11:20:20 -0700 Subject: [PATCH] Port some of the KVO support functionality to the object store --- index_set.cpp | 117 +++++++++++++++++ index_set.hpp | 63 +++++++++ realm_delegate.hpp | 78 +++++++++++ shared_realm.cpp | 313 ++++++++++++++++++++++++++++++++++++++++++--- shared_realm.hpp | 17 --- 5 files changed, 554 insertions(+), 34 deletions(-) create mode 100644 index_set.cpp create mode 100644 index_set.hpp create mode 100644 realm_delegate.hpp diff --git a/index_set.cpp b/index_set.cpp new file mode 100644 index 00000000..d3f70047 --- /dev/null +++ b/index_set.cpp @@ -0,0 +1,117 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "index_set.hpp" + +using namespace realm; + +size_t IndexSet::size() const +{ + size_t size = 0; + for (auto const& range : m_ranges) { + size += range.second - range.first; + } + return size; +} + +std::vector::iterator IndexSet::find(size_t index) +{ + for (auto it = m_ranges.begin(), end = m_ranges.end(); it != end; ++it) { + if (it->second > index) + return it; + } + return m_ranges.end(); +} + +void IndexSet::add(size_t index) +{ + do_add(find(index), index); +} + +void IndexSet::do_add(std::vector::iterator it, size_t index) +{ + bool more_before = it != m_ranges.begin(), valid = it != m_ranges.end(); + if (valid && it->first <= index && it->second > index) { + // index is already in set + } + else if (more_before && (it - 1)->second == index) { + // index is immediate after an existing range + ++(it - 1)->second; + } + else if (more_before && valid && (it - 1)->second == it->first) { + // index joins two existing ranges + (it - 1)->second = it->second; + m_ranges.erase(it); + } + else if (valid && it->first == index + 1) { + // index is immediately before an existing range + --it->first; + } + else { + // index is not next to an existing range + m_ranges.insert(it, {index, index + 1}); + } +} + +void IndexSet::set(size_t len) +{ + m_ranges.clear(); + if (len) { + m_ranges.push_back({0, len}); + } +} + +void IndexSet::insert_at(size_t index) +{ + auto pos = find(index); + if (pos != m_ranges.end()) { + if (pos->first >= index) + ++pos->first; + ++pos->second; + for (auto it = pos + 1; it != m_ranges.end(); ++it) { + ++it->first; + ++it->second; + } + } + do_add(pos, index); +} + +size_t IndexSet::iterator::operator*() const +{ + return m_data->first + m_offset; +} + +IndexSet::iterator& IndexSet::iterator::operator++() +{ + ++m_offset; + if (m_offset + m_data->first == m_data->second) { + ++m_data; + m_offset = 0; + } + return *this; +} + +bool IndexSet::iterator::operator==(iterator other) const +{ + return m_data == other.m_data && m_offset == other.m_offset; +} + +bool IndexSet::iterator::operator!=(iterator other) const +{ + return m_data != other.m_data || m_offset != other.m_offset; +} diff --git a/index_set.hpp b/index_set.hpp new file mode 100644 index 00000000..4e55c40f --- /dev/null +++ b/index_set.hpp @@ -0,0 +1,63 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_INDEX_SET_HPP +#define REALM_INDEX_SET_HPP + +#include + +namespace realm { +class IndexSet { +public: + struct iterator { + size_t operator*() const; + iterator& operator++(); + bool operator==(iterator) const; + bool operator!=(iterator) const; + + iterator(std::pair* data) noexcept : m_data(data) { } + + private: + std::pair* m_data; + size_t m_offset = 0; + }; + + iterator begin() { return iterator(&m_ranges[0]); } + iterator end() { return iterator(&m_ranges[m_ranges.size()]); } + + size_t size() const; + + // Add an index to the set, doing nothing if it's already present + void add(size_t index); + // Set the index set to a single range starting at 0 with length `len` + void set(size_t len); + // Insert an index at the given position, shifting existing indexes back + void insert_at(size_t index); + +private: + using Range = std::pair; + std::vector m_ranges; + + // Find the range which contains the index, or the first one after it if + // none do + std::vector::iterator find(size_t index); + void do_add(std::vector::iterator pos, size_t index); +}; +} // namespace realm + +#endif // REALM_INDEX_SET_HPP diff --git a/realm_delegate.hpp b/realm_delegate.hpp new file mode 100644 index 00000000..2b4f5ba6 --- /dev/null +++ b/realm_delegate.hpp @@ -0,0 +1,78 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_DELEGATE_HPP +#define REALM_DELEGATE_HPP + +#include "index_set.hpp" + +#include +#include + +namespace realm { +class RealmDelegate { +public: + virtual ~RealmDelegate() = default; + + struct ColumnInfo { + bool changed = false; + enum class Kind { + None, + Set, + Insert, + Remove, + SetAll + } kind = Kind::None; + IndexSet indices; + }; + + struct ObserverState { + size_t table_ndx; + size_t row_ndx; + void* info; // opaque user info + std::vector changes; + + // Simple lexographic ordering + friend bool operator<(ObserverState const& lft, ObserverState const& rgt) { + return std::tie(lft.table_ndx, lft.row_ndx) < std::tie(rgt.table_ndx, rgt.row_ndx); + } + }; + + // The Realm has committed a write transaction, and other Realms at the + // same path should be notified + virtual void transaction_committed() = 0; + + // There are now new versions available for the Realm, but it has not + // had its read version advanced + virtual void changes_available() = 0; + + // Called before changing the read transaction. Should return a list of + // ObserverStates for each row for which detailed change information is + // desired. + virtual std::vector get_observed_rows() = 0; + + // The Realm's read version will change + // Only called if get_observed_row() returned a non-empty array. + virtual void will_change(std::vector const&, std::vector const&) = 0; + + // The Realm's read version has changed + virtual void did_change(std::vector const&, std::vector const&) = 0; +}; +} // namespace realm + +#endif /* REALM_DELEGATE_HPP */ diff --git a/shared_realm.cpp b/shared_realm.cpp index 5c2733ed..e2753d85 100644 --- a/shared_realm.cpp +++ b/shared_realm.cpp @@ -18,14 +18,296 @@ #include "shared_realm.hpp" +#include "realm_delegate.hpp" + #include #include #include #include +#include using namespace realm; +namespace { +class TransactLogHandler { + using ColumnInfo = RealmDelegate::ColumnInfo; + using ObserverState = RealmDelegate::ObserverState; + + size_t currentTable = 0; + std::vector observers; + std::vector invalidated; + ColumnInfo *activeLinkList = nullptr; + RealmDelegate* m_delegate; + + // Get the change info for the given column, creating it if needed + static ColumnInfo& get_change(ObserverState& state, size_t i) + { + if (state.changes.size() <= i) { + state.changes.resize(std::max(state.changes.size() * 2, i + 1)); + } + return state.changes[i]; + } + + // Loop over the columns which were changed in an observer state + template + static void for_each(ObserverState& state, Func&& f) + { + for (size_t i = 0; i < state.changes.size(); ++i) { + auto const& change = state.changes[i]; + if (change.changed) { + f(i, change); + } + } + } + + // Mark the given row/col as needing notifications sent + bool mark_dirty(size_t row_ndx, size_t col_ndx) + { + auto it = lower_bound(begin(observers), end(observers), ObserverState{currentTable, row_ndx, nullptr}); + if (it != end(observers) && it->table_ndx == currentTable && it->row_ndx == row_ndx) { + get_change(*it, col_ndx).changed = true; + } + return true; + } + + // Remove the given observer from the list of observed objects and add it + // to the listed of invalidated objects + void invalidate(ObserverState *o) + { + invalidated.push_back(o->info); + observers.erase(observers.begin() + (o - &observers[0])); + } + +public: + template + TransactLogHandler(RealmDelegate* delegate, SharedGroup& sg, Func&& func) + : m_delegate(delegate) + { + if (!delegate) { + func(); + return; + } + + observers = delegate->get_observed_rows(); + if (observers.empty()) { + auto old_version = sg.get_version_of_current_transaction(); + func(); + if (old_version != sg.get_version_of_current_transaction()) { + delegate->did_change({}, {}); + } + return; + } + + func(*this); + delegate->did_change(observers, invalidated); + } + + // Called at the end of the transaction log immediately before the version + // is advanced + void parse_complete() + { + m_delegate->will_change(observers, invalidated); + } + + // These would require having an observer before schema init + // Maybe do something here to throw an error when multiple processes have different schemas? + bool insert_group_level_table(size_t, size_t, StringData) noexcept { return false; } + bool erase_group_level_table(size_t, size_t) noexcept { return false; } + bool rename_group_level_table(size_t, StringData) noexcept { return false; } + bool insert_column(size_t, DataType, StringData, bool) { return false; } + bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return false; } + bool erase_column(size_t) { return false; } + bool erase_link_column(size_t, size_t, size_t) { return false; } + bool rename_column(size_t, StringData) { return false; } + bool add_search_index(size_t) { return false; } + bool remove_search_index(size_t) { return false; } + bool add_primary_key(size_t) { return false; } + bool remove_primary_key() { return false; } + bool set_link_type(size_t, LinkType) { return false; } + + bool select_table(size_t group_level_ndx, int, const size_t*) noexcept { + currentTable = group_level_ndx; + return true; + } + + bool insert_empty_rows(size_t, size_t, size_t, bool) { + // rows are only inserted at the end, so no need to do anything + return true; + } + + bool erase_rows(size_t row_ndx, size_t, size_t last_row_ndx, bool unordered) noexcept { + for (size_t i = 0; i < observers.size(); ++i) { + auto& o = observers[i]; + if (o.table_ndx == currentTable) { + if (o.row_ndx == row_ndx) { + invalidate(&o); + --i; + } + else if (unordered && o.row_ndx == last_row_ndx) { + o.row_ndx = row_ndx; + } + else if (!unordered && o.row_ndx > row_ndx) { + o.row_ndx -= 1; + } + } + } + return true; + } + + bool clear_table() noexcept { + for (size_t i = 0; i < observers.size(); ) { + auto& o = observers[i]; + if (o.table_ndx == currentTable) { + invalidate(&o); + } + else { + ++i; + } + } + return true; + } + + bool select_link_list(size_t col, size_t row) { + activeLinkList = nullptr; + for (auto& o : observers) { + if (o.table_ndx == currentTable && o.row_ndx == row) { + activeLinkList = &get_change(o, col); + break; + } + } + return true; + } + + void append_link_list_change(ColumnInfo::Kind kind, size_t index) { + ColumnInfo *o = activeLinkList; + if (!o || o->kind == ColumnInfo::Kind::SetAll) { + // Active LinkList isn't observed or already has multiple kinds of changes + return; + } + + if (o->kind == ColumnInfo::Kind::None) { + o->kind = kind; + o->changed = true; + o->indices.add(index); + } + else if (o->kind == kind) { + if (kind == ColumnInfo::Kind::Remove) { + // Shift the index to compensate for already-removed indices + for (auto i : o->indices) { + if (i <= index) + ++index; + else + break; + } + o->indices.add(index); + } + else if (kind == ColumnInfo::Kind::Insert) { + o->indices.insert_at(index); + } + else { + o->indices.add(index); + } + } + else { + // Array KVO can only send a single kind of change at a time, so + // if there's multiple just give up and send "Set" + o->indices.set(0); + o->kind = ColumnInfo::Kind::SetAll; + } + } + + bool link_list_set(size_t index, size_t) { + append_link_list_change(ColumnInfo::Kind::Set, index); + return true; + } + + bool link_list_insert(size_t index, size_t) { + append_link_list_change(ColumnInfo::Kind::Insert, index); + return true; + } + + bool link_list_erase(size_t index) { + append_link_list_change(ColumnInfo::Kind::Remove, index); + return true; + } + + bool link_list_nullify(size_t index) { + append_link_list_change(ColumnInfo::Kind::Remove, index); + return true; + } + + bool link_list_swap(size_t index1, size_t index2) { + append_link_list_change(ColumnInfo::Kind::Set, index1); + append_link_list_change(ColumnInfo::Kind::Set, index2); + return true; + } + + bool link_list_clear(size_t old_size) { + ColumnInfo *o = activeLinkList; + if (!o || o->kind == ColumnInfo::Kind::SetAll) { + return true; + } + + if (o->kind == ColumnInfo::Kind::Remove) + old_size += o->indices.size(); + else if (o->kind == ColumnInfo::Kind::Insert) + old_size -= o->indices.size(); + + o->indices.set(old_size); + + o->kind = ColumnInfo::Kind::Remove; + o->changed = true; + return true; + } + + bool link_list_move(size_t from, size_t to) { + ColumnInfo *o = activeLinkList; + if (!o || o->kind == ColumnInfo::Kind::SetAll) { + return true; + } + if (from > to) { + std::swap(from, to); + } + + if (o->kind == ColumnInfo::Kind::None) { + o->kind = ColumnInfo::Kind::Set; + o->changed = true; + } + if (o->kind == ColumnInfo::Kind::Set) { + for (size_t i = from; i <= to; ++i) + o->indices.add(i); + } + else { + o->indices.set(0); + o->kind = ColumnInfo::Kind::SetAll; + } + return true; + } + + // Things that just mark the field as modified + bool set_int(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); } + bool set_bool(size_t col, size_t row, bool) { return mark_dirty(row, col); } + bool set_float(size_t col, size_t row, float) { return mark_dirty(row, col); } + bool set_double(size_t col, size_t row, double) { return mark_dirty(row, col); } + bool set_string(size_t col, size_t row, StringData) { return mark_dirty(row, col); } + bool set_binary(size_t col, size_t row, BinaryData) { return mark_dirty(row, col); } + bool set_date_time(size_t col, size_t row, DateTime) { return mark_dirty(row, col); } + bool set_table(size_t col, size_t row) { return mark_dirty(row, col); } + bool set_mixed(size_t col, size_t row, const Mixed&) { return mark_dirty(row, col); } + bool set_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); } + bool set_null(size_t col, size_t row) { return mark_dirty(row, col); } + bool nullify_link(size_t col, size_t row) { return mark_dirty(row, col); } + + // Things we don't need to do anything for + bool optimize_table() { return false; } + + // Things that we don't do in the binding + bool select_descriptor(int, const size_t*) { return true; } + bool add_int_to_column(size_t, int_fast64_t) { return false; } +}; +} + RealmCache Realm::s_global_cache; Realm::Config::Config(const Config& c) @@ -214,18 +496,13 @@ void Realm::begin_transaction() throw InvalidTransactionException("The Realm is already in a write transaction"); } - // if the upgrade to write will move the transaction forward, announce the change after promoting - bool announce = m_shared_group->has_changed(); - // make sure we have a read transaction read_group(); - LangBindHelper::promote_to_write(*m_shared_group, *m_history); + TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) { + LangBindHelper::promote_to_write(*m_shared_group, *m_history, std::move(args)...); + }); m_in_transaction = true; - - if (announce && m_delegate) { - m_delegate->did_change(); - } } void Realm::commit_transaction() @@ -242,7 +519,6 @@ void Realm::commit_transaction() if (m_delegate) { m_delegate->transaction_committed(); - m_delegate->did_change(); } } @@ -255,7 +531,9 @@ void Realm::cancel_transaction() throw InvalidTransactionException("Can't cancel a non-existing write transaction"); } - LangBindHelper::rollback_and_continue_as_read(*m_shared_group, *m_history); + TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) { + LangBindHelper::rollback_and_continue_as_read(*m_shared_group, *m_history, std::move(args)...); + }); m_in_transaction = false; } @@ -303,10 +581,12 @@ void Realm::notify() if (m_shared_group->has_changed()) { // Throws if (m_auto_refresh) { if (m_group) { - LangBindHelper::advance_read(*m_shared_group, *m_history); + TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) { + LangBindHelper::advance_read(*m_shared_group, *m_history, std::move(args)...); + }); } - if (m_delegate) { - m_delegate->did_change(); + else if (m_delegate) { + m_delegate->did_change({}, {}); } } else if (m_delegate) { @@ -332,16 +612,15 @@ bool Realm::refresh() } if (m_group) { - LangBindHelper::advance_read(*m_shared_group, *m_history); + TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) { + LangBindHelper::advance_read(*m_shared_group, *m_history, std::move(args)...); + }); } else { // Create the read transaction read_group(); } - if (m_delegate) { - m_delegate->did_change(); - } return true; } diff --git a/shared_realm.hpp b/shared_realm.hpp index 0f0598a4..cdbf4d51 100644 --- a/shared_realm.hpp +++ b/shared_realm.hpp @@ -132,23 +132,6 @@ namespace realm { std::mutex m_mutex; }; - class RealmDelegate - { - public: - virtual ~RealmDelegate() = default; - - // The Realm has committed a write transaction, and other Realms at the - // same path should be notified - virtual void transaction_committed() = 0; - - // There are now new versions available for the Realm, but it has not - // had its read version advanced - virtual void changes_available() = 0; - - // The Realm's read version has advanced - virtual void did_change() = 0; - }; - class RealmFileException : public std::runtime_error { public: