realm-js/impl/transact_log_handler.cpp
2015-11-13 18:20:40 -08:00

355 lines
12 KiB
C++

////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "transact_log_handler.hpp"
#include "../binding_context.hpp"
#include <realm/commit_log.hpp>
#include <realm/group_shared.hpp>
#include <realm/lang_bind_helper.hpp>
namespace realm {
class TransactLogHandler {
using ColumnInfo = BindingContext::ColumnInfo;
using ObserverState = BindingContext::ObserverState;
// Observed table rows which need change information
std::vector<ObserverState> m_observers;
// Userdata pointers for rows which have been deleted
std::vector<void *> invalidated;
// Delegate to send change information to
BindingContext* m_binding_context;
// Index of currently selected table
size_t m_current_table = 0;
// Change information for the currently selected LinkList, if any
ColumnInfo* m_active_linklist = nullptr;
// Get the change info for the given column, creating it if needed
static ColumnInfo& get_change(ObserverState& state, size_t i)
{
if (state.changes.size() <= i) {
state.changes.resize(std::max(state.changes.size() * 2, i + 1));
}
return state.changes[i];
}
// Loop over the columns which were changed in an observer state
template<typename Func>
static void for_each(ObserverState& state, Func&& f)
{
for (size_t i = 0; i < state.changes.size(); ++i) {
auto const& change = state.changes[i];
if (change.changed) {
f(i, change);
}
}
}
// Mark the given row/col as needing notifications sent
bool mark_dirty(size_t row_ndx, size_t col_ndx)
{
auto it = lower_bound(begin(m_observers), end(m_observers), ObserverState{m_current_table, row_ndx, nullptr});
if (it != end(m_observers) && it->table_ndx == m_current_table && it->row_ndx == row_ndx) {
get_change(*it, col_ndx).changed = true;
}
return true;
}
// Remove the given observer from the list of observed objects and add it
// to the listed of invalidated objects
void invalidate(ObserverState *o)
{
invalidated.push_back(o->info);
m_observers.erase(m_observers.begin() + (o - &m_observers[0]));
}
public:
template<typename Func>
TransactLogHandler(BindingContext* binding_context, SharedGroup& sg, Func&& func)
: m_binding_context(binding_context)
{
if (!binding_context) {
func();
return;
}
m_observers = binding_context->get_observed_rows();
if (m_observers.empty()) {
auto old_version = sg.get_version_of_current_transaction();
func();
if (old_version != sg.get_version_of_current_transaction()) {
binding_context->did_change({}, {});
}
return;
}
func(*this);
binding_context->did_change(m_observers, invalidated);
}
// Called at the end of the transaction log immediately before the version
// is advanced
void parse_complete()
{
m_binding_context->will_change(m_observers, invalidated);
}
// These would require having an observer before schema init
// Maybe do something here to throw an error when multiple processes have different schemas?
bool insert_group_level_table(size_t, size_t, StringData) { return false; }
bool erase_group_level_table(size_t, size_t) { return false; }
bool rename_group_level_table(size_t, StringData) { return false; }
bool insert_column(size_t, DataType, StringData, bool) { return false; }
bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return false; }
bool erase_column(size_t) { return false; }
bool erase_link_column(size_t, size_t, size_t) { return false; }
bool rename_column(size_t, StringData) { return false; }
bool add_search_index(size_t) { return false; }
bool remove_search_index(size_t) { return false; }
bool add_primary_key(size_t) { return false; }
bool remove_primary_key() { return false; }
bool set_link_type(size_t, LinkType) { return false; }
bool select_table(size_t group_level_ndx, int, const size_t*) noexcept
{
m_current_table = group_level_ndx;
return true;
}
bool insert_empty_rows(size_t, size_t, size_t, bool)
{
// rows are only inserted at the end, so no need to do anything
return true;
}
bool erase_rows(size_t row_ndx, size_t, size_t last_row_ndx, bool unordered)
{
for (size_t i = 0; i < m_observers.size(); ++i) {
auto& o = m_observers[i];
if (o.table_ndx == m_current_table) {
if (o.row_ndx == row_ndx) {
invalidate(&o);
--i;
}
else if (unordered && o.row_ndx == last_row_ndx) {
o.row_ndx = row_ndx;
}
else if (!unordered && o.row_ndx > row_ndx) {
o.row_ndx -= 1;
}
}
}
return true;
}
bool clear_table()
{
for (size_t i = 0; i < m_observers.size(); ) {
auto& o = m_observers[i];
if (o.table_ndx == m_current_table) {
invalidate(&o);
}
else {
++i;
}
}
return true;
}
bool select_link_list(size_t col, size_t row, size_t)
{
m_active_linklist = nullptr;
for (auto& o : m_observers) {
if (o.table_ndx == m_current_table && o.row_ndx == row) {
m_active_linklist = &get_change(o, col);
break;
}
}
return true;
}
void append_link_list_change(ColumnInfo::Kind kind, size_t index) {
ColumnInfo *o = m_active_linklist;
if (!o || o->kind == ColumnInfo::Kind::SetAll) {
// Active LinkList isn't observed or already has multiple kinds of changes
return;
}
if (o->kind == ColumnInfo::Kind::None) {
o->kind = kind;
o->changed = true;
o->indices.add(index);
}
else if (o->kind == kind) {
if (kind == ColumnInfo::Kind::Remove) {
o->indices.add_shifted(index);
}
else if (kind == ColumnInfo::Kind::Insert) {
o->indices.insert_at(index);
}
else {
o->indices.add(index);
}
}
else {
// Array KVO can only send a single kind of change at a time, so
// if there's multiple just give up and send "Set"
o->indices.set(0);
o->kind = ColumnInfo::Kind::SetAll;
}
}
bool link_list_set(size_t index, size_t)
{
append_link_list_change(ColumnInfo::Kind::Set, index);
return true;
}
bool link_list_insert(size_t index, size_t)
{
append_link_list_change(ColumnInfo::Kind::Insert, index);
return true;
}
bool link_list_erase(size_t index)
{
append_link_list_change(ColumnInfo::Kind::Remove, index);
return true;
}
bool link_list_nullify(size_t index)
{
append_link_list_change(ColumnInfo::Kind::Remove, index);
return true;
}
bool link_list_swap(size_t index1, size_t index2)
{
append_link_list_change(ColumnInfo::Kind::Set, index1);
append_link_list_change(ColumnInfo::Kind::Set, index2);
return true;
}
bool link_list_clear(size_t old_size)
{
ColumnInfo *o = m_active_linklist;
if (!o || o->kind == ColumnInfo::Kind::SetAll) {
return true;
}
if (o->kind == ColumnInfo::Kind::Remove)
old_size += o->indices.size();
else if (o->kind == ColumnInfo::Kind::Insert)
old_size -= o->indices.size();
o->indices.set(old_size);
o->kind = ColumnInfo::Kind::Remove;
o->changed = true;
return true;
}
bool link_list_move(size_t from, size_t to)
{
ColumnInfo *o = m_active_linklist;
if (!o || o->kind == ColumnInfo::Kind::SetAll) {
return true;
}
if (from > to) {
std::swap(from, to);
}
if (o->kind == ColumnInfo::Kind::None) {
o->kind = ColumnInfo::Kind::Set;
o->changed = true;
}
if (o->kind == ColumnInfo::Kind::Set) {
for (size_t i = from; i <= to; ++i)
o->indices.add(i);
}
else {
o->indices.set(0);
o->kind = ColumnInfo::Kind::SetAll;
}
return true;
}
// Things that just mark the field as modified
bool set_int(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); }
bool set_bool(size_t col, size_t row, bool) { return mark_dirty(row, col); }
bool set_float(size_t col, size_t row, float) { return mark_dirty(row, col); }
bool set_double(size_t col, size_t row, double) { return mark_dirty(row, col); }
bool set_string(size_t col, size_t row, StringData) { return mark_dirty(row, col); }
bool set_binary(size_t col, size_t row, BinaryData) { return mark_dirty(row, col); }
bool set_date_time(size_t col, size_t row, DateTime) { return mark_dirty(row, col); }
bool set_table(size_t col, size_t row) { return mark_dirty(row, col); }
bool set_mixed(size_t col, size_t row, const Mixed&) { return mark_dirty(row, col); }
bool set_link(size_t col, size_t row, size_t, size_t) { return mark_dirty(row, col); }
bool set_null(size_t col, size_t row) { return mark_dirty(row, col); }
bool nullify_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); }
// Doesn't change any data
bool optimize_table() { return true; }
// Used for subtables, which we currently don't support
bool select_descriptor(int, const size_t*) { return false; }
// Not implemented
bool insert_substring(size_t, size_t, size_t, StringData) { return false; }
bool erase_substring(size_t, size_t, size_t, size_t) { return false; }
bool swap_rows(size_t, size_t) { return false; }
bool move_column(size_t, size_t) { return false; }
bool move_group_level_table(size_t, size_t) { return false; }
};
} // anonymous namespace
namespace realm {
namespace _impl {
namespace transaction {
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) {
TransactLogHandler(binding_context, sg, [&](auto&&... args) {
LangBindHelper::advance_read(sg, history, std::move(args)...);
});
}
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) {
TransactLogHandler(binding_context, sg, [&](auto&&... args) {
LangBindHelper::promote_to_write(sg, history, std::move(args)...);
});
}
void commit(SharedGroup& sg, ClientHistory&, BindingContext* binding_context) {
LangBindHelper::commit_and_continue_as_read(sg);
if (binding_context) {
binding_context->did_change({}, {});
}
}
void cancel(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) {
TransactLogHandler(binding_context, sg, [&](auto&&... args) {
LangBindHelper::rollback_and_continue_as_read(sg, history, std::move(args)...);
});
}
} // namespace transaction
} // namespace _impl
} // namespace realm