diff --git a/.gitignore b/.gitignore index b88a4cd..03f3409 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,7 @@ simulation simlib/netrunner/target .idea/ target/ -Cargo.lock \ No newline at end of file +Cargo.lock +simlib/**/target +simlib/**/Cargo.lock +simlib/test.json diff --git a/simlib/Cargo.toml b/simlib/Cargo.toml index cee8ef1..322581b 100644 --- a/simlib/Cargo.toml +++ b/simlib/Cargo.toml @@ -4,4 +4,4 @@ members = [ "mixnet-sims" ] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/simlib/mixnet-sims/Cargo.toml b/simlib/mixnet-sims/Cargo.toml index a7ca1f0..33a6759 100644 --- a/simlib/mixnet-sims/Cargo.toml +++ b/simlib/mixnet-sims/Cargo.toml @@ -16,10 +16,12 @@ serde_json = "1.0.132" tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } netrunner = { path = "../netrunner" } -nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git", rev = "cc5fef6" } -nomos-mix = { git = "https://github.com/logos-co/nomos-node", rev = "e095964", package = "nomos-mix" } -nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", rev = "e095964", package = "nomos-mix-message" } +nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } +nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" } +nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" } futures = "0.3.31" rand_chacha = "0.3" multiaddr = "0.18" - +sha2 = "0.10" +uuid = { version = "1", features = ["fast-rng", "v4"] } +tracing-appender = "0.2" diff --git a/simlib/mixnet-sims/src/log.rs b/simlib/mixnet-sims/src/log.rs index 053b81f..4c75ceb 100644 --- a/simlib/mixnet-sims/src/log.rs +++ b/simlib/mixnet-sims/src/log.rs @@ -3,6 +3,8 @@ use nomos_tracing::{ metrics::otlp::{create_otlp_metrics_layer, OtlpMetricsConfig}, }; use std::{path::PathBuf, str::FromStr}; +use tracing::{level_filters::LevelFilter, Level}; +use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Default, Copy, Clone)] @@ -44,10 +46,14 @@ impl FromStr for LogOutput { } } -pub fn config_tracing(_fmt: LogFormat, log_to: &LogOutput, with_metrics: bool) { +pub fn config_tracing( + _fmt: LogFormat, + log_to: &LogOutput, + with_metrics: bool, +) -> Option { let mut layers: Vec + Send + Sync>> = vec![]; - let (log_layer, _) = match log_to { + let (log_layer, guard) = match log_to { LogOutput::StdOut => create_writer_layer(std::io::stdout()), LogOutput::StdErr => create_writer_layer(std::io::stderr()), LogOutput::File(path) => create_file_layer(nomos_tracing::logging::local::FileConfig { @@ -68,5 +74,11 @@ pub fn config_tracing(_fmt: LogFormat, log_to: &LogOutput, with_metrics: bool) { .unwrap(); layers.push(Box::new(metrics_layer)); } - tracing_subscriber::registry().with(layers).init(); + + tracing_subscriber::registry() + .with(LevelFilter::from(Level::DEBUG)) + .with(layers) + .init(); + + Some(guard) } diff --git a/simlib/mixnet-sims/src/main.rs b/simlib/mixnet-sims/src/main.rs index f2a77e1..ba384e0 100644 --- a/simlib/mixnet-sims/src/main.rs +++ b/simlib/mixnet-sims/src/main.rs @@ -16,6 +16,7 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use nomos_mix::cover_traffic::CoverTrafficSettings; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; @@ -102,6 +103,8 @@ impl SimulationApp { data_message_lottery_interval: Duration::from_secs(20), stake_proportion: 1.0 / node_ids.len() as f64, seed: 0, + epoch_duration: Duration::from_secs(86400 * 5), // 5 days seconds + slot_duration: Duration::from_secs(20), persistent_transmission: PersistentTransmissionSettings { max_emission_frequency: 1.0, drop_message_probability: 0.0, @@ -109,12 +112,18 @@ impl SimulationApp { message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { private_key: node_id.into(), - num_mix_layers: 1, + num_mix_layers: 4, }, temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 10, }, }, + cover_traffic_settings: CoverTrafficSettings { + node_id: node_id.0, + number_of_hops: 4, + slots_per_epoch: 21600, + network_size: node_ids.len(), + }, membership: node_ids.iter().map(|&id| id.into()).collect(), }, ) @@ -232,7 +241,7 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); - log::config_tracing(app.log_format, &app.log_to, app.with_metrics); + let _maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); if let Err(e) = app.run() { tracing::error!("error: {}", e); diff --git a/simlib/mixnet-sims/src/node/mix/consensus_streams.rs b/simlib/mixnet-sims/src/node/mix/consensus_streams.rs index c05b5b8..33c52f7 100644 --- a/simlib/mixnet-sims/src/node/mix/consensus_streams.rs +++ b/simlib/mixnet-sims/src/node/mix/consensus_streams.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use std::time::Duration; pub struct CounterInterval { - interval: Box + Unpin>, + interval: Box + Unpin + Send + Sync>, } impl CounterInterval { @@ -31,7 +31,7 @@ impl Stream for CounterInterval { pub type Epoch = CounterInterval; pub struct Slot { - interval: Box + Unpin>, + interval: Box + Unpin + Send + Sync>, } impl Slot { diff --git a/simlib/mixnet-sims/src/node/mix/mod.rs b/simlib/mixnet-sims/src/node/mix/mod.rs index 241d3e5..6a8fedc 100644 --- a/simlib/mixnet-sims/src/node/mix/mod.rs +++ b/simlib/mixnet-sims/src/node/mix/mod.rs @@ -4,6 +4,7 @@ pub mod scheduler; pub mod state; pub mod stream_wrapper; +use crate::node::mix::consensus_streams::{Epoch, Slot}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; @@ -14,6 +15,7 @@ use netrunner::{ warding::WardCondition, }; use nomos_mix::{ + cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, message_blend::{ crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, @@ -28,13 +30,17 @@ use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use scheduler::{Interval, TemporalRelease}; use serde::Deserialize; +use sha2::{Digest, Sha256}; use state::MixnodeState; +use std::collections::HashSet; +use std::pin::pin; use std::{ pin::{self}, task::Poll, time::Duration, }; use stream_wrapper::CrossbeamReceiverStream; +use uuid::Uuid; #[derive(Debug, Clone)] pub struct MixMessage(Vec); @@ -45,23 +51,29 @@ impl PayloadSize for MixMessage { } } -#[derive(Clone, Deserialize)] +#[derive(Deserialize)] pub struct MixnodeSettings { pub connected_peers: Vec, pub data_message_lottery_interval: Duration, pub stake_proportion: f64, pub seed: u64, + pub epoch_duration: Duration, + pub slot_duration: Duration, pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, + pub cover_traffic_settings: CoverTrafficSettings, pub membership: Vec<::PublicKey>, } +type Sha256Hash = [u8; 32]; + /// This node implementation only used for testing different streaming implementation purposes. pub struct MixNode { id: NodeId, state: MixnodeState, settings: MixnodeSettings, network_interface: InMemoryNetworkInterface, + message_cache: HashSet, data_msg_lottery_update_time_sender: channel::Sender, data_msg_lottery_interval: Interval, @@ -84,6 +96,9 @@ pub struct MixNode { MockMixMessage, TemporalRelease, >, + epoch_update_sender: channel::Sender, + slot_update_sender: channel::Sender, + cover_traffic: CoverTraffic, } impl MixNode { @@ -157,13 +172,26 @@ impl MixNode { ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); + // tier 3 cover traffic + let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); + let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); + let cover_traffic: CoverTraffic = CoverTraffic::new( + settings.cover_traffic_settings, + Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), + Slot::new( + settings.cover_traffic_settings.slots_per_epoch, + settings.slot_duration, + slot_updater_update_receiver, + ), + ); + Self { id, network_interface, + message_cache: HashSet::new(), settings, state: MixnodeState { node_id: id, - mock_counter: 0, step_id: 0, num_messages_broadcasted: 0, }, @@ -177,22 +205,50 @@ impl MixNode { blend_sender, blend_update_time_sender, blend_messages, + epoch_update_sender, + slot_update_sender, + cover_traffic, } } - fn forward(&self, message: MixMessage) { + fn forward(&mut self, message: MixMessage) { + if !self.message_cache.insert(Self::sha256(&message.0)) { + return; + } for node_id in self.settings.connected_peers.iter() { self.network_interface .send_message(*node_id, message.clone()) } } + fn receive(&mut self) -> Vec { + self.network_interface + .receive_messages() + .into_iter() + .map(|msg| msg.into_payload()) + // Retain only messages that have not been seen before + .filter(|msg| self.message_cache.insert(Self::sha256(&msg.0))) + .collect() + } + + fn sha256(message: &[u8]) -> Sha256Hash { + let mut hasher = Sha256::new(); + hasher.update(message); + hasher.finalize().into() + } + fn update_time(&mut self, elapsed: Duration) { self.data_msg_lottery_update_time_sender .send(elapsed) .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); self.blend_update_time_sender.send(elapsed).unwrap(); + self.epoch_update_sender.send(elapsed).unwrap(); + self.slot_update_sender.send(elapsed).unwrap(); + } + + fn build_message_payload() -> [u8; 16] { + Uuid::new_v4().into_bytes() } } @@ -211,65 +267,56 @@ impl Node for MixNode { fn step(&mut self, elapsed: Duration) { self.update_time(elapsed); - - let Self { - data_msg_lottery_interval, - data_msg_lottery, - persistent_sender, - persistent_transmission_messages, - crypto_processor, - blend_sender, - blend_messages, - .. - } = self; let waker = futures::task::noop_waker(); let mut cx = futures::task::Context::from_waker(&waker); - if let Poll::Ready(Some(_)) = pin::pin!(data_msg_lottery_interval).poll_next(&mut cx) { - if data_msg_lottery.run() { - // TODO: Include a meaningful information in the payload (such as, step_id) to - // measure the latency until the message reaches the last mix node. - let message = crypto_processor.wrap_message(&[1u8; 1024]).unwrap(); - persistent_sender.send(message).unwrap(); + if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { + if self.data_msg_lottery.run() { + let payload = Self::build_message_payload(); + let message = self.crypto_processor.wrap_message(&payload).unwrap(); + self.persistent_sender.send(message).unwrap(); } } - - // TODO: Generate cover message with probability - - let messages = self.network_interface.receive_messages(); - for message in messages { - println!(">>>>> Node {}, message: {message:?}", self.id); - blend_sender.send(message.into_payload().0).unwrap(); + let received_messages = self.receive(); + for message in received_messages { + // println!(">>>>> Node {}, message: {message:?}", self.id); + self.forward(message.clone()); + self.blend_sender.send(message.0).unwrap(); } // Proceed message blend - if let Poll::Ready(Some(msg)) = pin::pin!(blend_messages).poll_next(&mut cx) { + if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { match msg { MixOutgoingMessage::Outbound(msg) => { - persistent_sender.send(msg).unwrap(); + self.persistent_sender.send(msg).unwrap(); } MixOutgoingMessage::FullyUnwrapped(_) => { + tracing::info!("fully unwrapped message: Node:{}", self.id); self.state.num_messages_broadcasted += 1; //TODO: create a tracing event } } } + if let Poll::Ready(Some(msg)) = pin::pin!(&mut self.cover_traffic).poll_next(&mut cx) { + let message = self.crypto_processor.wrap_message(&msg).unwrap(); + self.persistent_sender.send(message).unwrap(); + } + // Proceed persistent transmission if let Poll::Ready(Some(msg)) = - pin::pin!(persistent_transmission_messages).poll_next(&mut cx) + pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) { self.forward(MixMessage(msg)); } self.state.step_id += 1; - self.state.mock_counter += 1; } fn analyze(&self, ward: &mut WardCondition) -> bool { match ward { - WardCondition::Max(condition) => self.state.mock_counter >= condition.max_count, + WardCondition::Max(_) => false, WardCondition::Sum(condition) => { - *condition.step_result.borrow_mut() += self.state.mock_counter; + *condition.step_result.borrow_mut() += self.state.num_messages_broadcasted; false } } diff --git a/simlib/mixnet-sims/src/node/mix/state.rs b/simlib/mixnet-sims/src/node/mix/state.rs index c1d57f2..ced1420 100644 --- a/simlib/mixnet-sims/src/node/mix/state.rs +++ b/simlib/mixnet-sims/src/node/mix/state.rs @@ -12,7 +12,6 @@ use netrunner::{ #[derive(Debug, Clone, Serialize)] pub struct MixnodeState { pub node_id: NodeId, - pub mock_counter: usize, pub step_id: usize, pub num_messages_broadcasted: usize, } @@ -22,6 +21,7 @@ pub struct MixnodeState { pub enum MixnodeRecord { Runtime(Runtime), Settings(Box), + #[allow(clippy::vec_box)] // we downcast stuff and we need the extra boxing Data(Vec>), }