From c71412c1e2f3eede5320f48b73470ace60807b11 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 6 Nov 2024 08:45:25 +0100 Subject: [PATCH] Added temporal trigger streams --- .gitignore | 1 + network-runner/src/node/mix/mod.rs | 1 + network-runner/src/node/mix/step_scheduler.rs | 87 +++++++++++++++++++ 3 files changed, 89 insertions(+) create mode 100644 network-runner/src/node/mix/step_scheduler.rs diff --git a/.gitignore b/.gitignore index e18a6c8..d8a5322 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__/ *.so simulation network-runner/target +.idea/ \ No newline at end of file diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index 2d32508..5a1b879 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,4 +1,5 @@ pub mod state; +mod step_scheduler; use super::{Node, NodeId}; use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; diff --git a/network-runner/src/node/mix/step_scheduler.rs b/network-runner/src/node/mix/step_scheduler.rs new file mode 100644 index 0000000..6e164e6 --- /dev/null +++ b/network-runner/src/node/mix/step_scheduler.rs @@ -0,0 +1,87 @@ +use chrono::format::Item; +use futures::Stream; +use rand::RngCore; +use std::pin::Pin; +use std::sync::mpsc; +use std::task::{Context, Poll}; +use std::time::Duration; + +struct Interval { + duration: Duration, + current_elapsed: Duration, + update_time: mpsc::Receiver, +} + +impl Interval { + pub fn update(&mut self, elapsed: Duration) -> bool { + self.current_elapsed += elapsed; + if self.current_elapsed >= self.duration { + self.current_elapsed = Duration::from_secs(0); + true + } else { + false + } + } +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Ok(elapsed) = self.update_time.recv() { + if self.update(elapsed) { + return Poll::Ready(Some(())); + } + } + Poll::Pending + } +} + +struct TemporalRelease { + random_sleeps: Box>, + elapsed: Duration, + current_sleep: Duration, + update_time: mpsc::Receiver, +} + +impl TemporalRelease { + pub fn new( + mut rng: Rng, + update_time: mpsc::Receiver, + (min_delay, max_delay): (u64, u64), + ) -> Self { + let mut random_sleeps = Box::new(std::iter::repeat_with(move || { + Duration::from_secs((rng.next_u64() % (max_delay + 1)).max(min_delay)) + })); + let current_sleep = random_sleeps.next().unwrap(); + Self { + random_sleeps, + elapsed: Duration::from_secs(0), + current_sleep, + update_time, + } + } + pub fn update(&mut self, elapsed: Duration) -> bool { + self.elapsed += elapsed; + if self.elapsed >= self.current_sleep { + self.elapsed = Duration::from_secs(0); + self.current_sleep = self.random_sleeps.next().unwrap(); + true + } else { + false + } + } +} + +impl Stream for TemporalRelease { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Ok(elapsed) = self.update_time.recv() { + if self.update(elapsed) { + return Poll::Ready(Some(())); + } + } + Poll::Pending + } +}