From 00c9177b56b620c30b890c2a11a52eeda203a2ea Mon Sep 17 00:00:00 2001 From: Thomas Goyne Date: Wed, 2 Sep 2015 17:53:21 -0700 Subject: [PATCH] Move the interprocess notification functionality to the object store --- apple/external_commit_helper.cpp | 226 ++++++++++++++++++++++++++++++ apple/external_commit_helper.hpp | 81 +++++++++++ realm_delegate.hpp | 6 +- src/object-store/shared_realm.cpp | 7 +- src/object-store/shared_realm.hpp | 3 + transact_log_handler.cpp | 2 +- 6 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 apple/external_commit_helper.cpp create mode 100644 apple/external_commit_helper.hpp diff --git a/apple/external_commit_helper.cpp b/apple/external_commit_helper.cpp new file mode 100644 index 00000000..2364e970 --- /dev/null +++ b/apple/external_commit_helper.cpp @@ -0,0 +1,226 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#include "external_commit_helper.hpp" + +#include "shared_realm.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace realm; + +namespace { +// Write a byte to a pipe to notify anyone waiting for data on the pipe +void notify_fd(int fd) +{ + while (true) { + char c = 0; + ssize_t ret = write(fd, &c, 1); + if (ret == 1) { + break; + } + + // If the pipe's buffer is full, we need to read some of the old data in + // it to make space. We don't just read in the code waiting for + // notifications so that we can notify multiple waiters with a single + // write. + assert(ret == -1 && errno == EAGAIN); + char buff[1024]; + read(fd, buff, sizeof buff); + } +} +} // anonymous namespace + +void ExternalCommitHelper::FdHolder::close() +{ + if (m_fd != -1) { + ::close(m_fd); + } + m_fd = -1; + +} + +// Inter-thread and inter-process notifications of changes are done using a +// named pipe in the filesystem next to the Realm file. Everyone who wants to be +// notified of commits waits for data to become available on the pipe, and anyone +// who commits a write transaction writes data to the pipe after releasing the +// write lock. Note that no one ever actually *reads* from the pipe: the data +// actually written is meaningless, and trying to read from a pipe from multiple +// processes at once is fraught with race conditions. + +// When a RLMRealm instance is created, we add a CFRunLoopSource to the current +// thread's runloop. On each cycle of the run loop, the run loop checks each of +// its sources for work to do, which in the case of CFRunLoopSource is just +// checking if CFRunLoopSourceSignal has been called since the last time it ran, +// and if so invokes the function pointer supplied when the source is created, +// which in our case just invokes `[realm handleExternalChange]`. + +// Listening for external changes is done using kqueue() on a background thread. +// kqueue() lets us efficiently wait until the amount of data which can be read +// from one or more file descriptors has changed, and tells us which of the file +// descriptors it was that changed. We use this to wait on both the shared named +// pipe, and a local anonymous pipe. When data is written to the named pipe, we +// signal the runloop source and wake up the target runloop, and when data is +// written to the anonymous pipe the background thread removes the runloop +// source from the runloop and and shuts down. +ExternalCommitHelper::ExternalCommitHelper(Realm* realm) +: m_realm(realm) +, m_run_loop(CFRunLoopGetCurrent()) +{ + CFRetain(m_run_loop); + + m_kq = kqueue(); + if (m_kq == -1) { + throw std::system_error(errno, std::system_category()); + } + + auto path = realm->config().path + ".note"; + + // Create and open the named pipe + int ret = mkfifo(path.c_str(), 0600); + if (ret == -1) { + int err = errno; + if (err == ENOTSUP) { + // Filesystem doesn't support named pipes, so try putting it in tmp instead + // Hash collisions are okay here because they just result in doing + // extra work, as opposed to correctness problems + std::ostringstream ss; + ss << getenv("TMPDIR"); + ss << "realm_" << std::hash()(path) << ".note"; + path = ss.str(); + ret = mkfifo(path.c_str(), 0600); + err = errno; + } + // the fifo already existing isn't an error + if (ret == -1 && err != EEXIST) { + throw std::system_error(err, std::system_category()); + } + } + + m_notify_fd = open(path.c_str(), O_RDWR); + if (m_notify_fd == -1) { + throw std::system_error(errno, std::system_category()); + } + + // Make writing to the pipe return -1 when the pipe's buffer is full + // rather than blocking until there's space available + ret = fcntl(m_notify_fd, F_SETFL, O_NONBLOCK); + if (ret == -1) { + throw std::system_error(errno, std::system_category()); + } + + // Create the anonymous pipe + int pipeFd[2]; + ret = pipe(pipeFd); + if (ret == -1) { + throw std::system_error(errno, std::system_category()); + } + + m_shutdown_read_fd = pipeFd[0]; + m_shutdown_write_fd = pipeFd[1]; + + // Use the minimum allowed stack size, as we need very little in our listener + // https://developer.apple.com/library/ios/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html#//apple_ref/doc/uid/10000057i-CH15-SW7 + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, 16 * 1024); + + auto fn = [](void *self) -> void * { + static_cast(self)->listen(); + return nullptr; + }; + ret = pthread_create(&m_thread, &attr, fn, this); + pthread_attr_destroy(&attr); + if (ret != 0) { + throw std::system_error(errno, std::system_category()); + } +} + +ExternalCommitHelper::~ExternalCommitHelper() +{ + notify_fd(m_shutdown_write_fd); + pthread_join(m_thread, nullptr); // Wait for the thread to exit +} + +void ExternalCommitHelper::listen() +{ + pthread_setname_np("RLMRealm notification listener"); + + // Create the runloop source + CFRunLoopSourceContext ctx{}; + ctx.info = this; + ctx.perform = [](void *info) { + static_cast(info)->m_realm->notify(); + }; + + CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx); + CFRunLoopAddSource(m_run_loop, signal, kCFRunLoopDefaultMode); + + // Set up the kqueue + // EVFILT_READ indicates that we care about data being available to read + // on the given file descriptor. + // EV_CLEAR makes it wait for the amount of data available to be read to + // change rather than just returning when there is any data to read. + struct kevent ke[2]; + EV_SET(&ke[0], m_notify_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + EV_SET(&ke[1], m_shutdown_read_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + int ret = kevent(m_kq, ke, 2, nullptr, 0, nullptr); + assert(ret == 0); + + while (true) { + struct kevent event; + // Wait for data to become on either fd + // Return code is number of bytes available or -1 on error + ret = kevent(m_kq, nullptr, 0, &event, 1, nullptr); + assert(ret >= 0); + if (ret == 0) { + // Spurious wakeup; just wait again + continue; + } + + // Check which file descriptor had activity: if it's the shutdown + // pipe, then someone called -stop; otherwise it's the named pipe + // and someone committed a write transaction + if (event.ident == (uint32_t)m_shutdown_read_fd) { + CFRunLoopSourceInvalidate(signal); + CFRelease(signal); + CFRelease(m_run_loop); + return; + } + assert(event.ident == (uint32_t)m_notify_fd); + + CFRunLoopSourceSignal(signal); + // Signalling the source makes it run the next time the runloop gets + // to it, but doesn't make the runloop start if it's currently idle + // waiting for events + CFRunLoopWakeUp(m_run_loop); + } +} + +void ExternalCommitHelper::notify_others() +{ + notify_fd(m_notify_fd); +} + diff --git a/apple/external_commit_helper.hpp b/apple/external_commit_helper.hpp new file mode 100644 index 00000000..2793762d --- /dev/null +++ b/apple/external_commit_helper.hpp @@ -0,0 +1,81 @@ +//////////////////////////////////////////////////////////////////////////// +// +// Copyright 2015 Realm Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +//////////////////////////////////////////////////////////////////////////// + +#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP +#define REALM_EXTERNAL_COMMIT_HELPER_HPP + +#include + +namespace realm { +class Realm; + +class ExternalCommitHelper { +public: + ExternalCommitHelper(Realm* realm); + ~ExternalCommitHelper(); + + void notify_others(); + +private: + // A RAII holder for a file descriptor which automatically closes the wrapped + // fd when it's deallocated + class FdHolder { + public: + FdHolder() = default; + ~FdHolder() { close(); } + operator int() const { return m_fd; } + + FdHolder& operator=(int newFd) { + close(); + m_fd = newFd; + return *this; + } + + private: + int m_fd = -1; + void close(); + + FdHolder& operator=(FdHolder const&) = delete; + FdHolder(FdHolder const&) = delete; + }; + + void listen(); + + // This is owned by the realm, so it needs to not retain the realm + Realm *const m_realm; + + // Runloop which notifications are delivered on + CFRunLoopRef m_run_loop; + + // The listener thread + pthread_t m_thread; + + // Read-write file descriptor for the named pipe which is waited on for + // changes and written to when a commit is made + FdHolder m_notify_fd; + // File descriptor for the kqueue + FdHolder m_kq; + // The two ends of an anonymous pipe used to notify the kqueue() thread that + // it should be shut down. + FdHolder m_shutdown_read_fd; + FdHolder m_shutdown_write_fd; +}; + +} // namespace realm + +#endif /* REALM_EXTERNAL_COMMIT_HELPER_HPP */ diff --git a/realm_delegate.hpp b/realm_delegate.hpp index c3a41d04..0941b2dc 100644 --- a/realm_delegate.hpp +++ b/realm_delegate.hpp @@ -63,10 +63,6 @@ public: } }; - // The Realm has committed a write transaction, and other Realms at the - // same path should be notified - virtual void transaction_committed() = 0; - // There are now new versions available for the Realm, but it has not // had its read version advanced virtual void changes_available() = 0; @@ -86,6 +82,8 @@ public: // The Realm's read version has changed // observers is the vector returned from get_observed_rows() // invalidated is the `info` pointers for each observed object which was deleted + // Is called with empty change information following a local commit or if + // the Realm isn't in a read transaction yet virtual void did_change(std::vector const& observers, std::vector const& invalidated) = 0; }; diff --git a/src/object-store/shared_realm.cpp b/src/object-store/shared_realm.cpp index 7d270cc2..b201574c 100644 --- a/src/object-store/shared_realm.cpp +++ b/src/object-store/shared_realm.cpp @@ -18,6 +18,7 @@ #include "shared_realm.hpp" +#include "external_commit_helper.hpp" #include "realm_delegate.hpp" #include "transact_log_handler.hpp" @@ -52,7 +53,9 @@ Realm::Config& Realm::Config::operator=(realm::Realm::Config const& c) return *this; } -Realm::Realm(Config config) : m_config(std::move(config)) +Realm::Realm(Config config) +: m_config(std::move(config)) +, m_notifier(new ExternalCommitHelper(this)) { try { if (m_config.read_only) { @@ -234,6 +237,7 @@ void Realm::commit_transaction() m_in_transaction = false; transaction::commit(*m_shared_group, *m_history, m_delegate.get()); + m_notifier->notify_others(); } void Realm::cancel_transaction() @@ -416,4 +420,3 @@ void RealmCache::clear() m_cache.clear(); } - diff --git a/src/object-store/shared_realm.hpp b/src/object-store/shared_realm.hpp index 3ef4bdf1..eb9ee424 100644 --- a/src/object-store/shared_realm.hpp +++ b/src/object-store/shared_realm.hpp @@ -28,6 +28,7 @@ namespace realm { class ClientHistory; + class ExternalCommitHelper; class Realm; class RealmCache; class RealmDelegate; @@ -110,6 +111,8 @@ namespace realm { Group *m_group = nullptr; + std::unique_ptr m_notifier; + public: std::unique_ptr m_delegate; diff --git a/transact_log_handler.cpp b/transact_log_handler.cpp index b02fdcbd..3f833b19 100644 --- a/transact_log_handler.cpp +++ b/transact_log_handler.cpp @@ -333,7 +333,7 @@ void commit(SharedGroup& sg, ClientHistory&, RealmDelegate* delegate) { LangBindHelper::commit_and_continue_as_read(sg); if (delegate) { - delegate->transaction_committed(); + delegate->did_change({}, {}); } }