add logs in PersistentSender/Receiver and TemporalDelay

This commit is contained in:
Youngjoon Lee 2024-12-19 09:50:13 +09:00
parent 0ef5c3cfdb
commit feaca41db6
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
4 changed files with 118 additions and 24 deletions

View File

@ -2,6 +2,7 @@ pub mod consensus_streams;
pub mod lottery;
mod message;
pub mod scheduler;
mod sender_wrapper;
pub mod state;
pub mod stream_wrapper;
pub mod topology;
@ -34,6 +35,7 @@ use nomos_blend_message::mock::MockBlendMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
use sender_wrapper::CrossbeamSenderWrapper;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use state::BlendnodeState;
@ -77,7 +79,7 @@ pub struct BlendNode {
data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>,
persistent_sender: channel::Sender<Vec<u8>>,
persistent_sender: CrossbeamSenderWrapper<Vec<u8>>,
persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream<
CrossbeamReceiverStream<Vec<u8>>,
@ -122,17 +124,18 @@ impl BlendNode {
// Init Tier-1: Persistent transmission
let (persistent_sender, persistent_receiver) = channel::unbounded();
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission(
settings.persistent_transmission,
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
Interval::new(
Duration::from_secs_f64(
1.0 / settings.persistent_transmission.max_emission_frequency,
let persistent_transmission_messages =
CrossbeamReceiverStream::new("PersistentReceiver", persistent_receiver)
.persistent_transmission(
settings.persistent_transmission,
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
Interval::new(
Duration::from_secs_f64(
1.0 / settings.persistent_transmission.max_emission_frequency,
),
persistent_update_time_receiver,
),
persistent_update_time_receiver,
),
);
);
// Init Tier-2: message blend
let (blend_sender, blend_receiver) = channel::unbounded();
@ -163,7 +166,7 @@ impl BlendNode {
settings.message_blend.temporal_processor.max_delay_seconds,
),
);
let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend(
let blend_messages = CrossbeamReceiverStream::new("BlendReceiver", blend_receiver).blend(
settings.message_blend.clone(),
membership,
temporal_release,
@ -198,7 +201,7 @@ impl BlendNode {
data_msg_lottery_update_time_sender,
data_msg_lottery_interval,
data_msg_lottery,
persistent_sender,
persistent_sender: CrossbeamSenderWrapper::new("PersistentSender", persistent_sender),
persistent_update_time_sender,
persistent_transmission_messages,
crypto_processor,

View File

@ -1,6 +1,7 @@
use crossbeam::channel;
use futures::Stream;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
@ -68,16 +69,25 @@ impl TemporalRelease {
update_time,
}
}
pub fn update(&mut self, elapsed: Duration) -> bool {
fn update(&mut self, elapsed: Duration) -> Option<Delay> {
self.elapsed += elapsed;
if self.elapsed >= self.current_sleep {
let temporal_delay = Delay {
expected: self.current_sleep,
actual: self.elapsed,
};
self.elapsed = Duration::from_secs(0);
self.current_sleep = self.random_sleeps.next().unwrap();
true
Some(temporal_delay)
} else {
false
None
}
}
fn log_delay(delay: &Delay) {
tracing::info!("TemporalDelay: {}", serde_json::to_string(delay).unwrap());
}
}
impl Stream for TemporalRelease {
@ -85,7 +95,8 @@ impl Stream for TemporalRelease {
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Ok(elapsed) = self.update_time.recv() {
if self.update(elapsed) {
if let Some(delay) = self.update(elapsed) {
Self::log_delay(&delay);
return Poll::Ready(Some(()));
}
}
@ -93,6 +104,12 @@ impl Stream for TemporalRelease {
}
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct Delay {
expected: Duration,
actual: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
@ -136,10 +153,22 @@ mod tests {
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
assert!(!temporal_release.update(Duration::from_secs(0)));
assert!(!temporal_release.update(Duration::from_millis(999)));
assert!(temporal_release.update(Duration::from_secs(1)));
assert!(temporal_release.update(Duration::from_secs(3)));
assert_eq!(temporal_release.update(Duration::from_secs(0)), None);
assert_eq!(temporal_release.update(Duration::from_millis(999)), None);
assert_eq!(
temporal_release.update(Duration::from_secs(1)),
Some(Delay {
expected: Duration::from_secs(1),
actual: Duration::from_millis(1999),
})
);
assert_eq!(
temporal_release.update(Duration::from_secs(3)),
Some(Delay {
expected: Duration::from_secs(1),
actual: Duration::from_secs(3),
})
);
}
#[test]

View File

@ -0,0 +1,38 @@
use crossbeam::channel;
use serde::{Deserialize, Serialize};
pub struct CrossbeamSenderWrapper<T> {
name: String,
sender: channel::Sender<T>,
}
impl<T> CrossbeamSenderWrapper<T> {
pub fn new(name: &str, sender: channel::Sender<T>) -> Self {
Self {
name: name.to_string(),
sender,
}
}
pub fn send(&self, item: T) -> Result<(), channel::SendError<T>> {
self.sender.send(item)?;
self.log();
Ok(())
}
fn log(&self) {
tracing::info!(
"{}: {}",
self.name,
serde_json::to_string(&Log {
len: self.sender.len(),
})
.unwrap()
);
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Log {
len: usize,
}

View File

@ -5,14 +5,30 @@ use std::{
use crossbeam::channel;
use futures::Stream;
use serde::{Deserialize, Serialize};
pub struct CrossbeamReceiverStream<T> {
name: String,
receiver: channel::Receiver<T>,
}
impl<T> CrossbeamReceiverStream<T> {
pub fn new(receiver: channel::Receiver<T>) -> Self {
Self { receiver }
pub fn new(name: &str, receiver: channel::Receiver<T>) -> Self {
Self {
name: name.to_string(),
receiver,
}
}
fn log(&self) {
tracing::info!(
"{}: {}",
self.name,
serde_json::to_string(&Log {
len: self.receiver.len(),
})
.unwrap()
);
}
}
@ -21,9 +37,17 @@ impl<T> Stream for CrossbeamReceiverStream<T> {
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.try_recv() {
Ok(item) => Poll::Ready(Some(item)),
Ok(item) => {
self.log();
Poll::Ready(Some(item))
}
Err(channel::TryRecvError::Empty) => Poll::Pending,
Err(channel::TryRecvError::Disconnected) => Poll::Ready(None),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct Log {
len: usize,
}