merge latest object store

This commit is contained in:
Ari Lazier 2016-03-03 14:15:54 -08:00
commit 7802a9e976
58 changed files with 3130 additions and 682 deletions

13
.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# CMake
.ninja_deps
.ninja_log
CMakeCache.txt
CMakeFiles/
Makefile
build.ninja
cmake_install.cmake
rules.ninja
# Build products
src/librealm-object-store.*
tests/tests

6
.gitmodules vendored Normal file
View File

@ -0,0 +1,6 @@
[submodule "external/pegtl"]
path = external/pegtl
url = https://github.com/ColinH/PEGTL
[submodule "external/catch"]
path = external/catch
url = https://github.com/philsquared/Catch

15
CMake/CompilerFlags.cmake Normal file
View File

@ -0,0 +1,15 @@
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED on)
set(CMAKE_CXX_EXTENSIONS off)
add_compile_options(-Wall -DREALM_HAVE_CONFIG)
add_compile_options("$<$<CONFIG:DEBUG>:-DREALM_DEBUG>")
if(${CMAKE_GENERATOR} STREQUAL "Ninja")
if(${CMAKE_CXX_COMPILER_ID} STREQUAL "Clang")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fcolor-diagnostics")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcolor-diagnostics")
elseif(${CMAKE_CXX_COMPILER_ID} STREQUAL "GNU")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-color=always")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
endif()
endif()

118
CMake/RealmCore.cmake Normal file
View File

@ -0,0 +1,118 @@
include(ExternalProject)
if(${CMAKE_GENERATOR} STREQUAL "Unix Makefiles")
set(MAKE_EQUAL_MAKE "MAKE=$(MAKE)")
endif()
if (${CMAKE_VERSION} VERSION_GREATER "3.4.0")
set(USES_TERMINAL_BUILD USES_TERMINAL_BUILD 1)
endif()
function(use_realm_core version_or_path_to_source)
if("${version_or_path_to_source}" MATCHES "^[0-9]+(\\.[0-9])+")
if(APPLE)
download_realm_core(${version_or_path_to_source})
else()
clone_and_build_realm_core("v${version_or_path_to_source}")
endif()
else()
build_existing_realm_core(${version_or_path_to_source})
endif()
set(REALM_CORE_INCLUDE_DIR ${REALM_CORE_INCLUDE_DIR} PARENT_SCOPE)
endfunction()
function(download_realm_core core_version)
set(core_url "https://static.realm.io/downloads/core/realm-core-${core_version}.tar.bz2")
set(core_tarball_name "realm-core-${core_version}.tar.bz2")
set(core_temp_tarball "/tmp/${core_tarball_name}")
set(core_directory_parent "${CMAKE_CURRENT_SOURCE_DIR}${CMAKE_FILES_DIRECTORY}")
set(core_directory "${core_directory_parent}/realm-core-${core_version}")
set(core_tarball "${core_directory_parent}/${core_tarball_name}")
if (NOT EXISTS ${core_tarball})
if (NOT EXISTS ${core_temp_tarball})
message("Downloading core ${core_version} from ${core_url}.")
file(DOWNLOAD ${core_url} ${core_temp_tarball}.tmp SHOW_PROGRESS)
file(RENAME ${core_temp_tarball}.tmp ${core_temp_tarball})
endif()
file(COPY ${core_temp_tarball} DESTINATION ${core_directory_parent})
endif()
set(core_library_debug ${core_directory}/librealm-dbg.a)
set(core_library_release ${core_directory}/librealm.a)
set(core_libraries ${core_library_debug} ${core_library_release})
add_custom_command(
COMMENT "Extracting ${core_tarball_name}"
OUTPUT ${core_libraries}
DEPENDS ${core_tarball}
COMMAND ${CMAKE_COMMAND} -E tar xf ${core_tarball}
COMMAND ${CMAKE_COMMAND} -E remove_directory ${core_directory}
COMMAND ${CMAKE_COMMAND} -E rename core ${core_directory}
COMMAND ${CMAKE_COMMAND} -E touch_nocreate ${core_libraries})
add_custom_target(realm-core DEPENDS ${core_libraries})
add_library(realm STATIC IMPORTED)
add_dependencies(realm realm-core)
set_property(TARGET realm PROPERTY IMPORTED_LOCATION_DEBUG ${core_library_debug})
set_property(TARGET realm PROPERTY IMPORTED_LOCATION_RELEASE ${core_library_release})
set_property(TARGET realm PROPERTY IMPORTED_LOCATION ${core_library_release})
set(REALM_CORE_INCLUDE_DIR ${core_directory}/include PARENT_SCOPE)
endfunction()
macro(define_built_realm_core_target core_directory)
set(core_library_debug ${core_directory}/src/realm/librealm-dbg${CMAKE_SHARED_LIBRARY_SUFFIX})
set(core_library_release ${core_directory}/src/realm/librealm${CMAKE_SHARED_LIBRARY_SUFFIX})
set(core_libraries ${core_library_debug} ${core_library_release})
ExternalProject_Add_Step(realm-core ensure-libraries
COMMAND ${CMAKE_COMMAND} -E touch_nocreate ${core_libraries}
OUTPUT ${core_libraries}
DEPENDEES build
)
add_library(realm SHARED IMPORTED)
add_dependencies(realm realm-core)
set_property(TARGET realm PROPERTY IMPORTED_LOCATION_DEBUG ${core_library_debug})
set_property(TARGET realm PROPERTY IMPORTED_LOCATION_RELEASE ${core_library_release})
set_property(TARGET realm PROPERTY IMPORTED_LOCATION ${core_library_release})
set(REALM_CORE_INCLUDE_DIR ${core_directory}/src PARENT_SCOPE)
endmacro()
function(clone_and_build_realm_core branch)
set(core_prefix_directory "${CMAKE_CURRENT_SOURCE_DIR}${CMAKE_FILES_DIRECTORY}/realm-core")
ExternalProject_Add(realm-core
GIT_REPOSITORY "git@github.com:realm/realm-core.git"
GIT_TAG ${branch}
PREFIX ${core_prefix_directory}
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ""
BUILD_COMMAND ${MAKE_EQUAL_MAKE} sh build.sh build
INSTALL_COMMAND ""
${USES_TERMINAL_BUILD}
)
ExternalProject_Get_Property(realm-core SOURCE_DIR)
define_built_realm_core_target(${SOURCE_DIR})
endfunction()
function(build_existing_realm_core core_directory)
get_filename_component(core_directory ${core_directory} ABSOLUTE)
ExternalProject_Add(realm-core
URL ""
PREFIX ${CMAKE_CURRENT_SOURCE_DIR}${CMAKE_FILES_DIRECTORY}/realm-core
SOURCE_DIR ${core_directory}
BUILD_IN_SOURCE 1
BUILD_ALWAYS 1
CONFIGURE_COMMAND ""
BUILD_COMMAND ${MAKE_EQUAL_MAKE} sh build.sh build
INSTALL_COMMAND ""
${USES_TERMINAL_BUILD}
)
define_built_realm_core_target(${core_directory})
endfunction()

16
CMakeLists.txt Normal file
View File

@ -0,0 +1,16 @@
set(CMAKE_BUILD_TYPE Debug CACHE STRING "")
project(realm-object-store)
cmake_minimum_required(VERSION 3.2.0)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake")
include(CompilerFlags)
include(RealmCore)
set(REALM_CORE_VERSION "0.96.2" CACHE STRING "")
use_realm_core(${REALM_CORE_VERSION})
include_directories(${REALM_CORE_INCLUDE_DIR} src external/pegtl)
add_subdirectory(src)
add_subdirectory(tests)

View File

@ -1,6 +1,6 @@
# Realm Object Store
Cross-platform code used accross bindings. Binding developers can choose to use some or all the included functionality
Cross-platform code used accross bindings. Binding developers can choose to use some or all the included functionality:
- `object_store`/`schema`/`object_schema`/`property` - contains the structures and logic used to setup and modify realm files and their schema.
- `shared_realm` - wraps the object_store apis to provide transactions, notifications, realm caching, migrations, and other higher level functionality.
- `object_accessor`/`results`/`list` - accessor classes, object creation/update pipeline, and helpers for creating platform specific property getters and setters.
@ -8,8 +8,36 @@ Cross-platform code used accross bindings. Binding developers can choose to use
## Building
TBD
The object store's build system currently only suports building for OS X. The object store itself can build for all Apple
platforms when integrated into a binding.
1. Install CMake. You can download an installer for OS X from the [CMake download page], or install via [Homebrew](http://brew.sh):
```
brew install cmake
```
2. Generate build files:
```
cmake .
```
3. Build:
```
make
```
If you wish to build against a local version of core you can invoke `cmake` like so:
```
cmake -DREALM_CORE_VERSION=/path/to/realm-core
```
The given core tree will be built as part of the object store build.
## Testing
TBD
```
make run-tests
```

1
external/catch vendored Submodule

@ -0,0 +1 @@
Subproject commit f294c9847272b1b92c5119a6f711e57113b5f231

1
external/pegtl vendored Submodule

@ -0,0 +1 @@
Subproject commit 49a5b0a49e154b362ef9cf1e756dd8673ddd4efe

View File

@ -1,86 +0,0 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "list.hpp"
#include <stdexcept>
using namespace realm;
size_t List::size() {
verify_attached();
return m_link_view->size();
}
Row List::get(std::size_t row_ndx) {
verify_attached();
verify_valid_row(row_ndx);
return m_link_view->get(row_ndx);
}
void List::set(std::size_t row_ndx, std::size_t target_row_ndx) {
verify_attached();
verify_in_tranaction();
verify_valid_row(row_ndx);
m_link_view->set(row_ndx, target_row_ndx);
}
void List::add(std::size_t target_row_ndx) {
verify_attached();
verify_in_tranaction();
m_link_view->add(target_row_ndx);
}
void List::insert(std::size_t row_ndx, std::size_t target_row_ndx) {
verify_attached();
verify_in_tranaction();
verify_valid_row(row_ndx, true);
m_link_view->insert(row_ndx, target_row_ndx);
}
void List::remove(std::size_t row_ndx) {
verify_attached();
verify_in_tranaction();
verify_valid_row(row_ndx);
m_link_view->remove(row_ndx);
}
Query List::get_query() {
verify_attached();
return m_link_view->get_target_table().where(m_link_view);
}
void List::verify_valid_row(std::size_t row_ndx, bool insertion) {
size_t size = m_link_view->size();
if (row_ndx > size || (!insertion && row_ndx == size)) {
throw std::out_of_range(std::string("Index ") + to_string(row_ndx) + " is outside of range 0..." + to_string(size) + ".");
}
}
void List::verify_attached() {
if (!m_link_view->is_attached()) {
throw std::runtime_error("Tableview is not attached");
}
m_link_view->sync_if_needed();
}
void List::verify_in_tranaction() {
if (!m_realm->is_in_transaction()) {
throw std::runtime_error("Can only mutate a list within a transaction.");
}
}

View File

@ -1,63 +0,0 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_LIST_HPP
#define REALM_LIST_HPP
#include "shared_realm.hpp"
#include <realm/link_view.hpp>
namespace realm {
class List {
public:
List(SharedRealm &r, const ObjectSchema &s, LinkViewRef l) : m_realm(r), m_object_schema(&s), m_link_view(l) {}
const ObjectSchema &get_object_schema() const { return *m_object_schema; }
SharedRealm realm() { return m_realm; }
size_t size();
Row get(std::size_t row_ndx);
void set(std::size_t row_ndx, std::size_t target_row_ndx);
void add(size_t target_row_ndx);
void remove(size_t list_ndx);
void insert(size_t list_ndx, size_t target_row_ndx);
template<typename ValueType, typename ContextType>
void add(ContextType ctx, ValueType value);
template<typename ValueType, typename ContextType>
void insert(ContextType ctx, ValueType value, size_t list_ndx);
template<typename ValueType, typename ContextType>
void set(ContextType ctx, ValueType value, size_t list_ndx);
Query get_query();
void verify_valid_row(std::size_t row_ndx, bool insertion = false);
void verify_attached();
void verify_in_tranaction();
private:
SharedRealm m_realm;
const ObjectSchema *m_object_schema;
LinkViewRef m_link_view;
};
}
#endif /* REALM_LIST_HPP */

48
src/CMakeLists.txt Normal file
View File

@ -0,0 +1,48 @@
set(SOURCES
index_set.cpp
list.cpp
object_schema.cpp
object_store.cpp
results.cpp
schema.cpp
shared_realm.cpp
impl/async_query.cpp
impl/realm_coordinator.cpp
impl/transact_log_handler.cpp
parser/parser.cpp
parser/query_builder.cpp)
set(HEADERS
index_set.hpp
list.hpp
object_schema.hpp
object_store.hpp
results.hpp
schema.hpp
shared_realm.hpp
impl/weak_realm_notifier.hpp
impl/weak_realm_notifier_base.hpp
impl/external_commit_helper.hpp
impl/transact_log_handler.hpp
parser/parser.hpp
parser/query_builder.hpp
util/atomic_shared_ptr.hpp)
if(APPLE)
list(APPEND SOURCES
impl/apple/weak_realm_notifier.cpp
impl/apple/external_commit_helper.cpp)
list(APPEND HEADERS
impl/apple/weak_realm_notifier.hpp
impl/apple/external_commit_helper.hpp)
find_library(CF_LIBRARY CoreFoundation)
else()
list(APPEND SOURCES
impl/generic/external_commit_helper.cpp)
list(APPEND HEADERS
impl/generic/weak_realm_notifier.hpp
impl/generic/external_commit_helper.hpp)
endif()
add_library(realm-object-store SHARED ${SOURCES} ${HEADERS})
target_link_libraries(realm-object-store realm ${CF_LIBRARY})

View File

@ -70,6 +70,10 @@ class BindingContext {
public:
virtual ~BindingContext() = default;
// If the user adds a notification handler to the Realm, will it ever
// actually be called?
virtual bool can_deliver_notifications() const noexcept { return true; }
// Called by the Realm when a write transaction is committed to the file by
// a different Realm instance (possibly in a different process)
virtual void changes_available() { }

View File

@ -16,18 +16,19 @@
//
////////////////////////////////////////////////////////////////////////////
#include "external_commit_helper.hpp"
#include "impl/external_commit_helper.hpp"
#include "shared_realm.hpp"
#include "impl/realm_coordinator.hpp"
#include <asl.h>
#include <assert.h>
#include <fcntl.h>
#include <sstream>
#include <sys/event.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <system_error>
#include <fcntl.h>
#include <unistd.h>
#include <sstream>
using namespace realm;
using namespace realm::_impl;
@ -56,11 +57,10 @@ void notify_fd(int fd)
void ExternalCommitHelper::FdHolder::close()
{
if (m_fd != -1) {
::close(m_fd);
}
m_fd = -1;
if (m_fd != -1) {
::close(m_fd);
}
m_fd = -1;
}
// Inter-thread and inter-process notifications of changes are done using a
@ -86,16 +86,15 @@ void ExternalCommitHelper::FdHolder::close()
// signal the runloop source and wake up the target runloop, and when data is
// written to the anonymous pipe the background thread removes the runloop
// source from the runloop and and shuts down.
ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
: m_parent(parent)
{
add_realm(realm);
m_kq = kqueue();
if (m_kq == -1) {
throw std::system_error(errno, std::system_category());
}
auto path = realm->config().path + ".note";
auto path = parent.get_path() + ".note";
// Create and open the named pipe
int ret = mkfifo(path.c_str(), 0600);
@ -140,69 +139,33 @@ ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
m_shutdown_read_fd = pipeFd[0];
m_shutdown_write_fd = pipeFd[1];
// Use the minimum allowed stack size, as we need very little in our listener
// https://developer.apple.com/library/ios/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html#//apple_ref/doc/uid/10000057i-CH15-SW7
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 16 * 1024);
auto fn = [](void *self) -> void * {
static_cast<ExternalCommitHelper *>(self)->listen();
return nullptr;
};
ret = pthread_create(&m_thread, &attr, fn, this);
pthread_attr_destroy(&attr);
if (ret != 0) {
throw std::system_error(errno, std::system_category());
}
m_thread = std::async(std::launch::async, [=] {
try {
listen();
}
catch (std::exception const& e) {
fprintf(stderr, "uncaught exception in notifier thread: %s: %s\n", typeid(e).name(), e.what());
asl_log(nullptr, nullptr, ASL_LEVEL_ERR, "uncaught exception in notifier thread: %s: %s", typeid(e).name(), e.what());
throw;
}
catch (...) {
fprintf(stderr, "uncaught exception in notifier thread\n");
asl_log(nullptr, nullptr, ASL_LEVEL_ERR, "uncaught exception in notifier thread");
throw;
}
});
}
ExternalCommitHelper::~ExternalCommitHelper()
{
REALM_ASSERT_DEBUG(m_realms.empty());
notify_fd(m_shutdown_write_fd);
pthread_join(m_thread, nullptr); // Wait for the thread to exit
}
void ExternalCommitHelper::add_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
// Create the runloop source
CFRunLoopSourceContext ctx{};
ctx.info = realm;
ctx.perform = [](void* info) {
static_cast<Realm*>(info)->notify();
};
CFRunLoopRef runloop = CFRunLoopGetCurrent();
CFRetain(runloop);
CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(runloop, signal, kCFRunLoopDefaultMode);
m_realms.push_back({realm, runloop, signal});
}
void ExternalCommitHelper::remove_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto it = m_realms.begin(); it != m_realms.end(); ++it) {
if (it->realm == realm) {
CFRunLoopSourceInvalidate(it->signal);
CFRelease(it->signal);
CFRelease(it->runloop);
m_realms.erase(it);
return;
}
}
REALM_TERMINATE("Realm not registered");
m_thread.wait(); // Wait for the thread to exit
}
void ExternalCommitHelper::listen()
{
pthread_setname_np("RLMRealm notification listener");
// Set up the kqueue
// EVFILT_READ indicates that we care about data being available to read
// on the given file descriptor.
@ -233,14 +196,7 @@ void ExternalCommitHelper::listen()
}
assert(event.ident == (uint32_t)m_notify_fd);
std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto const& realm : m_realms) {
CFRunLoopSourceSignal(realm.signal);
// Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle
// waiting for events
CFRunLoopWakeUp(realm.runloop);
}
m_parent.on_change();
}
}
@ -248,4 +204,3 @@ void ExternalCommitHelper::notify_others()
{
notify_fd(m_notify_fd);
}

