Update WakeupService to use Duration

This commit is contained in:
Jazz Turner-Baggs 2026-06-02 21:07:49 -07:00
parent 690f8aa64e
commit b9db79a7c1
No known key found for this signature in database
3 changed files with 58 additions and 49 deletions

View File

@ -1,7 +1,10 @@
/// Service traits define the functionality which must be externally supplied by /// Service traits define the functionality which must be externally supplied by
/// platform clients. Platforms can alter the behaviour of the chat core by supplying /// platform clients. Platforms can alter the behaviour of the chat core by supplying
/// different implementations. /// different implementations.
use std::{fmt::Debug, fmt::Display}; use std::{
fmt::{Debug, Display},
time::Duration,
};
use crypto::{Ed25519Signature, Ed25519VerifyingKey}; use crypto::{Ed25519Signature, Ed25519VerifyingKey};
@ -70,5 +73,5 @@ impl<T: IdentityProvider> IdentityProvider for &T {
} }
pub trait WakeupService: Debug { pub trait WakeupService: Debug {
fn wakeup_in(&mut self, secs: u32, convo_id: ConversationId); fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId);
} }

View File

@ -484,9 +484,7 @@ impl GroupV2Convo {
// TODO(chat): WakeupService is second-granularity but de-mls // TODO(chat): WakeupService is second-granularity but de-mls
// deadlines are sub-second; `as_secs().max(1)` floors them up to 1s, // deadlines are sub-second; `as_secs().max(1)` floors them up to 1s,
// silently over-waiting. Needs a millisecond-capable wakeup. // silently over-waiting. Needs a millisecond-capable wakeup.
service_ctx service_ctx.wakeup_service.wakeup_in(d, &self.convo_id);
.wakeup_service
.wakeup_in(d.as_secs().max(1) as u32, &self.convo_id);
} }
Ok(()) Ok(())
} }

View File

