diff --git a/.gitignore b/.gitignore index e18a6c8..d8a5322 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__/ *.so simulation network-runner/target +.idea/ \ No newline at end of file diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index 21330cf..ac07664 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,3 +1,5 @@ +mod step_scheduler; + use super::{Node, NodeId}; use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; use serde::{Deserialize, Serialize}; diff --git a/network-runner/src/node/mix/step_scheduler.rs b/network-runner/src/node/mix/step_scheduler.rs new file mode 100644 index 0000000..6e164e6 --- /dev/null +++ b/network-runner/src/node/mix/step_scheduler.rs @@ -0,0 +1,87 @@ +use chrono::format::Item; +use futures::Stream; +use rand::RngCore; +use std::pin::Pin; +use std::sync::mpsc; +use std::task::{Context, Poll}; +use std::time::Duration; + +struct Interval { + duration: Duration, + current_elapsed: Duration, + update_time: mpsc::Receiver, +} + +impl Interval { + pub fn update(&mut self, elapsed: Duration) -> bool { + self.current_elapsed += elapsed; + if self.current_elapsed >= self.duration { + self.current_elapsed = Duration::from_secs(0); + true + } else { + false + } + } +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Ok(elapsed) = self.update_time.recv() { + if self.update(elapsed) { + return Poll::Ready(Some(())); + } + } + Poll::Pending + } +} + +struct TemporalRelease { + random_sleeps: Box>, + elapsed: Duration, + current_sleep: Duration, + update_time: mpsc::Receiver, +} + +impl TemporalRelease { + pub fn new( + mut rng: Rng, + update_time: mpsc::Receiver, + (min_delay, max_delay): (u64, u64), + ) -> Self { + let mut random_sleeps = Box::new(std::iter::repeat_with(move || { + Duration::from_secs((rng.next_u64() % (max_delay + 1)).max(min_delay)) + })); + let current_sleep = random_sleeps.next().unwrap(); + Self { + random_sleeps, + elapsed: Duration::from_secs(0), + current_sleep, + update_time, + } + } + pub fn update(&mut self, elapsed: Duration) -> bool { + self.elapsed += elapsed; + if self.elapsed >= self.current_sleep { + self.elapsed = Duration::from_secs(0); + self.current_sleep = self.random_sleeps.next().unwrap(); + true + } else { + false + } + } +} + +impl Stream for TemporalRelease { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Ok(elapsed) = self.update_time.recv() { + if self.update(elapsed) { + return Poll::Ready(Some(())); + } + } + Poll::Pending + } +}