View File

@ -16,25 +16,20 @@
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <CoreFoundation/CFRunLoop.h>
#include <mutex>
#include <vector>
#include <future>
namespace realm {
class Realm;
namespace _impl {
class RealmCoordinator;
class ExternalCommitHelper {
public:
ExternalCommitHelper(Realm* realm);
ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper();
void notify_others();
void add_realm(Realm* realm);
void remove_realm(Realm* realm);
private:
// A RAII holder for a file descriptor which automatically closes the wrapped
@ -59,23 +54,12 @@ private:
FdHolder(FdHolder const&) = delete;
};
struct PerRealmInfo {
Realm* realm;
CFRunLoopRef runloop;
CFRunLoopSourceRef signal;
};
void listen();
// Currently registered realms and the signal for delivering notifications
// to them
std::vector<PerRealmInfo> m_realms;
// Mutex which guards m_realms
std::mutex m_realms_mutex;
RealmCoordinator& m_parent;
// The listener thread
pthread_t m_thread;
std::future<void> m_thread;
// Read-write file descriptor for the named pipe which is waited on for
// changes and written to when a commit is made
@ -90,5 +74,3 @@ private:
} // namespace _impl
} // namespace realm
#endif /* REALM_EXTERNAL_COMMIT_HELPER_HPP */

View File

@ -0,0 +1,96 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/weak_realm_notifier.hpp"
#include "shared_realm.hpp"
#include <atomic>
using namespace realm;
using namespace realm::_impl;
WeakRealmNotifier::WeakRealmNotifier(const std::shared_ptr<Realm>& realm, bool cache)
: WeakRealmNotifierBase(realm, cache)
{
struct RefCountedWeakPointer {
std::weak_ptr<Realm> realm;
std::atomic<size_t> ref_count;
};
CFRunLoopSourceContext ctx{};
ctx.info = new RefCountedWeakPointer{realm, {1}};
ctx.perform = [](void* info) {
if (auto realm = static_cast<RefCountedWeakPointer*>(info)->realm.lock()) {
realm->notify();
}
};
ctx.retain = [](const void* info) {
static_cast<RefCountedWeakPointer*>(const_cast<void*>(info))->ref_count.fetch_add(1, std::memory_order_relaxed);
return info;
};
ctx.release = [](const void* info) {
auto ptr = static_cast<RefCountedWeakPointer*>(const_cast<void*>(info));
if (ptr->ref_count.fetch_add(-1, std::memory_order_acq_rel) == 1) {
delete ptr;
}
};
m_runloop = CFRunLoopGetCurrent();
CFRetain(m_runloop);
m_signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(m_runloop, m_signal, kCFRunLoopDefaultMode);
}
WeakRealmNotifier::WeakRealmNotifier(WeakRealmNotifier&& rgt)
: WeakRealmNotifierBase(std::move(rgt))
, m_runloop(rgt.m_runloop)
, m_signal(rgt.m_signal)
{
rgt.m_runloop = nullptr;
rgt.m_signal = nullptr;
}
WeakRealmNotifier& WeakRealmNotifier::operator=(WeakRealmNotifier&& rgt)
{
WeakRealmNotifierBase::operator=(std::move(rgt));
m_runloop = rgt.m_runloop;
m_signal = rgt.m_signal;
rgt.m_runloop = nullptr;
rgt.m_signal = nullptr;
return *this;
}
WeakRealmNotifier::~WeakRealmNotifier()
{
if (m_signal) {
CFRunLoopSourceInvalidate(m_signal);
CFRelease(m_signal);
CFRelease(m_runloop);
}
}
void WeakRealmNotifier::notify()
{
CFRunLoopSourceSignal(m_signal);
// Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle
// waiting for events
CFRunLoopWakeUp(m_runloop);
}

View File

@ -0,0 +1,48 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/weak_realm_notifier_base.hpp"
#include <CoreFoundation/CFRunLoop.h>
namespace realm {
class Realm;
namespace _impl {
class WeakRealmNotifier : public WeakRealmNotifierBase {
public:
WeakRealmNotifier(const std::shared_ptr<Realm>& realm, bool cache);
~WeakRealmNotifier();
WeakRealmNotifier(WeakRealmNotifier&&);
WeakRealmNotifier& operator=(WeakRealmNotifier&&);
WeakRealmNotifier(const WeakRealmNotifier&) = delete;
WeakRealmNotifier& operator=(const WeakRealmNotifier&) = delete;
// Asynchronously call notify() on the Realm on the appropriate thread
void notify();
private:
CFRunLoopRef m_runloop;
CFRunLoopSourceRef m_signal;
};
} // namespace _impl
} // namespace realm

290
src/impl/async_query.cpp Normal file
View File

@ -0,0 +1,290 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/async_query.hpp"
#include "impl/realm_coordinator.hpp"
#include "results.hpp"
using namespace realm;
using namespace realm::_impl;
AsyncQuery::AsyncQuery(Results& target)
: m_target_results(&target)
, m_realm(target.get_realm())
, m_sort(target.get_sort())
, m_sg_version(Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction())
{
Query q = target.get_query();
m_query_handover = Realm::Internal::get_shared_group(*m_realm).export_for_handover(q, MutableSourcePayload::Move);
}
AsyncQuery::~AsyncQuery()
{
// unregister() may have been called from a different thread than we're being
// destroyed on, so we need to synchronize access to the interesting fields
// modified there
std::lock_guard<std::mutex> lock(m_target_mutex);
m_realm = nullptr;
}
size_t AsyncQuery::add_callback(std::function<void (std::exception_ptr)> callback)
{
m_realm->verify_thread();
auto next_token = [=] {
size_t token = 0;
for (auto& callback : m_callbacks) {
if (token <= callback.token) {
token = callback.token + 1;
}
}
return token;
};
std::lock_guard<std::mutex> lock(m_callback_mutex);
auto token = next_token();
m_callbacks.push_back({std::move(callback), token, -1ULL});
if (m_callback_index == npos) { // Don't need to wake up if we're already sending notifications
Realm::Internal::get_coordinator(*m_realm).send_commit_notifications();
m_have_callbacks = true;
}
return token;
}
void AsyncQuery::remove_callback(size_t token)
{
Callback old;
{
std::lock_guard<std::mutex> lock(m_callback_mutex);
REALM_ASSERT(m_error || m_callbacks.size() > 0);
auto it = find_if(begin(m_callbacks), end(m_callbacks),
[=](const auto& c) { return c.token == token; });
// We should only fail to find the callback if it was removed due to an error
REALM_ASSERT(m_error || it != end(m_callbacks));
if (it == end(m_callbacks)) {
return;
}
size_t idx = distance(begin(m_callbacks), it);
if (m_callback_index != npos && m_callback_index >= idx) {
--m_callback_index;
}
old = std::move(*it);
m_callbacks.erase(it);
m_have_callbacks = !m_callbacks.empty();
}
}
void AsyncQuery::unregister() noexcept
{
std::lock_guard<std::mutex> lock(m_target_mutex);
m_target_results = nullptr;
m_realm = nullptr;
}
void AsyncQuery::release_query() noexcept
{
{
std::lock_guard<std::mutex> lock(m_target_mutex);
REALM_ASSERT(!m_realm && !m_target_results);
}
m_query = nullptr;
}
bool AsyncQuery::is_alive() const noexcept
{
std::lock_guard<std::mutex> lock(m_target_mutex);
return m_target_results != nullptr;
}
// Most of the inter-thread synchronization for run(), prepare_handover(),
// attach_to(), detach(), release_query() and deliver() is done by
// RealmCoordinator external to this code, which has some potentially
// non-obvious results on which members are and are not safe to use without
// holding a lock.
//
// attach_to(), detach(), run(), prepare_handover(), and release_query() are
// all only ever called on a single thread. call_callbacks() and deliver() are
// called on the same thread. Calls to prepare_handover() and deliver() are
// guarded by a lock.
//
// In total, this means that the safe data flow is as follows:
// - prepare_handover(), attach_to(), detach() and release_query() can read
// members written by each other
// - deliver() can read members written to in prepare_handover(), deliver(),
// and call_callbacks()
// - call_callbacks() and read members written to in deliver()
//
// Separately from this data flow for the query results, all uses of
// m_target_results, m_callbacks, and m_callback_index must be done with the
// appropriate mutex held to avoid race conditions when the Results object is
// destroyed while the background work is running, and to allow removing
// callbacks from any thread.
void AsyncQuery::run()
{
REALM_ASSERT(m_sg);
{
std::lock_guard<std::mutex> target_lock(m_target_mutex);
// Don't run the query if the results aren't actually going to be used
if (!m_target_results || (!m_have_callbacks && !m_target_results->wants_background_updates())) {
return;
}
}
REALM_ASSERT(!m_tv.is_attached());
// If we've run previously, check if we need to rerun
if (m_initial_run_complete) {
// Make an empty tableview from the query to get the table version, since
// Query doesn't expose it
if (m_query->find_all(0, 0, 0).outside_version() == m_handed_over_table_version) {
return;
}
}
m_tv = m_query->find_all();
if (m_sort) {
m_tv.sort(m_sort.columnIndices, m_sort.ascending);
}
}
void AsyncQuery::prepare_handover()
{
m_sg_version = m_sg->get_version_of_current_transaction();
if (!m_tv.is_attached()) {
return;
}
REALM_ASSERT(m_tv.is_in_sync());
m_initial_run_complete = true;
m_handed_over_table_version = m_tv.outside_version();
m_tv_handover = m_sg->export_for_handover(m_tv, MutableSourcePayload::Move);
// detach the TableView as we won't need it again and keeping it around
// makes advance_read() much more expensive
m_tv = TableView();
}
bool AsyncQuery::deliver(SharedGroup& sg, std::exception_ptr err)
{
if (!is_for_current_thread()) {
return false;
}
std::lock_guard<std::mutex> target_lock(m_target_mutex);
// Target results being null here indicates that it was destroyed while we
// were in the process of advancing the Realm version and preparing for
// delivery, i.e. it was destroyed from the "wrong" thread
if (!m_target_results) {
return false;
}
// We can get called before the query has actually had the chance to run if
// we're added immediately before a different set of async results are
// delivered
if (!m_initial_run_complete && !err) {
return false;
}
if (err) {
m_error = err;
return m_have_callbacks;
}
REALM_ASSERT(!m_query_handover);
auto realm_sg_version = Realm::Internal::get_shared_group(*m_realm).get_version_of_current_transaction();
if (m_sg_version != realm_sg_version) {
// Realm version can be newer if a commit was made on our thread or the
// user manually called refresh(), or older if a commit was made on a
// different thread and we ran *really* fast in between the check for
// if the shared group has changed and when we pick up async results
return false;
}
if (m_tv_handover) {
m_tv_handover->version = m_sg_version;
Results::Internal::set_table_view(*m_target_results,
std::move(*sg.import_from_handover(std::move(m_tv_handover))));
m_delievered_table_version = m_handed_over_table_version;
}
REALM_ASSERT(!m_tv_handover);
return m_have_callbacks;
}
void AsyncQuery::call_callbacks()
{
REALM_ASSERT(is_for_current_thread());
while (auto fn = next_callback()) {
fn(m_error);
}
if (m_error) {
// Remove all the callbacks as we never need to call anything ever again
// after delivering an error
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
m_callbacks.clear();
}
}
std::function<void (std::exception_ptr)> AsyncQuery::next_callback()
{
std::lock_guard<std::mutex> callback_lock(m_callback_mutex);
for (++m_callback_index; m_callback_index < m_callbacks.size(); ++m_callback_index) {
auto& callback = m_callbacks[m_callback_index];
if (m_error || callback.delivered_version != m_delievered_table_version) {
callback.delivered_version = m_delievered_table_version;
return callback.fn;
}
}
m_callback_index = npos;
return nullptr;
}
void AsyncQuery::attach_to(realm::SharedGroup& sg)
{
REALM_ASSERT(!m_sg);
REALM_ASSERT(m_query_handover);
m_query = sg.import_from_handover(std::move(m_query_handover));
m_sg = &sg;
}
void AsyncQuery::detatch()
{
REALM_ASSERT(m_sg);
REALM_ASSERT(m_query);
REALM_ASSERT(!m_tv.is_attached());
m_query_handover = m_sg->export_for_handover(*m_query, MutableSourcePayload::Move);
m_sg = nullptr;
m_query = nullptr;
}

122
src/impl/async_query.hpp Normal file
View File

@ -0,0 +1,122 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_ASYNC_QUERY_HPP
#define REALM_ASYNC_QUERY_HPP
#include "results.hpp"
#include <realm/group_shared.hpp>
#include <exception>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
namespace realm {
namespace _impl {
class AsyncQuery {
public:
AsyncQuery(Results& target);
~AsyncQuery();
size_t add_callback(std::function<void (std::exception_ptr)>);
void remove_callback(size_t token);
void unregister() noexcept;
void release_query() noexcept;
// Run/rerun the query if needed
void run();
// Prepare the handover object if run() did update the TableView
void prepare_handover();
// Update the target results from the handover
// Returns if any callbacks need to be invoked
bool deliver(SharedGroup& sg, std::exception_ptr err);
void call_callbacks();
// Attach the handed-over query to `sg`
void attach_to(SharedGroup& sg);
// Create a new query handover object and stop using the previously attached
// SharedGroup
void detatch();
Realm& get_realm() { return *m_target_results->get_realm(); }
// Get the version of the current handover object
SharedGroup::VersionID version() const noexcept { return m_sg_version; }
bool is_alive() const noexcept;
private:
// Target Results to update and a mutex which guards it
mutable std::mutex m_target_mutex;
Results* m_target_results;
std::shared_ptr<Realm> m_realm;
const SortOrder m_sort;
const std::thread::id m_thread_id = std::this_thread::get_id();
// The source Query, in handover form iff m_sg is null
std::unique_ptr<SharedGroup::Handover<Query>> m_query_handover;
std::unique_ptr<Query> m_query;
// The TableView resulting from running the query. Will be detached unless
// the query was (re)run since the last time the handover object was created
TableView m_tv;
std::unique_ptr<SharedGroup::Handover<TableView>> m_tv_handover;
SharedGroup::VersionID m_sg_version;
std::exception_ptr m_error;
struct Callback {
std::function<void (std::exception_ptr)> fn;
size_t token;
uint_fast64_t delivered_version;
};
// Currently registered callbacks and a mutex which must always be held
// while doing anything with them or m_callback_index
std::mutex m_callback_mutex;
std::vector<Callback> m_callbacks;
SharedGroup* m_sg = nullptr;
uint_fast64_t m_handed_over_table_version = -1;
uint_fast64_t m_delievered_table_version = -1;
// Iteration variable for looping over callbacks
// remove_callback() updates this when needed
size_t m_callback_index = npos;
bool m_initial_run_complete = false;
// Cached value for if m_callbacks is empty, needed to avoid deadlocks in
// run() due to lock-order inversion between m_callback_mutex and m_target_mutex
// It's okay if this value is stale as at worst it'll result in us doing
// some extra work.
std::atomic<bool> m_have_callbacks = {false};
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
std::function<void (std::exception_ptr)> next_callback();
};
} // namespace _impl
} // namespace realm
#endif /* REALM_ASYNC_QUERY_HPP */

