From feaca41db6d500484d3e85d52e74ac1cffa0bcd8 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:50:13 +0900 Subject: [PATCH] add logs in PersistentSender/Receiver and TemporalDelay --- simlib/blendnet-sims/src/node/blend/mod.rs | 29 ++++++------ .../blendnet-sims/src/node/blend/scheduler.rs | 45 +++++++++++++++---- .../src/node/blend/sender_wrapper.rs | 38 ++++++++++++++++ .../src/node/blend/stream_wrapper.rs | 30 +++++++++++-- 4 files changed, 118 insertions(+), 24 deletions(-) create mode 100644 simlib/blendnet-sims/src/node/blend/sender_wrapper.rs diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 6dd5c9e..7b11e19 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -2,6 +2,7 @@ pub mod consensus_streams; pub mod lottery; mod message; pub mod scheduler; +mod sender_wrapper; pub mod state; pub mod stream_wrapper; pub mod topology; @@ -34,6 +35,7 @@ use nomos_blend_message::mock::MockBlendMessage; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use scheduler::{Interval, TemporalRelease}; +use sender_wrapper::CrossbeamSenderWrapper; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use state::BlendnodeState; @@ -77,7 +79,7 @@ pub struct BlendNode { data_msg_lottery_interval: Interval, data_msg_lottery: StakeLottery, - persistent_sender: channel::Sender>, + persistent_sender: CrossbeamSenderWrapper>, persistent_update_time_sender: channel::Sender, persistent_transmission_messages: PersistentTransmissionStream< CrossbeamReceiverStream>, @@ -122,17 +124,18 @@ impl BlendNode { // Init Tier-1: Persistent transmission let (persistent_sender, persistent_receiver) = channel::unbounded(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); - let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) - .persistent_transmission( - settings.persistent_transmission, - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - Interval::new( - Duration::from_secs_f64( - 1.0 / settings.persistent_transmission.max_emission_frequency, + let persistent_transmission_messages = + CrossbeamReceiverStream::new("PersistentReceiver", persistent_receiver) + .persistent_transmission( + settings.persistent_transmission, + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + Interval::new( + Duration::from_secs_f64( + 1.0 / settings.persistent_transmission.max_emission_frequency, + ), + persistent_update_time_receiver, ), - persistent_update_time_receiver, - ), - ); + ); // Init Tier-2: message blend let (blend_sender, blend_receiver) = channel::unbounded(); @@ -163,7 +166,7 @@ impl BlendNode { settings.message_blend.temporal_processor.max_delay_seconds, ), ); - let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( + let blend_messages = CrossbeamReceiverStream::new("BlendReceiver", blend_receiver).blend( settings.message_blend.clone(), membership, temporal_release, @@ -198,7 +201,7 @@ impl BlendNode { data_msg_lottery_update_time_sender, data_msg_lottery_interval, data_msg_lottery, - persistent_sender, + persistent_sender: CrossbeamSenderWrapper::new("PersistentSender", persistent_sender), persistent_update_time_sender, persistent_transmission_messages, crypto_processor, diff --git a/simlib/blendnet-sims/src/node/blend/scheduler.rs b/simlib/blendnet-sims/src/node/blend/scheduler.rs index ac403c8..67d3214 100644 --- a/simlib/blendnet-sims/src/node/blend/scheduler.rs +++ b/simlib/blendnet-sims/src/node/blend/scheduler.rs @@ -1,6 +1,7 @@ use crossbeam::channel; use futures::Stream; use rand::RngCore; +use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -68,16 +69,25 @@ impl TemporalRelease { update_time, } } - pub fn update(&mut self, elapsed: Duration) -> bool { + + fn update(&mut self, elapsed: Duration) -> Option { self.elapsed += elapsed; if self.elapsed >= self.current_sleep { + let temporal_delay = Delay { + expected: self.current_sleep, + actual: self.elapsed, + }; self.elapsed = Duration::from_secs(0); self.current_sleep = self.random_sleeps.next().unwrap(); - true + Some(temporal_delay) } else { - false + None } } + + fn log_delay(delay: &Delay) { + tracing::info!("TemporalDelay: {}", serde_json::to_string(delay).unwrap()); + } } impl Stream for TemporalRelease { @@ -85,7 +95,8 @@ impl Stream for TemporalRelease { fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { if let Ok(elapsed) = self.update_time.recv() { - if self.update(elapsed) { + if let Some(delay) = self.update(elapsed) { + Self::log_delay(&delay); return Poll::Ready(Some(())); } } @@ -93,6 +104,12 @@ impl Stream for TemporalRelease { } } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct Delay { + expected: Duration, + actual: Duration, +} + #[cfg(test)] mod tests { use super::*; @@ -136,10 +153,22 @@ mod tests { let mut temporal_release = TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); - assert!(!temporal_release.update(Duration::from_secs(0))); - assert!(!temporal_release.update(Duration::from_millis(999))); - assert!(temporal_release.update(Duration::from_secs(1))); - assert!(temporal_release.update(Duration::from_secs(3))); + assert_eq!(temporal_release.update(Duration::from_secs(0)), None); + assert_eq!(temporal_release.update(Duration::from_millis(999)), None); + assert_eq!( + temporal_release.update(Duration::from_secs(1)), + Some(Delay { + expected: Duration::from_secs(1), + actual: Duration::from_millis(1999), + }) + ); + assert_eq!( + temporal_release.update(Duration::from_secs(3)), + Some(Delay { + expected: Duration::from_secs(1), + actual: Duration::from_secs(3), + }) + ); } #[test] diff --git a/simlib/blendnet-sims/src/node/blend/sender_wrapper.rs b/simlib/blendnet-sims/src/node/blend/sender_wrapper.rs new file mode 100644 index 0000000..fdbef4a --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/sender_wrapper.rs @@ -0,0 +1,38 @@ +use crossbeam::channel; +use serde::{Deserialize, Serialize}; + +pub struct CrossbeamSenderWrapper { + name: String, + sender: channel::Sender, +} + +impl CrossbeamSenderWrapper { + pub fn new(name: &str, sender: channel::Sender) -> Self { + Self { + name: name.to_string(), + sender, + } + } + + pub fn send(&self, item: T) -> Result<(), channel::SendError> { + self.sender.send(item)?; + self.log(); + Ok(()) + } + + fn log(&self) { + tracing::info!( + "{}: {}", + self.name, + serde_json::to_string(&Log { + len: self.sender.len(), + }) + .unwrap() + ); + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct Log { + len: usize, +} diff --git a/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs b/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs index 7d776e5..917c9be 100644 --- a/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs +++ b/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs @@ -5,14 +5,30 @@ use std::{ use crossbeam::channel; use futures::Stream; +use serde::{Deserialize, Serialize}; pub struct CrossbeamReceiverStream { + name: String, receiver: channel::Receiver, } impl CrossbeamReceiverStream { - pub fn new(receiver: channel::Receiver) -> Self { - Self { receiver } + pub fn new(name: &str, receiver: channel::Receiver) -> Self { + Self { + name: name.to_string(), + receiver, + } + } + + fn log(&self) { + tracing::info!( + "{}: {}", + self.name, + serde_json::to_string(&Log { + len: self.receiver.len(), + }) + .unwrap() + ); } } @@ -21,9 +37,17 @@ impl Stream for CrossbeamReceiverStream { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { match self.receiver.try_recv() { - Ok(item) => Poll::Ready(Some(item)), + Ok(item) => { + self.log(); + Poll::Ready(Some(item)) + } Err(channel::TryRecvError::Empty) => Poll::Pending, Err(channel::TryRecvError::Disconnected) => Poll::Ready(None), } } } + +#[derive(Debug, Serialize, Deserialize)] +struct Log { + len: usize, +}