Merge pull request #469 from realm/sk-object-store-merge

Merge latest object store (includes Android async stuff)
This commit is contained in:
Scott Kyle 2016-06-13 14:38:28 -07:00 committed by GitHub
commit 6aa9825cd1
13 changed files with 517 additions and 9 deletions

View File

@ -37,7 +37,8 @@ LOCAL_SRC_FILES := \
src/object-store/src/impl/realm_coordinator.cpp \
src/object-store/src/impl/results_notifier.cpp \
src/object-store/src/impl/transact_log_handler.cpp \
src/object-store/src/impl/generic/external_commit_helper.cpp \
src/object-store/src/impl/android/external_commit_helper.cpp \
src/object-store/src/impl/android/weak_realm_notifier.cpp \
vendor/base64.cpp
LOCAL_C_INCLUDES := src

View File

@ -9,5 +9,6 @@ cmake_install.cmake
rules.ninja
# Build products
src/librealm-object-store.*
src/librealm-object-store.dylib
src/librealm-object-store-static.a
tests/tests

View File

@ -13,7 +13,7 @@ include(RealmCore)
set(REALM_CORE_VERSION "0.100.1" CACHE STRING "")
use_realm_core(${REALM_CORE_VERSION})
include_directories(${REALM_CORE_INCLUDE_DIR} src external/pegtl)
set(PEGTL_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/external/pegtl)
add_subdirectory(src)
add_subdirectory(tests)

View File

