mirror of
https://github.com/status-im/realm-js.git
synced 2025-01-11 14:54:33 +00:00
Move the interprocess notification functionality to the object store
This commit is contained in:
parent
b129ebe8c1
commit
e4f29fe221
226
apple/external_commit_helper.cpp
Normal file
226
apple/external_commit_helper.cpp
Normal file
@ -0,0 +1,226 @@
|
|||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2015 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#include "external_commit_helper.hpp"
|
||||||
|
|
||||||
|
#include "shared_realm.hpp"
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
#include <sys/event.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <system_error>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sstream>
|
||||||
|
|
||||||
|
using namespace realm;
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
// Write a byte to a pipe to notify anyone waiting for data on the pipe
|
||||||
|
void notify_fd(int fd)
|
||||||
|
{
|
||||||
|
while (true) {
|
||||||
|
char c = 0;
|
||||||
|
ssize_t ret = write(fd, &c, 1);
|
||||||
|
if (ret == 1) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the pipe's buffer is full, we need to read some of the old data in
|
||||||
|
// it to make space. We don't just read in the code waiting for
|
||||||
|
// notifications so that we can notify multiple waiters with a single
|
||||||
|
// write.
|
||||||
|
assert(ret == -1 && errno == EAGAIN);
|
||||||
|
char buff[1024];
|
||||||
|
read(fd, buff, sizeof buff);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
|
void ExternalCommitHelper::FdHolder::close()
|
||||||
|
{
|
||||||
|
if (m_fd != -1) {
|
||||||
|
::close(m_fd);
|
||||||
|
}
|
||||||
|
m_fd = -1;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inter-thread and inter-process notifications of changes are done using a
|
||||||
|
// named pipe in the filesystem next to the Realm file. Everyone who wants to be
|
||||||
|
// notified of commits waits for data to become available on the pipe, and anyone
|
||||||
|
// who commits a write transaction writes data to the pipe after releasing the
|
||||||
|
// write lock. Note that no one ever actually *reads* from the pipe: the data
|
||||||
|
// actually written is meaningless, and trying to read from a pipe from multiple
|
||||||
|
// processes at once is fraught with race conditions.
|
||||||
|
|
||||||
|
// When a RLMRealm instance is created, we add a CFRunLoopSource to the current
|
||||||
|
// thread's runloop. On each cycle of the run loop, the run loop checks each of
|
||||||
|
// its sources for work to do, which in the case of CFRunLoopSource is just
|
||||||
|
// checking if CFRunLoopSourceSignal has been called since the last time it ran,
|
||||||
|
// and if so invokes the function pointer supplied when the source is created,
|
||||||
|
// which in our case just invokes `[realm handleExternalChange]`.
|
||||||
|
|
||||||
|
// Listening for external changes is done using kqueue() on a background thread.
|
||||||
|
// kqueue() lets us efficiently wait until the amount of data which can be read
|
||||||
|
// from one or more file descriptors has changed, and tells us which of the file
|
||||||
|
// descriptors it was that changed. We use this to wait on both the shared named
|
||||||
|
// pipe, and a local anonymous pipe. When data is written to the named pipe, we
|
||||||
|
// signal the runloop source and wake up the target runloop, and when data is
|
||||||
|
// written to the anonymous pipe the background thread removes the runloop
|
||||||
|
// source from the runloop and and shuts down.
|
||||||
|
ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
|
||||||
|
: m_realm(realm)
|
||||||
|
, m_run_loop(CFRunLoopGetCurrent())
|
||||||
|
{
|
||||||
|
CFRetain(m_run_loop);
|
||||||
|
|
||||||
|
m_kq = kqueue();
|
||||||
|
if (m_kq == -1) {
|
||||||
|
throw std::system_error(errno, std::system_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto path = realm->config().path + ".note";
|
||||||
|
|
||||||
|
// Create and open the named pipe
|
||||||
|
int ret = mkfifo(path.c_str(), 0600);
|
||||||
|
if (ret == -1) {
|
||||||
|
int err = errno;
|
||||||
|
if (err == ENOTSUP) {
|
||||||
|
// Filesystem doesn't support named pipes, so try putting it in tmp instead
|
||||||
|
// Hash collisions are okay here because they just result in doing
|
||||||
|
// extra work, as opposed to correctness problems
|
||||||
|
std::ostringstream ss;
|
||||||
|
ss << getenv("TMPDIR");
|
||||||
|
ss << "realm_" << std::hash<std::string>()(path) << ".note";
|
||||||
|
path = ss.str();
|
||||||
|
ret = mkfifo(path.c_str(), 0600);
|
||||||
|
err = errno;
|
||||||
|
}
|
||||||
|
// the fifo already existing isn't an error
|
||||||
|
if (ret == -1 && err != EEXIST) {
|
||||||
|
throw std::system_error(err, std::system_category());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m_notify_fd = open(path.c_str(), O_RDWR);
|
||||||
|
if (m_notify_fd == -1) {
|
||||||
|
throw std::system_error(errno, std::system_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make writing to the pipe return -1 when the pipe's buffer is full
|
||||||
|
// rather than blocking until there's space available
|
||||||
|
ret = fcntl(m_notify_fd, F_SETFL, O_NONBLOCK);
|
||||||
|
if (ret == -1) {
|
||||||
|
throw std::system_error(errno, std::system_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the anonymous pipe
|
||||||
|
int pipeFd[2];
|
||||||
|
ret = pipe(pipeFd);
|
||||||
|
if (ret == -1) {
|
||||||
|
throw std::system_error(errno, std::system_category());
|
||||||
|
}
|
||||||
|
|
||||||
|
m_shutdown_read_fd = pipeFd[0];
|
||||||
|
m_shutdown_write_fd = pipeFd[1];
|
||||||
|
|
||||||
|
// Use the minimum allowed stack size, as we need very little in our listener
|
||||||
|
// https://developer.apple.com/library/ios/documentation/Cocoa/Conceptual/Multithreading/CreatingThreads/CreatingThreads.html#//apple_ref/doc/uid/10000057i-CH15-SW7
|
||||||
|
pthread_attr_t attr;
|
||||||
|
pthread_attr_init(&attr);
|
||||||
|
pthread_attr_setstacksize(&attr, 16 * 1024);
|
||||||
|
|
||||||
|
auto fn = [](void *self) -> void * {
|
||||||
|
static_cast<ExternalCommitHelper *>(self)->listen();
|
||||||
|
return nullptr;
|
||||||
|
};
|
||||||
|
ret = pthread_create(&m_thread, &attr, fn, this);
|
||||||
|
pthread_attr_destroy(&attr);
|
||||||
|
if (ret != 0) {
|
||||||
|
throw std::system_error(errno, std::system_category());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ExternalCommitHelper::~ExternalCommitHelper()
|
||||||
|
{
|
||||||
|
notify_fd(m_shutdown_write_fd);
|
||||||
|
pthread_join(m_thread, nullptr); // Wait for the thread to exit
|
||||||
|
}
|
||||||
|
|
||||||
|
void ExternalCommitHelper::listen()
|
||||||
|
{
|
||||||
|
pthread_setname_np("RLMRealm notification listener");
|
||||||
|
|
||||||
|
// Create the runloop source
|
||||||
|
CFRunLoopSourceContext ctx{};
|
||||||
|
ctx.info = this;
|
||||||
|
ctx.perform = [](void *info) {
|
||||||
|
static_cast<ExternalCommitHelper *>(info)->m_realm->notify();
|
||||||
|
};
|
||||||
|
|
||||||
|
CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
|
||||||
|
CFRunLoopAddSource(m_run_loop, signal, kCFRunLoopDefaultMode);
|
||||||
|
|
||||||
|
// Set up the kqueue
|
||||||
|
// EVFILT_READ indicates that we care about data being available to read
|
||||||
|
// on the given file descriptor.
|
||||||
|
// EV_CLEAR makes it wait for the amount of data available to be read to
|
||||||
|
// change rather than just returning when there is any data to read.
|
||||||
|
struct kevent ke[2];
|
||||||
|
EV_SET(&ke[0], m_notify_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0);
|
||||||
|
EV_SET(&ke[1], m_shutdown_read_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0);
|
||||||
|
int ret = kevent(m_kq, ke, 2, nullptr, 0, nullptr);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
struct kevent event;
|
||||||
|
// Wait for data to become on either fd
|
||||||
|
// Return code is number of bytes available or -1 on error
|
||||||
|
ret = kevent(m_kq, nullptr, 0, &event, 1, nullptr);
|
||||||
|
assert(ret >= 0);
|
||||||
|
if (ret == 0) {
|
||||||
|
// Spurious wakeup; just wait again
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check which file descriptor had activity: if it's the shutdown
|
||||||
|
// pipe, then someone called -stop; otherwise it's the named pipe
|
||||||
|
// and someone committed a write transaction
|
||||||
|
if (event.ident == (uint32_t)m_shutdown_read_fd) {
|
||||||
|
CFRunLoopSourceInvalidate(signal);
|
||||||
|
CFRelease(signal);
|
||||||
|
CFRelease(m_run_loop);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
assert(event.ident == (uint32_t)m_notify_fd);
|
||||||
|
|
||||||
|
CFRunLoopSourceSignal(signal);
|
||||||
|
// Signalling the source makes it run the next time the runloop gets
|
||||||
|
// to it, but doesn't make the runloop start if it's currently idle
|
||||||
|
// waiting for events
|
||||||
|
CFRunLoopWakeUp(m_run_loop);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ExternalCommitHelper::notify_others()
|
||||||
|
{
|
||||||
|
notify_fd(m_notify_fd);
|
||||||
|
}
|
||||||
|
|
81
apple/external_commit_helper.hpp
Normal file
81
apple/external_commit_helper.hpp
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
//
|
||||||
|
// Copyright 2015 Realm Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#ifndef REALM_EXTERNAL_COMMIT_HELPER_HPP
|
||||||
|
#define REALM_EXTERNAL_COMMIT_HELPER_HPP
|
||||||
|
|
||||||
|
#include <CoreFoundation/CFRunLoop.h>
|
||||||
|
|
||||||
|
namespace realm {
|
||||||
|
class Realm;
|
||||||
|
|
||||||
|
class ExternalCommitHelper {
|
||||||
|
public:
|
||||||
|
ExternalCommitHelper(Realm* realm);
|
||||||
|
~ExternalCommitHelper();
|
||||||
|
|
||||||
|
void notify_others();
|
||||||
|
|
||||||
|
private:
|
||||||
|
// A RAII holder for a file descriptor which automatically closes the wrapped
|
||||||
|
// fd when it's deallocated
|
||||||
|
class FdHolder {
|
||||||
|
public:
|
||||||
|
FdHolder() = default;
|
||||||
|
~FdHolder() { close(); }
|
||||||
|
operator int() const { return m_fd; }
|
||||||
|
|
||||||
|
FdHolder& operator=(int newFd) {
|
||||||
|
close();
|
||||||
|
m_fd = newFd;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
int m_fd = -1;
|
||||||
|
void close();
|
||||||
|
|
||||||
|
FdHolder& operator=(FdHolder const&) = delete;
|
||||||
|
FdHolder(FdHolder const&) = delete;
|
||||||
|
};
|
||||||
|
|
||||||
|
void listen();
|
||||||
|
|
||||||
|
// This is owned by the realm, so it needs to not retain the realm
|
||||||
|
Realm *const m_realm;
|
||||||
|
|
||||||
|
// Runloop which notifications are delivered on
|
||||||
|
CFRunLoopRef m_run_loop;
|
||||||
|
|
||||||
|
// The listener thread
|
||||||
|
pthread_t m_thread;
|
||||||
|
|
||||||
|
// Read-write file descriptor for the named pipe which is waited on for
|
||||||
|
// changes and written to when a commit is made
|
||||||
|
FdHolder m_notify_fd;
|
||||||
|
// File descriptor for the kqueue
|
||||||
|
FdHolder m_kq;
|
||||||
|
// The two ends of an anonymous pipe used to notify the kqueue() thread that
|
||||||
|
// it should be shut down.
|
||||||
|
FdHolder m_shutdown_read_fd;
|
||||||
|
FdHolder m_shutdown_write_fd;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace realm
|
||||||
|
|
||||||
|
#endif /* REALM_EXTERNAL_COMMIT_HELPER_HPP */
|
@ -63,10 +63,6 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// The Realm has committed a write transaction, and other Realms at the
|
|
||||||
// same path should be notified
|
|
||||||
virtual void transaction_committed() = 0;
|
|
||||||
|
|
||||||
// There are now new versions available for the Realm, but it has not
|
// There are now new versions available for the Realm, but it has not
|
||||||
// had its read version advanced
|
// had its read version advanced
|
||||||
virtual void changes_available() = 0;
|
virtual void changes_available() = 0;
|
||||||
@ -86,6 +82,8 @@ public:
|
|||||||
// The Realm's read version has changed
|
// The Realm's read version has changed
|
||||||
// observers is the vector returned from get_observed_rows()
|
// observers is the vector returned from get_observed_rows()
|
||||||
// invalidated is the `info` pointers for each observed object which was deleted
|
// invalidated is the `info` pointers for each observed object which was deleted
|
||||||
|
// Is called with empty change information following a local commit or if
|
||||||
|
// the Realm isn't in a read transaction yet
|
||||||
virtual void did_change(std::vector<ObserverState> const& observers,
|
virtual void did_change(std::vector<ObserverState> const& observers,
|
||||||
std::vector<void*> const& invalidated) = 0;
|
std::vector<void*> const& invalidated) = 0;
|
||||||
};
|
};
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
#include "shared_realm.hpp"
|
#include "shared_realm.hpp"
|
||||||
|
|
||||||
|
#include "external_commit_helper.hpp"
|
||||||
#include "realm_delegate.hpp"
|
#include "realm_delegate.hpp"
|
||||||
#include "transact_log_handler.hpp"
|
#include "transact_log_handler.hpp"
|
||||||
|
|
||||||
@ -52,7 +53,9 @@ Realm::Config& Realm::Config::operator=(realm::Realm::Config const& c)
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
Realm::Realm(Config config) : m_config(std::move(config))
|
Realm::Realm(Config config)
|
||||||
|
: m_config(std::move(config))
|
||||||
|
, m_notifier(new ExternalCommitHelper(this))
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
if (m_config.read_only) {
|
if (m_config.read_only) {
|
||||||
@ -234,6 +237,7 @@ void Realm::commit_transaction()
|
|||||||
|
|
||||||
m_in_transaction = false;
|
m_in_transaction = false;
|
||||||
transaction::commit(*m_shared_group, *m_history, m_delegate.get());
|
transaction::commit(*m_shared_group, *m_history, m_delegate.get());
|
||||||
|
m_notifier->notify_others();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Realm::cancel_transaction()
|
void Realm::cancel_transaction()
|
||||||
@ -416,4 +420,3 @@ void RealmCache::clear()
|
|||||||
|
|
||||||
m_cache.clear();
|
m_cache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
|
|
||||||
namespace realm {
|
namespace realm {
|
||||||
class ClientHistory;
|
class ClientHistory;
|
||||||
|
class ExternalCommitHelper;
|
||||||
class Realm;
|
class Realm;
|
||||||
class RealmCache;
|
class RealmCache;
|
||||||
class RealmDelegate;
|
class RealmDelegate;
|
||||||
@ -110,6 +111,8 @@ namespace realm {
|
|||||||
|
|
||||||
Group *m_group = nullptr;
|
Group *m_group = nullptr;
|
||||||
|
|
||||||
|
std::unique_ptr<ExternalCommitHelper> m_notifier;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
std::unique_ptr<RealmDelegate> m_delegate;
|
std::unique_ptr<RealmDelegate> m_delegate;
|
||||||
|
|
||||||
|
@ -333,7 +333,7 @@ void commit(SharedGroup& sg, ClientHistory&, RealmDelegate* delegate) {
|
|||||||
LangBindHelper::commit_and_continue_as_read(sg);
|
LangBindHelper::commit_and_continue_as_read(sg);
|
||||||
|
|
||||||
if (delegate) {
|
if (delegate) {
|
||||||
delegate->transaction_committed();
|
delegate->did_change({}, {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user