diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 73b7f1e..9729a7a 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -18,8 +18,8 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } netrunner = { path = "../netrunner" } nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } -nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" } -nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" } +nomos-blend = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend" } +nomos-blend-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-blend-message" } futures = "0.3.31" rand_chacha = "0.3" multiaddr = "0.18" diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index 6636913..9179c44 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -44,6 +44,6 @@ "max_emission_frequency": 1.0, "drop_message_probability": 0.0 }, - "number_of_mix_layers": 2, + "number_of_blend_layers": 2, "max_delay_seconds": 10 } diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 1e9eb91..378716b 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -4,8 +4,8 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates -use crate::node::mix::state::{MixnodeRecord, MixnodeState}; -use crate::node::mix::{MixMessage, MixnodeSettings}; +use crate::node::blend::state::{BlendnodeRecord, BlendnodeState}; +use crate::node::blend::{BlendMessage, BlendnodeSettings}; use anyhow::Ok; use clap::Parser; use crossbeam::channel; @@ -16,8 +16,8 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; -use nomos_mix::cover_traffic::CoverTrafficSettings; -use nomos_mix::message_blend::{ +use nomos_blend::cover_traffic::CoverTrafficSettings; +use nomos_blend::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use parking_lot::Mutex; @@ -28,7 +28,7 @@ use rand_chacha::ChaCha12Rng; use serde::de::DeserializeOwned; use serde::Serialize; // internal -use crate::node::mix::MixNode; +use crate::node::blend::BlendNode; use crate::settings::SimSettings; use netrunner::{runner::SimulationRunner, settings::SimulationSettings}; @@ -88,19 +88,19 @@ impl SimulationApp { let regions_data = RegionsData::new(regions, behaviours); let ids = node_ids.clone(); - let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); + let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); let nodes: Vec<_> = node_ids .iter() .copied() .map(|node_id| { let mut network = network.lock(); - create_boxed_mixnode( + create_boxed_blendnode( node_id, &mut network, settings.simulation_settings.clone(), no_netcap, - MixnodeSettings { + BlendnodeSettings { connected_peers: ids .iter() .filter(|&id| id != &node_id) @@ -115,7 +115,7 @@ impl SimulationApp { message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { private_key: node_id.into(), - num_mix_layers: settings.number_of_mix_layers, + num_blend_layers: settings.number_of_blend_layers, }, temporal_processor: TemporalSchedulerSettings { max_delay_seconds: settings.max_delay_seconds, @@ -140,13 +140,13 @@ impl SimulationApp { } } -fn create_boxed_mixnode( +fn create_boxed_blendnode( node_id: NodeId, - network: &mut Network, + network: &mut Network, simulation_settings: SimulationSettings, no_netcap: bool, - mixnode_settings: MixnodeSettings, -) -> BoxedNode { + blendnode_settings: BlendnodeSettings, +) -> BoxedNode { let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded(); let (node_message_sender, node_message_receiver) = channel::unbounded(); // Dividing milliseconds in second by milliseconds in the step. @@ -174,7 +174,7 @@ fn create_boxed_mixnode( node_message_sender, network_message_receiver, ); - Box::new(MixNode::new(node_id, mixnode_settings, network_interface)) + Box::new(BlendNode::new(node_id, blendnode_settings, network_interface)) } fn run( @@ -189,7 +189,7 @@ where T: Serialize + Clone + 'static, { let stream_settings = settings.stream_settings.clone(); - let runner = SimulationRunner::<_, MixnodeRecord, S, T>::new( + let runner = SimulationRunner::<_, BlendnodeRecord, S, T>::new( network, nodes, Default::default(), @@ -199,11 +199,11 @@ where let handle = match stream_type { Some(StreamType::Naive) => { let settings = stream_settings.unwrap_naive(); - runner.simulate_and_subscribe::>(settings)? + runner.simulate_and_subscribe::>(settings)? } Some(StreamType::IO) => { let settings = stream_settings.unwrap_io(); - runner.simulate_and_subscribe::>(settings)? + runner.simulate_and_subscribe::>(settings)? } None => runner.simulate()?, }; diff --git a/simlib/blendnet-sims/src/node/mix/consensus_streams.rs b/simlib/blendnet-sims/src/node/mix/consensus_streams.rs deleted file mode 100644 index ea1edf1..0000000 --- a/simlib/blendnet-sims/src/node/mix/consensus_streams.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::node::mix::scheduler::Interval; -use crossbeam::channel; -use futures::stream::iter; -use futures::{Stream, StreamExt}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -pub struct CounterInterval { - interval: Box + Unpin + Send + Sync>, -} - -impl CounterInterval { - pub fn new(duration: Duration, update_receiver: channel::Receiver) -> Self { - let interval = Interval::new(duration, update_receiver) - .zip(iter(0usize..)) - .map(|(_, i)| i); - let interval = Box::new(interval); - Self { interval } - } -} - -impl Stream for CounterInterval { - type Item = usize; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.interval.poll_next_unpin(cx) - } -} - -pub type Epoch = CounterInterval; - -pub struct Slot { - interval: Box + Unpin + Send + Sync>, -} - -impl Slot { - pub fn new( - slots_per_epoch: usize, - slot_duration: Duration, - update_receiver: channel::Receiver, - ) -> Self { - let interval = CounterInterval::new(slot_duration, update_receiver) - .map(move |slot| slot % slots_per_epoch); - let interval = Box::new(interval); - Self { interval } - } -} - -impl Stream for Slot { - type Item = usize; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.interval.poll_next_unpin(cx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn counter_interval() { - let waker = futures::task::noop_waker(); - let mut cx = Context::from_waker(&waker); - - let (update_sender, update_receiver) = channel::unbounded(); - let mut interval = CounterInterval::new(Duration::from_secs(1), update_receiver); - - update_sender.send(Duration::from_secs(0)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(0))); - update_sender.send(Duration::from_secs(0)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); - update_sender.send(Duration::from_millis(999)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); - update_sender.send(Duration::from_millis(1)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); - update_sender.send(Duration::from_secs(1)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(2))); - update_sender.send(Duration::from_secs(3)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(3))); - } - - #[test] - fn slot_interval() { - let waker = futures::task::noop_waker(); - let mut cx = Context::from_waker(&waker); - - let (update_sender, update_receiver) = channel::unbounded(); - let mut slot = Slot::new(3, Duration::from_secs(1), update_receiver); - - update_sender.send(Duration::from_secs(0)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0))); - update_sender.send(Duration::from_secs(0)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Pending); - update_sender.send(Duration::from_millis(999)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Pending); - update_sender.send(Duration::from_millis(1)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); - update_sender.send(Duration::from_secs(1)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(2))); - update_sender.send(Duration::from_secs(3)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0))); - update_sender.send(Duration::from_secs(1)).unwrap(); - assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); - } -} diff --git a/simlib/blendnet-sims/src/node/mix/lottery.rs b/simlib/blendnet-sims/src/node/mix/lottery.rs deleted file mode 100644 index d0ed0b9..0000000 --- a/simlib/blendnet-sims/src/node/mix/lottery.rs +++ /dev/null @@ -1,22 +0,0 @@ -use rand::Rng; - -pub struct StakeLottery { - rng: R, - stake_proportion: f64, -} - -impl StakeLottery -where - R: Rng, -{ - pub fn new(rng: R, stake_proportion: f64) -> Self { - Self { - rng, - stake_proportion, - } - } - - pub fn run(&mut self) -> bool { - self.rng.gen_range(0.0..1.0) < self.stake_proportion - } -} diff --git a/simlib/blendnet-sims/src/node/mix/message.rs b/simlib/blendnet-sims/src/node/mix/message.rs deleted file mode 100644 index 4a30a7d..0000000 --- a/simlib/blendnet-sims/src/node/mix/message.rs +++ /dev/null @@ -1,39 +0,0 @@ -use uuid::Uuid; - -pub type PayloadId = String; - -pub struct Payload(Uuid); - -impl Payload { - pub fn new() -> Self { - Self(Uuid::new_v4()) - } - - pub fn id(&self) -> PayloadId { - self.0.to_string() - } - - pub fn as_bytes(&self) -> &[u8] { - self.0.as_bytes() - } - - pub fn load(data: Vec) -> Self { - assert_eq!(data.len(), 16); - Self(data.try_into().unwrap()) - } -} - -#[cfg(test)] -mod tests { - use super::Payload; - - #[test] - fn payload() { - let payload = Payload::new(); - println!("{}", payload.id()); - let bytes = payload.as_bytes(); - assert_eq!(bytes.len(), 16); - let loaded_payload = Payload::load(bytes.to_vec()); - assert_eq!(bytes, loaded_payload.as_bytes()); - } -} diff --git a/simlib/blendnet-sims/src/node/mix/mod.rs b/simlib/blendnet-sims/src/node/mix/mod.rs deleted file mode 100644 index cf5cca4..0000000 --- a/simlib/blendnet-sims/src/node/mix/mod.rs +++ /dev/null @@ -1,402 +0,0 @@ -pub mod consensus_streams; -pub mod lottery; -mod message; -pub mod scheduler; -pub mod state; -pub mod stream_wrapper; - -use crate::node::mix::consensus_streams::{Epoch, Slot}; -use cached::{Cached, TimedCache}; -use crossbeam::channel; -use futures::Stream; -use lottery::StakeLottery; -use message::{Payload, PayloadId}; -use multiaddr::Multiaddr; -use netrunner::network::NetworkMessage; -use netrunner::node::{Node, NodeId, NodeIdExt}; -use netrunner::{ - network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, - warding::WardCondition, -}; -use nomos_mix::{ - cover_traffic::{CoverTraffic, CoverTrafficSettings}, - membership::Membership, - message_blend::{ - crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, - }, - persistent_transmission::{ - PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, - }, - MixOutgoingMessage, -}; -use nomos_mix_message::mock::MockMixMessage; -use rand::SeedableRng; -use rand_chacha::ChaCha12Rng; -use scheduler::{Interval, TemporalRelease}; -use serde::{Deserialize, Serialize}; -use sha2::{Digest, Sha256}; -use state::MixnodeState; -use std::{pin::pin, task::Poll, time::Duration}; -use stream_wrapper::CrossbeamReceiverStream; - -#[derive(Debug, Clone)] -pub struct MixMessage(Vec); - -impl PayloadSize for MixMessage { - fn size_bytes(&self) -> u32 { - 2208 - } -} - -#[derive(Deserialize)] -pub struct MixnodeSettings { - pub connected_peers: Vec, - pub data_message_lottery_interval: Duration, - pub stake_proportion: f64, - pub seed: u64, - pub epoch_duration: Duration, - pub slot_duration: Duration, - pub persistent_transmission: PersistentTransmissionSettings, - pub message_blend: MessageBlendSettings, - pub cover_traffic_settings: CoverTrafficSettings, - pub membership: Vec<::PublicKey>, -} - -type Sha256Hash = [u8; 32]; - -/// This node implementation only used for testing different streaming implementation purposes. -pub struct MixNode { - id: NodeId, - state: MixnodeState, - settings: MixnodeSettings, - network_interface: InMemoryNetworkInterface, - message_cache: TimedCache, - - data_msg_lottery_update_time_sender: channel::Sender, - data_msg_lottery_interval: Interval, - data_msg_lottery: StakeLottery, - - persistent_sender: channel::Sender>, - persistent_update_time_sender: channel::Sender, - persistent_transmission_messages: PersistentTransmissionStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockMixMessage, - Interval, - >, - crypto_processor: CryptographicProcessor, - blend_sender: channel::Sender>, - blend_update_time_sender: channel::Sender, - blend_messages: MessageBlendStream< - CrossbeamReceiverStream>, - ChaCha12Rng, - MockMixMessage, - TemporalRelease, - >, - epoch_update_sender: channel::Sender, - slot_update_sender: channel::Sender, - cover_traffic: CoverTraffic, -} - -impl MixNode { - pub fn new( - id: NodeId, - settings: MixnodeSettings, - network_interface: InMemoryNetworkInterface, - ) -> Self { - let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); - - // Init Interval for data message lottery - let (data_msg_lottery_update_time_sender, data_msg_lottery_update_time_receiver) = - channel::unbounded(); - let data_msg_lottery_interval = Interval::new( - settings.data_message_lottery_interval, - data_msg_lottery_update_time_receiver, - ); - let data_msg_lottery = StakeLottery::new( - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - settings.stake_proportion, - ); - - // Init Tier-1: Persistent transmission - let (persistent_sender, persistent_receiver) = channel::unbounded(); - let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); - let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) - .persistent_transmission( - settings.persistent_transmission, - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - Interval::new( - Duration::from_secs_f64( - 1.0 / settings.persistent_transmission.max_emission_frequency, - ), - persistent_update_time_receiver, - ), - ); - - // Init Tier-2: message blend - let (blend_sender, blend_receiver) = channel::unbounded(); - let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); - let nodes: Vec< - nomos_mix::membership::Node< - ::PublicKey, - >, - > = settings - .membership - .iter() - .map(|&public_key| nomos_mix::membership::Node { - address: Multiaddr::empty(), - public_key, - }) - .collect(); - let membership = Membership::::new(nodes, id.into()); - let crypto_processor = CryptographicProcessor::new( - settings.message_blend.cryptographic_processor.clone(), - membership.clone(), - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - ); - let temporal_release = TemporalRelease::new( - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - blend_update_time_receiver, - ( - 1, - settings.message_blend.temporal_processor.max_delay_seconds, - ), - ); - let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( - settings.message_blend.clone(), - membership, - temporal_release, - ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), - ); - - // tier 3 cover traffic - let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); - let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); - let cover_traffic: CoverTraffic = CoverTraffic::new( - settings.cover_traffic_settings, - Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), - Slot::new( - settings.cover_traffic_settings.slots_per_epoch, - settings.slot_duration, - slot_updater_update_receiver, - ), - ); - - Self { - id, - network_interface, - // We're not coupling this lifespan with the steps now, but it's okay - // We expected that a message will be delivered to most of nodes within 60s. - message_cache: TimedCache::with_lifespan(60), - settings, - state: MixnodeState { - node_id: id, - step_id: 0, - num_messages_fully_unwrapped: 0, - }, - data_msg_lottery_update_time_sender, - data_msg_lottery_interval, - data_msg_lottery, - persistent_sender, - persistent_update_time_sender, - persistent_transmission_messages, - crypto_processor, - blend_sender, - blend_update_time_sender, - blend_messages, - epoch_update_sender, - slot_update_sender, - cover_traffic, - } - } - - fn forward( - &mut self, - message: MixMessage, - exclude_node: Option, - log: Option, - ) { - for (i, node_id) in self - .settings - .connected_peers - .iter() - .filter(|&id| Some(*id) != exclude_node) - .enumerate() - { - if i == 0 { - if let Some(log) = &log { - Self::log_emission(log); - } - } - self.network_interface - .send_message(*node_id, message.clone()) - } - self.message_cache.cache_set(Self::sha256(&message.0), ()); - } - - fn receive(&mut self) -> Vec> { - self.network_interface - .receive_messages() - .into_iter() - // Retain only messages that have not been seen before - .filter(|msg| { - self.message_cache - .cache_set(Self::sha256(&msg.payload().0), ()) - .is_none() - }) - .collect() - } - - fn sha256(message: &[u8]) -> Sha256Hash { - let mut hasher = Sha256::new(); - hasher.update(message); - hasher.finalize().into() - } - - fn update_time(&mut self, elapsed: Duration) { - self.data_msg_lottery_update_time_sender - .send(elapsed) - .unwrap(); - self.persistent_update_time_sender.send(elapsed).unwrap(); - self.blend_update_time_sender.send(elapsed).unwrap(); - self.epoch_update_sender.send(elapsed).unwrap(); - self.slot_update_sender.send(elapsed).unwrap(); - } - - fn log_message_generated(&self, msg_type: &str, payload: &Payload) { - self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload); - } - - fn log_message_fully_unwrapped(&self, payload: &Payload) { - self.log_message("MessageFullyUnwrapped", payload); - } - - fn log_message(&self, tag: &str, payload: &Payload) { - let log = MessageLog { - payload_id: payload.id(), - step_id: self.state.step_id, - node_id: self.id.index(), - }; - tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); - } - - fn log_emission(log: &EmissionLog) { - tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); - } - - fn new_emission_log(&self, emission_type: &str) -> EmissionLog { - EmissionLog { - emission_type: emission_type.to_string(), - step_id: self.state.step_id, - node_id: self.id.index(), - } - } -} - -impl Node for MixNode { - type Settings = MixnodeSettings; - - type State = MixnodeState; - - fn id(&self) -> NodeId { - self.id - } - - fn state(&self) -> &Self::State { - &self.state - } - - fn step(&mut self, elapsed: Duration) { - self.update_time(elapsed); - let waker = futures::task::noop_waker(); - let mut cx = futures::task::Context::from_waker(&waker); - - // Generate a data message probabilistically - if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { - if self.data_msg_lottery.run() { - let payload = Payload::new(); - self.log_message_generated("Data", &payload); - let message = self - .crypto_processor - .wrap_message(payload.as_bytes()) - .unwrap(); - self.persistent_sender.send(message).unwrap(); - } - } - - // Handle incoming messages - for network_message in self.receive() { - self.forward( - network_message.payload().clone(), - Some(network_message.from), - None, - ); - self.blend_sender - .send(network_message.into_payload().0) - .unwrap(); - } - - // Proceed message blend - if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { - match msg { - MixOutgoingMessage::Outbound(msg) => { - self.persistent_sender.send(msg).unwrap(); - } - MixOutgoingMessage::FullyUnwrapped(payload) => { - let payload = Payload::load(payload); - self.log_message_fully_unwrapped(&payload); - self.state.num_messages_fully_unwrapped += 1; - //TODO: create a tracing event - } - } - } - - // Generate a cover message probabilistically - if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { - let payload = Payload::new(); - self.log_message_generated("Cover", &payload); - let message = self - .crypto_processor - .wrap_message(payload.as_bytes()) - .unwrap(); - self.persistent_sender.send(message).unwrap(); - } - - // Proceed persistent transmission - if let Poll::Ready(Some(msg)) = - pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) - { - self.forward( - MixMessage(msg), - None, - Some(self.new_emission_log("FromPersistent")), - ); - } - - self.state.step_id += 1; - } - - fn analyze(&self, ward: &mut WardCondition) -> bool { - match ward { - WardCondition::Max(_) => false, - WardCondition::Sum(condition) => { - *condition.step_result.borrow_mut() += self.state.num_messages_fully_unwrapped; - false - } - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -struct MessageLog { - payload_id: PayloadId, - step_id: usize, - node_id: usize, -} - -#[derive(Debug, Serialize, Deserialize)] -struct EmissionLog { - emission_type: String, - step_id: usize, - node_id: usize, -} diff --git a/simlib/blendnet-sims/src/node/mix/scheduler.rs b/simlib/blendnet-sims/src/node/mix/scheduler.rs deleted file mode 100644 index ac403c8..0000000 --- a/simlib/blendnet-sims/src/node/mix/scheduler.rs +++ /dev/null @@ -1,169 +0,0 @@ -use crossbeam::channel; -use futures::Stream; -use rand::RngCore; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -pub struct Interval { - duration: Duration, - current_elapsed: Duration, - update_time: channel::Receiver, -} - -impl Interval { - pub fn new(duration: Duration, update_time: channel::Receiver) -> Self { - Self { - duration, - current_elapsed: duration, // to immediately release at the interval 0 - update_time, - } - } - - pub fn update(&mut self, elapsed: Duration) -> bool { - self.current_elapsed += elapsed; - if self.current_elapsed >= self.duration { - self.current_elapsed = Duration::from_secs(0); - true - } else { - false - } - } -} - -impl Stream for Interval { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - if let Ok(elapsed) = self.update_time.recv() { - if self.update(elapsed) { - return Poll::Ready(Some(())); - } - } - Poll::Pending - } -} - -pub struct TemporalRelease { - random_sleeps: Box + Send + Sync + 'static>, - elapsed: Duration, - current_sleep: Duration, - update_time: channel::Receiver, -} - -impl TemporalRelease { - pub fn new( - mut rng: Rng, - update_time: channel::Receiver, - (min_delay, max_delay): (u64, u64), - ) -> Self { - let mut random_sleeps = Box::new(std::iter::repeat_with(move || { - Duration::from_secs((rng.next_u64() % (max_delay + 1)).max(min_delay)) - })); - let current_sleep = random_sleeps.next().unwrap(); - Self { - random_sleeps, - elapsed: Duration::from_secs(0), - current_sleep, - update_time, - } - } - pub fn update(&mut self, elapsed: Duration) -> bool { - self.elapsed += elapsed; - if self.elapsed >= self.current_sleep { - self.elapsed = Duration::from_secs(0); - self.current_sleep = self.random_sleeps.next().unwrap(); - true - } else { - false - } - } -} - -impl Stream for TemporalRelease { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - if let Ok(elapsed) = self.update_time.recv() { - if self.update(elapsed) { - return Poll::Ready(Some(())); - } - } - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::StreamExt; - use rand_chacha::rand_core::SeedableRng; - - #[test] - fn interval_update() { - let (_tx, rx) = channel::unbounded(); - let mut interval = Interval::new(Duration::from_secs(2), rx); - - assert!(interval.update(Duration::from_secs(0))); - assert!(!interval.update(Duration::from_secs(1))); - assert!(interval.update(Duration::from_secs(1))); - assert!(interval.update(Duration::from_secs(3))); - } - - #[test] - fn interval_polling() { - let waker = futures::task::noop_waker(); - let mut cx = Context::from_waker(&waker); - - let (tx, rx) = channel::unbounded(); - let mut interval = Interval::new(Duration::from_secs(2), rx); - - tx.send(Duration::from_secs(0)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(()))); - tx.send(Duration::from_secs(0)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); - tx.send(Duration::from_secs(1)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); - tx.send(Duration::from_secs(1)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(()))); - tx.send(Duration::from_secs(3)).unwrap(); - assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(()))); - } - - #[test] - fn temporal_release_update() { - let (_tx, rx) = channel::unbounded(); - let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); - - assert!(!temporal_release.update(Duration::from_secs(0))); - assert!(!temporal_release.update(Duration::from_millis(999))); - assert!(temporal_release.update(Duration::from_secs(1))); - assert!(temporal_release.update(Duration::from_secs(3))); - } - - #[test] - fn temporal_release_polling() { - let waker = futures::task::noop_waker(); - let mut cx = Context::from_waker(&waker); - - let (tx, rx) = channel::unbounded(); - let mut temporal_release = - TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); - - tx.send(Duration::from_secs(0)).unwrap(); - assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending); - tx.send(Duration::from_millis(999)).unwrap(); - assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending); - tx.send(Duration::from_secs(1)).unwrap(); - assert_eq!( - temporal_release.poll_next_unpin(&mut cx), - Poll::Ready(Some(())) - ); - tx.send(Duration::from_secs(3)).unwrap(); - assert_eq!( - temporal_release.poll_next_unpin(&mut cx), - Poll::Ready(Some(())) - ); - } -} diff --git a/simlib/blendnet-sims/src/node/mix/state.rs b/simlib/blendnet-sims/src/node/mix/state.rs deleted file mode 100644 index ff0d9c1..0000000 --- a/simlib/blendnet-sims/src/node/mix/state.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::any::Any; - -use serde::Serialize; - -use netrunner::{ - node::{serialize_node_id_as_index, NodeId}, - output_processors::{Record, RecordType, Runtime}, - settings::SimulationSettings, - warding::SimulationState, -}; - -#[derive(Debug, Clone, Serialize)] -pub struct MixnodeState { - #[serde(serialize_with = "serialize_node_id_as_index")] - pub node_id: NodeId, - pub step_id: usize, - pub num_messages_fully_unwrapped: usize, -} - -#[derive(Serialize)] -#[serde(untagged)] -pub enum MixnodeRecord { - Runtime(Runtime), - Settings(Box), - #[allow(clippy::vec_box)] // we downcast stuff and we need the extra boxing - Data(Vec>), -} - -impl From for MixnodeRecord { - fn from(value: Runtime) -> Self { - Self::Runtime(value) - } -} - -impl From for MixnodeRecord { - fn from(value: SimulationSettings) -> Self { - Self::Settings(Box::new(value)) - } -} - -impl Record for MixnodeRecord { - type Data = MixnodeState; - - fn record_type(&self) -> RecordType { - match self { - MixnodeRecord::Runtime(_) => RecordType::Meta, - MixnodeRecord::Settings(_) => RecordType::Settings, - MixnodeRecord::Data(_) => RecordType::Data, - } - } - - fn data(&self) -> Vec<&MixnodeState> { - match self { - MixnodeRecord::Data(d) => d.iter().map(AsRef::as_ref).collect(), - _ => vec![], - } - } -} - -impl TryFrom<&SimulationState> for MixnodeRecord { - type Error = anyhow::Error; - - fn try_from(state: &SimulationState) -> Result { - let Ok(states) = state - .nodes - .read() - .iter() - .map(|n| Box::::downcast(Box::new(n.state().clone()))) - .collect::, _>>() - else { - return Err(anyhow::anyhow!("use carnot record on other node")); - }; - Ok(Self::Data(states)) - } -} diff --git a/simlib/blendnet-sims/src/node/mix/stream_wrapper.rs b/simlib/blendnet-sims/src/node/mix/stream_wrapper.rs deleted file mode 100644 index 7d776e5..0000000 --- a/simlib/blendnet-sims/src/node/mix/stream_wrapper.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use crossbeam::channel; -use futures::Stream; - -pub struct CrossbeamReceiverStream { - receiver: channel::Receiver, -} - -impl CrossbeamReceiverStream { - pub fn new(receiver: channel::Receiver) -> Self { - Self { receiver } - } -} - -impl Stream for CrossbeamReceiverStream { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - match self.receiver.try_recv() { - Ok(item) => Poll::Ready(Some(item)), - Err(channel::TryRecvError::Empty) => Poll::Pending, - Err(channel::TryRecvError::Disconnected) => Poll::Ready(None), - } - } -} diff --git a/simlib/blendnet-sims/src/node/mod.rs b/simlib/blendnet-sims/src/node/mod.rs index bd1b04a..e54dd7c 100644 --- a/simlib/blendnet-sims/src/node/mod.rs +++ b/simlib/blendnet-sims/src/node/mod.rs @@ -1 +1 @@ -pub mod mix; +pub mod blend; diff --git a/simlib/blendnet-sims/src/settings.rs b/simlib/blendnet-sims/src/settings.rs index ba53b46..ef63637 100644 --- a/simlib/blendnet-sims/src/settings.rs +++ b/simlib/blendnet-sims/src/settings.rs @@ -1,5 +1,5 @@ use netrunner::settings::SimulationSettings; -use nomos_mix::persistent_transmission::PersistentTransmissionSettings; +use nomos_blend::persistent_transmission::PersistentTransmissionSettings; use serde::{Deserialize, Deserializer}; use std::time::Duration; @@ -21,7 +21,7 @@ pub struct SimSettings { // For tier 1 pub persistent_transmission: PersistentTransmissionSettings, // For tier 2 - pub number_of_mix_layers: usize, + pub number_of_blend_layers: usize, pub max_delay_seconds: u64, }