diff --git a/apple/external_commit_helper.cpp b/apple/external_commit_helper.cpp index 2364e970..1444281a 100644 --- a/apple/external_commit_helper.cpp +++ b/apple/external_commit_helper.cpp @@ -86,10 +86,8 @@ void ExternalCommitHelper::FdHolder::close() // written to the anonymous pipe the background thread removes the runloop // source from the runloop and and shuts down. ExternalCommitHelper::ExternalCommitHelper(Realm* realm) -: m_realm(realm) -, m_run_loop(CFRunLoopGetCurrent()) { - CFRetain(m_run_loop); + add_realm(realm); m_kq = kqueue(); if (m_kq == -1) { @@ -160,23 +158,49 @@ ExternalCommitHelper::ExternalCommitHelper(Realm* realm) ExternalCommitHelper::~ExternalCommitHelper() { + REALM_ASSERT_DEBUG(m_realms.empty()); notify_fd(m_shutdown_write_fd); pthread_join(m_thread, nullptr); // Wait for the thread to exit } +void ExternalCommitHelper::add_realm(realm::Realm* realm) +{ + std::lock_guard lock(m_realms_mutex); + + // Create the runloop source + CFRunLoopSourceContext ctx{}; + ctx.info = realm; + ctx.perform = [](void* info) { + static_cast(info)->notify(); + }; + + CFRunLoopRef runloop = CFRunLoopGetCurrent(); + CFRetain(runloop); + CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx); + CFRunLoopAddSource(runloop, signal, kCFRunLoopDefaultMode); + + m_realms.push_back({realm, runloop, signal}); +} + +void ExternalCommitHelper::remove_realm(realm::Realm* realm) +{ + std::lock_guard lock(m_realms_mutex); + for (auto it = m_realms.begin(); it != m_realms.end(); ++it) { + if (it->realm == realm) { + CFRunLoopSourceInvalidate(it->signal); + CFRelease(it->signal); + CFRelease(it->runloop); + m_realms.erase(it); + return; + } + } + REALM_TERMINATE("Realm not registered"); +} + void ExternalCommitHelper::listen() { pthread_setname_np("RLMRealm notification listener"); - // Create the runloop source - CFRunLoopSourceContext ctx{}; - ctx.info = this; - ctx.perform = [](void *info) { - static_cast(info)->m_realm->notify(); - }; - - CFRunLoopSourceRef signal = CFRunLoopSourceCreate(kCFAllocatorDefault, 0, &ctx); - CFRunLoopAddSource(m_run_loop, signal, kCFRunLoopDefaultMode); // Set up the kqueue // EVFILT_READ indicates that we care about data being available to read @@ -204,18 +228,18 @@ void ExternalCommitHelper::listen() // pipe, then someone called -stop; otherwise it's the named pipe // and someone committed a write transaction if (event.ident == (uint32_t)m_shutdown_read_fd) { - CFRunLoopSourceInvalidate(signal); - CFRelease(signal); - CFRelease(m_run_loop); return; } assert(event.ident == (uint32_t)m_notify_fd); - CFRunLoopSourceSignal(signal); - // Signalling the source makes it run the next time the runloop gets - // to it, but doesn't make the runloop start if it's currently idle - // waiting for events - CFRunLoopWakeUp(m_run_loop); + std::lock_guard lock(m_realms_mutex); + for (auto const& realm : m_realms) { + CFRunLoopSourceSignal(realm.signal); + // Signalling the source makes it run the next time the runloop gets + // to it, but doesn't make the runloop start if it's currently idle + // waiting for events + CFRunLoopWakeUp(realm.runloop); + } } } diff --git a/apple/external_commit_helper.hpp b/apple/external_commit_helper.hpp index 2793762d..acf0e1c4 100644 --- a/apple/external_commit_helper.hpp +++ b/apple/external_commit_helper.hpp @@ -20,6 +20,8 @@ #define REALM_EXTERNAL_COMMIT_HELPER_HPP #include +#include +#include namespace realm { class Realm; @@ -30,6 +32,8 @@ public: ~ExternalCommitHelper(); void notify_others(); + void add_realm(Realm* realm); + void remove_realm(Realm* realm); private: // A RAII holder for a file descriptor which automatically closes the wrapped @@ -54,13 +58,20 @@ private: FdHolder(FdHolder const&) = delete; }; + struct PerRealmInfo { + Realm* realm; + CFRunLoopRef runloop; + CFRunLoopSourceRef signal; + }; + void listen(); - // This is owned by the realm, so it needs to not retain the realm - Realm *const m_realm; + // Currently registered realms and the signal for delivering notifications + // to them + std::vector m_realms; - // Runloop which notifications are delivered on - CFRunLoopRef m_run_loop; + // Mutex which guards m_realms + std::mutex m_realms_mutex; // The listener thread pthread_t m_thread; diff --git a/shared_realm.cpp b/shared_realm.cpp index 2143db38..8b1abd84 100644 --- a/shared_realm.cpp +++ b/shared_realm.cpp @@ -55,7 +55,6 @@ Realm::Config& Realm::Config::operator=(realm::Realm::Config const& c) Realm::Realm(Config config) : m_config(std::move(config)) -, m_notifier(new ExternalCommitHelper(this)) { try { if (m_config.read_only) { @@ -85,6 +84,12 @@ Realm::Realm(Config config) } } +Realm::~Realm() { + if (m_notifier) { // might not exist yet if an error occurred during init + m_notifier->remove_realm(this); + } +} + Group *Realm::read_group() { if (!m_group) { @@ -132,8 +137,13 @@ SharedRealm Realm::get_shared_realm(Config config) // if there is an existing realm at the current path steal its schema/column mapping // FIXME - need to validate that schemas match realm->m_config.schema = std::make_unique(*existing->m_config.schema); + + realm->m_notifier = existing->m_notifier; + realm->m_notifier->add_realm(realm.get()); } else { + realm->m_notifier = std::make_shared(realm.get()); + // otherwise get the schema from the group realm->m_config.schema = std::make_unique(ObjectStore::schema_from_group(realm->read_group())); diff --git a/shared_realm.hpp b/shared_realm.hpp index a3070c47..3836c5e9 100644 --- a/shared_realm.hpp +++ b/shared_realm.hpp @@ -97,6 +97,8 @@ namespace realm { std::thread::id thread_id() const { return m_thread_id; } void verify_thread(); + ~Realm(); + private: Realm(Config config); @@ -111,7 +113,7 @@ namespace realm { Group *m_group = nullptr; - std::unique_ptr m_notifier; + std::shared_ptr m_notifier; public: std::unique_ptr m_delegate;