//////////////////////////////////////////////////////////////////////////// // // Copyright 2015 Realm Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // //////////////////////////////////////////////////////////////////////////// #include "transact_log_handler.hpp" #include "../binding_context.hpp" #include #include #include namespace realm { class TransactLogHandler { using ColumnInfo = BindingContext::ColumnInfo; using ObserverState = BindingContext::ObserverState; // Observed table rows which need change information std::vector m_observers; // Userdata pointers for rows which have been deleted std::vector invalidated; // Delegate to send change information to BindingContext* m_binding_context; // Index of currently selected table size_t m_current_table = 0; // Change information for the currently selected LinkList, if any ColumnInfo* m_active_linklist = nullptr; // Get the change info for the given column, creating it if needed static ColumnInfo& get_change(ObserverState& state, size_t i) { if (state.changes.size() <= i) { state.changes.resize(std::max(state.changes.size() * 2, i + 1)); } return state.changes[i]; } // Loop over the columns which were changed in an observer state template static void for_each(ObserverState& state, Func&& f) { for (size_t i = 0; i < state.changes.size(); ++i) { auto const& change = state.changes[i]; if (change.changed) { f(i, change); } } } // Mark the given row/col as needing notifications sent bool mark_dirty(size_t row_ndx, size_t col_ndx) { auto it = lower_bound(begin(m_observers), end(m_observers), ObserverState{m_current_table, row_ndx, nullptr}); if (it != end(m_observers) && it->table_ndx == m_current_table && it->row_ndx == row_ndx) { get_change(*it, col_ndx).changed = true; } return true; } // Remove the given observer from the list of observed objects and add it // to the listed of invalidated objects void invalidate(ObserverState *o) { invalidated.push_back(o->info); m_observers.erase(m_observers.begin() + (o - &m_observers[0])); } public: template TransactLogHandler(BindingContext* binding_context, SharedGroup& sg, Func&& func) : m_binding_context(binding_context) { if (!binding_context) { func(); return; } m_observers = binding_context->get_observed_rows(); if (m_observers.empty()) { auto old_version = sg.get_version_of_current_transaction(); func(); if (old_version != sg.get_version_of_current_transaction()) { binding_context->did_change({}, {}); } return; } func(*this); binding_context->did_change(m_observers, invalidated); } // Called at the end of the transaction log immediately before the version // is advanced void parse_complete() { m_binding_context->will_change(m_observers, invalidated); } // These would require having an observer before schema init // Maybe do something here to throw an error when multiple processes have different schemas? bool insert_group_level_table(size_t, size_t, StringData) { return false; } bool erase_group_level_table(size_t, size_t) { return false; } bool rename_group_level_table(size_t, StringData) { return false; } bool insert_column(size_t, DataType, StringData, bool) { return false; } bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return false; } bool erase_column(size_t) { return false; } bool erase_link_column(size_t, size_t, size_t) { return false; } bool rename_column(size_t, StringData) { return false; } bool add_search_index(size_t) { return false; } bool remove_search_index(size_t) { return false; } bool add_primary_key(size_t) { return false; } bool remove_primary_key() { return false; } bool set_link_type(size_t, LinkType) { return false; } bool select_table(size_t group_level_ndx, int, const size_t*) noexcept { m_current_table = group_level_ndx; return true; } bool insert_empty_rows(size_t, size_t, size_t, bool) { // rows are only inserted at the end, so no need to do anything return true; } bool erase_rows(size_t row_ndx, size_t, size_t last_row_ndx, bool unordered) { for (size_t i = 0; i < m_observers.size(); ++i) { auto& o = m_observers[i]; if (o.table_ndx == m_current_table) { if (o.row_ndx == row_ndx) { invalidate(&o); --i; } else if (unordered && o.row_ndx == last_row_ndx) { o.row_ndx = row_ndx; } else if (!unordered && o.row_ndx > row_ndx) { o.row_ndx -= 1; } } } return true; } bool clear_table() { for (size_t i = 0; i < m_observers.size(); ) { auto& o = m_observers[i]; if (o.table_ndx == m_current_table) { invalidate(&o); } else { ++i; } } return true; } bool select_link_list(size_t col, size_t row, size_t) { m_active_linklist = nullptr; for (auto& o : m_observers) { if (o.table_ndx == m_current_table && o.row_ndx == row) { m_active_linklist = &get_change(o, col); break; } } return true; } void append_link_list_change(ColumnInfo::Kind kind, size_t index) { ColumnInfo *o = m_active_linklist; if (!o || o->kind == ColumnInfo::Kind::SetAll) { // Active LinkList isn't observed or already has multiple kinds of changes return; } if (o->kind == ColumnInfo::Kind::None) { o->kind = kind; o->changed = true; o->indices.add(index); } else if (o->kind == kind) { if (kind == ColumnInfo::Kind::Remove) { o->indices.add_shifted(index); } else if (kind == ColumnInfo::Kind::Insert) { o->indices.insert_at(index); } else { o->indices.add(index); } } else { // Array KVO can only send a single kind of change at a time, so // if there's multiple just give up and send "Set" o->indices.set(0); o->kind = ColumnInfo::Kind::SetAll; } } bool link_list_set(size_t index, size_t) { append_link_list_change(ColumnInfo::Kind::Set, index); return true; } bool link_list_insert(size_t index, size_t) { append_link_list_change(ColumnInfo::Kind::Insert, index); return true; } bool link_list_erase(size_t index) { append_link_list_change(ColumnInfo::Kind::Remove, index); return true; } bool link_list_nullify(size_t index) { append_link_list_change(ColumnInfo::Kind::Remove, index); return true; } bool link_list_swap(size_t index1, size_t index2) { append_link_list_change(ColumnInfo::Kind::Set, index1); append_link_list_change(ColumnInfo::Kind::Set, index2); return true; } bool link_list_clear(size_t old_size) { ColumnInfo *o = m_active_linklist; if (!o || o->kind == ColumnInfo::Kind::SetAll) { return true; } if (o->kind == ColumnInfo::Kind::Remove) old_size += o->indices.size(); else if (o->kind == ColumnInfo::Kind::Insert) old_size -= o->indices.size(); o->indices.set(old_size); o->kind = ColumnInfo::Kind::Remove; o->changed = true; return true; } bool link_list_move(size_t from, size_t to) { ColumnInfo *o = m_active_linklist; if (!o || o->kind == ColumnInfo::Kind::SetAll) { return true; } if (from > to) { std::swap(from, to); } if (o->kind == ColumnInfo::Kind::None) { o->kind = ColumnInfo::Kind::Set; o->changed = true; } if (o->kind == ColumnInfo::Kind::Set) { for (size_t i = from; i <= to; ++i) o->indices.add(i); } else { o->indices.set(0); o->kind = ColumnInfo::Kind::SetAll; } return true; } // Things that just mark the field as modified bool set_int(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); } bool set_bool(size_t col, size_t row, bool) { return mark_dirty(row, col); } bool set_float(size_t col, size_t row, float) { return mark_dirty(row, col); } bool set_double(size_t col, size_t row, double) { return mark_dirty(row, col); } bool set_string(size_t col, size_t row, StringData) { return mark_dirty(row, col); } bool set_binary(size_t col, size_t row, BinaryData) { return mark_dirty(row, col); } bool set_date_time(size_t col, size_t row, DateTime) { return mark_dirty(row, col); } bool set_table(size_t col, size_t row) { return mark_dirty(row, col); } bool set_mixed(size_t col, size_t row, const Mixed&) { return mark_dirty(row, col); } bool set_link(size_t col, size_t row, size_t, size_t) { return mark_dirty(row, col); } bool set_null(size_t col, size_t row) { return mark_dirty(row, col); } bool nullify_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); } // Doesn't change any data bool optimize_table() { return true; } // Used for subtables, which we currently don't support bool select_descriptor(int, const size_t*) { return false; } // Not implemented bool insert_substring(size_t, size_t, size_t, StringData) { return false; } bool erase_substring(size_t, size_t, size_t, size_t) { return false; } bool swap_rows(size_t, size_t) { return false; } bool move_column(size_t, size_t) { return false; } bool move_group_level_table(size_t, size_t) { return false; } }; } // anonymous namespace namespace realm { namespace _impl { namespace transaction { void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) { TransactLogHandler(binding_context, sg, [&](auto&&... args) { LangBindHelper::advance_read(sg, history, std::move(args)...); }); } void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) { TransactLogHandler(binding_context, sg, [&](auto&&... args) { LangBindHelper::promote_to_write(sg, history, std::move(args)...); }); } void commit(SharedGroup& sg, ClientHistory&, BindingContext* binding_context) { LangBindHelper::commit_and_continue_as_read(sg); if (binding_context) { binding_context->did_change({}, {}); } } void cancel(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) { TransactLogHandler(binding_context, sg, [&](auto&&... args) { LangBindHelper::rollback_and_continue_as_read(sg, history, std::move(args)...); }); } } // namespace transaction } // namespace _impl } // namespace realm