Port some of the KVO support functionality to the object store

This commit is contained in:
Thomas Goyne 2015-08-31 11:20:20 -07:00 committed by Ari Lazier
parent 513d781572
commit fc0de384a6
5 changed files with 554 additions and 34 deletions

117
index_set.cpp Normal file
View File

@ -0,0 +1,117 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "index_set.hpp"
using namespace realm;
size_t IndexSet::size() const
{
size_t size = 0;
for (auto const& range : m_ranges) {
size += range.second - range.first;
}
return size;
}
std::vector<IndexSet::Range>::iterator IndexSet::find(size_t index)
{
for (auto it = m_ranges.begin(), end = m_ranges.end(); it != end; ++it) {
if (it->second > index)
return it;
}
return m_ranges.end();
}
void IndexSet::add(size_t index)
{
do_add(find(index), index);
}
void IndexSet::do_add(std::vector<Range>::iterator it, size_t index)
{
bool more_before = it != m_ranges.begin(), valid = it != m_ranges.end();
if (valid && it->first <= index && it->second > index) {
// index is already in set
}
else if (more_before && (it - 1)->second == index) {
// index is immediate after an existing range
++(it - 1)->second;
}
else if (more_before && valid && (it - 1)->second == it->first) {
// index joins two existing ranges
(it - 1)->second = it->second;
m_ranges.erase(it);
}
else if (valid && it->first == index + 1) {
// index is immediately before an existing range
--it->first;
}
else {
// index is not next to an existing range
m_ranges.insert(it, {index, index + 1});
}
}
void IndexSet::set(size_t len)
{
m_ranges.clear();
if (len) {
m_ranges.push_back({0, len});
}
}
void IndexSet::insert_at(size_t index)
{
auto pos = find(index);
if (pos != m_ranges.end()) {
if (pos->first >= index)
++pos->first;
++pos->second;
for (auto it = pos + 1; it != m_ranges.end(); ++it) {
++it->first;
++it->second;
}
}
do_add(pos, index);
}
size_t IndexSet::iterator::operator*() const
{
return m_data->first + m_offset;
}
IndexSet::iterator& IndexSet::iterator::operator++()
{
++m_offset;
if (m_offset + m_data->first == m_data->second) {
++m_data;
m_offset = 0;
}
return *this;
}
bool IndexSet::iterator::operator==(iterator other) const
{
return m_data == other.m_data && m_offset == other.m_offset;
}
bool IndexSet::iterator::operator!=(iterator other) const
{
return m_data != other.m_data || m_offset != other.m_offset;
}

63
index_set.hpp Normal file
View File

@ -0,0 +1,63 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_INDEX_SET_HPP
#define REALM_INDEX_SET_HPP
#include <vector>
namespace realm {
class IndexSet {
public:
struct iterator {
size_t operator*() const;
iterator& operator++();
bool operator==(iterator) const;
bool operator!=(iterator) const;
iterator(std::pair<size_t, size_t>* data) noexcept : m_data(data) { }
private:
std::pair<size_t, size_t>* m_data;
size_t m_offset = 0;
};
iterator begin() { return iterator(&m_ranges[0]); }
iterator end() { return iterator(&m_ranges[m_ranges.size()]); }
size_t size() const;
// Add an index to the set, doing nothing if it's already present
void add(size_t index);
// Set the index set to a single range starting at 0 with length `len`
void set(size_t len);
// Insert an index at the given position, shifting existing indexes back
void insert_at(size_t index);
private:
using Range = std::pair<size_t, size_t>;
std::vector<Range> m_ranges;
// Find the range which contains the index, or the first one after it if
// none do
std::vector<Range>::iterator find(size_t index);
void do_add(std::vector<Range>::iterator pos, size_t index);
};
} // namespace realm
#endif // REALM_INDEX_SET_HPP

78
realm_delegate.hpp Normal file
View File

