From d6c5ff20044bf5a22edcc79490845ba4d275de89 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:08:45 +0900 Subject: [PATCH] use temporal processor directly --- simlib/blendnet-sims/src/node/blend/mod.rs | 94 +++++++++++-------- .../blendnet-sims/src/node/blend/scheduler.rs | 10 +- 2 files changed, 58 insertions(+), 46 deletions(-) diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index f1b4d38..51f5a13 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -21,12 +21,11 @@ use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, }; +use nomos_blend::message_blend::temporal::{TemporalProcessorExt, TemporalStream}; use nomos_blend::{ cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, - message_blend::{ - crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, - }, + message_blend::{crypto::CryptographicProcessor, MessageBlendSettings}, persistent_transmission::{ PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, }, @@ -35,7 +34,7 @@ use nomos_blend::{ use nomos_blend_message::mock::MockBlendMessage; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; -use scheduler::{Interval, TemporalRelease}; +use scheduler::{Interval, TemporalScheduler}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use state::BlendnodeState; @@ -88,14 +87,10 @@ pub struct BlendNode { Interval, >, crypto_processor: CryptographicProcessor, - blend_sender: channel::Sender>, - blend_update_time_sender: channel::Sender, - blend_messages: MessageBlendStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockBlendMessage, - TemporalRelease, - >, + temporal_sender: channel::Sender, + temporal_update_time_sender: channel::Sender, + temporal_processor_messages: + TemporalStream, TemporalScheduler>, epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, cover_traffic: CoverTraffic, @@ -136,9 +131,7 @@ impl BlendNode { ), ); - // Init Tier-2: message blend - let (blend_sender, blend_receiver) = channel::unbounded(); - let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); + // Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor let nodes: Vec< nomos_blend::membership::Node< ::PublicKey, @@ -157,20 +150,17 @@ impl BlendNode { membership.clone(), ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); - let temporal_release = TemporalRelease::new( - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - blend_update_time_receiver, - ( - 1, - settings.message_blend.temporal_processor.max_delay_seconds, - ), - ); - let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( - settings.message_blend.clone(), - membership, - temporal_release, - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - ); + let (temporal_sender, temporal_receiver) = channel::unbounded(); + let (temporal_update_time_sender, temporal_update_time_receiver) = channel::unbounded(); + let temporal_processor_messages = CrossbeamReceiverStream::new(temporal_receiver) + .temporal_stream(TemporalScheduler::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + temporal_update_time_receiver, + ( + 1, + settings.message_blend.temporal_processor.max_delay_seconds, + ), + )); // tier 3 cover traffic let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); @@ -204,9 +194,9 @@ impl BlendNode { persistent_update_time_sender, persistent_transmission_messages, crypto_processor, - blend_sender, - blend_update_time_sender, - blend_messages, + temporal_sender, + temporal_update_time_sender, + temporal_processor_messages, epoch_update_sender, slot_update_sender, cover_traffic, @@ -264,9 +254,29 @@ impl BlendNode { self.persistent_sender.send(message).unwrap(); } - fn schedule_blend(&mut self, message: Vec) { - self.log_message("BlendScheduled", &Self::parse_payload(&message)); - self.blend_sender.send(message).unwrap(); + fn handle_incoming_message(&mut self, message: Vec) { + match self.crypto_processor.unwrap_message(&message) { + Ok((unwrapped_message, fully_unwrapped)) => { + let temporal_message = if fully_unwrapped { + BlendOutgoingMessage::FullyUnwrapped(unwrapped_message) + } else { + BlendOutgoingMessage::Outbound(unwrapped_message) + }; + self.schedule_temporal_processor(temporal_message); + } + Err(e) => { + tracing::error!("Failed to unwrap message: {:?}", e); + } + } + } + + fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) { + let payload = match &message { + BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()), + BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(&msg), + }; + self.log_message("TemporalProcessorScheduled", &payload); + self.temporal_sender.send(message).unwrap(); } fn parse_payload(message: &[u8]) -> Payload { @@ -278,7 +288,7 @@ impl BlendNode { .send(elapsed) .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); - self.blend_update_time_sender.send(elapsed).unwrap(); + self.temporal_update_time_sender.send(elapsed).unwrap(); self.epoch_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap(); } @@ -357,19 +367,21 @@ impl Node for BlendNode { Some(network_message.from), None, ); - self.schedule_blend(network_message.into_payload().0); + self.handle_incoming_message(network_message.into_payload().0); } - // Proceed message blend - if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { + // Proceed temporal processor + if let Poll::Ready(Some(msg)) = + pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) + { match msg { BlendOutgoingMessage::Outbound(msg) => { - self.log_message_released_from("Blend", &Self::parse_payload(&msg)); + self.log_message_released_from("TemporalProcessor", &Self::parse_payload(&msg)); self.schedule_persistent_transmission(msg); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); - self.log_message_released_from("Blend", &payload); + self.log_message_released_from("TemporalProcessor", &payload); self.log_message_fully_unwrapped(&payload); self.state.num_messages_fully_unwrapped += 1; //TODO: create a tracing event diff --git a/simlib/blendnet-sims/src/node/blend/scheduler.rs b/simlib/blendnet-sims/src/node/blend/scheduler.rs index ac403c8..b6a760d 100644 --- a/simlib/blendnet-sims/src/node/blend/scheduler.rs +++ b/simlib/blendnet-sims/src/node/blend/scheduler.rs @@ -44,14 +44,14 @@ impl Stream for Interval { } } -pub struct TemporalRelease { +pub struct TemporalScheduler { random_sleeps: Box + Send + Sync + 'static>, elapsed: Duration, current_sleep: Duration, update_time: channel::Receiver, } -impl TemporalRelease { +impl TemporalScheduler { pub fn new( mut rng: Rng, update_time: channel::Receiver, @@ -80,7 +80,7 @@ impl TemporalRelease { } } -impl Stream for TemporalRelease { +impl Stream for TemporalScheduler { type Item = (); fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { @@ -134,7 +134,7 @@ mod tests { fn temporal_release_update() { let (_tx, rx) = channel::unbounded(); let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); assert!(!temporal_release.update(Duration::from_secs(0))); assert!(!temporal_release.update(Duration::from_millis(999))); @@ -149,7 +149,7 @@ mod tests { let (tx, rx) = channel::unbounded(); let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); tx.send(Duration::from_secs(0)).unwrap(); assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending);