From 8588a40f9419fd1277b410df09e453d0052a254f Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Thu, 28 May 2026 11:56:34 -0700 Subject: [PATCH] Update process to handle next wakeup --- .../integration_tests_core/tests/dev_tests.rs | 105 +++++++++++++++--- 1 file changed, 92 insertions(+), 13 deletions(-) diff --git a/core/integration_tests_core/tests/dev_tests.rs b/core/integration_tests_core/tests/dev_tests.rs index 5b9d461..0b55c5b 100644 --- a/core/integration_tests_core/tests/dev_tests.rs +++ b/core/integration_tests_core/tests/dev_tests.rs @@ -70,6 +70,52 @@ impl DerefMut for PollableClient { } } +fn process_all(clients: &mut Vec, wakeups: &mut Vec) { + info!("Process All"); + while process_next(clients, wakeups) { + info!(" -- process"); + } +} + +fn process_next(clients: &mut Vec, wakeups: &mut Vec) -> bool { + for w in wakeups.iter().as_ref() { + if let Some(client) = w.client_slot.borrow().as_ref() { + dbg!(&client.ws().pending); + } + } + + for w in wakeups.iter().as_ref() { + if let Some(client) = w.client_slot.borrow().as_ref() { + let n = w.next(); + info!(n, "<<<"); + } + } + + let Some(next_wakeup) = wakeups + .iter() + .map(|w| w.next()) + .filter(|x| x.is_some()) + .min() + .flatten() + else { + info!("Nothing to do for process_next"); + return false; + }; + + info!(next = next_wakeup, "Process"); + // }; + + for w in wakeups.iter().as_ref() { + w.advance_time(next_wakeup); + } + + for client in clients.as_mut_slice() { + client.process_messages(); + } + + return true; +} + fn process(clients: &mut Vec, wakeups: &mut Vec, secs: u32) { for _ in 0..secs { for w in wakeups.iter().as_ref() { @@ -93,7 +139,7 @@ struct WakeupRecord { struct ManualWakeupService { now: u32, - pending: BinaryHeap>, + pub pending: BinaryHeap>, on_wakeup: Box, } @@ -126,6 +172,7 @@ impl ManualWakeupService { let Reverse(w) = self.pending.pop().unwrap(); info!(now = self.now, w.convo_id, "Popping"); fired.push(w.convo_id); + dbg!(&self); } fired } @@ -135,11 +182,15 @@ impl ManualWakeupService { (self.on_wakeup)(convo_id); } } + + pub fn next(&self) -> Option { + Some(self.pending.peek()?.0.expiry) + } } impl WakeupService for ManualWakeupService { fn wakeup_in(&mut self, secs: u32, convo_id: libchat::ConversationId) { - info!(now = self.now, convo_id, "Pushing"); + info!(now = self.now, secs, convo_id, "Pushing"); self.pending.push(Reverse(WakeupRecord { expiry: self.now + secs, convo_id: convo_id.to_string(), @@ -173,6 +224,12 @@ struct WakeupProvider { >, } +impl std::fmt::Debug for WakeupProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WakeupProvider").finish() + } +} + impl WakeupProvider { pub fn new() -> Self { Self { @@ -180,6 +237,22 @@ impl WakeupProvider { } } + pub fn has_pending(&self) -> bool { + if let Some(client) = self.client_slot.borrow().as_ref() { + return true; + } + false + } + + pub fn next(&self) -> Option { + if let Some(client) = self.client_slot.borrow().as_ref() { + let ws = client.ws(); + return Some(ws.next()? - ws.now); + } + + None + } + pub fn create_wakeup_service(&self) -> ManualWakeupService { let slot = self.client_slot.clone(); ManualWakeupService::new(move |convo_id| { @@ -197,8 +270,9 @@ impl WakeupProvider { .map_or(vec![], |client| client.ws().tick(secs)) }; for convo_id in fired { + info!("FIRED"); if let Some(client) = self.client_slot.borrow().as_ref() { - client.on_wakeup(&convo_id); + client.on_wakeup(&convo_id).unwrap(); } } } @@ -224,15 +298,20 @@ fn wakup() { println!("STARTing"); w.wakeup_in(5, "5"); - + info!(next = w.next(), all = format!("{:?}", w.pending)); w.wakeup_in(1, "1"); + info!(next = w.next(), all = format!("{:?}", w.pending)); w.wakeup_in(2, "2"); + info!(next = w.next(), all = format!("{:?}", w.pending)); println!("GO"); w.advance_time(1); + info!(next = w.next(), all = format!("{:?}", w.pending)); w.advance_time(1); + info!(next = w.next(), all = format!("{:?}", w.pending)); w.advance_time(1); + info!(next = w.next(), all = format!("{:?}", w.pending)); w.wakeup_in(3, "3"); w.advance_time(1); @@ -291,21 +370,21 @@ fn core_client() { .unwrap(); // Manaully process the DS - process(&mut clients, &mut wakeups, 10); - - s_convo.send_content(b"HI").unwrap(); + process_all(&mut clients, &mut wakeups); + process_all(&mut clients, &mut wakeups); + // s_convo.send_content(b"HI").unwrap(); // Manaully process the DS - process(&mut clients, &mut wakeups, 10); + process_all(&mut clients, &mut wakeups); - // TODO: Needs Invite path working first + // // TODO: Needs Invite path working first // let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap(); // let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists"); - // process(&mut clients); + // process(&mut clients, &mut wakeups, 10); // r_convo.send_content(b"PEW").unwrap(); - // process(&mut clients); + // process(&mut clients, &mut wakeups, 10); // s_convo.send_content(b"SARO again").unwrap(); - // process(&mut clients); - // println!("Hello"); + // process(&mut clients, &mut wakeups, 10); + println!("Hello"); }