@ -0,0 +1,78 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_DELEGATE_HPP
#define REALM_DELEGATE_HPP
#include "index_set.hpp"
#include <tuple>
#include <vector>
namespace realm {
class RealmDelegate {
public:
virtual ~RealmDelegate() = default;
struct ColumnInfo {
bool changed = false;
enum class Kind {
None,
Set,
Insert,
Remove,
SetAll
} kind = Kind::None;
IndexSet indices;
};
struct ObserverState {
size_t table_ndx;
size_t row_ndx;
void* info; // opaque user info
std::vector<ColumnInfo> changes;
// Simple lexographic ordering
friend bool operator<(ObserverState const& lft, ObserverState const& rgt) {
return std::tie(lft.table_ndx, lft.row_ndx) < std::tie(rgt.table_ndx, rgt.row_ndx);
}
};
// The Realm has committed a write transaction, and other Realms at the
// same path should be notified
virtual void transaction_committed() = 0;
// There are now new versions available for the Realm, but it has not
// had its read version advanced
virtual void changes_available() = 0;
// Called before changing the read transaction. Should return a list of
// ObserverStates for each row for which detailed change information is
// desired.
virtual std::vector<ObserverState> get_observed_rows() = 0;
// The Realm's read version will change
// Only called if get_observed_row() returned a non-empty array.
virtual void will_change(std::vector<ObserverState> const&, std::vector<void*> const&) = 0;
// The Realm's read version has changed
virtual void did_change(std::vector<ObserverState> const&, std::vector<void*> const&) = 0;
};
} // namespace realm
#endif /* REALM_DELEGATE_HPP */

View File

