diff --git a/simlib/mixnet-sims/Cargo.toml b/simlib/mixnet-sims/Cargo.toml index 37dd82b..743c1b4 100644 --- a/simlib/mixnet-sims/Cargo.toml +++ b/simlib/mixnet-sims/Cargo.toml @@ -16,9 +16,9 @@ serde_json = "1.0.132" tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } netrunner = { path = "../netrunner" } -nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git", rev = "7b984a4" } -nomos-mix = { git = "https://github.com/logos-co/nomos-node", rev = "7b984a4", package = "nomos-mix" } -nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", rev = "7b984a4", package = "nomos-mix-message" } +nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } +nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix", branch = "missing-covertraffic-bounds" } +nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message", branch = "missing-covertraffic-bounds" } futures = "0.3.31" rand_chacha = "0.3" multiaddr = "0.18" diff --git a/simlib/mixnet-sims/src/main.rs b/simlib/mixnet-sims/src/main.rs index c1f37cc..bf066aa 100644 --- a/simlib/mixnet-sims/src/main.rs +++ b/simlib/mixnet-sims/src/main.rs @@ -16,6 +16,7 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use nomos_mix::cover_traffic::CoverTrafficSettings; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; @@ -102,6 +103,8 @@ impl SimulationApp { data_message_lottery_interval: Duration::from_secs(20), stake_proportion: 1.0 / node_ids.len() as f64, seed: 0, + epoch_duration: Duration::from_secs(86400 * 5), // 5 days seconds + slot_duration: Duration::from_secs(20), persistent_transmission: PersistentTransmissionSettings { max_emission_frequency: 1.0, drop_message_probability: 0.0, @@ -115,6 +118,12 @@ impl SimulationApp { max_delay_seconds: 10, }, }, + cover_traffic_settings: CoverTrafficSettings { + node_id: node_id.0, + number_of_hops: 4, + slots_per_epoch: 200, + network_size: node_ids.len(), + }, membership: node_ids.iter().map(|&id| id.into()).collect(), }, ) diff --git a/simlib/mixnet-sims/src/node/mix/consensus_streams.rs b/simlib/mixnet-sims/src/node/mix/consensus_streams.rs index 8f534a7..b31a41a 100644 --- a/simlib/mixnet-sims/src/node/mix/consensus_streams.rs +++ b/simlib/mixnet-sims/src/node/mix/consensus_streams.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use std::time::Duration; pub struct CounterInterval { - interval: Box + Unpin>, + interval: Box + Unpin + Send + Sync>, } impl CounterInterval { @@ -31,7 +31,7 @@ impl Stream for CounterInterval { pub type Epoch = CounterInterval; pub struct Slot { - interval: Box + Unpin>, + interval: Box + Unpin + Send + Sync>, } impl Slot { diff --git a/simlib/mixnet-sims/src/node/mix/mod.rs b/simlib/mixnet-sims/src/node/mix/mod.rs index e644e1a..9175366 100644 --- a/simlib/mixnet-sims/src/node/mix/mod.rs +++ b/simlib/mixnet-sims/src/node/mix/mod.rs @@ -1,9 +1,10 @@ -mod consensus_streams; -mod lottery; -mod scheduler; +pub mod consensus_streams; +pub mod lottery; +pub mod scheduler; pub mod state; pub mod stream_wrapper; +use crate::node::mix::consensus_streams::{Epoch, Slot}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; @@ -14,6 +15,7 @@ use netrunner::{ warding::WardCondition, }; use nomos_mix::{ + cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, message_blend::{ crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, @@ -30,8 +32,11 @@ use scheduler::{Interval, TemporalRelease}; use serde::Deserialize; use sha2::{Digest, Sha256}; use state::MixnodeState; -use std::collections::HashSet; -use std::{pin::pin, task::Poll, time::Duration}; +use std::{ + pin::{self}, + task::Poll, + time::Duration, +}; use stream_wrapper::CrossbeamReceiverStream; use uuid::Uuid; @@ -44,14 +49,17 @@ impl PayloadSize for MixMessage { } } -#[derive(Clone, Deserialize)] +#[derive(Deserialize)] pub struct MixnodeSettings { pub connected_peers: Vec, pub data_message_lottery_interval: Duration, pub stake_proportion: f64, pub seed: u64, + pub epoch_duration: Duration, + pub slot_duration: Duration, pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, + pub cover_traffic_settings: CoverTrafficSettings, pub membership: Vec<::PublicKey>, } @@ -86,6 +94,9 @@ pub struct MixNode { MockMixMessage, TemporalRelease, >, + epoch_update_sender: channel::Sender, + slot_update_sender: channel::Sender, + cover_traffic: CoverTraffic, } impl MixNode { @@ -159,6 +170,19 @@ impl MixNode { ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); + // tier 3 cover traffic + let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); + let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); + let cover_traffic: CoverTraffic = CoverTraffic::new( + settings.cover_traffic_settings, + Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), + Slot::new( + settings.cover_traffic_settings.slots_per_epoch, + settings.slot_duration, + slot_updater_update_receiver, + ), + ); + Self { id, network_interface, @@ -179,6 +203,9 @@ impl MixNode { blend_sender, blend_update_time_sender, blend_messages, + epoch_update_sender, + slot_update_sender, + cover_traffic, } } @@ -214,6 +241,8 @@ impl MixNode { .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); self.blend_update_time_sender.send(elapsed).unwrap(); + self.epoch_update_sender.send(elapsed).unwrap(); + self.slot_update_sender.send(elapsed).unwrap(); } fn build_message_payload(&self) -> [u8; 16] { @@ -237,6 +266,17 @@ impl Node for MixNode { fn step(&mut self, elapsed: Duration) { self.update_time(elapsed); + let Self { + data_msg_lottery_interval, + data_msg_lottery, + persistent_sender, + persistent_transmission_messages, + crypto_processor, + blend_sender, + blend_messages, + cover_traffic, + .. + } = self; let waker = futures::task::noop_waker(); let mut cx = futures::task::Context::from_waker(&waker); @@ -269,6 +309,11 @@ impl Node for MixNode { } } } + if let Poll::Ready(Some(msg)) = pin::pin!(cover_traffic).poll_next(&mut cx) { + let message = crypto_processor.wrap_message(&msg).unwrap(); + persistent_sender.send(message).unwrap(); + } + // Proceed persistent transmission if let Poll::Ready(Some(msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)