From 1ec08198a65010a903f4fdea012174ef23397654 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Sat, 13 Jun 2026 00:14:36 -0700 Subject: [PATCH] Add Wakeups to Client --- Cargo.lock | 1 + core/conversations/src/conversation.rs | 2 +- core/conversations/src/service_context.rs | 6 +- crates/client/src/client.rs | 44 +++++-- extensions/components/Cargo.toml | 1 + extensions/components/src/lib.rs | 2 + extensions/components/src/wakeup.rs | 133 ++++++++++++++++++++++ 7 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 extensions/components/src/wakeup.rs diff --git a/Cargo.lock b/Cargo.lock index 44f5258..a9d362d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1472,6 +1472,7 @@ name = "components" version = "0.1.0" dependencies = [ "base64", + "crossbeam-channel", "crypto", "hex", "libchat", diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index 78573c2..c8816fa 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -34,7 +34,7 @@ pub(crate) trait Convo { } /// Group-only operations. -pub(crate) trait GroupConvo: Convo + std::fmt::Debug { +pub(crate) trait GroupConvo: Convo + std::fmt::Debug + Send { fn add_member( &mut self, cx: &mut ServiceContext, diff --git a/core/conversations/src/service_context.rs b/core/conversations/src/service_context.rs index 18a3357..eb03210 100644 --- a/core/conversations/src/service_context.rs +++ b/core/conversations/src/service_context.rs @@ -115,11 +115,11 @@ mod test_support { pub(crate) struct NoopWakeups; impl WakeupService for NoopWakeups { - fn wakeup_in(&mut self, duration: std::time::Duration, convo_id: crate::ConversationId) {} + fn wakeup_in(&mut self, _: std::time::Duration, _: crate::ConversationId) {} } impl - ServiceContext<(IP, NoopDelivery, NoopRegistration, WS, CS)> + ServiceContext<(IP, NoopDelivery, NoopRegistration, NoopWakeups, CS)> { /// Builds a context around a real store, stubbing other services. pub(crate) fn for_test(ident: IP, store: CS) -> Result { @@ -132,7 +132,7 @@ mod test_support { mls_provider: MlsEphemeralPqProvider::new().map_err(ChatError::generic)?, causal: CausalHistoryStore::new(), identity: Identity::new(name), - wakeup_service: NoopWakeup {}, + wakeup_service: NoopWakeups {}, }) } } diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index c5c8a4d..17b5439 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use std::thread::{self, JoinHandle}; -use components::EphemeralRegistry; +use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent}; use crossbeam_channel::{Receiver, Sender, select}; use libchat::{ ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, InboxOutcome, @@ -13,7 +13,7 @@ use parking_lot::Mutex; use crate::errors::ClientError; use crate::event::Event; -type ClientCore = Core<(TestLogosAccount, T, R, ChatStorage)>; +type ClientCore = Core<(TestLogosAccount, T, R, ThreadedWakeupService, ChatStorage)>; /// The transport as the client sees it: a [`DeliveryService`] for outbound /// publishing plus the inbound payload stream the worker drains. One object owns @@ -52,14 +52,17 @@ impl ChatClient { pub fn new(name: impl Into, mut transport: T) -> (Self, Receiver) { let inbound = transport.inbound(); let ident = TestLogosAccount::new(name); + let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); + let wakeup_service = ThreadedWakeupService::new(wakeup_tx); let core = Core::new_with_name( ident, transport, EphemeralRegistry::new(), + wakeup_service, ChatStorage::in_memory(), ) .unwrap(); - Self::spawn(core, inbound) + Self::spawn(core, inbound, wakeup_rx) } /// Open or create a persistent client backed by `StorageConfig`. @@ -74,8 +77,16 @@ impl ChatClient { let store = ChatStorage::new(config).map_err(ChatError::from)?; let inbound = transport.inbound(); let ident = TestLogosAccount::new(name); - let core = Core::new_from_store(ident, transport, EphemeralRegistry::new(), store)?; - Ok(Self::spawn(core, inbound)) + let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); + let wakeup_service = ThreadedWakeupService::new(wakeup_tx); + let core = Core::new_from_store( + ident, + transport, + EphemeralRegistry::new(), + wakeup_service, + store, + )?; + Ok(Self::spawn(core, inbound, wakeup_rx)) } } @@ -106,19 +117,25 @@ where let store = ChatStorage::new(config).map_err(ChatError::from)?; let inbound = transport.inbound(); let ident = TestLogosAccount::new(name); - let mut core = Core::new_from_store(ident, transport, registry, store)?; + let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); + let wakeup_service = ThreadedWakeupService::new(wakeup_tx); + let mut core = Core::new_from_store(ident, transport, registry, wakeup_service, store)?; core.register_keypackage()?; - Ok(Self::spawn(core, inbound)) + Ok(Self::spawn(core, inbound, wakeup_rx)) } - fn spawn(core: ClientCore, inbound: Receiver>) -> (Self, Receiver) { + fn spawn( + core: ClientCore, + inbound: Receiver>, + wakeup_events: Receiver, + ) -> (Self, Receiver) { let core = Arc::new(Mutex::new(core)); let (event_tx, event_rx) = crossbeam_channel::unbounded(); let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0); let worker = thread::spawn({ let core = Arc::clone(&core); - move || worker_loop(core, inbound, shutdown_rx, event_tx) + move || worker_loop(core, inbound, wakeup_events, shutdown_rx, event_tx) }); ( @@ -187,6 +204,7 @@ impl Drop for ChatClient { fn worker_loop( core: Arc>>, inbound: Receiver>, + wakeup_events: Receiver, shutdown: Receiver<()>, event_tx: Sender, ) where @@ -217,6 +235,14 @@ fn worker_loop( } } } + recv(wakeup_events) -> msg => { + let Ok(WakeupEvent { convo_id }) = msg else { + return; // wakeup service's sender dropped + }; + if let Err(e) = core.lock().wakeup(&convo_id) { + tracing::warn!("wakeup failed: {e:?}"); + } + } recv(shutdown) -> _ => return, } } diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml index fb65de6..582c0bc 100644 --- a/extensions/components/Cargo.toml +++ b/extensions/components/Cargo.toml @@ -11,6 +11,7 @@ storage = { workspace = true } # External dependencies (sorted) base64 = "0.22" +crossbeam-channel = { workspace = true } hex = "0.4.3" reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs index a147b70..25ae825 100644 --- a/extensions/components/src/lib.rs +++ b/extensions/components/src/lib.rs @@ -1,8 +1,10 @@ mod contact_registry; mod delivery; mod storage; +mod wakeup; pub use contact_registry::EphemeralRegistry; pub use contact_registry::http::{HttpRegistry, HttpRegistryError}; pub use delivery::*; pub use storage::*; +pub use wakeup::*; diff --git a/extensions/components/src/wakeup.rs b/extensions/components/src/wakeup.rs new file mode 100644 index 0000000..58905d1 --- /dev/null +++ b/extensions/components/src/wakeup.rs @@ -0,0 +1,133 @@ +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::fmt; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +use crossbeam_channel::Sender; +use libchat::{ConversationId, WakeupService}; + +#[derive(Debug, Eq, PartialEq)] +struct WakeupRecord { + expiry: Instant, + convo_id: ConversationId, +} + +impl Ord for WakeupRecord { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.expiry.cmp(&other.expiry) + } +} + +impl PartialOrd for WakeupRecord { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Sent to the wakeup queue when a previously registered timer expires. +#[derive(Debug, Clone)] +pub struct WakeupEvent { + pub convo_id: ConversationId, +} + +struct Shared { + pending: Mutex>>, + condvar: Condvar, + running: AtomicBool, +} + +/// A [`WakeupService`] backed by a background thread that sleeps until the +/// nearest pending deadline, then emits a [`WakeupEvent`] on `events`. +pub struct ThreadedWakeupService { + shared: Arc, + thread: Option>, +} + +impl fmt::Debug for ThreadedWakeupService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ThreadedWakeupService").finish() + } +} + +impl ThreadedWakeupService { + pub fn new(events: Sender) -> Self { + let shared = Arc::new(Shared { + pending: Mutex::new(BinaryHeap::new()), + condvar: Condvar::new(), + running: AtomicBool::new(true), + }); + + let thread = thread::spawn({ + let shared = Arc::clone(&shared); + move || run(shared, events) + }); + + Self { + shared, + thread: Some(thread), + } + } +} + +impl WakeupService for ThreadedWakeupService { + fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId) { + let mut pending = self.shared.pending.lock().unwrap(); + pending.push(Reverse(WakeupRecord { + expiry: Instant::now() + duration, + convo_id, + })); + // The worker may be sleeping until a later deadline; wake it so it + // can recompute the time until the new nearest deadline. + self.shared.condvar.notify_one(); + } +} + +impl Drop for ThreadedWakeupService { + fn drop(&mut self) { + self.shared.running.store(false, Ordering::SeqCst); + self.shared.condvar.notify_one(); + if let Some(thread) = self.thread.take() { + let _ = thread.join(); + } + } +} + +/// Background loop: sleep until the nearest deadline (or forever if the heap +/// is empty), then drain and emit any expired records. +fn run(shared: Arc, events: Sender) { + loop { + let mut pending = shared.pending.lock().unwrap(); + + if !shared.running.load(Ordering::SeqCst) { + return; + } + + let Some(Reverse(next)) = pending.peek() else { + // Nothing scheduled: wait until a registration or shutdown wakes us. + drop(shared.condvar.wait(pending).unwrap()); + continue; + }; + + let now = Instant::now(); + if next.expiry > now { + let timeout = next.expiry - now; + drop(shared.condvar.wait_timeout(pending, timeout).unwrap()); + continue; + } + + let Reverse(record) = pending.pop().unwrap(); + drop(pending); + + if events + .send(WakeupEvent { + convo_id: record.convo_id, + }) + .is_err() + { + return; + } + } +}