@ -11,18 +11,23 @@ Cross-platform code used accross bindings. Binding developers can choose to use
The object store's build system currently only suports building for OS X. The object store itself can build for all Apple
platforms when integrated into a binding.
1. Install CMake. You can download an installer for OS X from the [CMake download page](https://cmake.org/download/), or install via [Homebrew](http://brew.sh):
1. Download PEGTL dependency
```
git submodule update --init
```
2. Install CMake. You can download an installer for OS X from the [CMake download page](https://cmake.org/download/), or install via [Homebrew](http://brew.sh):
```
brew install cmake
```
2. Generate build files:
3. Generate build files:
```
cmake .
```
3. Build:
4. Build:
```
make

View File

@ -55,5 +55,20 @@ else()
impl/generic/external_commit_helper.hpp)
endif()
add_library(realm-object-store SHARED ${SOURCES} ${HEADERS})
target_link_libraries(realm-object-store realm ${CF_LIBRARY})
set(INCLUDE_DIRS ${REALM_CORE_INCLUDE_DIR} ${PEGTL_INCLUDE_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
# An object library to group together the compilation of the source files.
add_library(realm-object-store-objects OBJECT ${SOURCES} ${HEADERS})
add_dependencies(realm-object-store-objects realm)
set_target_properties(realm-object-store-objects PROPERTIES POSITION_INDEPENDENT_CODE 1)
target_include_directories(realm-object-store-objects PUBLIC ${INCLUDE_DIRS})
# A static library, aggregating the prebuilt object files.
add_library(realm-object-store-static STATIC $<TARGET_OBJECTS:realm-object-store-objects>)
target_include_directories(realm-object-store-static PUBLIC ${INCLUDE_DIRS})
target_link_libraries(realm-object-store-static PUBLIC realm ${CF_LIBRARY})
# A dynamic library, linking together the prebuilt object files.
add_library(realm-object-store SHARED $<TARGET_OBJECTS:realm-object-store-objects>)
target_include_directories(realm-object-store PUBLIC ${INCLUDE_DIRS})
target_link_libraries(realm-object-store PRIVATE realm ${CF_LIBRARY})

View File

@ -0,0 +1,201 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/external_commit_helper.hpp"
#include "impl/realm_coordinator.hpp"
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <sstream>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <system_error>
#include <unistd.h>
#ifdef __ANDROID__
#include <android/log.h>
#define ANDROID_LOG __android_log_print
#else
#define ANDROID_LOG(...)
#endif
using namespace realm;
using namespace realm::_impl;
#define LOGE(fmt...) do { \
fprintf(stderr, fmt); \
ANDROID_LOG(ANDROID_LOG_ERROR, "REALM", fmt); \
} while (0)
namespace {
// Write a byte to a pipe to notify anyone waiting for data on the pipe
void notify_fd(int fd)
{
while (true) {
char c = 0;
ssize_t ret = write(fd, &c, 1);
if (ret == 1) {
break;
}
// If the pipe's buffer is full, we need to read some of the old data in
// it to make space. We don't just read in the code waiting for
// notifications so that we can notify multiple waiters with a single
// write.
assert(ret == -1 && errno == EAGAIN);
char buff[1024];
read(fd, buff, sizeof buff);
}
}
} // anonymous namespace
void ExternalCommitHelper::FdHolder::close()
{
if (m_fd != -1) {
::close(m_fd);
}
m_fd = -1;
}
ExternalCommitHelper::ExternalCommitHelper(RealmCoordinator& parent)
: m_parent(parent)
{
m_epfd = epoll_create(1);
if (m_epfd == -1) {
throw std::system_error(errno, std::system_category());
}
auto path = parent.get_path() + ".note";
// Create and open the named pipe
int ret = mkfifo(path.c_str(), 0600);
if (ret == -1) {
int err = errno;
if (err == ENOTSUP) {
// Filesystem doesn't support named pipes, so try putting it in tmp instead
// Hash collisions are okay here because they just result in doing
// extra work, as opposed to correctness problems
std::ostringstream ss;
std::string tmp_dir(getenv("TMPDIR"));
ss << tmp_dir;
if (tmp_dir.back() != '/')
ss << '/';
ss << "realm_" << std::hash<std::string>()(path) << ".note";
path = ss.str();
ret = mkfifo(path.c_str(), 0600);
err = errno;
}
// the fifo already existing isn't an error
if (ret == -1 && err != EEXIST) {
throw std::system_error(err, std::system_category());
}
}
m_notify_fd = open(path.c_str(), O_RDWR);
if (m_notify_fd == -1) {
throw std::system_error(errno, std::system_category());
}
// Make writing to the pipe return -1 when the pipe's buffer is full
// rather than blocking until there's space available
ret = fcntl(m_notify_fd, F_SETFL, O_NONBLOCK);
if (ret == -1) {
throw std::system_error(errno, std::system_category());
}
// Create the anonymous pipe
int pipe_fd[2];
ret = pipe(pipe_fd);
if (ret == -1) {
throw std::system_error(errno, std::system_category());
}
m_shutdown_read_fd = pipe_fd[0];
m_shutdown_write_fd = pipe_fd[1];
m_thread = std::thread([=] {
try {
listen();
}
catch (std::exception const& e) {
LOGE("uncaught exception in notifier thread: %s: %s\n", typeid(e).name(), e.what());
throw;
}
catch (...) {
LOGE("uncaught exception in notifier thread\n");
throw;
}
});
}
ExternalCommitHelper::~ExternalCommitHelper()
{
notify_fd(m_shutdown_write_fd);
m_thread.join(); // Wait for the thread to exit
}
void ExternalCommitHelper::listen()
{
pthread_setname_np(pthread_self(), "Realm notification listener");
int ret;
struct epoll_event event[2];
event[0].events = EPOLLIN | EPOLLET;
event[0].data.fd = m_notify_fd;
ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_notify_fd, &event[0]);
assert(ret == 0);
event[1].events = EPOLLIN;
event[1].data.fd = m_shutdown_read_fd;
ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_shutdown_read_fd, &event[1]);
assert(ret == 0);
while (true) {
struct epoll_event ev;
ret = epoll_wait(m_epfd, &ev, 1, -1);
if (ret == -1 && errno == EINTR) {
// Interrupted system call, try again.
continue;
}
assert(ret >= 0);
if (ret == 0) {
// Spurious wakeup; just wait again
continue;
}
if (ev.data.u32 == (uint32_t)m_shutdown_read_fd) {
return;
}
assert(ev.data.u32 == (uint32_t)m_notify_fd);
m_parent.on_change();
}
}
void ExternalCommitHelper::notify_others()
{
notify_fd(m_notify_fd);
}

View File

@ -0,0 +1,76 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include <thread>
namespace realm {
namespace _impl {
class RealmCoordinator;
class ExternalCommitHelper {
public:
ExternalCommitHelper(RealmCoordinator& parent);
~ExternalCommitHelper();
void notify_others();
private:
// A RAII holder for a file descriptor which automatically closes the wrapped
// fd when it's deallocated
class FdHolder {
public:
FdHolder() = default;
~FdHolder() { close(); }
operator int() const { return m_fd; }
FdHolder& operator=(int new_fd) {
close();
m_fd = new_fd;
return *this;
}
private:
int m_fd = -1;
void close();
FdHolder& operator=(FdHolder const&) = delete;
FdHolder(FdHolder const&) = delete;
};
void listen();
RealmCoordinator& m_parent;
// The listener thread
std::thread m_thread;
// Read-write file descriptor for the named pipe which is waited on for
// changes and written to when a commit is made
FdHolder m_notify_fd;
// File descriptor for epoll
FdHolder m_epfd;
// The two ends of an anonymous pipe used to notify the kqueue() thread that
// it should be shut down.
FdHolder m_shutdown_read_fd;
FdHolder m_shutdown_write_fd;
};
} // namespace _impl
} // namespace realm

View File

@ -0,0 +1,143 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/weak_realm_notifier.hpp"
#include "shared_realm.hpp"
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <android/log.h>
#include <android/looper.h>
#define LOGE(fmt...) do { \
fprintf(stderr, fmt); \
__android_log_print(ANDROID_LOG_ERROR, "REALM", fmt); \
} while (0)
namespace realm {
namespace _impl {
WeakRealmNotifier::WeakRealmNotifier(const std::shared_ptr<Realm>& realm, bool cache)
: WeakRealmNotifierBase(realm, cache)
, m_thread_has_looper(false)
{
ALooper* looper = ALooper_forThread();
if (!looper) {
return;
}
int message_pipe[2];
if (pipe2(message_pipe, O_CLOEXEC | O_NONBLOCK)) {
LOGE("could not create WeakRealmNotifier ALooper message pipe: %s", strerror(errno));
return;
}
if (ALooper_addFd(looper, message_pipe[0], 3 /* LOOPER_ID_USER */, ALOOPER_EVENT_INPUT | ALOOPER_EVENT_HANGUP, &looper_callback, nullptr) != 1) {
LOGE("Error adding WeakRealmNotifier callback to looper.");
::close(message_pipe[0]);
::close(message_pipe[1]);
return;
}
m_message_pipe.read = message_pipe[0];
m_message_pipe.write = message_pipe[1];
m_thread_has_looper = true;
}
WeakRealmNotifier::WeakRealmNotifier(WeakRealmNotifier&& rgt)
: WeakRealmNotifierBase(std::move(rgt))
, m_message_pipe(std::move(rgt.m_message_pipe))
{
bool flag = true;
m_thread_has_looper = rgt.m_thread_has_looper.compare_exchange_strong(flag, false);
}
WeakRealmNotifier& WeakRealmNotifier::operator=(WeakRealmNotifier&& rgt)
{
close();
WeakRealmNotifierBase::operator=(std::move(rgt));
m_message_pipe = std::move(rgt.m_message_pipe);
bool flag = true;
m_thread_has_looper = rgt.m_thread_has_looper.compare_exchange_strong(flag, false);
return *this;
}
void WeakRealmNotifier::close()
{
bool flag = true;
if (m_thread_has_looper.compare_exchange_strong(flag, false)) {
// closing one end of the pipe here will trigger ALOOPER_EVENT_HANGUP in the callback
// which will do the rest of the cleanup
::close(m_message_pipe.write);
}
}
void WeakRealmNotifier::notify()
{
if (m_thread_has_looper && !expired()) {
// we need to pass the weak Realm pointer to the other thread.
// to do so we allocate a weak pointer on the heap and send its address over a pipe.
// the other end of the pipe is read by the realm thread. when it's done with the pointer, it deletes it.
auto realm_ptr = new std::weak_ptr<Realm>(realm());
if (write(m_message_pipe.write, &realm_ptr, sizeof(realm_ptr)) != sizeof(realm_ptr)) {
delete realm_ptr;
LOGE("Buffer overrun when writing to WeakRealmNotifier's ALooper message pipe.");
}
}
}
int WeakRealmNotifier::looper_callback(int fd, int events, void* data)
{
if ((events & ALOOPER_EVENT_INPUT) != 0) {
// this is a pointer to a heap-allocated weak Realm pointer created by the notifiying thread.
// the actual address to the pointer is communicated over a pipe.
// we have to delete it so as to not leak, using the same memory allocation facilities it was allocated with.
std::weak_ptr<Realm>* realm_ptr = nullptr;
while (read(fd, &realm_ptr, sizeof(realm_ptr)) == sizeof(realm_ptr)) {
if (auto realm = realm_ptr->lock()) {
if (!realm->is_closed()) {
realm->notify();
}
}
delete realm_ptr;
}
}
if ((events & ALOOPER_EVENT_HANGUP) != 0) {
// this callback is always invoked on the looper thread so it's fine to get the looper like this
ALooper_removeFd(ALooper_forThread(), fd);
::close(fd);
}
if ((events & ALOOPER_EVENT_ERROR) != 0) {
LOGE("Unexpected error on WeakRealmNotifier's ALooper message pipe.");
}
// return 1 to continue receiving events
return 1;
}
}
}

View File

@ -0,0 +1,58 @@
////////////////////////////////////////////////////////////////////////////
//
// Copyright 2016 Realm Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////
#include "impl/weak_realm_notifier_base.hpp"
#include <atomic>
namespace realm {
class Realm;
namespace _impl {
class WeakRealmNotifier : public WeakRealmNotifierBase {
public:
WeakRealmNotifier(const std::shared_ptr<Realm>& realm, bool cache);
~WeakRealmNotifier() { close(); }
WeakRealmNotifier(WeakRealmNotifier&&);
WeakRealmNotifier& operator=(WeakRealmNotifier&&);
WeakRealmNotifier(const WeakRealmNotifier&) = delete;
WeakRealmNotifier& operator=(const WeakRealmNotifier&) = delete;
// Asyncronously call notify() on the Realm on the appropriate thread
void notify();
private:
void close();
static int looper_callback(int fd, int events, void* data);
std::atomic<bool> m_thread_has_looper;
// pipe file descriptor pair we use to signal ALooper
struct {
int read = -1;
int write = -1;
} m_message_pipe;
};
} // namespace _impl
} // namespace realm

View File

@ -23,6 +23,8 @@
#if REALM_PLATFORM_APPLE
#include "impl/apple/external_commit_helper.hpp"
#elif REALM_ANDROID
#include "impl/android/external_commit_helper.hpp"
#else
#include "impl/generic/external_commit_helper.hpp"
#endif

View File

@ -25,6 +25,8 @@
#include "impl/node/weak_realm_notifier.hpp"
#elif REALM_PLATFORM_APPLE
#include "impl/apple/weak_realm_notifier.hpp"
#elif REALM_ANDROID
#include "impl/android/weak_realm_notifier.hpp"
#else
#include "impl/generic/weak_realm_notifier.hpp"
#endif

View File

@ -40,7 +40,7 @@ public:
std::shared_ptr<Realm> realm() const { return m_realm.lock(); }
// Does this WeakRealmNotifierBase store a Realm instance that should be used on the current thread?
bool is_cached_for_current_thread() const { return m_cache && m_thread_id == std::this_thread::get_id(); }
bool is_cached_for_current_thread() const { return m_cache && is_for_current_thread(); }
// Has the Realm instance been destroyed?
bool expired() const { return m_realm.expired(); }
@ -48,6 +48,8 @@ public:
// Is this a WeakRealmNotifierBase for the given Realm instance?
bool is_for_realm(Realm* realm) const { return realm == m_realm_key; }
bool is_for_current_thread() const { return m_thread_id == std::this_thread::get_id(); }
private:
std::weak_ptr<Realm> m_realm;
std::thread::id m_thread_id = std::this_thread::get_id();

View File

@ -135,6 +135,8 @@ namespace realm {
// Realm after closing it will produce undefined behavior.
void close();
bool is_closed() { return !m_read_only_group && !m_shared_group; }
~Realm();
void init(std::shared_ptr<_impl::RealmCoordinator> coordinator);