Update process to handle next wakeup

This commit is contained in:
Jazz Turner-Baggs 2026-05-28 11:56:34 -07:00
parent b9a628535f
commit 8588a40f94
No known key found for this signature in database

View File

@ -70,6 +70,52 @@ impl DerefMut for PollableClient {
}
}
fn process_all(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>) {
info!("Process All");
while process_next(clients, wakeups) {
info!(" -- process");
}
}
fn process_next(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>) -> bool {
for w in wakeups.iter().as_ref() {
if let Some(client) = w.client_slot.borrow().as_ref() {
dbg!(&client.ws().pending);
}
}
for w in wakeups.iter().as_ref() {
if let Some(client) = w.client_slot.borrow().as_ref() {
let n = w.next();
info!(n, "<<<");
}
}
let Some(next_wakeup) = wakeups
.iter()
.map(|w| w.next())
.filter(|x| x.is_some())
.min()
.flatten()
else {
info!("Nothing to do for process_next");
return false;
};
info!(next = next_wakeup, "Process");
// };
for w in wakeups.iter().as_ref() {
w.advance_time(next_wakeup);
}
for client in clients.as_mut_slice() {
client.process_messages();
}
return true;
}
fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>, secs: u32) {
for _ in 0..secs {
for w in wakeups.iter().as_ref() {
@ -93,7 +139,7 @@ struct WakeupRecord {
struct ManualWakeupService {
now: u32,
pending: BinaryHeap<Reverse<WakeupRecord>>,
pub pending: BinaryHeap<Reverse<WakeupRecord>>,
on_wakeup: Box<dyn Fn(String)>,
}
@ -126,6 +172,7 @@ impl ManualWakeupService {
let Reverse(w) = self.pending.pop().unwrap();
info!(now = self.now, w.convo_id, "Popping");
fired.push(w.convo_id);
dbg!(&self);
}
fired
}
@ -135,11 +182,15 @@ impl ManualWakeupService {
(self.on_wakeup)(convo_id);
}
}
pub fn next(&self) -> Option<u32> {
Some(self.pending.peek()?.0.expiry)
}
}
impl WakeupService for ManualWakeupService {
fn wakeup_in(&mut self, secs: u32, convo_id: libchat::ConversationId) {
info!(now = self.now, convo_id, "Pushing");
info!(now = self.now, secs, convo_id, "Pushing");
self.pending.push(Reverse(WakeupRecord {
expiry: self.now + secs,
convo_id: convo_id.to_string(),
@ -173,6 +224,12 @@ struct WakeupProvider {
>,
}
impl std::fmt::Debug for WakeupProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WakeupProvider").finish()
}
}
impl WakeupProvider {
pub fn new() -> Self {
Self {
@ -180,6 +237,22 @@ impl WakeupProvider {
}
}
pub fn has_pending(&self) -> bool {
if let Some(client) = self.client_slot.borrow().as_ref() {
return true;
}
false
}
pub fn next(&self) -> Option<u32> {
if let Some(client) = self.client_slot.borrow().as_ref() {
let ws = client.ws();
return Some(ws.next()? - ws.now);
}
None
}
pub fn create_wakeup_service(&self) -> ManualWakeupService {
let slot = self.client_slot.clone();
ManualWakeupService::new(move |convo_id| {
@ -197,8 +270,9 @@ impl WakeupProvider {
.map_or(vec![], |client| client.ws().tick(secs))
};
for convo_id in fired {
info!("FIRED");
if let Some(client) = self.client_slot.borrow().as_ref() {
client.on_wakeup(&convo_id);
client.on_wakeup(&convo_id).unwrap();
}
}
}
@ -224,15 +298,20 @@ fn wakup() {
println!("STARTing");
w.wakeup_in(5, "5");
info!(next = w.next(), all = format!("{:?}", w.pending));
w.wakeup_in(1, "1");
info!(next = w.next(), all = format!("{:?}", w.pending));
w.wakeup_in(2, "2");
info!(next = w.next(), all = format!("{:?}", w.pending));
println!("GO");
w.advance_time(1);
info!(next = w.next(), all = format!("{:?}", w.pending));
w.advance_time(1);
info!(next = w.next(), all = format!("{:?}", w.pending));
w.advance_time(1);
info!(next = w.next(), all = format!("{:?}", w.pending));
w.wakeup_in(3, "3");
w.advance_time(1);
@ -291,21 +370,21 @@ fn core_client() {
.unwrap();
// Manaully process the DS
process(&mut clients, &mut wakeups, 10);
s_convo.send_content(b"HI").unwrap();
process_all(&mut clients, &mut wakeups);
process_all(&mut clients, &mut wakeups);
// s_convo.send_content(b"HI").unwrap();
// Manaully process the DS
process(&mut clients, &mut wakeups, 10);
process_all(&mut clients, &mut wakeups);
// TODO: Needs Invite path working first
// // TODO: Needs Invite path working first
// let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap();
// let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists");
// process(&mut clients);
// process(&mut clients, &mut wakeups, 10);
// r_convo.send_content(b"PEW").unwrap();
// process(&mut clients);
// process(&mut clients, &mut wakeups, 10);
// s_convo.send_content(b"SARO again").unwrap();
// process(&mut clients);
// println!("Hello");
// process(&mut clients, &mut wakeups, 10);
println!("Hello");
}