Share ExternalCommitHelpers between Realm instances for a single path

This commit is contained in:
Thomas Goyne 2015-09-03 17:59:20 -07:00
parent f79dec9033
commit 06e0ff8373
4 changed files with 73 additions and 26 deletions

View File

@ -86,10 +86,8 @@ void ExternalCommitHelper::FdHolder::close()
// written to the anonymous pipe the background thread removes the runloop // written to the anonymous pipe the background thread removes the runloop
// source from the runloop and and shuts down. // source from the runloop and and shuts down.
ExternalCommitHelper::ExternalCommitHelper(Realm* realm) ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
: m_realm(realm)
, m_run_loop(CFRunLoopGetCurrent())
{ {
CFRetain(m_run_loop); add_realm(realm);
m_kq = kqueue(); m_kq = kqueue();
if (m_kq == -1) { if (m_kq == -1) {
@ -160,23 +158,49 @@ ExternalCommitHelper::ExternalCommitHelper(Realm* realm)
ExternalCommitHelper::~ExternalCommitHelper() ExternalCommitHelper::~ExternalCommitHelper()
{ {
REALM_ASSERT_DEBUG(m_realms.empty());
notify_fd(m_shutdown_write_fd); notify_fd(m_shutdown_write_fd);
pthread_join(m_thread, nullptr); // Wait for the thread to exit pthread_join(m_thread, nullptr); // Wait for the thread to exit
} }
void ExternalCommitHelper::add_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
// Create the runloop source
CFRunLoopSourceContext ctx{};
ctx.info = realm;
ctx.perform = [](void* info) {
static_cast<Realm*>(info)->notify();
};
CFRunLoopRef runloop = CFRunLoopGetCurrent();
CFRetain(runloop);
CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(runloop, signal, kCFRunLoopDefaultMode);
m_realms.push_back({realm, runloop, signal});
}
void ExternalCommitHelper::remove_realm(realm::Realm* realm)
{
std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto it = m_realms.begin(); it != m_realms.end(); ++it) {
if (it->realm == realm) {
CFRunLoopSourceInvalidate(it->signal);
CFRelease(it->signal);
CFRelease(it->runloop);
m_realms.erase(it);
return;
}
}
REALM_TERMINATE("Realm not registered");
}
void ExternalCommitHelper::listen() void ExternalCommitHelper::listen()
{ {
pthread_setname_np("RLMRealm notification listener"); pthread_setname_np("RLMRealm notification listener");
// Create the runloop source
CFRunLoopSourceContext ctx{};
ctx.info = this;
ctx.perform = [](void *info) {
static_cast<ExternalCommitHelper *>(info)->m_realm->notify();
};
CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx);
CFRunLoopAddSource(m_run_loop, signal, kCFRunLoopDefaultMode);
// Set up the kqueue // Set up the kqueue
// EVFILT_READ indicates that we care about data being available to read // EVFILT_READ indicates that we care about data being available to read
@ -204,18 +228,18 @@ void ExternalCommitHelper::listen()
// pipe, then someone called -stop; otherwise it's the named pipe // pipe, then someone called -stop; otherwise it's the named pipe
// and someone committed a write transaction // and someone committed a write transaction
if (event.ident == (uint32_t)m_shutdown_read_fd) { if (event.ident == (uint32_t)m_shutdown_read_fd) {
CFRunLoopSourceInvalidate(signal);
CFRelease(signal);
CFRelease(m_run_loop);
return; return;
} }
assert(event.ident == (uint32_t)m_notify_fd); assert(event.ident == (uint32_t)m_notify_fd);
CFRunLoopSourceSignal(signal); std::lock_guard<std::mutex> lock(m_realms_mutex);
for (auto const& realm : m_realms) {
CFRunLoopSourceSignal(realm.signal);
// Signalling the source makes it run the next time the runloop gets // Signalling the source makes it run the next time the runloop gets
// to it, but doesn't make the runloop start if it's currently idle // to it, but doesn't make the runloop start if it's currently idle
// waiting for events // waiting for events
CFRunLoopWakeUp(m_run_loop); CFRunLoopWakeUp(realm.runloop);
}
} }
} }

