diff --git a/core/conversations/src/service_traits.rs b/core/conversations/src/service_traits.rs index bac2282..87ef40b 100644 --- a/core/conversations/src/service_traits.rs +++ b/core/conversations/src/service_traits.rs @@ -1,7 +1,10 @@ /// Service traits define the functionality which must be externally supplied by /// platform clients. Platforms can alter the behaviour of the chat core by supplying /// different implementations. -use std::{fmt::Debug, fmt::Display}; +use std::{ + fmt::{Debug, Display}, + time::Duration, +}; use crypto::{Ed25519Signature, Ed25519VerifyingKey}; @@ -70,5 +73,5 @@ impl IdentityProvider for &T { } pub trait WakeupService: Debug { - fn wakeup_in(&mut self, secs: u32, convo_id: ConversationId); + fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId); } diff --git a/core/core_client/src/conversation/group_v2.rs b/core/core_client/src/conversation/group_v2.rs index 16b2b38..f4b8d33 100644 --- a/core/core_client/src/conversation/group_v2.rs +++ b/core/core_client/src/conversation/group_v2.rs @@ -484,9 +484,7 @@ impl GroupV2Convo { // TODO(chat): WakeupService is second-granularity but de-mls // deadlines are sub-second; `as_secs().max(1)` floors them up to 1s, // silently over-waiting. Needs a millisecond-capable wakeup. - service_ctx - .wakeup_service - .wakeup_in(d.as_secs().max(1) as u32, &self.convo_id); + service_ctx.wakeup_service.wakeup_in(d, &self.convo_id); } Ok(()) } diff --git a/core/integration_tests_core/tests/dev_tests.rs b/core/integration_tests_core/tests/dev_tests.rs index 83ba74a..d345c59 100644 --- a/core/integration_tests_core/tests/dev_tests.rs +++ b/core/integration_tests_core/tests/dev_tests.rs @@ -1,6 +1,7 @@ use std::cell::RefCell; use std::ops::{Deref, DerefMut}; use std::rc::Rc; +use std::time::Duration; use tracing::{debug, info, warn}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; @@ -70,10 +71,12 @@ impl DerefMut for PollableClient { } } -fn process(clients: &mut Vec, wakeups: &mut Vec, secs: u32) { - for _ in 0..secs { +fn process(clients: &mut Vec, wakeups: &mut Vec, ms: u32) { + info!(ms, "processing"); + let step = 5; + for _ in (0..ms).step_by(step as usize) { for w in wakeups.iter().as_ref() { - w.advance_time(1); + w.advance_time(step as u64); } for client in clients.as_mut_slice() { @@ -82,7 +85,7 @@ fn process(clients: &mut Vec, wakeups: &mut Vec, // de-mls deadlines are real wall-clock; sleep so the millisecond-scale // commit/consensus timers actually elapse between poll cycles. - std::thread::sleep(std::time::Duration::from_millis(60)); + std::thread::sleep(std::time::Duration::from_millis(step)); } } @@ -91,12 +94,12 @@ use std::collections::BinaryHeap; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] struct WakeupRecord { - expiry: u32, + expiry: Duration, convo_id: String, } struct ManualWakeupService { - now: u32, + now: Duration, pub pending: BinaryHeap>, on_wakeup: Box, } @@ -113,14 +116,14 @@ impl std::fmt::Debug for ManualWakeupService { impl ManualWakeupService { pub fn new(on_wakeup: impl Fn(String) + 'static) -> Self { Self { - now: 0, + now: Duration::new(0, 0), pending: BinaryHeap::new(), on_wakeup: Box::new(on_wakeup), } } - pub fn tick(&mut self, secs: u32) -> Vec { - self.now += secs; + pub fn tick(&mut self, ms: u64) -> Vec { + self.now = self.now.checked_add(Duration::from_millis(ms)).unwrap(); let mut fired = vec![]; while self .pending @@ -128,28 +131,29 @@ impl ManualWakeupService { .is_some_and(|Reverse(w)| w.expiry <= self.now) { let Reverse(w) = self.pending.pop().unwrap(); - debug!(now = self.now, w.convo_id, "Popping"); + debug!(now = self.now.as_secs(), w.convo_id, "Popping"); fired.push(w.convo_id); } fired } - pub fn advance_time(&mut self, secs: u32) { - for convo_id in self.tick(secs) { + pub fn advance_time(&mut self, ms: u64) { + for convo_id in self.tick(ms) { (self.on_wakeup)(convo_id); } } - - pub fn next(&self) -> Option { - Some(self.pending.peek()?.0.expiry) - } } impl WakeupService for ManualWakeupService { - fn wakeup_in(&mut self, secs: u32, convo_id: libchat::ConversationId) { - debug!(now = self.now, secs, convo_id, "Pushing"); + fn wakeup_in(&mut self, duration: Duration, convo_id: libchat::ConversationId) { + debug!( + now = self.now.as_secs(), + duration = duration.as_secs(), + convo_id, + "Pushing" + ); self.pending.push(Reverse(WakeupRecord { - expiry: self.now + secs, + expiry: self.now + duration, convo_id: convo_id.to_string(), })); } @@ -205,12 +209,11 @@ impl WakeupProvider { }) } - pub fn advance_time(&self, secs: u32) { + pub fn advance_time(&self, ms: u64) { // borrow_mut must be released before on_wakeup fires — it re-borrows client_slot let fired = { let mut slot = self.client_slot.borrow_mut(); - slot.as_mut() - .map_or(vec![], |client| client.ws().tick(secs)) + slot.as_mut().map_or(vec![], |client| client.ws().tick(ms)) }; for convo_id in fired { if let Some(client) = self.client_slot.borrow().as_ref() { @@ -239,28 +242,30 @@ fn wakup() { let mut w = ManualWakeupService::new(|c| println!("Wakeup: {}. ", c)); println!("STARTing"); - w.wakeup_in(5, "5"); - info!(next = w.next(), all = format!("{:?}", w.pending)); - w.wakeup_in(1, "1"); - info!(next = w.next(), all = format!("{:?}", w.pending)); - w.wakeup_in(2, "2"); - info!(next = w.next(), all = format!("{:?}", w.pending)); + w.wakeup_in(Duration::from_secs(5), "5"); + info!(w = format!("{:?}", w)); + w.wakeup_in(Duration::from_secs(1), "1"); + info!(w = format!("{:?}", w)); + w.wakeup_in(Duration::from_secs(2), "2"); + info!(w = format!("{:?}", w)); println!("GO"); - w.advance_time(1); - info!(next = w.next(), all = format!("{:?}", w.pending)); - w.advance_time(1); - info!(next = w.next(), all = format!("{:?}", w.pending)); - w.advance_time(1); - info!(next = w.next(), all = format!("{:?}", w.pending)); - w.wakeup_in(3, "3"); - w.advance_time(1); + w.advance_time(1000); + info!(w = format!("{:?}", w)); + w.advance_time(1000); + info!(w = format!("{:?}", w)); + w.advance_time(1000); + info!(w = format!("{:?}", w)); + w.wakeup_in(Duration::from_secs(3), "3"); + w.advance_time(1000); - w.advance_time(1); - w.advance_time(1); - w.advance_time(1); - w.advance_time(1); + w.advance_time(1000); + + w.advance_time(1000); + w.advance_time(1000); + w.advance_time(1000); + w.advance_time(1000); println!("DONE"); } @@ -327,6 +332,8 @@ fn core_client() { let s_convo = clients[SARO] .create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()]) + let wait_time_ms: u32 = 400; + .unwrap(); // Bounded driver: de-mls reschedules its steward poll every tick, so a @@ -337,8 +344,8 @@ fn core_client() { // welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`. // Run extra cycles afterward so Raya polls her inbox and joins after the // welcome is published. - process(&mut clients, &mut wakeups, 180); - process(&mut clients, &mut wakeups, 180); + process(&mut clients, &mut wakeups, wait_time_ms); + // Raya joined via the invite path. let raya_convos = clients[RAYA].list_conversations().unwrap(); assert!( @@ -350,12 +357,13 @@ fn core_client() { // in the log). info!(target: "chat", "Saro -> sending: HI"); s_convo.send_content(b"HI").unwrap(); - process(&mut clients, &mut wakeups, 120); + process(&mut clients, &mut wakeups, wait_time_ms); + // Raya replies; Saro receives it (look for "Saro received: hi back"). let raya_convo = clients[RAYA] .convo(&raya_convos[0]) .expect("Raya must have a usable conversation handle"); info!(target: "chat", "Raya -> sending: hi back"); raya_convo.send_content(b"hi back").unwrap(); - process(&mut clients, &mut wakeups, 120); + process(&mut clients, &mut wakeups, wait_time_ms); }