@ -18,14 +18,296 @@
#include "shared_realm.hpp"
#include "realm_delegate.hpp"
#include <realm/commit_log.hpp>
#include <realm/group_shared.hpp>
#include <realm/lang_bind_helper.hpp>
#include <mutex>
#include <numeric>
using namespace realm;
namespace {
class TransactLogHandler {
using ColumnInfo = RealmDelegate::ColumnInfo;
using ObserverState = RealmDelegate::ObserverState;
size_t currentTable = 0;
std::vector<ObserverState> observers;
std::vector<void *> invalidated;
ColumnInfo *activeLinkList = nullptr;
RealmDelegate* m_delegate;
// Get the change info for the given column, creating it if needed
static ColumnInfo& get_change(ObserverState& state, size_t i)
{
if (state.changes.size() <= i) {
state.changes.resize(std::max(state.changes.size() * 2, i + 1));
}
return state.changes[i];
}
// Loop over the columns which were changed in an observer state
template<typename Func>
static void for_each(ObserverState& state, Func&& f)
{
for (size_t i = 0; i < state.changes.size(); ++i) {
auto const& change = state.changes[i];
if (change.changed) {
f(i, change);
}
}
}
// Mark the given row/col as needing notifications sent
bool mark_dirty(size_t row_ndx, size_t col_ndx)
{
auto it = lower_bound(begin(observers), end(observers), ObserverState{currentTable, row_ndx, nullptr});
if (it != end(observers) && it->table_ndx == currentTable && it->row_ndx == row_ndx) {
get_change(*it, col_ndx).changed = true;
}
return true;
}
// Remove the given observer from the list of observed objects and add it
// to the listed of invalidated objects
void invalidate(ObserverState *o)
{
invalidated.push_back(o->info);
observers.erase(observers.begin() + (o - &observers[0]));
}
public:
template<typename Func>
TransactLogHandler(RealmDelegate* delegate, SharedGroup& sg, Func&& func)
: m_delegate(delegate)
{
if (!delegate) {
func();
return;
}
observers = delegate->get_observed_rows();
if (observers.empty()) {
auto old_version = sg.get_version_of_current_transaction();
func();
if (old_version != sg.get_version_of_current_transaction()) {
delegate->did_change({}, {});
}
return;
}
func(*this);
delegate->did_change(observers, invalidated);
}
// Called at the end of the transaction log immediately before the version
// is advanced
void parse_complete()
{
m_delegate->will_change(observers, invalidated);
}
// These would require having an observer before schema init
// Maybe do something here to throw an error when multiple processes have different schemas?
bool insert_group_level_table(size_t, size_t, StringData) noexcept { return false; }
bool erase_group_level_table(size_t, size_t) noexcept { return false; }
bool rename_group_level_table(size_t, StringData) noexcept { return false; }
bool insert_column(size_t, DataType, StringData, bool) { return false; }
bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return false; }
bool erase_column(size_t) { return false; }
bool erase_link_column(size_t, size_t, size_t) { return false; }
bool rename_column(size_t, StringData) { return false; }
bool add_search_index(size_t) { return false; }
bool remove_search_index(size_t) { return false; }
bool add_primary_key(size_t) { return false; }
bool remove_primary_key() { return false; }
bool set_link_type(size_t, LinkType) { return false; }
bool select_table(size_t group_level_ndx, int, const size_t*) noexcept {
currentTable = group_level_ndx;
return true;
}
bool insert_empty_rows(size_t, size_t, size_t, bool) {
// rows are only inserted at the end, so no need to do anything
return true;
}
bool erase_rows(size_t row_ndx, size_t, size_t last_row_ndx, bool unordered) noexcept {
for (size_t i = 0; i < observers.size(); ++i) {
auto& o = observers[i];
if (o.table_ndx == currentTable) {
if (o.row_ndx == row_ndx) {
invalidate(&o);
--i;
}
else if (unordered && o.row_ndx == last_row_ndx) {
o.row_ndx = row_ndx;
}
else if (!unordered && o.row_ndx > row_ndx) {
o.row_ndx -= 1;
}
}
}
return true;
}
bool clear_table() noexcept {
for (size_t i = 0; i < observers.size(); ) {
auto& o = observers[i];
if (o.table_ndx == currentTable) {
invalidate(&o);
}
else {
++i;
}
}
return true;
}
bool select_link_list(size_t col, size_t row) {
activeLinkList = nullptr;
for (auto& o : observers) {
if (o.table_ndx == currentTable && o.row_ndx == row) {
activeLinkList = &get_change(o, col);
break;
}
}
return true;
}
void append_link_list_change(ColumnInfo::Kind kind, size_t index) {
ColumnInfo *o = activeLinkList;
if (!o || o->kind == ColumnInfo::Kind::SetAll) {
// Active LinkList isn't observed or already has multiple kinds of changes
return;
}
if (o->kind == ColumnInfo::Kind::None) {
o->kind = kind;
o->changed = true;
o->indices.add(index);
}
else if (o->kind == kind) {
if (kind == ColumnInfo::Kind::Remove) {
// Shift the index to compensate for already-removed indices
for (auto i : o->indices) {
if (i <= index)
++index;
else
break;
}
o->indices.add(index);
}
else if (kind == ColumnInfo::Kind::Insert) {
o->indices.insert_at(index);
}
else {
o->indices.add(index);
}
}
else {
// Array KVO can only send a single kind of change at a time, so
// if there's multiple just give up and send "Set"
o->indices.set(0);
o->kind = ColumnInfo::Kind::SetAll;
}
}
bool link_list_set(size_t index, size_t) {
append_link_list_change(ColumnInfo::Kind::Set, index);
return true;
}
bool link_list_insert(size_t index, size_t) {
append_link_list_change(ColumnInfo::Kind::Insert, index);
return true;
}
bool link_list_erase(size_t index) {
append_link_list_change(ColumnInfo::Kind::Remove, index);
return true;
}
bool link_list_nullify(size_t index) {
append_link_list_change(ColumnInfo::Kind::Remove, index);
return true;
}
bool link_list_swap(size_t index1, size_t index2) {
append_link_list_change(ColumnInfo::Kind::Set, index1);
append_link_list_change(ColumnInfo::Kind::Set, index2);
return true;
}
bool link_list_clear(size_t old_size) {
ColumnInfo *o = activeLinkList;
if (!o || o->kind == ColumnInfo::Kind::SetAll) {
return true;
}
if (o->kind == ColumnInfo::Kind::Remove)
old_size += o->indices.size();
else if (o->kind == ColumnInfo::Kind::Insert)
old_size -= o->indices.size();
o->indices.set(old_size);
o->kind = ColumnInfo::Kind::Remove;
o->changed = true;
return true;
}
bool link_list_move(size_t from, size_t to) {
ColumnInfo *o = activeLinkList;
if (!o || o->kind == ColumnInfo::Kind::SetAll) {
return true;
}
if (from > to) {
std::swap(from, to);
}
if (o->kind == ColumnInfo::Kind::None) {
o->kind = ColumnInfo::Kind::Set;
o->changed = true;
}
if (o->kind == ColumnInfo::Kind::Set) {
for (size_t i = from; i <= to; ++i)
o->indices.add(i);
}
else {
o->indices.set(0);
o->kind = ColumnInfo::Kind::SetAll;
}
return true;
}
// Things that just mark the field as modified
bool set_int(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); }
bool set_bool(size_t col, size_t row, bool) { return mark_dirty(row, col); }
bool set_float(size_t col, size_t row, float) { return mark_dirty(row, col); }
bool set_double(size_t col, size_t row, double) { return mark_dirty(row, col); }
bool set_string(size_t col, size_t row, StringData) { return mark_dirty(row, col); }
bool set_binary(size_t col, size_t row, BinaryData) { return mark_dirty(row, col); }
bool set_date_time(size_t col, size_t row, DateTime) { return mark_dirty(row, col); }
bool set_table(size_t col, size_t row) { return mark_dirty(row, col); }
bool set_mixed(size_t col, size_t row, const Mixed&) { return mark_dirty(row, col); }
bool set_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); }
bool set_null(size_t col, size_t row) { return mark_dirty(row, col); }
bool nullify_link(size_t col, size_t row) { return mark_dirty(row, col); }
// Things we don't need to do anything for
bool optimize_table() { return false; }
// Things that we don't do in the binding
bool select_descriptor(int, const size_t*) { return true; }
bool add_int_to_column(size_t, int_fast64_t) { return false; }
};
}
RealmCache Realm::s_global_cache;
Realm::Config::Config(const Config& c)
@ -214,18 +496,13 @@ void Realm::begin_transaction()
throw InvalidTransactionException("The Realm is already in a write transaction");
}
// if the upgrade to write will move the transaction forward, announce the change after promoting
bool announce = m_shared_group->has_changed();
// make sure we have a read transaction
read_group();
LangBindHelper::promote_to_write(*m_shared_group, *m_history);
TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) {
LangBindHelper::promote_to_write(*m_shared_group, *m_history, std::move(args)...);
});
m_in_transaction = true;
if (announce && m_delegate) {
m_delegate->did_change();
}
}
void Realm::commit_transaction()
@ -242,7 +519,6 @@ void Realm::commit_transaction()
if (m_delegate) {
m_delegate->transaction_committed();
m_delegate->did_change();
}
}
@ -255,7 +531,9 @@ void Realm::cancel_transaction()
throw InvalidTransactionException("Can't cancel a non-existing write transaction");
}
LangBindHelper::rollback_and_continue_as_read(*m_shared_group, *m_history);
TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) {
LangBindHelper::rollback_and_continue_as_read(*m_shared_group, *m_history, std::move(args)...);
});
m_in_transaction = false;
}
@ -303,10 +581,12 @@ void Realm::notify()
if (m_shared_group->has_changed()) { // Throws
if (m_auto_refresh) {
if (m_group) {
LangBindHelper::advance_read(*m_shared_group, *m_history);
TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) {
LangBindHelper::advance_read(*m_shared_group, *m_history, std::move(args)...);
});
}
if (m_delegate) {
m_delegate->did_change();
else if (m_delegate) {
m_delegate->did_change({}, {});
}
}
else if (m_delegate) {
@ -332,16 +612,15 @@ bool Realm::refresh()
}
if (m_group) {
LangBindHelper::advance_read(*m_shared_group, *m_history);
TransactLogHandler(m_delegate.get(), *m_shared_group, [&](auto&&... args) {
LangBindHelper::advance_read(*m_shared_group, *m_history, std::move(args)...);
});
}
else {
// Create the read transaction
read_group();
}
if (m_delegate) {
m_delegate->did_change();
}
return true;
}

View File

@ -132,23 +132,6 @@ namespace realm {
std::mutex m_mutex;
};
class RealmDelegate
{
public:
virtual ~RealmDelegate() = default;
// The Realm has committed a write transaction, and other Realms at the
// same path should be notified
virtual void transaction_committed() = 0;
// There are now new versions available for the Realm, but it has not
// had its read version advanced
virtual void changes_available() = 0;
// The Realm's read version has advanced
virtual void did_change() = 0;
};
class RealmFileException : public std::runtime_error
{
public: