Add Wakeups to Client

This commit is contained in:
Jazz Turner-Baggs 2026-06-13 00:14:36 -07:00
parent 4c0cc97cc3
commit 1ec08198a6
No known key found for this signature in database
7 changed files with 176 additions and 13 deletions

1
Cargo.lock generated
View File

@ -1472,6 +1472,7 @@ name = "components"
version = "0.1.0"
dependencies = [
"base64",
"crossbeam-channel",
"crypto",
"hex",
"libchat",

View File

@ -34,7 +34,7 @@ pub(crate) trait Convo<S: ExternalServices> {
}
/// Group-only operations.
pub(crate) trait GroupConvo<S: ExternalServices>: Convo<S> + std::fmt::Debug {
pub(crate) trait GroupConvo<S: ExternalServices>: Convo<S> + std::fmt::Debug + Send {
fn add_member(
&mut self,
cx: &mut ServiceContext<S>,

View File

@ -115,11 +115,11 @@ mod test_support {
pub(crate) struct NoopWakeups;
impl WakeupService for NoopWakeups {
fn wakeup_in(&mut self, duration: std::time::Duration, convo_id: crate::ConversationId) {}
fn wakeup_in(&mut self, _: std::time::Duration, _: crate::ConversationId) {}
}
impl<IP: IdentityProvider, CS: ChatStore>
ServiceContext<(IP, NoopDelivery, NoopRegistration, WS, CS)>
ServiceContext<(IP, NoopDelivery, NoopRegistration, NoopWakeups, CS)>
{
/// Builds a context around a real store, stubbing other services.
pub(crate) fn for_test(ident: IP, store: CS) -> Result<Self, ChatError> {
@ -132,7 +132,7 @@ mod test_support {
mls_provider: MlsEphemeralPqProvider::new().map_err(ChatError::generic)?,
causal: CausalHistoryStore::new(),
identity: Identity::new(name),
wakeup_service: NoopWakeup {},
wakeup_service: NoopWakeups {},
})
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use components::EphemeralRegistry;
use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent};
use crossbeam_channel::{Receiver, Sender, select};
use libchat::{
ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, InboxOutcome,
@ -13,7 +13,7 @@ use parking_lot::Mutex;
use crate::errors::ClientError;
use crate::event::Event;
type ClientCore<T, R> = Core<(TestLogosAccount, T, R, ChatStorage)>;
type ClientCore<T, R> = Core<(TestLogosAccount, T, R, ThreadedWakeupService, ChatStorage)>;
/// The transport as the client sees it: a [`DeliveryService`] for outbound
/// publishing plus the inbound payload stream the worker drains. One object owns
@ -52,14 +52,17 @@ impl<T: Transport> ChatClient<T, EphemeralRegistry> {
pub fn new(name: impl Into<String>, mut transport: T) -> (Self, Receiver<Event>) {
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_with_name(
ident,
transport,
EphemeralRegistry::new(),
wakeup_service,
ChatStorage::in_memory(),
)
.unwrap();
Self::spawn(core, inbound)
Self::spawn(core, inbound, wakeup_rx)
}
/// Open or create a persistent client backed by `StorageConfig`.
@ -74,8 +77,16 @@ impl<T: Transport> ChatClient<T, EphemeralRegistry> {
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let core = Core::new_from_store(ident, transport, EphemeralRegistry::new(), store)?;
Ok(Self::spawn(core, inbound))
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_from_store(
ident,
transport,
EphemeralRegistry::new(),
wakeup_service,
store,
)?;
Ok(Self::spawn(core, inbound, wakeup_rx))
}
}
@ -106,19 +117,25 @@ where
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let mut core = Core::new_from_store(ident, transport, registry, store)?;
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let mut core = Core::new_from_store(ident, transport, registry, wakeup_service, store)?;
core.register_keypackage()?;
Ok(Self::spawn(core, inbound))
Ok(Self::spawn(core, inbound, wakeup_rx))
}
fn spawn(core: ClientCore<T, R>, inbound: Receiver<Vec<u8>>) -> (Self, Receiver<Event>) {
fn spawn(
core: ClientCore<T, R>,
inbound: Receiver<Vec<u8>>,
wakeup_events: Receiver<WakeupEvent>,
) -> (Self, Receiver<Event>) {
let core = Arc::new(Mutex::new(core));
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0);
let worker = thread::spawn({
let core = Arc::clone(&core);
move || worker_loop(core, inbound, shutdown_rx, event_tx)
move || worker_loop(core, inbound, wakeup_events, shutdown_rx, event_tx)
});
(
@ -187,6 +204,7 @@ impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
fn worker_loop<T, R>(
core: Arc<Mutex<ClientCore<T, R>>>,
inbound: Receiver<Vec<u8>>,
wakeup_events: Receiver<WakeupEvent>,
shutdown: Receiver<()>,
event_tx: Sender<Event>,
) where
@ -217,6 +235,14 @@ fn worker_loop<T, R>(
}
}
}
recv(wakeup_events) -> msg => {
let Ok(WakeupEvent { convo_id }) = msg else {
return; // wakeup service's sender dropped
};
if let Err(e) = core.lock().wakeup(&convo_id) {
tracing::warn!("wakeup failed: {e:?}");
}
}
recv(shutdown) -> _ => return,
}
}

View File

@ -11,6 +11,7 @@ storage = { workspace = true }
# External dependencies (sorted)
base64 = "0.22"
crossbeam-channel = { workspace = true }
hex = "0.4.3"
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }

View File

@ -1,8 +1,10 @@
mod contact_registry;
mod delivery;
mod storage;
mod wakeup;
pub use contact_registry::EphemeralRegistry;
pub use contact_registry::http::{HttpRegistry, HttpRegistryError};
pub use delivery::*;
pub use storage::*;
pub use wakeup::*;

View File

@ -0,0 +1,133 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::Sender;
use libchat::{ConversationId, WakeupService};
#[derive(Debug, Eq, PartialEq)]
struct WakeupRecord {
expiry: Instant,
convo_id: ConversationId,
}
impl Ord for WakeupRecord {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.expiry.cmp(&other.expiry)
}
}
impl PartialOrd for WakeupRecord {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// Sent to the wakeup queue when a previously registered timer expires.
#[derive(Debug, Clone)]
pub struct WakeupEvent {
pub convo_id: ConversationId,
}
struct Shared {
pending: Mutex<BinaryHeap<Reverse<WakeupRecord>>>,
condvar: Condvar,
running: AtomicBool,
}
/// A [`WakeupService`] backed by a background thread that sleeps until the
/// nearest pending deadline, then emits a [`WakeupEvent`] on `events`.
pub struct ThreadedWakeupService {
shared: Arc<Shared>,
thread: Option<JoinHandle<()>>,
}
impl fmt::Debug for ThreadedWakeupService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThreadedWakeupService").finish()
}
}
impl ThreadedWakeupService {
pub fn new(events: Sender<WakeupEvent>) -> Self {
let shared = Arc::new(Shared {
pending: Mutex::new(BinaryHeap::new()),
condvar: Condvar::new(),
running: AtomicBool::new(true),
});
let thread = thread::spawn({
let shared = Arc::clone(&shared);
move || run(shared, events)
});
Self {
shared,
thread: Some(thread),
}
}
}
impl WakeupService for ThreadedWakeupService {
fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId) {
let mut pending = self.shared.pending.lock().unwrap();
pending.push(Reverse(WakeupRecord {
expiry: Instant::now() + duration,
convo_id,
}));
// The worker may be sleeping until a later deadline; wake it so it
// can recompute the time until the new nearest deadline.
self.shared.condvar.notify_one();
}
}
impl Drop for ThreadedWakeupService {
fn drop(&mut self) {
self.shared.running.store(false, Ordering::SeqCst);
self.shared.condvar.notify_one();
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
/// Background loop: sleep until the nearest deadline (or forever if the heap
/// is empty), then drain and emit any expired records.
fn run(shared: Arc<Shared>, events: Sender<WakeupEvent>) {
loop {
let mut pending = shared.pending.lock().unwrap();
if !shared.running.load(Ordering::SeqCst) {
return;
}
let Some(Reverse(next)) = pending.peek() else {
// Nothing scheduled: wait until a registration or shutdown wakes us.
drop(shared.condvar.wait(pending).unwrap());
continue;
};
let now = Instant::now();
if next.expiry > now {
let timeout = next.expiry - now;
drop(shared.condvar.wait_timeout(pending, timeout).unwrap());
continue;
}
let Reverse(record) = pending.pop().unwrap();
drop(pending);
if events
.send(WakeupEvent {
convo_id: record.convo_id,
})
.is_err()
{
return;
}
}
}