Merge pull request #39 from realm/tg/validate-schema-changes

Handle allowed schema changes in the transaction log observer
This commit is contained in:
Thomas Goyne 2016-02-16 10:07:01 -08:00
commit f5ae1bdf80
3 changed files with 180 additions and 64 deletions

View File

@ -24,8 +24,122 @@
#include <realm/group_shared.hpp> #include <realm/group_shared.hpp>
#include <realm/lang_bind_helper.hpp> #include <realm/lang_bind_helper.hpp>
namespace realm { using namespace realm;
class TransactLogHandler {
namespace {
// A transaction log handler that just validates that all operations made are
// ones supported by the object store
class TransactLogValidator {
// Index of currently selected table
size_t m_current_table = 0;
// Tables which were created during the transaction being processed, which
// can have columns inserted without a schema version bump
std::vector<size_t> m_new_tables;
REALM_NORETURN
REALM_NOINLINE
void schema_error()
{
throw std::runtime_error("Schema mismatch detected: another process has modified the Realm file's schema in an incompatible way");
}
// Throw an exception if the currently modified table already existed before
// the current set of modifications
bool schema_error_unless_new_table()
{
if (std::find(begin(m_new_tables), end(m_new_tables), m_current_table) == end(m_new_tables)) {
schema_error();
}
return true;
}
protected:
size_t current_table() const noexcept { return m_current_table; }
public:
// Schema changes which don't involve a change in the schema version are
// allowed
bool add_search_index(size_t) { return true; }
bool remove_search_index(size_t) { return true; }
// Creating entirely new tables without a schema version bump is allowed, so
// we need to track if new columns are being added to a new table or an
// existing one
bool insert_group_level_table(size_t table_ndx, size_t, StringData)
{
// Shift any previously added tables after the new one
for (auto& table : m_new_tables) {
if (table >= table_ndx)
++table;
}
m_new_tables.push_back(table_ndx);
return true;
}
bool insert_column(size_t, DataType, StringData, bool) { return schema_error_unless_new_table(); }
bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return schema_error_unless_new_table(); }
bool add_primary_key(size_t) { return schema_error_unless_new_table(); }
bool set_link_type(size_t, LinkType) { return schema_error_unless_new_table(); }
// Removing or renaming things while a Realm is open is never supported
bool erase_group_level_table(size_t, size_t) { schema_error(); }
bool rename_group_level_table(size_t, StringData) { schema_error(); }
bool erase_column(size_t) { schema_error(); }
bool erase_link_column(size_t, size_t, size_t) { schema_error(); }
bool rename_column(size_t, StringData) { schema_error(); }
bool remove_primary_key() { schema_error(); }
bool move_column(size_t, size_t) { schema_error(); }
bool move_group_level_table(size_t, size_t) { schema_error(); }
bool select_descriptor(int levels, const size_t*)
{
// subtables not supported
return levels == 0;
}
bool select_table(size_t group_level_ndx, int, const size_t*) noexcept
{
m_current_table = group_level_ndx;
return true;
}
bool select_link_list(size_t, size_t, size_t) { return true; }
// Non-schema changes are all allowed
void parse_complete() { }
bool insert_empty_rows(size_t, size_t, size_t, bool) { return true; }
bool erase_rows(size_t, size_t, size_t, bool) { return true; }
bool swap_rows(size_t, size_t) { return true; }
bool clear_table() noexcept { return true; }
bool link_list_set(size_t, size_t) { return true; }
bool link_list_insert(size_t, size_t) { return true; }
bool link_list_erase(size_t) { return true; }
bool link_list_nullify(size_t) { return true; }
bool link_list_clear(size_t) { return true; }
bool link_list_move(size_t, size_t) { return true; }
bool link_list_swap(size_t, size_t) { return true; }
bool set_int(size_t, size_t, int_fast64_t) { return true; }
bool set_bool(size_t, size_t, bool) { return true; }
bool set_float(size_t, size_t, float) { return true; }
bool set_double(size_t, size_t, double) { return true; }
bool set_string(size_t, size_t, StringData) { return true; }
bool set_binary(size_t, size_t, BinaryData) { return true; }
bool set_date_time(size_t, size_t, DateTime) { return true; }
bool set_table(size_t, size_t) { return true; }
bool set_mixed(size_t, size_t, const Mixed&) { return true; }
bool set_link(size_t, size_t, size_t, size_t) { return true; }
bool set_null(size_t, size_t) { return true; }
bool nullify_link(size_t, size_t, size_t) { return true; }
bool insert_substring(size_t, size_t, size_t, StringData) { return true; }
bool erase_substring(size_t, size_t, size_t, size_t) { return true; }
bool optimize_table() { return true; }
bool set_int_unique(size_t, size_t, int_fast64_t) { return true; }
bool set_string_unique(size_t, size_t, StringData) { return true; }
};
// Extends TransactLogValidator to also track changes and report it to the
// binding context if any properties are being observed
class TransactLogObserver : public TransactLogValidator {
using ColumnInfo = BindingContext::ColumnInfo; using ColumnInfo = BindingContext::ColumnInfo;
using ObserverState = BindingContext::ObserverState; using ObserverState = BindingContext::ObserverState;
@ -34,13 +148,15 @@ class TransactLogHandler {
// Userdata pointers for rows which have been deleted // Userdata pointers for rows which have been deleted
std::vector<void *> invalidated; std::vector<void *> invalidated;
// Delegate to send change information to // Delegate to send change information to
BindingContext* m_binding_context; BindingContext* m_context;
// Index of currently selected table
size_t m_current_table = 0;
// Change information for the currently selected LinkList, if any // Change information for the currently selected LinkList, if any
ColumnInfo* m_active_linklist = nullptr; ColumnInfo* m_active_linklist = nullptr;
// Tables which were created during the transaction being processed, which
// can have columns inserted without a schema version bump
std::vector<size_t> m_new_tables;
// Get the change info for the given column, creating it if needed // Get the change info for the given column, creating it if needed
static ColumnInfo& get_change(ObserverState& state, size_t i) static ColumnInfo& get_change(ObserverState& state, size_t i)
{ {
@ -65,8 +181,8 @@ class TransactLogHandler {
// Mark the given row/col as needing notifications sent // Mark the given row/col as needing notifications sent
bool mark_dirty(size_t row_ndx, size_t col_ndx) bool mark_dirty(size_t row_ndx, size_t col_ndx)
{ {
auto it = lower_bound(begin(m_observers), end(m_observers), ObserverState{m_current_table, row_ndx, nullptr}); auto it = lower_bound(begin(m_observers), end(m_observers), ObserverState{current_table(), row_ndx, nullptr});
if (it != end(m_observers) && it->table_ndx == m_current_table && it->row_ndx == row_ndx) { if (it != end(m_observers) && it->table_ndx == current_table() && it->row_ndx == row_ndx) {
get_change(*it, col_ndx).changed = true; get_change(*it, col_ndx).changed = true;
} }
return true; return true;
@ -82,54 +198,55 @@ class TransactLogHandler {
public: public:
template<typename Func> template<typename Func>
TransactLogHandler(BindingContext* binding_context, SharedGroup& sg, Func&& func) TransactLogObserver(BindingContext* context, SharedGroup& sg, Func&& func, bool validate_schema_changes)
: m_binding_context(binding_context) : m_context(context)
{ {
if (!binding_context) { if (!context) {
func(); if (validate_schema_changes) {
// The handler functions are non-virtual, so the parent class's
// versions are called if we don't need to track changes to observed
// objects
func(static_cast<TransactLogValidator&>(*this));
}
else {
func();
}
return; return;
} }
m_observers = binding_context->get_observed_rows(); m_observers = context->get_observed_rows();
if (m_observers.empty()) { if (m_observers.empty()) {
auto old_version = sg.get_version_of_current_transaction(); auto old_version = sg.get_version_of_current_transaction();
func(); if (validate_schema_changes) {
func(static_cast<TransactLogValidator&>(*this));
}
else {
func();
}
if (old_version != sg.get_version_of_current_transaction()) { if (old_version != sg.get_version_of_current_transaction()) {
binding_context->did_change({}, {}); context->did_change({}, {});
} }
return; return;
} }
func(*this); func(*this);
binding_context->did_change(m_observers, invalidated); context->did_change(m_observers, invalidated);
} }
// Called at the end of the transaction log immediately before the version // Called at the end of the transaction log immediately before the version
// is advanced // is advanced
void parse_complete() void parse_complete()
{ {
m_binding_context->will_change(m_observers, invalidated); m_context->will_change(m_observers, invalidated);
} }
// These would require having an observer before schema init bool insert_group_level_table(size_t table_ndx, size_t prior_size, StringData name)
// Maybe do something here to throw an error when multiple processes have different schemas?
bool insert_group_level_table(size_t, size_t, StringData) { return false; }
bool erase_group_level_table(size_t, size_t) { return false; }
bool rename_group_level_table(size_t, StringData) { return false; }
bool insert_column(size_t, DataType, StringData, bool) { return false; }
bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return false; }
bool erase_column(size_t) { return false; }
bool erase_link_column(size_t, size_t, size_t) { return false; }
bool rename_column(size_t, StringData) { return false; }
bool add_search_index(size_t) { return false; }
bool remove_search_index(size_t) { return false; }
bool add_primary_key(size_t) { return false; }
bool remove_primary_key() { return false; }
bool set_link_type(size_t, LinkType) { return false; }
bool select_table(size_t group_level_ndx, int, const size_t*) noexcept
{ {
m_current_table = group_level_ndx; for (auto& observer : m_observers) {
if (observer.table_ndx >= table_ndx)
++observer.table_ndx;
}
TransactLogValidator::insert_group_level_table(table_ndx, prior_size, name);
return true; return true;
} }
@ -143,7 +260,7 @@ public:
{ {
for (size_t i = 0; i < m_observers.size(); ++i) { for (size_t i = 0; i < m_observers.size(); ++i) {
auto& o = m_observers[i]; auto& o = m_observers[i];
if (o.table_ndx == m_current_table) { if (o.table_ndx == current_table()) {
if (o.row_ndx == row_ndx) { if (o.row_ndx == row_ndx) {
invalidate(&o); invalidate(&o);
--i; --i;
@ -163,7 +280,7 @@ public:
{ {
for (size_t i = 0; i < m_observers.size(); ) { for (size_t i = 0; i < m_observers.size(); ) {
auto& o = m_observers[i]; auto& o = m_observers[i];
if (o.table_ndx == m_current_table) { if (o.table_ndx == current_table()) {
invalidate(&o); invalidate(&o);
} }
else { else {
@ -177,7 +294,7 @@ public:
{ {
m_active_linklist = nullptr; m_active_linklist = nullptr;
for (auto& o : m_observers) { for (auto& o : m_observers) {
if (o.table_ndx == m_current_table && o.row_ndx == row) { if (o.table_ndx == current_table() && o.row_ndx == row) {
m_active_linklist = &get_change(o, col); m_active_linklist = &get_change(o, col);
break; break;
} }
@ -306,49 +423,43 @@ public:
bool nullify_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); } bool nullify_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); }
bool set_int_unique(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); } bool set_int_unique(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); }
bool set_string_unique(size_t col, size_t row, StringData) { return mark_dirty(row, col); } bool set_string_unique(size_t col, size_t row, StringData) { return mark_dirty(row, col); }
bool insert_substring(size_t col, size_t row, size_t, StringData) { return mark_dirty(row, col); }
// Doesn't change any data bool erase_substring(size_t col, size_t row, size_t, size_t) { return mark_dirty(row, col); }
bool optimize_table() { return true; }
// Used for subtables, which we currently don't support
bool select_descriptor(int, const size_t*) { return false; }
// Not implemented
bool insert_substring(size_t, size_t, size_t, StringData) { return false; }
bool erase_substring(size_t, size_t, size_t, size_t) { return false; }
bool swap_rows(size_t, size_t) { return false; }
bool move_column(size_t, size_t) { return false; }
bool move_group_level_table(size_t, size_t) { return false; }
}; };
} // anonymous namespace } // anonymous namespace
namespace realm { namespace realm {
namespace _impl { namespace _impl {
namespace transaction { namespace transaction {
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) { void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context)
TransactLogHandler(binding_context, sg, [&](auto&&... args) { {
TransactLogObserver(context, sg, [&](auto&&... args) {
LangBindHelper::advance_read(sg, history, std::move(args)...); LangBindHelper::advance_read(sg, history, std::move(args)...);
}); }, true);
} }
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) { void begin(SharedGroup& sg, ClientHistory& history, BindingContext* context,
TransactLogHandler(binding_context, sg, [&](auto&&... args) { bool validate_schema_changes)
{
TransactLogObserver(context, sg, [&](auto&&... args) {
LangBindHelper::promote_to_write(sg, history, std::move(args)...); LangBindHelper::promote_to_write(sg, history, std::move(args)...);
}); }, validate_schema_changes);
} }
void commit(SharedGroup& sg, ClientHistory&, BindingContext* binding_context) { void commit(SharedGroup& sg, ClientHistory&, BindingContext* context)
{
LangBindHelper::commit_and_continue_as_read(sg); LangBindHelper::commit_and_continue_as_read(sg);
if (binding_context) { if (context) {
binding_context->did_change({}, {}); context->did_change({}, {});
} }
} }
void cancel(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) { void cancel(SharedGroup& sg, ClientHistory& history, BindingContext* context)
TransactLogHandler(binding_context, sg, [&](auto&&... args) { {
TransactLogObserver(context, sg, [&](auto&&... args) {
LangBindHelper::rollback_and_continue_as_read(sg, history, std::move(args)...); LangBindHelper::rollback_and_continue_as_read(sg, history, std::move(args)...);
}); }, false);
} }
} // namespace transaction } // namespace transaction

View File

@ -33,7 +33,8 @@ void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_co
// Begin a write transaction // Begin a write transaction
// If the read transaction version is not up to date, will first advance to the // If the read transaction version is not up to date, will first advance to the
// most recent read transaction and sent notifications to delegate // most recent read transaction and sent notifications to delegate
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context); void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context,
bool validate_schema_changes=true);
// Commit a write transaction // Commit a write transaction
void commit(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context); void commit(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context);

View File

@ -194,7 +194,11 @@ void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
return; return;
} }
begin_transaction(); read_group();
transaction::begin(*m_shared_group, *m_history, m_binding_context.get(),
/* error on schema changes */ false);
m_in_transaction = true;
struct WriteTransactionGuard { struct WriteTransactionGuard {
Realm& realm; Realm& realm;
~WriteTransactionGuard() { ~WriteTransactionGuard() {