View File

@ -0,0 +1,30 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <realm/util/features.h>
#if REALM_PLATFORM_APPLE
#include "impl/apple/external_commit_helper.hpp"
#else
#include "impl/generic/external_commit_helper.hpp"
#endif
#endif // REALM_EXTERNAL_COMMIT_HELPER_HPP

View File

@ -0,0 +1,49 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/external_commit_helper.hpp"
#include "impl/realm_coordinator.hpp"
#include <realm/commit_log.hpp>
#include <realm/replication.hpp>
using namespace realm;
using namespace realm::_impl;
ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
: m_parent(parent)
, m_history(realm::make_client_history(parent.get_path(), parent.get_encryption_key().data()))
, m_sg(*m_history, parent.is_in_memory() ? SharedGroup::durability_MemOnly : SharedGroup::durability_Full,
parent.get_encryption_key().data())
, m_thread(std::async(std::launch::async, [=] {
m_sg.begin_read();
while (m_sg.wait_for_change()) {
m_sg.end_read();
m_sg.begin_read();
m_parent.on_change();
}
}))
{
}
ExternalCommitHelper::~ExternalCommitHelper()
{
m_sg.wait_for_change_release();
m_thread.wait(); // Wait for the thread to exit
}

View File

@ -0,0 +1,50 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include <realm/group_shared.hpp>
#include <future>
namespace realm {
class ClientHistory;
namespace _impl {
class RealmCoordinator;
class ExternalCommitHelper {
public:
ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper();
// A no-op in this version, but needed for the Apple version
void notify_others() { }
private:
RealmCoordinator& m_parent;
// A shared group used to listen for changes
std::unique_ptr<ClientHistory> m_history;
SharedGroup m_sg;
// The listener thread
std::future<void> m_thread;
};
} // namespace _impl
} // namespace realm

View File

@ -0,0 +1,36 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/weak_realm_notifier_base.hpp"
namespace realm {
class Realm;
namespace _impl {
class WeakRealmNotifier : public WeakRealmNotifierBase {
public:
using WeakRealmNotifierBase::WeakRealmNotifierBase;
// Do nothing, as this can't be implemented portably
void notify() { }
};
} // namespace _impl
} // namespace realm

View File

@ -0,0 +1,482 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/realm_coordinator.hpp"
#include "impl/async_query.hpp"
#include "impl/weak_realm_notifier.hpp"
#include "impl/external_commit_helper.hpp"
#include "impl/transact_log_handler.hpp"
#include "object_store.hpp"
#include "schema.hpp"
#include <realm/commit_log.hpp>
#include <realm/group_shared.hpp>
#include <realm/lang_bind_helper.hpp>
#include <realm/query.hpp>
#include <realm/table_view.hpp>
#include <cassert>
#include <unordered_map>
using namespace realm;
using namespace realm::_impl;
static std::mutex s_coordinator_mutex;
static std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>> s_coordinators_per_path;
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
auto& weak_coordinator = s_coordinators_per_path[path];
if (auto coordinator = weak_coordinator.lock()) {
return coordinator;
}
auto coordinator = std::make_shared<RealmCoordinator>();
weak_coordinator = coordinator;
return coordinator;
}
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
auto it = s_coordinators_per_path.find(path);
return it == s_coordinators_per_path.end() ? nullptr : it->second.lock();
}
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
if ((!m_config.read_only && !m_notifier) || (m_config.read_only && m_weak_realm_notifiers.empty())) {
m_config = config;
if (!config.read_only && !m_notifier && config.automatic_change_notifications) {
try {
m_notifier = std::make_unique<ExternalCommitHelper>(*this);
}
catch (std::system_error const& ex) {
throw RealmFileException(RealmFileException::Kind::AccessError, config.path, ex.code().message());
}
}
}
else {
if (m_config.read_only != config.read_only) {
throw MismatchedConfigException("Realm at path already opened with different read permissions.");
}
if (m_config.in_memory != config.in_memory) {
throw MismatchedConfigException("Realm at path already opened with different inMemory settings.");
}
if (m_config.encryption_key != config.encryption_key) {
throw MismatchedConfigException("Realm at path already opened with a different encryption key.");
}
if (m_config.schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) {
throw MismatchedConfigException("Realm at path already opened with different schema version.");
}
// FIXME: verify that schema is compatible
// Needs to verify that all tables present in both are identical, and
// then updated m_config with any tables present in config but not in
// it
// Public API currently doesn't make it possible to have non-matching
// schemata so it's not a huge issue
if ((false) && m_config.schema != config.schema) {
throw MismatchedConfigException("Realm at path already opened with different schema");
}
}
if (config.cache) {
for (auto& cachedRealm : m_weak_realm_notifiers) {
if (cachedRealm.is_cached_for_current_thread()) {
// can be null if we jumped in between ref count hitting zero and
// unregister_realm() getting the lock
if (auto realm = cachedRealm.realm()) {
return realm;
}
}
}
}
auto realm = std::make_shared<Realm>(std::move(config));
realm->init(shared_from_this());
m_weak_realm_notifiers.emplace_back(realm, m_config.cache);
return realm;
}
std::shared_ptr<Realm> RealmCoordinator::get_realm()
{
return get_realm(m_config);
}
const Schema* RealmCoordinator::get_schema() const noexcept
{
return m_weak_realm_notifiers.empty() ? nullptr : m_config.schema.get();
}
void RealmCoordinator::update_schema(Schema const& schema)
{
// FIXME: this should probably be doing some sort of validation and
// notifying all Realm instances of the new schema in some way
m_config.schema = std::make_unique<Schema>(schema);
}
RealmCoordinator::RealmCoordinator() = default;
RealmCoordinator::~RealmCoordinator()
{
std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end(); ) {
if (it->second.expired()) {
it = s_coordinators_per_path.erase(it);
}
else {
++it;
}
}
}
void RealmCoordinator::unregister_realm(Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realm_mutex);
for (size_t i = 0; i < m_weak_realm_notifiers.size(); ++i) {
auto& weak_realm_notifier = m_weak_realm_notifiers[i];
if (!weak_realm_notifier.expired() && !weak_realm_notifier.is_for_realm(realm)) {
continue;
}
if (i + 1 < m_weak_realm_notifiers.size()) {
weak_realm_notifier = std::move(m_weak_realm_notifiers.back());
}
m_weak_realm_notifiers.pop_back();
}
}
void RealmCoordinator::clear_cache()
{
std::vector<WeakRealm> realms_to_close;
{
std::lock_guard<std::mutex> lock(s_coordinator_mutex);
for (auto& weak_coordinator : s_coordinators_per_path) {
auto coordinator = weak_coordinator.second.lock();
if (!coordinator) {
continue;
}
coordinator->m_notifier = nullptr;
// Gather a list of all of the realms which will be removed
for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
if (auto realm = weak_realm_notifier.realm()) {
realms_to_close.push_back(realm);
}
}
}
s_coordinators_per_path.clear();
}
// Close all of the previously cached Realms. This can't be done while
// s_coordinator_mutex is held as it may try to re-lock it.
for (auto& weak_realm : realms_to_close) {
if (auto realm = weak_realm.lock()) {
realm->close();
}
}
}
void RealmCoordinator::send_commit_notifications()
{
REALM_ASSERT(!m_config.read_only);
if (m_notifier) {
m_notifier->notify_others();
}
}
void RealmCoordinator::pin_version(uint_fast64_t version, uint_fast32_t index)
{
if (m_async_error) {
return;
}
SharedGroup::VersionID versionid(version, index);
if (!m_advancer_sg) {
try {
std::unique_ptr<Group> read_only_group;
Realm::open_with_config(m_config, m_advancer_history, m_advancer_sg, read_only_group);
REALM_ASSERT(!read_only_group);
m_advancer_sg->begin_read(versionid);
}
catch (...) {
m_async_error = std::current_exception();
m_advancer_sg = nullptr;
m_advancer_history = nullptr;
}
}
else if (m_new_queries.empty()) {
// If this is the first query then we don't already have a read transaction
m_advancer_sg->begin_read(versionid);
}
else if (versionid < m_advancer_sg->get_version_of_current_transaction()) {
// Ensure we're holding a readlock on the oldest version we have a
// handover object for, as handover objects don't
m_advancer_sg->end_read();
m_advancer_sg->begin_read(versionid);
}
}
void RealmCoordinator::register_query(std::shared_ptr<AsyncQuery> query)
{
auto version = query->version();
auto& self = Realm::Internal::get_coordinator(query->get_realm());
{
std::lock_guard<std::mutex> lock(self.m_query_mutex);
self.pin_version(version.version, version.index);
self.m_new_queries.push_back(std::move(query));
}
}
void RealmCoordinator::clean_up_dead_queries()
{
auto swap_remove = [&](auto& container) {
bool did_remove = false;
for (size_t i = 0; i < container.size(); ++i) {
if (container[i]->is_alive())
continue;
// Ensure the query is destroyed here even if there's lingering refs
// to the async query elsewhere
container[i]->release_query();
if (container.size() > i + 1)
container[i] = std::move(container.back());
container.pop_back();
--i;
did_remove = true;
}
return did_remove;
};
if (swap_remove(m_queries)) {
// Make sure we aren't holding on to read versions needlessly if there
// are no queries left, but don't close them entirely as opening shared
// groups is expensive
if (m_queries.empty() && m_query_sg) {
m_query_sg->end_read();
}
}
if (swap_remove(m_new_queries)) {
if (m_new_queries.empty() && m_advancer_sg) {
m_advancer_sg->end_read();
}
}
}
void RealmCoordinator::on_change()
{
run_async_queries();
std::lock_guard<std::mutex> lock(m_realm_mutex);
for (auto& realm : m_weak_realm_notifiers) {
realm.notify();
}
}
void RealmCoordinator::run_async_queries()
{
std::unique_lock<std::mutex> lock(m_query_mutex);
clean_up_dead_queries();
if (m_queries.empty() && m_new_queries.empty()) {
return;
}
if (!m_async_error) {
open_helper_shared_group();
}
if (m_async_error) {
move_new_queries_to_main();
return;
}
advance_helper_shared_group_to_latest();
// Make a copy of the queries vector so that we can release the lock while
// we run the queries
auto queries_to_run = m_queries;
lock.unlock();
for (auto& query : queries_to_run) {
query->run();
}
// Reacquire the lock while updating the fields that are actually read on
// other threads
{
lock.lock();
for (auto& query : queries_to_run) {
query->prepare_handover();
}
}
clean_up_dead_queries();
}
void RealmCoordinator::open_helper_shared_group()
{
if (!m_query_sg) {
try {
std::unique_ptr<Group> read_only_group;
Realm::open_with_config(m_config, m_query_history, m_query_sg, read_only_group);
REALM_ASSERT(!read_only_group);
m_query_sg->begin_read();
}
catch (...) {
// Store the error to be passed to the async queries
m_async_error = std::current_exception();
m_query_sg = nullptr;
m_query_history = nullptr;
}
}
else if (m_queries.empty()) {
m_query_sg->begin_read();
}
}
void RealmCoordinator::move_new_queries_to_main()
{
m_queries.reserve(m_queries.size() + m_new_queries.size());
std::move(m_new_queries.begin(), m_new_queries.end(), std::back_inserter(m_queries));
m_new_queries.clear();
}
void RealmCoordinator::advance_helper_shared_group_to_latest()
{
if (m_new_queries.empty()) {
LangBindHelper::advance_read(*m_query_sg, *m_query_history);
return;
}
// Sort newly added queries by their source version so that we can pull them
// all forward to the latest version in a single pass over the transaction log
std::sort(m_new_queries.begin(), m_new_queries.end(), [](auto const& lft, auto const& rgt) {
return lft->version() < rgt->version();
});
// Import all newly added queries to our helper SG
for (auto& query : m_new_queries) {
LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history, query->version());
query->attach_to(*m_advancer_sg);
}
// Advance both SGs to the newest version
LangBindHelper::advance_read(*m_advancer_sg, *m_advancer_history);
LangBindHelper::advance_read(*m_query_sg, *m_query_history,
m_advancer_sg->get_version_of_current_transaction());
// Transfer all new queries over to the main SG
for (auto& query : m_new_queries) {
query->detatch();
query->attach_to(*m_query_sg);
}
move_new_queries_to_main();
m_advancer_sg->end_read();
}
void RealmCoordinator::advance_to_ready(Realm& realm)
{
decltype(m_queries) queries;
auto& sg = Realm::Internal::get_shared_group(realm);
auto& history = Realm::Internal::get_history(realm);
auto get_query_version = [&] {
for (auto& query : m_queries) {
auto version = query->version();
if (version != SharedGroup::VersionID{}) {
return version;
}
}
return SharedGroup::VersionID{};
};
SharedGroup::VersionID version;
{
std::lock_guard<std::mutex> lock(m_query_mutex);
version = get_query_version();
}
// no async queries; just advance to latest
if (version.version == 0) {
transaction::advance(sg, history, realm.m_binding_context.get());
return;
}
// async results are out of date; ignore
if (version < sg.get_version_of_current_transaction()) {
return;
}
while (true) {
// Advance to the ready version without holding any locks because it
// may end up calling user code (in did_change() notifications)
transaction::advance(sg, history, realm.m_binding_context.get(), version);
// Reacquire the lock and recheck the query version, as the queries may
// have advanced to a later version while we didn't hold the lock. If
// so, we need to release the lock and re-advance
std::lock_guard<std::mutex> lock(m_query_mutex);
version = get_query_version();
if (version.version == 0)
return;
if (version != sg.get_version_of_current_transaction())
continue;
// Query version now matches the SG version, so we can deliver them
for (auto& query : m_queries) {
if (query->deliver(sg, m_async_error)) {
queries.push_back(query);
}
}
break;
}
for (auto& query : queries) {
query->call_callbacks();
}
}
void RealmCoordinator::process_available_async(Realm& realm)
{
auto& sg = Realm::Internal::get_shared_group(realm);
decltype(m_queries) queries;
{
std::lock_guard<std::mutex> lock(m_query_mutex);
for (auto& query : m_queries) {
if (query->deliver(sg, m_async_error)) {
queries.push_back(query);
}
}
}
for (auto& query : queries) {
query->call_callbacks();
}
}

View File