View File

@ -20,6 +20,8 @@
#define REALM_EXTERNAL_COMMIT_HELPER_HPP #define REALM_EXTERNAL_COMMIT_HELPER_HPP
#include <CoreFoundation/CFRunLoop.h> #include <CoreFoundation/CFRunLoop.h>
#include <mutex>
#include <vector>
namespace realm { namespace realm {
class Realm; class Realm;
@ -30,6 +32,8 @@ public:
~ExternalCommitHelper(); ~ExternalCommitHelper();
void notify_others(); void notify_others();
void add_realm(Realm* realm);
void remove_realm(Realm* realm);
private: private:
// A RAII holder for a file descriptor which automatically closes the wrapped // A RAII holder for a file descriptor which automatically closes the wrapped
@ -54,13 +58,20 @@ private:
FdHolder(FdHolder const&) = delete; FdHolder(FdHolder const&) = delete;
}; };
struct PerRealmInfo {
Realm* realm;
CFRunLoopRef runloop;
CFRunLoopSourceRef signal;
};
void listen(); void listen();
// This is owned by the realm, so it needs to not retain the realm // Currently registered realms and the signal for delivering notifications
Realm *const m_realm; // to them
std::vector<PerRealmInfo> m_realms;
// Runloop which notifications are delivered on // Mutex which guards m_realms
CFRunLoopRef m_run_loop; std::mutex m_realms_mutex;
// The listener thread // The listener thread
pthread_t m_thread; pthread_t m_thread;

View File

@ -55,7 +55,6 @@ Realm::Config& Realm::Config::operator=(realm::Realm::Config const& c)
Realm::Realm(Config config) Realm::Realm(Config config)
: m_config(std::move(config)) : m_config(std::move(config))
, m_notifier(new ExternalCommitHelper(this))
{ {
try { try {
if (m_config.read_only) { if (m_config.read_only) {
@ -85,6 +84,12 @@ Realm::Realm(Config config)
} }
} }
Realm::~Realm() {
if (m_notifier) { // might not exist yet if an error occurred during init
m_notifier->remove_realm(this);
}
}
Group *Realm::read_group() Group *Realm::read_group()
{ {
if (!m_group) { if (!m_group) {
@ -132,8 +137,13 @@ SharedRealm Realm::get_shared_realm(Config config)
// if there is an existing realm at the current path steal its schema/column mapping // if there is an existing realm at the current path steal its schema/column mapping
// FIXME - need to validate that schemas match // FIXME - need to validate that schemas match
realm->m_config.schema = std::make_unique<Schema>(*existing->m_config.schema); realm->m_config.schema = std::make_unique<Schema>(*existing->m_config.schema);
realm->m_notifier = existing->m_notifier;
realm->m_notifier->add_realm(realm.get());
} }
else { else {
realm->m_notifier = std::make_shared<ExternalCommitHelper>(realm.get());
// otherwise get the schema from the group // otherwise get the schema from the group
realm->m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(realm->read_group())); realm->m_config.schema = std::make_unique<Schema>(ObjectStore::schema_from_group(realm->read_group()));

View File

@ -97,6 +97,8 @@ namespace realm {
std::thread::id thread_id() const { return m_thread_id; } std::thread::id thread_id() const { return m_thread_id; }
void verify_thread(); void verify_thread();
~Realm();
private: private:
Realm(Config config); Realm(Config config);
@ -111,7 +113,7 @@ namespace realm {
Group *m_group = nullptr; Group *m_group = nullptr;
std::unique_ptr<ExternalCommitHelper> m_notifier; std::shared_ptr<ExternalCommitHelper> m_notifier;
public: public:
std::unique_ptr<RealmDelegate> m_delegate; std::unique_ptr<RealmDelegate> m_delegate;