@ -1,6 +1,7 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
@ -70,10 +71,12 @@ impl DerefMut for PollableClient {
} }
} }
fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>, secs: u32) { fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>, ms: u32) {
for _ in 0..secs { info!(ms, "processing");
let step = 5;
for _ in (0..ms).step_by(step as usize) {
for w in wakeups.iter().as_ref() { for w in wakeups.iter().as_ref() {
w.advance_time(1); w.advance_time(step as u64);
} }
for client in clients.as_mut_slice() { for client in clients.as_mut_slice() {
@ -82,7 +85,7 @@ fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>,
// de-mls deadlines are real wall-clock; sleep so the millisecond-scale // de-mls deadlines are real wall-clock; sleep so the millisecond-scale
// commit/consensus timers actually elapse between poll cycles. // commit/consensus timers actually elapse between poll cycles.
std::thread::sleep(std::time::Duration::from_millis(60)); std::thread::sleep(std::time::Duration::from_millis(step));
} }
} }
@ -91,12 +94,12 @@ use std::collections::BinaryHeap;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
struct WakeupRecord { struct WakeupRecord {
expiry: u32, expiry: Duration,
convo_id: String, convo_id: String,
} }
struct ManualWakeupService { struct ManualWakeupService {
now: u32, now: Duration,
pub pending: BinaryHeap<Reverse<WakeupRecord>>, pub pending: BinaryHeap<Reverse<WakeupRecord>>,
on_wakeup: Box<dyn Fn(String)>, on_wakeup: Box<dyn Fn(String)>,
} }
@ -113,14 +116,14 @@ impl std::fmt::Debug for ManualWakeupService {
impl ManualWakeupService { impl ManualWakeupService {
pub fn new(on_wakeup: impl Fn(String) + 'static) -> Self { pub fn new(on_wakeup: impl Fn(String) + 'static) -> Self {
Self { Self {
now: 0, now: Duration::new(0, 0),
pending: BinaryHeap::new(), pending: BinaryHeap::new(),
on_wakeup: Box::new(on_wakeup), on_wakeup: Box::new(on_wakeup),
} }
} }
pub fn tick(&mut self, secs: u32) -> Vec<String> { pub fn tick(&mut self, ms: u64) -> Vec<String> {
self.now += secs; self.now = self.now.checked_add(Duration::from_millis(ms)).unwrap();
let mut fired = vec![]; let mut fired = vec![];
while self while self
.pending .pending
@ -128,28 +131,29 @@ impl ManualWakeupService {
.is_some_and(|Reverse(w)| w.expiry <= self.now) .is_some_and(|Reverse(w)| w.expiry <= self.now)
{ {
let Reverse(w) = self.pending.pop().unwrap(); let Reverse(w) = self.pending.pop().unwrap();
debug!(now = self.now, w.convo_id, "Popping"); debug!(now = self.now.as_secs(), w.convo_id, "Popping");
fired.push(w.convo_id); fired.push(w.convo_id);
} }
fired fired
} }
pub fn advance_time(&mut self, secs: u32) { pub fn advance_time(&mut self, ms: u64) {
for convo_id in self.tick(secs) { for convo_id in self.tick(ms) {
(self.on_wakeup)(convo_id); (self.on_wakeup)(convo_id);
} }
} }
pub fn next(&self) -> Option<u32> {
Some(self.pending.peek()?.0.expiry)
}
} }
impl WakeupService for ManualWakeupService { impl WakeupService for ManualWakeupService {
fn wakeup_in(&mut self, secs: u32, convo_id: libchat::ConversationId) { fn wakeup_in(&mut self, duration: Duration, convo_id: libchat::ConversationId) {
debug!(now = self.now, secs, convo_id, "Pushing"); debug!(
now = self.now.as_secs(),
duration = duration.as_secs(),
convo_id,
"Pushing"
);
self.pending.push(Reverse(WakeupRecord { self.pending.push(Reverse(WakeupRecord {
expiry: self.now + secs, expiry: self.now + duration,
convo_id: convo_id.to_string(), convo_id: convo_id.to_string(),
})); }));
} }
@ -205,12 +209,11 @@ impl WakeupProvider {
}) })
} }
pub fn advance_time(&self, secs: u32) { pub fn advance_time(&self, ms: u64) {
// borrow_mut must be released before on_wakeup fires — it re-borrows client_slot // borrow_mut must be released before on_wakeup fires — it re-borrows client_slot
let fired = { let fired = {
let mut slot = self.client_slot.borrow_mut(); let mut slot = self.client_slot.borrow_mut();
slot.as_mut() slot.as_mut().map_or(vec![], |client| client.ws().tick(ms))
.map_or(vec![], |client| client.ws().tick(secs))
}; };
for convo_id in fired { for convo_id in fired {
if let Some(client) = self.client_slot.borrow().as_ref() { if let Some(client) = self.client_slot.borrow().as_ref() {
@ -239,28 +242,30 @@ fn wakup() {
let mut w = ManualWakeupService::new(|c| println!("Wakeup: {}. ", c)); let mut w = ManualWakeupService::new(|c| println!("Wakeup: {}. ", c));
println!("STARTing"); println!("STARTing");
w.wakeup_in(5, "5"); w.wakeup_in(Duration::from_secs(5), "5");
info!(next = w.next(), all = format!("{:?}", w.pending)); info!(w = format!("{:?}", w));
w.wakeup_in(1, "1"); w.wakeup_in(Duration::from_secs(1), "1");
info!(next = w.next(), all = format!("{:?}", w.pending)); info!(w = format!("{:?}", w));
w.wakeup_in(2, "2"); w.wakeup_in(Duration::from_secs(2), "2");
info!(next = w.next(), all = format!("{:?}", w.pending)); info!(w = format!("{:?}", w));
println!("GO"); println!("GO");
w.advance_time(1); w.advance_time(1000);
info!(next = w.next(), all = format!("{:?}", w.pending)); info!(w = format!("{:?}", w));
w.advance_time(1); w.advance_time(1000);
info!(next = w.next(), all = format!("{:?}", w.pending)); info!(w = format!("{:?}", w));
w.advance_time(1); w.advance_time(1000);
info!(next = w.next(), all = format!("{:?}", w.pending)); info!(w = format!("{:?}", w));
w.wakeup_in(3, "3"); w.wakeup_in(Duration::from_secs(3), "3");
w.advance_time(1); w.advance_time(1000);
w.advance_time(1); w.advance_time(1000);
w.advance_time(1);
w.advance_time(1); w.advance_time(1000);
w.advance_time(1); w.advance_time(1000);
w.advance_time(1000);
w.advance_time(1000);
println!("DONE"); println!("DONE");
} }
@ -327,6 +332,8 @@ fn core_client() {
let s_convo = clients[SARO] let s_convo = clients[SARO]
.create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()]) .create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()])
let wait_time_ms: u32 = 400;
.unwrap(); .unwrap();
// Bounded driver: de-mls reschedules its steward poll every tick, so a // Bounded driver: de-mls reschedules its steward poll every tick, so a
@ -337,8 +344,8 @@ fn core_client() {
// welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`. // welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`.
// Run extra cycles afterward so Raya polls her inbox and joins after the // Run extra cycles afterward so Raya polls her inbox and joins after the
// welcome is published. // welcome is published.
process(&mut clients, &mut wakeups, 180); process(&mut clients, &mut wakeups, wait_time_ms);
process(&mut clients, &mut wakeups, 180);
// Raya joined via the invite path. // Raya joined via the invite path.
let raya_convos = clients[RAYA].list_conversations().unwrap(); let raya_convos = clients[RAYA].list_conversations().unwrap();
assert!( assert!(
@ -350,12 +357,13 @@ fn core_client() {
// in the log). // in the log).
info!(target: "chat", "Saro -> sending: HI"); info!(target: "chat", "Saro -> sending: HI");
s_convo.send_content(b"HI").unwrap(); s_convo.send_content(b"HI").unwrap();
process(&mut clients, &mut wakeups, 120); process(&mut clients, &mut wakeups, wait_time_ms);
// Raya replies; Saro receives it (look for "Saro received: hi back"). // Raya replies; Saro receives it (look for "Saro received: hi back").
let raya_convo = clients[RAYA] let raya_convo = clients[RAYA]
.convo(&raya_convos[0]) .convo(&raya_convos[0])
.expect("Raya must have a usable conversation handle"); .expect("Raya must have a usable conversation handle");
info!(target: "chat", "Raya -> sending: hi back"); info!(target: "chat", "Raya -> sending: hi back");
raya_convo.send_content(b"hi back").unwrap(); raya_convo.send_content(b"hi back").unwrap();
process(&mut clients, &mut wakeups, 120); process(&mut clients, &mut wakeups, wait_time_ms);
} }