@ -0,0 +1,127 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_COORDINATOR_HPP
#define REALM_COORDINATOR_HPP
#include "shared_realm.hpp"
#include <realm/string_data.hpp>
namespace realm {
class AsyncQueryCallback;
class ClientHistory;
class Results;
class Schema;
class SharedGroup;
struct AsyncQueryCancelationToken;
namespace _impl {
class AsyncQuery;
class WeakRealmNotifier;
class ExternalCommitHelper;
// RealmCoordinator manages the weak cache of Realm instances and communication
// between per-thread Realm instances for a given file
class RealmCoordinator : public std::enable_shared_from_this<RealmCoordinator> {
public:
// Get the coordinator for the given path, creating it if neccesary
static std::shared_ptr<RealmCoordinator> get_coordinator(StringData path);
// Get the coordinator for the given path, or null if there is none
static std::shared_ptr<RealmCoordinator> get_existing_coordinator(StringData path);
// Get a thread-local shared Realm with the given configuration
// If the Realm is already open on another thread, validates that the given
// configuration is compatible with the existing one
std::shared_ptr<Realm> get_realm(Realm::Config config);
std::shared_ptr<Realm> get_realm();
const Schema* get_schema() const noexcept;
uint64_t get_schema_version() const noexcept { return m_config.schema_version; }
const std::string& get_path() const noexcept { return m_config.path; }
const std::vector<char>& get_encryption_key() const noexcept { return m_config.encryption_key; }
bool is_in_memory() const noexcept { return m_config.in_memory; }
// Asynchronously call notify() on every Realm instance for this coordinator's
// path, including those in other processes
void send_commit_notifications();
// Clear the weak Realm cache for all paths
// Should only be called in test code, as continuing to use the previously
// cached instances will have odd results
static void clear_cache();
// Explicit constructor/destructor needed for the unique_ptrs to forward-declared types
RealmCoordinator();
~RealmCoordinator();
// Called by Realm's destructor to ensure the cache is cleaned up promptly
// Do not call directly
void unregister_realm(Realm* realm);
// Called by m_notifier when there's a new commit to send notifications for
void on_change();
// Update the schema in the cached config
void update_schema(Schema const& new_schema);
static void register_query(std::shared_ptr<AsyncQuery> query);
// Advance the Realm to the most recent transaction version which all async
// work is complete for
void advance_to_ready(Realm& realm);
void process_available_async(Realm& realm);
private:
Realm::Config m_config;
std::mutex m_realm_mutex;
std::vector<WeakRealmNotifier> m_weak_realm_notifiers;
std::mutex m_query_mutex;
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_new_queries;
std::vector<std::shared_ptr<_impl::AsyncQuery>> m_queries;
// SharedGroup used for actually running async queries
// Will have a read transaction iff m_queries is non-empty
std::unique_ptr<ClientHistory> m_query_history;
std::unique_ptr<SharedGroup> m_query_sg;
// SharedGroup used to advance queries in m_new_queries to the main shared
// group's transaction version
// Will have a read transaction iff m_new_queries is non-empty
std::unique_ptr<ClientHistory> m_advancer_history;
std::unique_ptr<SharedGroup> m_advancer_sg;
std::exception_ptr m_async_error;
std::unique_ptr<_impl::ExternalCommitHelper> m_notifier;
// must be called with m_query_mutex locked
void pin_version(uint_fast64_t version, uint_fast32_t index);
void run_async_queries();
void open_helper_shared_group();
void move_new_queries_to_main();
void advance_helper_shared_group_to_latest();
void clean_up_dead_queries();
};
} // namespace _impl
} // namespace realm
#endif /* REALM_COORDINATOR_HPP */

View File

@ -16,16 +16,131 @@
//
////////////////////////////////////////////////////////////////////////////
#include "transact_log_handler.hpp"
#include "impl/transact_log_handler.hpp"
#include "../binding_context.hpp"
#include "binding_context.hpp"
#include <realm/commit_log.hpp>
#include <realm/group_shared.hpp>
#include <realm/lang_bind_helper.hpp>
namespace realm {
class TransactLogHandler {
using namespace realm;
namespace {
// A transaction log handler that just validates that all operations made are
// ones supported by the object store
class TransactLogValidator {
// Index of currently selected table
size_t m_current_table = 0;
// Tables which were created during the transaction being processed, which
// can have columns inserted without a schema version bump
std::vector<size_t> m_new_tables;
REALM_NORETURN
REALM_NOINLINE
void schema_error()
{
throw std::runtime_error("Schema mismatch detected: another process has modified the Realm file's schema in an incompatible way");
}
// Throw an exception if the currently modified table already existed before
// the current set of modifications
bool schema_error_unless_new_table()
{
if (std::find(begin(m_new_tables), end(m_new_tables), m_current_table) == end(m_new_tables)) {
schema_error();
}
return true;
}
protected:
size_t current_table() const noexcept { return m_current_table; }
public:
// Schema changes which don't involve a change in the schema version are
// allowed
bool add_search_index(size_t) { return true; }
bool remove_search_index(size_t) { return true; }
// Creating entirely new tables without a schema version bump is allowed, so
// we need to track if new columns are being added to a new table or an
// existing one
bool insert_group_level_table(size_t table_ndx, size_t, StringData)
{
// Shift any previously added tables after the new one
for (auto& table : m_new_tables) {
if (table >= table_ndx)
++table;
}
m_new_tables.push_back(table_ndx);
return true;
}
bool insert_column(size_t, DataType, StringData, bool) { return schema_error_unless_new_table(); }
bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return schema_error_unless_new_table(); }
bool add_primary_key(size_t) { return schema_error_unless_new_table(); }
bool set_link_type(size_t, LinkType) { return schema_error_unless_new_table(); }
// Removing or renaming things while a Realm is open is never supported
bool erase_group_level_table(size_t, size_t) { schema_error(); }
bool rename_group_level_table(size_t, StringData) { schema_error(); }
bool erase_column(size_t) { schema_error(); }
bool erase_link_column(size_t, size_t, size_t) { schema_error(); }
bool rename_column(size_t, StringData) { schema_error(); }
bool remove_primary_key() { schema_error(); }
bool move_column(size_t, size_t) { schema_error(); }
bool move_group_level_table(size_t, size_t) { schema_error(); }
bool select_descriptor(int levels, const size_t*)
{
// subtables not supported
return levels == 0;
}
bool select_table(size_t group_level_ndx, int, const size_t*) noexcept
{
m_current_table = group_level_ndx;
return true;
}
bool select_link_list(size_t, size_t, size_t) { return true; }
// Non-schema changes are all allowed
void parse_complete() { }
bool insert_empty_rows(size_t, size_t, size_t, bool) { return true; }
bool erase_rows(size_t, size_t, size_t, bool) { return true; }
bool swap_rows(size_t, size_t) { return true; }
bool clear_table() noexcept { return true; }
bool link_list_set(size_t, size_t) { return true; }
bool link_list_insert(size_t, size_t) { return true; }
bool link_list_erase(size_t) { return true; }
bool link_list_nullify(size_t) { return true; }
bool link_list_clear(size_t) { return true; }
bool link_list_move(size_t, size_t) { return true; }
bool link_list_swap(size_t, size_t) { return true; }
bool set_int(size_t, size_t, int_fast64_t) { return true; }
bool set_bool(size_t, size_t, bool) { return true; }
bool set_float(size_t, size_t, float) { return true; }
bool set_double(size_t, size_t, double) { return true; }
bool set_string(size_t, size_t, StringData) { return true; }
bool set_binary(size_t, size_t, BinaryData) { return true; }
bool set_date_time(size_t, size_t, DateTime) { return true; }
bool set_table(size_t, size_t) { return true; }
bool set_mixed(size_t, size_t, const Mixed&) { return true; }
bool set_link(size_t, size_t, size_t, size_t) { return true; }
bool set_null(size_t, size_t) { return true; }
bool nullify_link(size_t, size_t, size_t) { return true; }
bool insert_substring(size_t, size_t, size_t, StringData) { return true; }
bool erase_substring(size_t, size_t, size_t, size_t) { return true; }
bool optimize_table() { return true; }
bool set_int_unique(size_t, size_t, size_t, int_fast64_t) { return true; }
bool set_string_unique(size_t, size_t, size_t, StringData) { return true; }
bool change_link_targets(size_t, size_t) { return true; }
};
// Extends TransactLogValidator to also track changes and report it to the
// binding context if any properties are being observed
class TransactLogObserver : public TransactLogValidator {
using ColumnInfo = BindingContext::ColumnInfo;
using ObserverState = BindingContext::ObserverState;
@ -34,13 +149,15 @@ class TransactLogHandler {
// Userdata pointers for rows which have been deleted
std::vector<void *> invalidated;
// Delegate to send change information to
BindingContext* m_binding_context;
BindingContext* m_context;
// Index of currently selected table
size_t m_current_table = 0;
// Change information for the currently selected LinkList, if any
ColumnInfo* m_active_linklist = nullptr;
// Tables which were created during the transaction being processed, which
// can have columns inserted without a schema version bump
std::vector<size_t> m_new_tables;
// Get the change info for the given column, creating it if needed
static ColumnInfo& get_change(ObserverState& state, size_t i)
{
@ -65,8 +182,8 @@ class TransactLogHandler {
// Mark the given row/col as needing notifications sent
bool mark_dirty(size_t row_ndx, size_t col_ndx)
{
auto it = lower_bound(begin(m_observers), end(m_observers), ObserverState{m_current_table, row_ndx, nullptr});
if (it != end(m_observers) && it->table_ndx == m_current_table && it->row_ndx == row_ndx) {
auto it = lower_bound(begin(m_observers), end(m_observers), ObserverState{current_table(), row_ndx, nullptr});
if (it != end(m_observers) && it->table_ndx == current_table() && it->row_ndx == row_ndx) {
get_change(*it, col_ndx).changed = true;
}
return true;
@ -82,54 +199,55 @@ class TransactLogHandler {
public:
template<typename Func>
TransactLogHandler(BindingContext* binding_context, SharedGroup& sg, Func&& func)
: m_binding_context(binding_context)
TransactLogObserver(BindingContext* context, SharedGroup& sg, Func&& func, bool validate_schema_changes)
: m_context(context)
{
if (!binding_context) {
func();
if (!context) {
if (validate_schema_changes) {
// The handler functions are non-virtual, so the parent class's
// versions are called if we don't need to track changes to observed
// objects
func(static_cast<TransactLogValidator&>(*this));
}
else {
func();
}
return;
}
m_observers = binding_context->get_observed_rows();
m_observers = context->get_observed_rows();
if (m_observers.empty()) {
auto old_version = sg.get_version_of_current_transaction();
func();
if (validate_schema_changes) {
func(static_cast<TransactLogValidator&>(*this));
}
else {
func();
}
if (old_version != sg.get_version_of_current_transaction()) {
binding_context->did_change({}, {});
context->did_change({}, {});
}
return;
}
func(*this);
binding_context->did_change(m_observers, invalidated);
context->did_change(m_observers, invalidated);
}
// Called at the end of the transaction log immediately before the version
// is advanced
void parse_complete()
{
m_binding_context->will_change(m_observers, invalidated);
m_context->will_change(m_observers, invalidated);
}
// These would require having an observer before schema init
// Maybe do something here to throw an error when multiple processes have different schemas?
bool insert_group_level_table(size_t, size_t, StringData) { return false; }
bool erase_group_level_table(size_t, size_t) { return false; }
bool rename_group_level_table(size_t, StringData) { return false; }
bool insert_column(size_t, DataType, StringData, bool) { return false; }
bool insert_link_column(size_t, DataType, StringData, size_t, size_t) { return false; }
bool erase_column(size_t) { return false; }
bool erase_link_column(size_t, size_t, size_t) { return false; }
bool rename_column(size_t, StringData) { return false; }
bool add_search_index(size_t) { return false; }
bool remove_search_index(size_t) { return false; }
bool add_primary_key(size_t) { return false; }
bool remove_primary_key() { return false; }
bool set_link_type(size_t, LinkType) { return false; }
bool select_table(size_t group_level_ndx, int, const size_t*) noexcept
bool insert_group_level_table(size_t table_ndx, size_t prior_size, StringData name)
{
m_current_table = group_level_ndx;
for (auto& observer : m_observers) {
if (observer.table_ndx >= table_ndx)
++observer.table_ndx;
}
TransactLogValidator::insert_group_level_table(table_ndx, prior_size, name);
return true;
}
@ -143,7 +261,7 @@ public:
{
for (size_t i = 0; i < m_observers.size(); ++i) {
auto& o = m_observers[i];
if (o.table_ndx == m_current_table) {
if (o.table_ndx == current_table()) {
if (o.row_ndx == row_ndx) {
invalidate(&o);
--i;
@ -163,7 +281,7 @@ public:
{
for (size_t i = 0; i < m_observers.size(); ) {
auto& o = m_observers[i];
if (o.table_ndx == m_current_table) {
if (o.table_ndx == current_table()) {
invalidate(&o);
}
else {
@ -177,7 +295,7 @@ public:
{
m_active_linklist = nullptr;
for (auto& o : m_observers) {
if (o.table_ndx == m_current_table && o.row_ndx == row) {
if (o.table_ndx == current_table() && o.row_ndx == row) {
m_active_linklist = &get_change(o, col);
break;
}
@ -293,12 +411,10 @@ public:
// Things that just mark the field as modified
bool set_int(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); }
bool set_int_unique(size_t col, size_t row, int_fast64_t) { return mark_dirty(row, col); }
bool set_bool(size_t col, size_t row, bool) { return mark_dirty(row, col); }
bool set_float(size_t col, size_t row, float) { return mark_dirty(row, col); }
bool set_double(size_t col, size_t row, double) { return mark_dirty(row, col); }
bool set_string(size_t col, size_t row, StringData) { return mark_dirty(row, col); }
bool set_string_unique(size_t col, size_t row, StringData) { return mark_dirty(row, col); }
bool set_binary(size_t col, size_t row, BinaryData) { return mark_dirty(row, col); }
bool set_date_time(size_t col, size_t row, DateTime) { return mark_dirty(row, col); }
bool set_table(size_t col, size_t row) { return mark_dirty(row, col); }
@ -306,49 +422,46 @@ public:
bool set_link(size_t col, size_t row, size_t, size_t) { return mark_dirty(row, col); }
bool set_null(size_t col, size_t row) { return mark_dirty(row, col); }
bool nullify_link(size_t col, size_t row, size_t) { return mark_dirty(row, col); }
// Doesn't change any data
bool optimize_table() { return true; }
// Used for subtables, which we currently don't support
bool select_descriptor(int, const size_t*) { return false; }
// Not implemented
bool insert_substring(size_t, size_t, size_t, StringData) { return false; }
bool erase_substring(size_t, size_t, size_t, size_t) { return false; }
bool swap_rows(size_t, size_t) { return false; }
bool move_column(size_t, size_t) { return false; }
bool move_group_level_table(size_t, size_t) { return false; }
bool set_int_unique(size_t col, size_t row, size_t, int_fast64_t) { return mark_dirty(row, col); }
bool set_string_unique(size_t col, size_t row, size_t, StringData) { return mark_dirty(row, col); }
bool insert_substring(size_t col, size_t row, size_t, StringData) { return mark_dirty(row, col); }
bool erase_substring(size_t col, size_t row, size_t, size_t) { return mark_dirty(row, col); }
};
} // anonymous namespace
namespace realm {
namespace _impl {
namespace transaction {
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) {
TransactLogHandler(binding_context, sg, [&](auto&&... args) {
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* context,
SharedGroup::VersionID version)
{
TransactLogObserver(context, sg, [&](auto&&... args) {
LangBindHelper::advance_read(sg, history, std::move(args)...);
});
}, true);
}
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) {
TransactLogHandler(binding_context, sg, [&](auto&&... args) {
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* context,
bool validate_schema_changes)
{
TransactLogObserver(context, sg, [&](auto&&... args) {
LangBindHelper::promote_to_write(sg, history, std::move(args)...);
});
}, validate_schema_changes);
}
void commit(SharedGroup& sg, ClientHistory&, BindingContext* binding_context) {
void commit(SharedGroup& sg, ClientHistory&, BindingContext* context)
{
LangBindHelper::commit_and_continue_as_read(sg);
if (binding_context) {
binding_context->did_change({}, {});
if (context) {
context->did_change({}, {});
}
}
void cancel(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context) {
TransactLogHandler(binding_context, sg, [&](auto&&... args) {
void cancel(SharedGroup& sg, ClientHistory& history, BindingContext* context)
{
TransactLogObserver(context, sg, [&](auto&&... args) {
LangBindHelper::rollback_and_continue_as_read(sg, history, std::move(args)...);
});
}, false);
}
} // namespace transaction

View File

@ -19,6 +19,8 @@
#ifndef REALM_TRANSACT_LOG_HANDLER_HPP
#define REALM_TRANSACT_LOG_HANDLER_HPP
#include <realm/group_shared.hpp>
namespace realm {
class BindingContext;
class SharedGroup;
@ -28,12 +30,14 @@ namespace _impl {
namespace transaction {
// Advance the read transaction version, with change notifications sent to delegate
// Must not be called from within a write transaction.
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context);
void advance(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context,
SharedGroup::VersionID version=SharedGroup::VersionID{});
// Begin a write transaction
// If the read transaction version is not up to date, will first advance to the
// most recent read transaction and sent notifications to delegate
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context);
void begin(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context,
bool validate_schema_changes=true);
// Commit a write transaction
void commit(SharedGroup& sg, ClientHistory& history, BindingContext* binding_context);

View File

@ -0,0 +1,30 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_WEAK_REALM_NOTIFIER_HPP
#define REALM_WEAK_REALM_NOTIFIER_HPP
#include <realm/util/features.h>
#if REALM_PLATFORM_APPLE
#include "impl/apple/weak_realm_notifier.hpp"
#else
#include "impl/generic/weak_realm_notifier.hpp"
#endif
#endif // REALM_WEAK_REALM_NOTIFIER_HPP

View File

@ -0,0 +1,68 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_WEAK_REALM_NOTIFIER_BASE_HPP
#define REALM_WEAK_REALM_NOTIFIER_BASE_HPP
#include <memory>
#include <thread>
namespace realm {
class Realm;
namespace _impl {
// WeakRealmNotifierBase stores a weak reference to a Realm instance, along with all of
// the information about a Realm that needs to be accessed from other threads.
// This is needed to avoid forming strong references to the Realm instances on
// other threads, which can produce deadlocks when the last strong reference to
// a Realm instance is released from within a function holding the cache lock.
class WeakRealmNotifierBase {
public:
WeakRealmNotifierBase(const std::shared_ptr<Realm>& realm, bool cache);
// Get a strong reference to the cached realm
std::shared_ptr<Realm> realm() const { return m_realm.lock(); }
// Does this WeakRealmNotifierBase store a Realm instance that should be used on the current thread?
bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); }
// Has the Realm instance been destroyed?
bool expired() const { return m_realm.expired(); }
// Is this a WeakRealmNotifierBase for the given Realm instance?
bool is_for_realm(Realm* realm) const { return realm == m_realm_key; }
private:
std::weak_ptr<Realm> m_realm;
std::thread::id m_thread_id = std::this_thread::get_id();
void* m_realm_key;
bool m_cache = false;
};
inline WeakRealmNotifierBase::WeakRealmNotifierBase(const std::shared_ptr<Realm>& realm, bool cache)
: m_realm(realm)
, m_realm_key(realm.get())
, m_cache(cache)
{
}
} // namespace _impl
} // namespace realm
#endif // REALM_WEAK_REALM_NOTIFIER_BASE_HPP

View File

@ -43,11 +43,12 @@ void IndexSet::do_add(iterator it, size_t index)
else if (more_before && (it - 1)->second == index) {
// index is immediately after an existing range
++(it - 1)->second;
}
else if (more_before && valid && (it - 1)->second == it->first) {
// index joins two existing ranges
(it - 1)->second = it->second;
m_ranges.erase(it);
if (valid && (it - 1)->second == it->first) {
// index joins two existing ranges
(it - 1)->second = it->second;
m_ranges.erase(it);
}
}
else if (valid && it->first == index + 1) {
// index is immediately before an existing range

169
src/list.cpp Normal file
View File

@ -0,0 +1,169 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "list.hpp"
#include "results.hpp"
#include "shared_realm.hpp"
#include <stdexcept>
using namespace realm;
List::List() noexcept = default;
List::~List() = default;
List::List(std::shared_ptr<Realm> r, const ObjectSchema& s, LinkViewRef l) noexcept
: m_realm(std::move(r))
, m_object_schema(&s)
, m_link_view(std::move(l))
{
}
Query List::get_query() const
{
verify_attached();
return m_link_view->get_target_table().where(m_link_view);
}
void List::verify_valid_row(size_t row_ndx, bool insertion) const
{
size_t size = m_link_view->size();
if (row_ndx > size || (!insertion && row_ndx == size)) {
throw std::out_of_range("Index " + std::to_string(row_ndx) + " is outside of range 0..." +
std::to_string(size) + ".");
}
}
bool List::is_valid() const
{
m_realm->verify_thread();
return m_link_view && m_link_view->is_attached();
}
void List::verify_attached() const
{
if (!is_valid()) {
throw std::runtime_error("LinkView is not attached");
}
}
void List::verify_in_transaction() const
{
verify_attached();
if (!m_realm->is_in_transaction()) {
throw InvalidTransactionException("Must be in a write transaction");
}
}
size_t List::size() const
{
verify_attached();
return m_link_view->size();
}
RowExpr List::get(size_t row_ndx) const
{
verify_attached();
verify_valid_row(row_ndx);
return m_link_view->get(row_ndx);
}
size_t List::find(ConstRow const& row) const
{
verify_attached();
if (!row.is_attached() || row.get_table() != &m_link_view->get_target_table()) {
return not_found;
}
return m_link_view->find(row.get_index());
}
void List::add(size_t target_row_ndx)
{
verify_in_transaction();
m_link_view->add(target_row_ndx);
}
void List::insert(size_t row_ndx, size_t target_row_ndx)
{
verify_in_transaction();
verify_valid_row(row_ndx, true);
m_link_view->insert(row_ndx, target_row_ndx);
}
void List::move(size_t source_ndx, size_t dest_ndx)
{
verify_in_transaction();
verify_valid_row(source_ndx);
verify_valid_row(dest_ndx); // Can't be one past end due to removing one earlier
m_link_view->move(source_ndx, dest_ndx);
}
void List::remove(size_t row_ndx)
{
verify_in_transaction();
verify_valid_row(row_ndx);
m_link_view->remove(row_ndx);
}
void List::remove_all()
{
verify_in_transaction();
m_link_view->clear();
}
void List::set(size_t row_ndx, size_t target_row_ndx)
{
verify_in_transaction();
verify_valid_row(row_ndx);
m_link_view->set(row_ndx, target_row_ndx);
}
void List::swap(size_t ndx1, size_t ndx2)
{
verify_in_transaction();
verify_valid_row(ndx1);
verify_valid_row(ndx2);
m_link_view->swap(ndx1, ndx2);
}
void List::delete_all()
{
verify_in_transaction();
m_link_view->remove_all_target_rows();
}
Results List::sort(SortOrder order)
{
return Results(m_realm, *m_object_schema, get_query(), std::move(order));
}
// These definitions rely on that LinkViews are interned by core
bool List::operator==(List const& rgt) const noexcept
{
return m_link_view.get() == rgt.m_link_view.get();
}
namespace std {
size_t hash<realm::List>::operator()(realm::List const& list) const
{
return std::hash<void*>()(list.m_link_view.get());
}
}

94
src/list.hpp Normal file
View File

@ -0,0 +1,94 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2015 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_LIST_HPP
#define REALM_LIST_HPP
#include <realm/link_view.hpp>
#include <memory>
namespace realm {
template<typename T> class BasicRowExpr;
using RowExpr = BasicRowExpr<Table>;
class ObjectSchema;
class Realm;
class Results;
struct SortOrder;
class List {
public:
List() noexcept;
List(std::shared_ptr<Realm> r, const ObjectSchema& s, LinkViewRef l) noexcept;
~List();
const std::shared_ptr<Realm>& get_realm() const { return m_realm; }
Query get_query() const;
const ObjectSchema& get_object_schema() const { return *m_object_schema; }
bool is_valid() const;
void verify_attached() const;
void verify_in_transaction() const;
size_t size() const;
RowExpr get(size_t row_ndx) const;
size_t find(ConstRow const& row) const;
void add(size_t target_row_ndx);
void insert(size_t list_ndx, size_t target_row_ndx);
void move(size_t source_ndx, size_t dest_ndx);
void remove(size_t list_ndx);
void remove_all();
void set(size_t row_ndx, size_t target_row_ndx);
void swap(size_t ndx1, size_t ndx2);
void delete_all();
Results sort(SortOrder order);
bool operator==(List const& rgt) const noexcept;
// These are implemented in object_accessor.hpp
template <typename ValueType, typename ContextType>
void add(ContextType ctx, ValueType value);
template <typename ValueType, typename ContextType>
void insert(ContextType ctx, ValueType value, size_t list_ndx);
template <typename ValueType, typename ContextType>
void set(ContextType ctx, ValueType value, size_t list_ndx);
private:
std::shared_ptr<Realm> m_realm;
const ObjectSchema* m_object_schema;
LinkViewRef m_link_view;
void verify_valid_row(size_t row_ndx, bool insertion = false) const;
friend struct std::hash<List>;
};
} // namespace realm
namespace std {
template<> struct hash<realm::List> {
size_t operator()(realm::List const&) const;
};
}
#endif /* REALM_LIST_HPP */

View File

@ -19,10 +19,13 @@
#ifndef REALM_OBJECT_ACCESSOR_HPP
#define REALM_OBJECT_ACCESSOR_HPP
#include <string>
#include "shared_realm.hpp"
#include "schema.hpp"
#include "list.hpp"
#include "object_schema.hpp"
#include "object_store.hpp"
#include "schema.hpp"
#include "shared_realm.hpp"
#include <string>
namespace realm {

View File

@ -28,6 +28,14 @@ using namespace realm;
ObjectSchema::~ObjectSchema() = default;
ObjectSchema::ObjectSchema(std::string name, std::string primary_key, std::initializer_list<Property> properties)
: name(std::move(name))
, properties(properties)
, primary_key(std::move(primary_key))
{
set_primary_key_property();
}
ObjectSchema::ObjectSchema(const Group *group, const std::string &name) : name(name) {
ConstTableRef table = ObjectStore::table_for_object_type(group, name);
@ -50,13 +58,7 @@ ObjectSchema::ObjectSchema(const Group *group, const std::string &name) : name(n
}
primary_key = realm::ObjectStore::get_primary_key_for_object(group, name);
if (primary_key.length()) {
auto primary_key_prop = primary_key_property();
if (!primary_key_prop) {
throw InvalidPrimaryKeyException(name, primary_key);
}
primary_key_prop->is_primary = true;
}
set_primary_key_property();
}
Property *ObjectSchema::property_for_name(StringData name) {
@ -71,3 +73,14 @@ Property *ObjectSchema::property_for_name(StringData name) {
const Property *ObjectSchema::property_for_name(StringData name) const {
return const_cast<ObjectSchema *>(this)->property_for_name(name);
}
void ObjectSchema::set_primary_key_property()
{
if (primary_key.length()) {
auto primary_key_prop = primary_key_property();
if (!primary_key_prop) {
throw InvalidPrimaryKeyException(name, primary_key);
}
primary_key_prop->is_primary = true;
}
}

View File

@ -25,13 +25,13 @@
#include <vector>
namespace realm {
class Property;
class Group;
struct Property;
class ObjectSchema {
public:
ObjectSchema() = default;
ObjectSchema(std::string name, std::string primary_key, std::initializer_list<Property> properties);
~ObjectSchema();
// create object schema from existing table
@ -50,6 +50,9 @@ namespace realm {
const Property *primary_key_property() const {
return property_for_name(primary_key);
}
private:
void set_primary_key_property();
};
}

View File

@ -52,13 +52,11 @@ bool ObjectStore::has_metadata_tables(const Group *group) {
return group->get_table(c_primaryKeyTableName) && group->get_table(c_metadataTableName);
}
bool ObjectStore::create_metadata_tables(Group *group) {
bool changed = false;
void ObjectStore::create_metadata_tables(Group *group) {
TableRef table = group->get_or_add_table(c_primaryKeyTableName);
if (table->get_column_count() == 0) {
table->add_column(type_String, c_primaryKeyObjectClassColumnName);
table->add_column(type_String, c_primaryKeyPropertyNameColumnName);
changed = true;
}
table = group->get_or_add_table(c_metadataTableName);
@ -68,10 +66,7 @@ bool ObjectStore::create_metadata_tables(Group *group) {
// set initial version
table->add_empty_row();
table->set_int(c_versionColumnIndex, c_zeroRowIndex, ObjectStore::NotVersioned);
changed = true;
}
return changed;
}
uint64_t ObjectStore::get_schema_version(const Group *group) {
@ -255,9 +250,7 @@ static void copy_property_values(const Property& source, const Property& destina
// set references to tables on targetSchema and create/update any missing or out-of-date tables
// if update existing is true, updates existing tables, otherwise validates existing tables
// NOTE: must be called from within write transaction
bool ObjectStore::create_tables(Group *group, Schema &target_schema, bool update_existing) {
bool changed = false;
void ObjectStore::create_tables(Group *group, Schema &target_schema, bool update_existing) {
// first pass to create missing tables
std::vector<ObjectSchema *> to_update;
for (auto& object_schema : target_schema) {
@ -267,7 +260,6 @@ bool ObjectStore::create_tables(Group *group, Schema &target_schema, bool update
// we will modify tables for any new objectSchema (table was created) or for all if update_existing is true
if (update_existing || created) {
to_update.push_back(&object_schema);
changed = true;
}
}
@ -291,7 +283,6 @@ bool ObjectStore::create_tables(Group *group, Schema &target_schema, bool update
table->remove_column(current_prop.table_column);
current_prop.table_column = target_prop->table_column;
changed = true;
}
bool inserted_placeholder_column = false;
@ -314,7 +305,6 @@ bool ObjectStore::create_tables(Group *group, Schema &target_schema, bool update
table->remove_column(current_prop.table_column);
++deleted;
current_prop.table_column = npos;
changed = true;
}
}
@ -339,8 +329,6 @@ bool ObjectStore::create_tables(Group *group, Schema &target_schema, bool update
target_prop.is_nullable);
break;
}
changed = true;
}
else {
target_prop.table_column = current_prop->table_column;
@ -361,16 +349,13 @@ bool ObjectStore::create_tables(Group *group, Schema &target_schema, bool update
// if there is a primary key set, check if it is the same as the old key
if (current_schema.primary_key != target_object_schema->primary_key) {
set_primary_key_for_object(group, target_object_schema->name, target_object_schema->primary_key);
changed = true;
}
}
else if (current_schema.primary_key.length()) {
// there is no primary key, so if there was one nil out
set_primary_key_for_object(group, target_object_schema->name, "");
changed = true;
}
}
return changed;
}
bool ObjectStore::is_schema_at_version(const Group *group, uint64_t version) {
@ -405,7 +390,7 @@ bool ObjectStore::needs_update(Schema const& old_schema, Schema const& schema) {
return false;
}
bool ObjectStore::update_realm_with_schema(Group *group, Schema const& old_schema,
void ObjectStore::update_realm_with_schema(Group *group, Schema const& old_schema,
uint64_t version, Schema &schema,
MigrationFunction migration) {
// Recheck the schema version after beginning the write transaction as
@ -414,8 +399,8 @@ bool ObjectStore::update_realm_with_schema(Group *group, Schema const& old_schem
bool migrating = !is_schema_at_version(group, version);
// create tables
bool changed = create_metadata_tables(group);
changed = create_tables(group, schema, migrating) || changed;
create_metadata_tables(group);
create_tables(group, schema, migrating);
if (!migrating) {
// If we aren't migrating, then verify that all of the tables which
@ -423,10 +408,10 @@ bool ObjectStore::update_realm_with_schema(Group *group, Schema const& old_schem
verify_schema(old_schema, schema, true);
}
changed = update_indexes(group, schema) || changed;
update_indexes(group, schema);
if (!migrating) {
return changed;
return;
}
// apply the migration block if provided and there's any old data
@ -437,7 +422,6 @@ bool ObjectStore::update_realm_with_schema(Group *group, Schema const& old_schem
}
set_schema_version(group, version);
return true;
}
Schema ObjectStore::schema_from_group(const Group *group) {

View File

@ -54,12 +54,11 @@ namespace realm {
static bool needs_update(Schema const& old_schema, Schema const& schema);
// updates a Realm from old_schema to the given target schema, creating and updating tables as needed
// returns if any changes were made
// passed in target schema is updated with the correct column mapping
// optionally runs migration function if schema is out of date
// NOTE: must be performed within a write transaction
typedef std::function<void(Group *, Schema &)> MigrationFunction;
static bool update_realm_with_schema(Group *group, Schema const& old_schema, uint64_t version,
static void update_realm_with_schema(Group *group, Schema const& old_schema, uint64_t version,
Schema &schema, MigrationFunction migration);
// get a table for an object type
@ -88,11 +87,11 @@ namespace realm {
// create any metadata tables that don't already exist
// must be in write transaction to set
// returns true if it actually did anything
static bool create_metadata_tables(Group *group);
static void create_metadata_tables(Group *group);
// set references to tables on targetSchema and create/update any missing or out-of-date tables
// if update existing is true, updates existing tables, otherwise only adds and initializes new tables
static bool create_tables(realm::Group *group, Schema &target_schema, bool update_existing);
static void create_tables(realm::Group *group, Schema &target_schema, bool update_existing);
// verify a target schema against an expected schema, setting the table_column property on each schema object
// updates the column mapping on the target_schema

View File

@ -331,7 +331,7 @@ Predicate parse(const std::string &query)
if (out_predicate.type == Predicate::Type::And && out_predicate.cpnd.sub_predicates.size() == 1) {
return std::move(out_predicate.cpnd.sub_predicates.back());
}
return std::move(out_predicate);
return out_predicate;
}
void analyze_grammar()

View File

@ -28,7 +28,7 @@ class Schema;
namespace parser {
struct Expression
{
enum class Type { None, Number, String, KeyPath, Argument, True, False } type = Type::None;
enum class Type { None, Number, String, KeyPath, Argument, True, False } type;
std::string s;
Expression(Type t = Type::None, std::string s = "") : type(t), s(s) {}
};
@ -61,7 +61,7 @@ struct Predicate
struct Comparison
{
Operator op = Operator::None;
Expression expr[2];
Expression expr[2] = {{Expression::Type::None, ""}, {Expression::Type::None, ""}};
};
struct Compound

View File

@ -53,17 +53,24 @@ class TrueExpression : public realm::Expression {
if (start != end)
return start;
return not_found;
return realm::not_found;
}
void set_base_table(const Table*) override {}
const Table* get_base_table() const override { return nullptr; }
std::unique_ptr<Expression> clone(QueryNodeHandoverPatches*) const override
{
return std::unique_ptr<Expression>(new TrueExpression(*this));
}
virtual void set_table(const Table* table) {}
virtual const Table* get_table() const { return nullptr; }
};
class FalseExpression : public realm::Expression {
public:
virtual size_t find_first(size_t, size_t) const { return not_found; }
virtual void set_table(const Table* table) {}
virtual const Table* get_table() const { return nullptr; }
struct FalseExpression : realm::Expression {
size_t find_first(size_t, size_t) const override { return realm::not_found; }
void set_base_table(const Table*) override {}
const Table* get_base_table() const override { return nullptr; }
std::unique_ptr<Expression> clone(QueryNodeHandoverPatches*) const override
{
return std::unique_ptr<Expression>(new FalseExpression(*this));
}
};
using KeyPath = std::vector<std::string>;
@ -249,9 +256,11 @@ void add_link_constraint_to_query(realm::Query &query,
switch (op) {
case Predicate::Operator::NotEqual:
query.Not();
case Predicate::Operator::Equal:
query.links_to(prop_expr.prop->table_column, row_index);
case Predicate::Operator::Equal: {
size_t col = prop_expr.prop->table_column;
query.links_to(col, query.get_table()->get_link_target(col)->get(row_index));
break;
}
default:
throw std::runtime_error("Only 'equal' and 'not equal' operators supported for object comparison.");
}
@ -462,7 +471,7 @@ void update_query_with_predicate(Query &query, const Predicate &pred, Arguments
update_query_with_predicate(query, sub, arguments, schema, type);
}
if (!pred.cpnd.sub_predicates.size()) {
query.and_query(new TrueExpression);
query.and_query(std::unique_ptr<realm::Expression>(new TrueExpression));
}
query.end_group();
break;
@ -474,7 +483,7 @@ void update_query_with_predicate(Query &query, const Predicate &pred, Arguments
update_query_with_predicate(query, sub, arguments, schema, type);
}
if (!pred.cpnd.sub_predicates.size()) {
query.and_query(new FalseExpression);
query.and_query(std::unique_ptr<realm::Expression>(new FalseExpression));
}
query.end_group();
break;
@ -484,16 +493,15 @@ void update_query_with_predicate(Query &query, const Predicate &pred, Arguments
break;
}
case Predicate::Type::True:
query.and_query(new TrueExpression);
query.and_query(std::unique_ptr<realm::Expression>(new TrueExpression));
break;
case Predicate::Type::False:
query.and_query(new FalseExpression);
query.and_query(std::unique_ptr<realm::Expression>(new FalseExpression));
break;
default:
throw std::runtime_error("Invalid predicate type");
break;
}
}

View File

@ -53,7 +53,7 @@ namespace realm {
bool is_indexed = false;
bool is_nullable = false;
size_t table_column;
size_t table_column = -1;
bool requires_index() const { return is_primary || is_indexed; }
};

View File

@ -18,6 +18,10 @@
#include "results.hpp"
#include "impl/async_query.hpp"
#include "impl/realm_coordinator.hpp"
#include "object_store.hpp"
#include <stdexcept>
using namespace realm;
@ -52,12 +56,21 @@ Results::Results(SharedRealm r, const ObjectSchema &o, Table& table)
{
}
Results::~Results()
{
if (m_background_query) {
m_background_query->unregister();
}
}
void Results::validate_read() const
{
if (m_realm)
m_realm->verify_thread();
if (m_table && !m_table->is_attached())
throw InvalidatedException();
if (m_mode == Mode::TableView && !m_table_view.is_attached())
throw InvalidatedException();
}
void Results::validate_write() const
@ -92,6 +105,11 @@ size_t Results::size()
REALM_UNREACHABLE();
}
StringData Results::get_object_type() const noexcept
{
return get_object_schema().name;
}
RowExpr Results::get(size_t row_ndx)
{
validate_read();
@ -159,9 +177,15 @@ void Results::update_tableview()
m_mode = Mode::TableView;
break;
case Mode::TableView:
if (m_live) {
m_table_view.sync_if_needed();
if (!m_live) {
return;
}
if (!m_background_query && !m_realm->is_in_transaction() && m_realm->can_deliver_notifications()) {
m_background_query = std::make_shared<_impl::AsyncQuery>(*this);
_impl::RealmCoordinator::register_query(m_background_query);
}
m_has_used_table_view = true;
m_table_view.sync_if_needed();
break;
}
}
@ -190,9 +214,6 @@ size_t Results::index_of(size_t row_ndx)
case Mode::Table:
return row_ndx;
case Mode::Query:
if (!m_sort)
return m_query.count(row_ndx, row_ndx + 1) ? m_query.count(0, row_ndx) : not_found;
REALM_FALLTHROUGH;
case Mode::TableView:
update_tableview();
return m_table_view.find_by_source_ndx(row_ndx);
@ -302,8 +323,9 @@ Query Results::get_query() const
switch (m_mode) {
case Mode::Empty:
case Mode::Query:
case Mode::TableView:
return m_query;
case Mode::TableView:
return m_table_view.get_query();
case Mode::Table:
return m_table->where();
}
@ -336,6 +358,36 @@ Results Results::filter(Query&& q) const
return Results(m_realm, get_object_schema(), get_query().and_query(std::move(q)), get_sort());
}
AsyncQueryCancelationToken Results::async(std::function<void (std::exception_ptr)> target)
{
if (m_realm->config().read_only) {
throw InvalidTransactionException("Cannot create asynchronous query for read-only Realms");
}
if (m_realm->is_in_transaction()) {
throw InvalidTransactionException("Cannot create asynchronous query while in a write transaction");
}
if (!m_background_query) {
m_background_query = std::make_shared<_impl::AsyncQuery>(*this);
_impl::RealmCoordinator::register_query(m_background_query);
}
return {m_background_query, m_background_query->add_callback(std::move(target))};
}
void Results::Internal::set_table_view(Results& results, realm::TableView &&tv)
{
// If the previous TableView was never actually used, then stop generating
// new ones until the user actually uses the Results object again
if (results.m_mode == Mode::TableView) {
results.m_wants_background_updates = results.m_has_used_table_view;
}
results.m_table_view = std::move(tv);
results.m_mode = Mode::TableView;
results.m_has_used_table_view = false;
REALM_ASSERT(results.m_table_view.is_in_sync());
}
Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t column, const Table* table)
: std::runtime_error((std::string)"Operation not supported on '" + table->get_column_name(column).data() + "' columns")
, column_index(column)
@ -343,3 +395,35 @@ Results::UnsupportedColumnTypeException::UnsupportedColumnTypeException(size_t c
, column_type(table->get_column_type(column))
{
}
AsyncQueryCancelationToken::AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token)
: m_query(std::move(query)), m_token(token)
{
}
AsyncQueryCancelationToken::~AsyncQueryCancelationToken()
{
// m_query itself (and not just the pointed-to thing) needs to be accessed
// atomically to ensure that there are no data races when the token is
// destroyed after being modified on a different thread.
// This is needed despite the token not being thread-safe in general as
// users find it very surpringing for obj-c objects to care about what
// thread they are deallocated on.
if (auto query = m_query.exchange({})) {
query->remove_callback(m_token);
}
}
AsyncQueryCancelationToken::AsyncQueryCancelationToken(AsyncQueryCancelationToken&& rgt) = default;
AsyncQueryCancelationToken& AsyncQueryCancelationToken::operator=(realm::AsyncQueryCancelationToken&& rgt)
{
if (this != &rgt) {
if (auto query = m_query.exchange({})) {
query->remove_callback(m_token);
}
m_query = std::move(rgt.m_query);
m_token = rgt.m_token;
}
return *this;
}

View File

@ -20,6 +20,7 @@
#define REALM_RESULTS_HPP
#include "shared_realm.hpp"
#include "util/atomic_shared_ptr.hpp"
#include <realm/table_view.hpp>
#include <realm/table.hpp>
@ -29,6 +30,29 @@ namespace realm {
template<typename T> class BasicRowExpr;
using RowExpr = BasicRowExpr<Table>;
class Mixed;
class Results;
class ObjectSchema;
namespace _impl {
class AsyncQuery;
}
// A token which keeps an asynchronous query alive
struct AsyncQueryCancelationToken {
AsyncQueryCancelationToken() = default;
AsyncQueryCancelationToken(std::shared_ptr<_impl::AsyncQuery> query, size_t token);
~AsyncQueryCancelationToken();
AsyncQueryCancelationToken(AsyncQueryCancelationToken&&);
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken&&);
AsyncQueryCancelationToken(AsyncQueryCancelationToken const&) = delete;
AsyncQueryCancelationToken& operator=(AsyncQueryCancelationToken const&) = delete;
private:
util::AtomicSharedPtr<_impl::AsyncQuery> m_query;
size_t m_token;
};
struct SortOrder {
std::vector<size_t> columnIndices;
@ -48,6 +72,7 @@ public:
Results() = default;
Results(SharedRealm r, const ObjectSchema& o, Table& table);
Results(SharedRealm r, const ObjectSchema& o, Query q, SortOrder s = {});
~Results();
// Results is copyable and moveable
Results(Results const&) = default;
@ -72,7 +97,7 @@ public:
TableView get_tableview();
// Get the object type which will be returned by get()
StringData get_object_type() const noexcept { return get_object_schema().name; }
StringData get_object_type() const noexcept;
// Set whether the TableView should sync if needed before accessing results
void set_live(bool live);
@ -165,6 +190,22 @@ public:
UnsupportedColumnTypeException(size_t column, const Table* table);
};
void update_tableview();
// Create an async query from this Results
// The query will be run on a background thread and delivered to the callback,
// and then rerun after each commit (if needed) and redelivered if it changed
AsyncQueryCancelationToken async(std::function<void (std::exception_ptr)> target);
bool wants_background_updates() const { return m_wants_background_updates; }
// Helper type to let AsyncQuery update the tableview without giving access
// to any other privates or letting anyone else do so
class Internal {
friend class _impl::AsyncQuery;
static void set_table_view(Results& results, TableView&& tv);
};
private:
SharedRealm m_realm;
const ObjectSchema *m_object_schema;
@ -174,17 +215,21 @@ private:
SortOrder m_sort;
bool m_live = true;
std::shared_ptr<_impl::AsyncQuery> m_background_query;
Mode m_mode = Mode::Empty;
bool m_has_used_table_view = false;
bool m_wants_background_updates = true;
void validate_read() const;
void validate_write() const;
void update_tableview();
template<typename Int, typename Float, typename Double, typename DateTime>
util::Optional<Mixed> aggregate(size_t column, bool return_none_for_empty,
Int agg_int, Float agg_float,
Double agg_double, DateTime agg_datetime);
void set_table_view(TableView&& tv);
};
}

View File

@ -19,6 +19,8 @@
#ifndef REALM_SCHEMA_HPP
#define REALM_SCHEMA_HPP
#include "property.hpp"
#include <string>
#include <vector>
#include <string>
@ -32,6 +34,7 @@ private:
public:
// Create a schema from a vector of ObjectSchema
Schema(base types);
Schema(std::initializer_list<ObjectSchema> types) : Schema(base(types)) { }
// find an ObjectSchema by name
iterator find(std::string const& name);

View File

@ -18,13 +18,12 @@
#include "shared_realm.hpp"
#if __APPLE__
#include "external_commit_helper.hpp"
#endif
#include "binding_context.hpp"
#include "schema.hpp"
#include "impl/external_commit_helper.hpp"
#include "impl/realm_coordinator.hpp"
#include "impl/transact_log_handler.hpp"
#include "object_store.hpp"
#include "schema.hpp"
#include <realm/commit_log.hpp>
#include <realm/group_shared.hpp>
@ -34,24 +33,23 @@
using namespace realm;
using namespace realm::_impl;
RealmCache Realm::s_global_cache;
Realm::Config::Config(const Config& c)
: path(c.path)
, encryption_key(c.encryption_key)
, schema_version(c.schema_version)
, migration_function(c.migration_function)
, read_only(c.read_only)
, in_memory(c.in_memory)
, cache(c.cache)
, disable_format_upgrade(c.disable_format_upgrade)
, encryption_key(c.encryption_key)
, schema_version(c.schema_version)
, migration_function(c.migration_function)
, automatic_change_notifications(c.automatic_change_notifications)
{
if (c.schema) {
schema = std::make_unique<Schema>(*c.schema);
}
}
Realm::Config::Config() = default;
Realm::Config::Config() : schema_version(ObjectStore::NotVersioned) { }
Realm::Config::Config(Config&&) = default;
Realm::Config::~Config() = default;
@ -65,26 +63,37 @@ Realm::Config& Realm::Config::operator=(realm::Realm::Config const& c)
Realm::Realm(Config config)
: m_config(std::move(config))
{
open_with_config(m_config, m_history, m_shared_group, m_read_only_group);
if (m_read_only_group) {
m_group = m_read_only_group.get();
}
}
void Realm::open_with_config(const Config& config,
std::unique_ptr<ClientHistory>& history,
std::unique_ptr<SharedGroup>& shared_group,
std::unique_ptr<Group>& read_only_group)
{
try {
if (m_config.read_only) {
m_read_only_group = std::make_unique<Group>(m_config.path, m_config.encryption_key.data(), Group::mode_ReadOnly);
m_group = m_read_only_group.get();
if (config.read_only) {
read_only_group = std::make_unique<Group>(config.path, config.encryption_key.data(), Group::mode_ReadOnly);
}
else {
if (m_config.encryption_key.data() && m_config.encryption_key.size() != 64) {
if (config.encryption_key.data() && config.encryption_key.size() != 64) {
throw InvalidEncryptionKeyException();
}
m_history = realm::make_client_history(m_config.path, m_config.encryption_key.data());
SharedGroup::DurabilityLevel durability = m_config.in_memory ? SharedGroup::durability_MemOnly :
history = realm::make_client_history(config.path, config.encryption_key.data());
SharedGroup::DurabilityLevel durability = config.in_memory ? SharedGroup::durability_MemOnly :
SharedGroup::durability_Full;
m_shared_group = std::make_unique<SharedGroup>(*m_history, durability, m_config.encryption_key.data(), !m_config.disable_format_upgrade);
shared_group = std::make_unique<SharedGroup>(*history, durability, config.encryption_key.data(), !config.disable_format_upgrade);
}
}
catch (util::File::PermissionDenied const& ex) {
throw RealmFileException(RealmFileException::Kind::PermissionDenied, ex.get_path(),
"Unable to open a realm at path '" + ex.get_path() +
"'. Please use a path where your app has " + (m_config.read_only ? "read" : "read-write") + " permissions.");
"'. Please use a path where your app has " + (config.read_only ? "read" : "read-write") + " permissions.");
}
catch (util::File::Exists const& ex) {
throw RealmFileException(RealmFileException::Kind::Exists, ex.get_path(),
@ -99,23 +108,70 @@ Realm::Realm(Config config)
"Unable to open a realm at path '" + ex.get_path() + "'");
}
catch (IncompatibleLockFile const& ex) {
throw RealmFileException(RealmFileException::Kind::IncompatibleLockFile, m_config.path,
throw RealmFileException(RealmFileException::Kind::IncompatibleLockFile, config.path,
"Realm file is currently open in another process "
"which cannot share access with this process. All processes sharing a single file must be the same architecture.");
}
catch (FileFormatUpgradeRequired const& ex) {
throw RealmFileException(RealmFileException::Kind::FormatUpgradeRequired, m_config.path,
throw RealmFileException(RealmFileException::Kind::FormatUpgradeRequired, config.path,
"The Realm file format must be allowed to be upgraded "
"in order to proceed.");
}
}
Realm::~Realm() {
#if __APPLE__
if (m_notifier) { // might not exist yet if an error occurred during init
m_notifier->remove_realm(this);
void Realm::init(std::shared_ptr<RealmCoordinator> coordinator)
{
m_coordinator = std::move(coordinator);
// if there is an existing realm at the current path steal its schema/column mapping
if (auto existing = m_coordinator->get_schema()) {
m_config.schema = std::make_unique<Schema>(*existing);
return;
}
try {
// otherwise get the schema from the group
auto target_schema = std::move(m_config.schema);
auto target_schema_version = m_config.schema_version;
m_config.schema_version = ObjectStore::get_schema_version(read_group());
m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(read_group()));
// if a target schema is supplied, verify that it matches or migrate to
// it, as neeeded
if (target_schema) {
if (m_config.read_only) {
if (m_config.schema_version == ObjectStore::NotVersioned) {
throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema");
}
target_schema->validate();
ObjectStore::verify_schema(*m_config.schema, *target_schema, true);
m_config.schema = std::move(target_schema);
}
else {
update_schema(std::move(target_schema), target_schema_version);
}
if (!m_config.read_only) {
// End the read transaction created to validation/update the
// schema to avoid pinning the version even if the user never
// actually reads data
invalidate();
}
}
}
catch (...) {
// Trying to unregister from the coordinator before we finish
// construction will result in a deadlock
m_coordinator = nullptr;
throw;
}
}
Realm::~Realm()
{
if (m_coordinator) {
m_coordinator->unregister_realm(this);
}
#endif
}
Group *Realm::read_group()
@ -128,134 +184,87 @@ Group *Realm::read_group()
SharedRealm Realm::get_shared_realm(Config config)
{
if (config.cache) {
if (SharedRealm realm = s_global_cache.get_realm(config.path)) {
if (realm->config().read_only != config.read_only) {
throw MismatchedConfigException("Realm at path already opened with different read permissions.");
}
if (realm->config().in_memory != config.in_memory) {
throw MismatchedConfigException("Realm at path already opened with different inMemory settings.");
}
if (realm->config().encryption_key != config.encryption_key) {
throw MismatchedConfigException("Realm at path already opened with a different encryption key.");
}
if (realm->config().schema_version != config.schema_version && config.schema_version != ObjectStore::NotVersioned) {
throw MismatchedConfigException("Realm at path already opened with different schema version.");
}
// FIXME - enable schma comparison
/*if (realm->config().schema != config.schema) {
throw MismatchedConfigException("Realm at path already opened with different schema");
}*/
realm->m_config.migration_function = config.migration_function;
return realm;
}
}
SharedRealm realm(new Realm(std::move(config)));
auto target_schema = std::move(realm->m_config.schema);
auto target_schema_version = realm->m_config.schema_version;
realm->m_config.schema_version = ObjectStore::get_schema_version(realm->read_group());
// we want to ensure we are only initializing a single realm at a time
static std::mutex s_init_mutex;
std::lock_guard<std::mutex> lock(s_init_mutex);
if (auto existing = s_global_cache.get_any_realm(realm->config().path)) {
// if there is an existing realm at the current path steal its schema/column mapping
// FIXME - need to validate that schemas match
realm->m_config.schema = std::make_unique<Schema>(*existing->m_config.schema);
#if __APPLE__
realm->m_notifier = existing->m_notifier;
realm->m_notifier->add_realm(realm.get());
#endif
}
else {
#if __APPLE__
realm->m_notifier = std::make_shared<ExternalCommitHelper>(realm.get());
#endif
// otherwise get the schema from the group
realm->m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(realm->read_group()));
// if a target schema is supplied, verify that it matches or migrate to
// it, as neeeded
if (target_schema) {
if (realm->m_config.read_only) {
if (realm->m_config.schema_version == ObjectStore::NotVersioned) {
throw UnitializedRealmException("Can't open an un-initialized Realm without a Schema");
}
target_schema->validate();
ObjectStore::verify_schema(*realm->m_config.schema, *target_schema, true);
realm->m_config.schema = std::move(target_schema);
}
else {
realm->update_schema(std::move(target_schema), target_schema_version);
}
if (!realm->m_config.read_only) {
// End the read transaction created to validation/update the
// schema to avoid pinning the version even if the user never
// actually reads data
realm->invalidate();
}
}
}
if (config.cache) {
s_global_cache.cache_realm(realm, realm->m_thread_id);
}
return realm;
return RealmCoordinator::get_coordinator(config.path)->get_realm(std::move(config));
}
bool Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
void Realm::update_schema(std::unique_ptr<Schema> schema, uint64_t version)
{
schema->validate();
bool needs_update = !m_config.read_only && (m_config.schema_version != version || ObjectStore::needs_update(*m_config.schema, *schema));
if (!needs_update) {
auto needs_update = [&] {
// If the schema version matches, just verify that the schema itself also matches
bool needs_write = !m_config.read_only && (m_config.schema_version != version || ObjectStore::needs_update(*m_config.schema, *schema));
if (needs_write) {
return true;
}
ObjectStore::verify_schema(*m_config.schema, *schema, m_config.read_only);
m_config.schema = std::move(schema);
m_config.schema_version = version;
m_coordinator->update_schema(*m_config.schema);
return false;
};
if (!needs_update()) {
return;
}
read_group();
transaction::begin(*m_shared_group, *m_history, m_binding_context.get(),
/* error on schema changes */ false);
m_in_transaction = true;
struct WriteTransactionGuard {
Realm& realm;
~WriteTransactionGuard() {
if (realm.is_in_transaction()) {
realm.cancel_transaction();
}
}
} write_transaction_guard{*this};
// Recheck the schema version after beginning the write transaction
// If it changed then someone else initialized the schema and we need to
// recheck everything
auto current_schema_version = ObjectStore::get_schema_version(read_group());
if (current_schema_version != m_config.schema_version) {
m_config.schema_version = current_schema_version;
*m_config.schema = ObjectStore::schema_from_group(read_group());
if (!needs_update()) {
cancel_transaction();
return;
}
}
// Store the old config/schema for the migration function, and update
// our schema to the new one
auto old_schema = std::move(m_config.schema);
Config old_config(m_config);
old_config.read_only = true;
old_config.schema = std::move(old_schema);
m_config.schema = std::move(schema);
m_config.schema_version = version;
auto migration_function = [&](Group*, Schema&) {
SharedRealm old_realm(new Realm(old_config));
auto updated_realm = shared_from_this();
// Need to open in read-write mode so that it uses a SharedGroup, but
// users shouldn't actually be able to write via the old realm
old_realm->m_config.read_only = true;
if (m_config.migration_function) {
m_config.migration_function(old_realm, updated_realm);
m_config.migration_function(old_realm, shared_from_this());
}
};
try {
// update and migrate
begin_transaction();
bool changed = ObjectStore::update_realm_with_schema(read_group(), *old_config.schema,
version, *m_config.schema,
migration_function);
m_config.schema = std::move(schema);
m_config.schema_version = version;
ObjectStore::update_realm_with_schema(read_group(), *old_config.schema,
version, *m_config.schema,
migration_function);
commit_transaction();
return changed;
}
catch (...) {
if (is_in_transaction()) {
cancel_transaction();
}
m_config.schema_version = old_config.schema_version;
m_config.schema = std::move(old_config.schema);
m_config.schema_version = old_config.schema_version;
throw;
}
m_coordinator->update_schema(*m_config.schema);
}
static void check_read_write(Realm *realm)
@ -306,9 +315,7 @@ void Realm::commit_transaction()
m_in_transaction = false;
transaction::commit(*m_shared_group, *m_history, m_binding_context.get());
#if __APPLE__
m_notifier->notify_others();
#endif
m_coordinator->send_commit_notifications();
}
void Realm::cancel_transaction()
@ -369,16 +376,18 @@ void Realm::notify()
}
if (m_auto_refresh) {
if (m_group) {
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
m_coordinator->advance_to_ready(*this);
}
else if (m_binding_context) {
m_binding_context->did_change({}, {});
}
}
}
else {
m_coordinator->process_available_async(*this);
}
}
bool Realm::refresh()
{
verify_thread();
@ -396,6 +405,7 @@ bool Realm::refresh()
if (m_group) {
transaction::advance(*m_shared_group, *m_history, m_binding_context.get());
m_coordinator->process_available_async(*this);
}
else {
// Create the read transaction
@ -405,10 +415,24 @@ bool Realm::refresh()
return true;
}
bool Realm::can_deliver_notifications() const noexcept
{
if (m_config.read_only) {
return false;
}
if (m_binding_context && !m_binding_context->can_deliver_notifications()) {
return false;
}
return true;
}
uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
{
if (auto existing_realm = s_global_cache.get_any_realm(config.path)) {
return existing_realm->config().schema_version;
auto coordinator = RealmCoordinator::get_existing_coordinator(config.path);
if (coordinator) {
return coordinator->get_schema_version();
}
return ObjectStore::get_schema_version(Realm(config).read_group());
@ -416,99 +440,16 @@ uint64_t Realm::get_schema_version(const realm::Realm::Config &config)
void Realm::close()
{
#if __APPLE__
if (m_notifier) {
m_notifier->remove_realm(this);
invalidate();
if (m_coordinator) {
m_coordinator->unregister_realm(this);
}
m_notifier = nullptr;
#endif
m_group = nullptr;
m_shared_group = nullptr;
m_history = nullptr;
m_read_only_group = nullptr;
m_binding_context = nullptr;
}
SharedRealm RealmCache::get_realm(const std::string &path, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return SharedRealm();
}
auto thread_iter = path_iter->second.find(thread_id);
if (thread_iter == path_iter->second.end()) {
return SharedRealm();
}
return thread_iter->second.lock();
}
SharedRealm RealmCache::get_any_realm(const std::string &path)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return SharedRealm();
}
auto thread_iter = path_iter->second.begin();
while (thread_iter != path_iter->second.end()) {
if (auto realm = thread_iter->second.lock()) {
return realm;
}
path_iter->second.erase(thread_iter++);
}
return SharedRealm();
}
void RealmCache::remove(const std::string &path, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(path);
if (path_iter == m_cache.end()) {
return;
}
auto thread_iter = path_iter->second.find(thread_id);
if (thread_iter != path_iter->second.end()) {
path_iter->second.erase(thread_iter);
}
if (path_iter->second.size() == 0) {
m_cache.erase(path_iter);
}
}
void RealmCache::cache_realm(SharedRealm &realm, std::thread::id thread_id)
{
std::lock_guard<std::mutex> lock(m_mutex);
auto path_iter = m_cache.find(realm->config().path);
if (path_iter == m_cache.end()) {
m_cache.emplace(realm->config().path, std::map<std::thread::id, WeakRealm>{{thread_id, realm}});
}
else {
path_iter->second.emplace(thread_id, realm);
}
}
void RealmCache::clear()
{
std::lock_guard<std::mutex> lock(m_mutex);
for (auto const& path : m_cache) {
for (auto const& thread : path.second) {
if (auto realm = thread.second.lock()) {
realm->close();
}
}
}
m_cache.clear();
m_coordinator = nullptr;
}

View File

@ -19,45 +19,69 @@
#ifndef REALM_REALM_HPP
#define REALM_REALM_HPP
#include "object_store.hpp"
#include <realm/handover_defs.hpp>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <mutex>
namespace realm {
class ClientHistory;
class Realm;
class RealmCache;
class BindingContext;
class ClientHistory;
class Group;
class Realm;
class RealmDelegate;
class Schema;
class SharedGroup;
typedef std::shared_ptr<Realm> SharedRealm;
typedef std::weak_ptr<Realm> WeakRealm;
namespace _impl {
class ExternalCommitHelper;
class AsyncQuery;
class RealmCoordinator;
}
class Realm : public std::enable_shared_from_this<Realm>
{
class Realm : public std::enable_shared_from_this<Realm> {
public:
typedef std::function<void(SharedRealm old_realm, SharedRealm realm)> MigrationFunction;
struct Config
{
struct Config {
std::string path;
bool read_only = false;
bool in_memory = false;
bool cache = true;
bool disable_format_upgrade = false;
// User-supplied encryption key. Must be either empty or 64 bytes.
std::vector<char> encryption_key;
// Optional schema for the file. If nullptr, the existing schema
// from the file opened will be used. If present, the file will be
// migrated to the schema if needed.
std::unique_ptr<Schema> schema;
uint64_t schema_version = ObjectStore::NotVersioned;
uint64_t schema_version;
MigrationFunction migration_function;
bool read_only = false;
bool in_memory = false;
// The following are intended for internal/testing purposes and
// should not be publically exposed in binding APIs
// If false, always return a new Realm instance, and don't return
// that Realm instance for other requests for a cached Realm. Useful
// for dynamic Realms and for tests that need multiple instances on
// one thread
bool cache = true;
// Throw an exception rather than automatically upgrading the file
// format. Used by the browser to warn the user that it'll modify
// the file.
bool disable_format_upgrade = false;
// Disable the background worker thread for producing change
// notifications. Useful for tests for those notifications so that
// everything can be done deterministically on one thread, and
// speeds up tests that don't need notifications.
bool automatic_change_notifications = true;
Config();
Config(Config&&);
Config(const Config& c);
@ -80,8 +104,7 @@ namespace realm {
// updating indexes as necessary. Uses the existing migration function
// on the Config, and the resulting Schema and version with updated
// column mappings are set on the realms config upon success.
// returns if any changes were made
bool update_schema(std::unique_ptr<Schema> schema, uint64_t version);
void update_schema(std::unique_ptr<Schema> schema, uint64_t version);
static uint64_t get_schema_version(Config const& config);
@ -105,15 +128,40 @@ namespace realm {
void verify_thread() const;
void verify_in_write() const;
bool can_deliver_notifications() const noexcept;
// Close this Realm and remove it from the cache. Continuing to use a
// Realm after closing it will produce undefined behavior.
void close();
~Realm();
private:
void init(std::shared_ptr<_impl::RealmCoordinator> coordinator);
Realm(Config config);
// Expose some internal functionality to other parts of the ObjectStore
// without making it public to everyone
class Internal {
friend class _impl::AsyncQuery;
friend class _impl::RealmCoordinator;
// AsyncQuery needs access to the SharedGroup to be able to call the
// handover functions, which are not very wrappable
static SharedGroup& get_shared_group(Realm& realm) { return *realm.m_shared_group; }
static ClientHistory& get_history(Realm& realm) { return *realm.m_history; }
// AsyncQuery needs to be able to access the owning coordinator to
// wake up the worker thread when a callback is added, and
// coordinators need to be able to get themselves from a Realm
static _impl::RealmCoordinator& get_coordinator(Realm& realm) { return *realm.m_coordinator; }
};
static void open_with_config(const Config& config,
std::unique_ptr<ClientHistory>& history,
std::unique_ptr<SharedGroup>& shared_group,
std::unique_ptr<Group>& read_only_group);
private:
Config m_config;
std::thread::id m_thread_id = std::this_thread::get_id();
bool m_in_transaction = false;
@ -125,30 +173,13 @@ namespace realm {
Group *m_group = nullptr;
#if __APPLE__
std::shared_ptr<_impl::ExternalCommitHelper> m_notifier;
#endif
std::shared_ptr<_impl::RealmCoordinator> m_coordinator;
public:
std::unique_ptr<BindingContext> m_binding_context;
// FIXME private
Group *read_group();
static RealmCache s_global_cache;
};
class RealmCache
{
public:
SharedRealm get_realm(const std::string &path, std::thread::id thread_id = std::this_thread::get_id());
SharedRealm get_any_realm(const std::string &path);
void remove(const std::string &path, std::thread::id thread_id);
void cache_realm(SharedRealm &realm, std::thread::id thread_id = std::this_thread::get_id());
void clear();
private:
std::map<std::string, std::map<std::thread::id, WeakRealm>> m_cache;
std::mutex m_mutex;
};
class RealmFileException : public std::runtime_error {

View File

@ -0,0 +1,137 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#ifndef REALM_ATOMIC_SHARED_PTR_HPP
#define REALM_ATOMIC_SHARED_PTR_HPP
#include <atomic>
#include <memory>
#include <mutex>
namespace realm {
namespace _impl {
// Check if std::atomic_load has an overload taking a std::shared_ptr, and set
// HasAtomicPtrOps to either true_type or false_type
template<typename... Ts> struct make_void { typedef void type; };
template<typename... Ts> using void_t = typename make_void<Ts...>::type;
template<typename, typename = void_t<>>
struct HasAtomicPtrOps : std::false_type { };
template<class T>
struct HasAtomicPtrOps<T, void_t<decltype(std::atomic_load(std::declval<T*>()))>> : std::true_type { };
} // namespace _impl
namespace util {
// A wrapper for std::shared_ptr that enables sharing a shared_ptr instance
// (and not just a thing *pointed to* by a shared_ptr) between threads. Is
// lock-free iff the underlying shared_ptr implementation supports atomic
// operations. Currently the only implemented operation other than copy/move
// construction/assignment is exchange().
template<typename T, bool = _impl::HasAtomicPtrOps<std::shared_ptr<T>>::value>
class AtomicSharedPtr;
template<typename T>
class AtomicSharedPtr<T, true> {
public:
AtomicSharedPtr() = default;
AtomicSharedPtr(std::shared_ptr<T> ptr) : m_ptr(std::move(ptr)) { }
AtomicSharedPtr(AtomicSharedPtr const& ptr) : m_ptr(std::atomic_load(&ptr.m_ptr)) { }
AtomicSharedPtr(AtomicSharedPtr&& ptr) : m_ptr(std::atomic_exchange(&ptr.m_ptr, {})) { }
AtomicSharedPtr& operator=(AtomicSharedPtr const& ptr)
{
if (&ptr != this) {
std::atomic_store(&m_ptr, std::atomic_load(&ptr.m_ptr));
}
return *this;
}
AtomicSharedPtr& operator=(AtomicSharedPtr&& ptr)
{
std::atomic_store(&m_ptr, std::atomic_exchange(&ptr.m_ptr, {}));
return *this;
}
std::shared_ptr<T> exchange(std::shared_ptr<T> ptr)
{
return std::atomic_exchange(&m_ptr, std::move(ptr));
}
private:
std::shared_ptr<T> m_ptr = nullptr;
};
template<typename T>
class AtomicSharedPtr<T, false> {
public:
AtomicSharedPtr() = default;
AtomicSharedPtr(std::shared_ptr<T> ptr) : m_ptr(std::move(ptr)) { }
AtomicSharedPtr(AtomicSharedPtr const& ptr)
{
std::lock_guard<std::mutex> lock(ptr.m_mutex);
m_ptr = ptr.m_ptr;
}
AtomicSharedPtr(AtomicSharedPtr&& ptr)
{
std::lock_guard<std::mutex> lock(ptr.m_mutex);
m_ptr = std::move(ptr.m_ptr);
}
AtomicSharedPtr& operator=(AtomicSharedPtr const& ptr)
{
if (&ptr != this) {
// std::lock() ensures that these are locked in a consistent order
// to avoid deadlock
std::lock(m_mutex, ptr.m_mutex);
m_ptr = ptr.m_ptr;
m_mutex.unlock();
ptr.m_mutex.unlock();
}
return *this;
}
AtomicSharedPtr& operator=(AtomicSharedPtr&& ptr)
{
std::lock(m_mutex, ptr.m_mutex);
m_ptr = std::move(ptr.m_ptr);
m_mutex.unlock();
ptr.m_mutex.unlock();
return *this;
}
std::shared_ptr<T> exchange(std::shared_ptr<T> ptr)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_ptr.swap(ptr);
return ptr;
}
private:
std::mutex m_mutex;
std::shared_ptr<T> m_ptr = nullptr;
};
}
}
#endif // REALM_ASYNC_QUERY_HPP

18
tests/CMakeLists.txt Normal file
View File

@ -0,0 +1,18 @@
include_directories(../external/catch/single_include .)
set(HEADERS
util/test_file.hpp
)
set(SOURCES
index_set.cpp
main.cpp
parser.cpp
results.cpp
util/test_file.cpp
)
add_executable(tests ${SOURCES} ${HEADERS})
target_link_libraries(tests realm-object-store)
add_custom_target(run-tests USES_TERMINAL DEPENDS tests COMMAND ./tests)

153
tests/index_set.cpp Normal file
View File

@ -0,0 +1,153 @@
#include "index_set.hpp"
#include <string>
// Catch doesn't have an overload for std::pair, so define one ourselves
// The declaration needs to be before catch.hpp is included for it to be used,
// but the definition needs to be after since it uses Catch's toString()
namespace Catch {
template<typename T, typename U>
std::string toString(std::pair<T, U> const& value);
}
#include "catch.hpp"
namespace Catch {
template<typename T, typename U>
std::string toString(std::pair<T, U> const& value) {
return "{" + toString(value.first) + ", " + toString(value.second) + "}";
}
}
#define REQUIRE_RANGES(index_set, ...) do { \
std::initializer_list<std::pair<size_t, size_t>> expected = {__VA_ARGS__}; \
REQUIRE(index_set.size() == expected.size()); \
auto begin = index_set.begin(), end = index_set.end(); \
for (auto range : expected) { \
REQUIRE(*begin++ == range); \
} \
} while (0)
TEST_CASE("index set") {
realm::IndexSet set;
SECTION("add() extends existing ranges") {
set.add(1);
REQUIRE_RANGES(set, {1, 2});
set.add(2);
REQUIRE_RANGES(set, {1, 3});
set.add(0);
REQUIRE_RANGES(set, {0, 3});
}
SECTION("add() with gaps") {
set.add(0);
REQUIRE_RANGES(set, {0, 1});
set.add(2);
REQUIRE_RANGES(set, {0, 1}, {2, 3});
}
SECTION("add() is idempotent") {
set.add(0);
set.add(0);
REQUIRE_RANGES(set, {0, 1});
}
SECTION("add() merges existing ranges") {
set.add(0);
set.add(2);
set.add(4);
set.add(1);
REQUIRE_RANGES(set, {0, 3}, {4, 5});
}
SECTION("set() from empty") {
set.set(5);
REQUIRE_RANGES(set, {0, 5});
}
SECTION("set() discards existing data") {
set.add(8);
set.add(9);
set.set(5);
REQUIRE_RANGES(set, {0, 5});
}
SECTION("insert_at() extends ranges containing the target index") {
set.add(5);
set.add(6);
set.insert_at(5);
REQUIRE_RANGES(set, {5, 8});
set.insert_at(4);
REQUIRE_RANGES(set, {4, 5}, {6, 9});
set.insert_at(9);
REQUIRE_RANGES(set, {4, 5}, {6, 10});
}
SECTION("insert_at() does not modify ranges entirely before it") {
set.add(5);
set.add(6);
set.insert_at(8);
REQUIRE_RANGES(set, {5, 7}, {8, 9});
}
SECTION("insert_at() shifts ranges after it") {
set.add(5);
set.add(6);
set.insert_at(3);
REQUIRE_RANGES(set, {3, 4}, {6, 8});
}
SECTION("insert_at() cannot join ranges") {
set.add(5);
set.add(7);
set.insert_at(6);
REQUIRE_RANGES(set, {5, 7}, {8, 9});
}
SECTION("add_shifted() on an empty set is just add()") {
set.add_shifted(5);
REQUIRE_RANGES(set, {5, 6});
}
SECTION("add_shifted() before the first range is just add()") {
set.add(10);
set.add_shifted(5);
REQUIRE_RANGES(set, {5, 6}, {10, 11});
}
SECTION("add_shifted() on first index of range extends range") {
set.add(5);
set.add_shifted(5);
REQUIRE_RANGES(set, {5, 7});
set.add_shifted(5);
REQUIRE_RANGES(set, {5, 8});
set.add_shifted(6);
REQUIRE_RANGES(set, {5, 8}, {9, 10});
}
SECTION("add_shifted() after ranges shifts by the size of those ranges") {
set.add(5);
set.add_shifted(6);
REQUIRE_RANGES(set, {5, 6}, {7, 8});
set.add_shifted(6); // bumped into second range
REQUIRE_RANGES(set, {5, 6}, {7, 9});
set.add_shifted(8);
REQUIRE_RANGES(set, {5, 6}, {7, 9}, {11, 12});
}
}

2
tests/main.cpp Normal file
View File

@ -0,0 +1,2 @@
#define CATCH_CONFIG_MAIN
#include "catch.hpp"

View File

@ -1,10 +1,8 @@
#include "parser.hpp"
#include "catch.hpp"
#include "parser/parser.hpp"
#include <vector>
#include <string>
#include <exception>
#include <iostream>
static std::vector<std::string> valid_queries = {
// true/false predicates
@ -51,6 +49,13 @@ static std::vector<std::string> valid_queries = {
"0 contains 0",
"0 BeGiNsWiTh 0",
"0 ENDSWITH 0",
"contains contains 'contains'",
"beginswith beginswith 'beginswith'",
"endswith endswith 'endswith'",
"NOT NOT != 'NOT'",
"AND == 'AND' AND OR == 'OR'",
// FIXME - bug
// "truepredicate == 'falsepredicate' && truepredicate",
// atoms/groups
"(0=0)",
@ -131,36 +136,16 @@ static std::vector<std::string> invalid_queries = {
"truepredicate & truepredicate",
};
namespace realm {
namespace parser {
bool test_grammar()
{
bool success = true;
for (auto &query : valid_queries) {
std::cout << "valid query: " << query << std::endl;
try {
realm::parser::parse(query);
} catch (std::exception &ex) {
std::cout << "FAILURE - " << ex.what() << std::endl;
success = false;
}
TEST_CASE("valid queries") {
for (auto& query : valid_queries) {
INFO("query: " << query);
CHECK_NOTHROW(realm::parser::parse(query));
}
}
for (auto &query : invalid_queries) {
std::cout << "invalid query: " << query << std::endl;
try {
realm::parser::parse(query);
} catch (std::exception &ex) {
// std::cout << "message: " << ex.what() << std::endl;
continue;
}
std::cout << "FAILURE - query should throw an exception" << std::endl;
success = false;
TEST_CASE("invalid queries") {
for (auto& query : invalid_queries) {
INFO("query: " << query);
CHECK_THROWS(realm::parser::parse(query));
}
return success;
}
}
}

146
tests/results.cpp Normal file
View File

@ -0,0 +1,146 @@
#include "catch.hpp"
#include "util/test_file.hpp"
#include "impl/realm_coordinator.hpp"
#include "object_schema.hpp"
#include "property.hpp"
#include "results.hpp"
#include "schema.hpp"
#include <realm/commit_log.hpp>
#include <realm/group_shared.hpp>
#include <realm/link_view.hpp>
using namespace realm;
TEST_CASE("Results") {
InMemoryTestFile config;
config.cache = false;
config.automatic_change_notifications = false;
config.schema = std::make_unique<Schema>(Schema{
{"object", "", {
{"value", PropertyTypeInt},
{"link", PropertyTypeObject, "linked to object", false, false, true}
}},
{"other object", "", {
{"value", PropertyTypeInt}
}},
{"linking object", "", {
{"link", PropertyTypeObject, "object", false, false, true}
}},
{"linked to object", "", {
{"value", PropertyTypeInt}
}}
});
auto r = Realm::get_shared_realm(config);
auto coordinator = _impl::RealmCoordinator::get_existing_coordinator(config.path);
auto table = r->read_group()->get_table("class_object");
r->begin_transaction();
table->add_empty_row(10);
for (int i = 0; i < 10; ++i)
table->set_int(0, i, i);
r->commit_transaction();
Results results(r, *config.schema->find("object"), table->where().greater(0, 0).less(0, 5));
SECTION("notifications") {
int notification_calls = 0;
auto token = results.async([&](std::exception_ptr err) {
REQUIRE_FALSE(err);
++notification_calls;
});
coordinator->on_change();
r->notify();
SECTION("initial results are delivered") {
REQUIRE(notification_calls == 1);
}
SECTION("modifying the table sends a notification asynchronously") {
r->begin_transaction();
table->set_int(0, 0, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("modifying a linked-to table send a notification") {
r->begin_transaction();
r->read_group()->get_table("class_linked to object")->add_empty_row();
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("modifying a a linking table sends a notification") {
r->begin_transaction();
r->read_group()->get_table("class_linking object")->add_empty_row();
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("modifying a an unrelated table does not send a notification") {
r->begin_transaction();
r->read_group()->get_table("class_other object")->add_empty_row();
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 1);
}
SECTION("modifications from multiple transactions are collapsed") {
r->begin_transaction();
table->set_int(0, 0, 0);
r->commit_transaction();
r->begin_transaction();
table->set_int(0, 1, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 2);
}
SECTION("notifications are not delivered when the token is destroyed before they are calculated") {
r->begin_transaction();
table->set_int(0, 0, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
token = {};
coordinator->on_change();
r->notify();
REQUIRE(notification_calls == 1);
}
SECTION("notifications are not delivered when the token is destroyed before they are delivered") {
r->begin_transaction();
table->set_int(0, 0, 0);
r->commit_transaction();
REQUIRE(notification_calls == 1);
coordinator->on_change();
token = {};
r->notify();
REQUIRE(notification_calls == 1);
}
}
}

31
tests/util/test_file.cpp Normal file
View File

@ -0,0 +1,31 @@
#include "util/test_file.hpp"
#include <realm/disable_sync_to_disk.hpp>
#include <cstdlib>
#include <unistd.h>
TestFile::TestFile()
{
static std::string tmpdir = [] {
realm::disable_sync_to_disk();
const char* dir = getenv("TMPDIR");
if (dir && *dir)
return dir;
return "/tmp";
}();
path = tmpdir + "/realm.XXXXXX";
mktemp(&path[0]);
unlink(path.c_str());
}
TestFile::~TestFile()
{
unlink(path.c_str());
}
InMemoryTestFile::InMemoryTestFile()
{
in_memory = true;
}

15
tests/util/test_file.hpp Normal file
View File

@ -0,0 +1,15 @@
#ifndef REALM_TEST_UTIL_TEST_FILE_HPP
#define REALM_TEST_UTIL_TEST_FILE_HPP
#include "shared_realm.hpp"
struct TestFile : realm::Realm::Config {
TestFile();
~TestFile();
};
struct InMemoryTestFile : TestFile {
InMemoryTestFile();
};
#endif