From 3d1bd0dc18e2f855c3fc88af11690f6100a426de Mon Sep 17 00:00:00 2001 From: Daniel Sanchez <3danimanimal@gmail.com> Date: Fri, 8 Nov 2024 05:09:46 +0100 Subject: [PATCH] Include cover traffic (#31) * Include cover traffic * Remove branch from cargo * Fix sim configuration * Fix rebase * Clippy happy --- simlib/mixnet-sims/Cargo.toml | 6 +- simlib/mixnet-sims/src/main.rs | 13 ++++- .../src/node/mix/consensus_streams.rs | 4 +- simlib/mixnet-sims/src/node/mix/mod.rs | 57 +++++++++++++++---- simlib/mixnet-sims/src/node/mix/state.rs | 1 + 5 files changed, 62 insertions(+), 19 deletions(-) diff --git a/simlib/mixnet-sims/Cargo.toml b/simlib/mixnet-sims/Cargo.toml index 37dd82b..33a6759 100644 --- a/simlib/mixnet-sims/Cargo.toml +++ b/simlib/mixnet-sims/Cargo.toml @@ -16,9 +16,9 @@ serde_json = "1.0.132" tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } netrunner = { path = "../netrunner" } -nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git", rev = "7b984a4" } -nomos-mix = { git = "https://github.com/logos-co/nomos-node", rev = "7b984a4", package = "nomos-mix" } -nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", rev = "7b984a4", package = "nomos-mix-message" } +nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } +nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" } +nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" } futures = "0.3.31" rand_chacha = "0.3" multiaddr = "0.18" diff --git a/simlib/mixnet-sims/src/main.rs b/simlib/mixnet-sims/src/main.rs index c1f37cc..ba384e0 100644 --- a/simlib/mixnet-sims/src/main.rs +++ b/simlib/mixnet-sims/src/main.rs @@ -16,6 +16,7 @@ use netrunner::node::{NodeId, NodeIdExt}; use netrunner::output_processors::Record; use netrunner::runner::{BoxedNode, SimulationRunnerHandle}; use netrunner::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType}; +use nomos_mix::cover_traffic::CoverTrafficSettings; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; @@ -102,6 +103,8 @@ impl SimulationApp { data_message_lottery_interval: Duration::from_secs(20), stake_proportion: 1.0 / node_ids.len() as f64, seed: 0, + epoch_duration: Duration::from_secs(86400 * 5), // 5 days seconds + slot_duration: Duration::from_secs(20), persistent_transmission: PersistentTransmissionSettings { max_emission_frequency: 1.0, drop_message_probability: 0.0, @@ -109,12 +112,18 @@ impl SimulationApp { message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { private_key: node_id.into(), - num_mix_layers: 1, + num_mix_layers: 4, }, temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 10, }, }, + cover_traffic_settings: CoverTrafficSettings { + node_id: node_id.0, + number_of_hops: 4, + slots_per_epoch: 21600, + network_size: node_ids.len(), + }, membership: node_ids.iter().map(|&id| id.into()).collect(), }, ) @@ -232,7 +241,7 @@ fn load_json_from_file(path: &Path) -> anyhow::Result { fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); - let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); + let _maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics); if let Err(e) = app.run() { tracing::error!("error: {}", e); diff --git a/simlib/mixnet-sims/src/node/mix/consensus_streams.rs b/simlib/mixnet-sims/src/node/mix/consensus_streams.rs index 8f534a7..b31a41a 100644 --- a/simlib/mixnet-sims/src/node/mix/consensus_streams.rs +++ b/simlib/mixnet-sims/src/node/mix/consensus_streams.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use std::time::Duration; pub struct CounterInterval { - interval: Box + Unpin>, + interval: Box + Unpin + Send + Sync>, } impl CounterInterval { @@ -31,7 +31,7 @@ impl Stream for CounterInterval { pub type Epoch = CounterInterval; pub struct Slot { - interval: Box + Unpin>, + interval: Box + Unpin + Send + Sync>, } impl Slot { diff --git a/simlib/mixnet-sims/src/node/mix/mod.rs b/simlib/mixnet-sims/src/node/mix/mod.rs index e644e1a..6a8fedc 100644 --- a/simlib/mixnet-sims/src/node/mix/mod.rs +++ b/simlib/mixnet-sims/src/node/mix/mod.rs @@ -1,9 +1,10 @@ -mod consensus_streams; -mod lottery; -mod scheduler; +pub mod consensus_streams; +pub mod lottery; +pub mod scheduler; pub mod state; pub mod stream_wrapper; +use crate::node::mix::consensus_streams::{Epoch, Slot}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; @@ -14,6 +15,7 @@ use netrunner::{ warding::WardCondition, }; use nomos_mix::{ + cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, message_blend::{ crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, @@ -31,7 +33,12 @@ use serde::Deserialize; use sha2::{Digest, Sha256}; use state::MixnodeState; use std::collections::HashSet; -use std::{pin::pin, task::Poll, time::Duration}; +use std::pin::pin; +use std::{ + pin::{self}, + task::Poll, + time::Duration, +}; use stream_wrapper::CrossbeamReceiverStream; use uuid::Uuid; @@ -44,14 +51,17 @@ impl PayloadSize for MixMessage { } } -#[derive(Clone, Deserialize)] +#[derive(Deserialize)] pub struct MixnodeSettings { pub connected_peers: Vec, pub data_message_lottery_interval: Duration, pub stake_proportion: f64, pub seed: u64, + pub epoch_duration: Duration, + pub slot_duration: Duration, pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, + pub cover_traffic_settings: CoverTrafficSettings, pub membership: Vec<::PublicKey>, } @@ -86,6 +96,9 @@ pub struct MixNode { MockMixMessage, TemporalRelease, >, + epoch_update_sender: channel::Sender, + slot_update_sender: channel::Sender, + cover_traffic: CoverTraffic, } impl MixNode { @@ -159,6 +172,19 @@ impl MixNode { ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), ); + // tier 3 cover traffic + let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); + let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); + let cover_traffic: CoverTraffic = CoverTraffic::new( + settings.cover_traffic_settings, + Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), + Slot::new( + settings.cover_traffic_settings.slots_per_epoch, + settings.slot_duration, + slot_updater_update_receiver, + ), + ); + Self { id, network_interface, @@ -179,6 +205,9 @@ impl MixNode { blend_sender, blend_update_time_sender, blend_messages, + epoch_update_sender, + slot_update_sender, + cover_traffic, } } @@ -214,9 +243,11 @@ impl MixNode { .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); self.blend_update_time_sender.send(elapsed).unwrap(); + self.epoch_update_sender.send(elapsed).unwrap(); + self.slot_update_sender.send(elapsed).unwrap(); } - fn build_message_payload(&self) -> [u8; 16] { + fn build_message_payload() -> [u8; 16] { Uuid::new_v4().into_bytes() } } @@ -236,21 +267,18 @@ impl Node for MixNode { fn step(&mut self, elapsed: Duration) { self.update_time(elapsed); - let waker = futures::task::noop_waker(); let mut cx = futures::task::Context::from_waker(&waker); if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { if self.data_msg_lottery.run() { - let payload = self.build_message_payload(); + let payload = Self::build_message_payload(); let message = self.crypto_processor.wrap_message(&payload).unwrap(); self.persistent_sender.send(message).unwrap(); } } - - // TODO: Generate cover message with probability - - for message in self.receive() { + let received_messages = self.receive(); + for message in received_messages { // println!(">>>>> Node {}, message: {message:?}", self.id); self.forward(message.clone()); self.blend_sender.send(message.0).unwrap(); @@ -269,6 +297,11 @@ impl Node for MixNode { } } } + if let Poll::Ready(Some(msg)) = pin::pin!(&mut self.cover_traffic).poll_next(&mut cx) { + let message = self.crypto_processor.wrap_message(&msg).unwrap(); + self.persistent_sender.send(message).unwrap(); + } + // Proceed persistent transmission if let Poll::Ready(Some(msg)) = pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) diff --git a/simlib/mixnet-sims/src/node/mix/state.rs b/simlib/mixnet-sims/src/node/mix/state.rs index 644b011..ced1420 100644 --- a/simlib/mixnet-sims/src/node/mix/state.rs +++ b/simlib/mixnet-sims/src/node/mix/state.rs @@ -21,6 +21,7 @@ pub struct MixnodeState { pub enum MixnodeRecord { Runtime(Runtime), Settings(Box), + #[allow(clippy::vec_box)] // we downcast stuff and we need the extra boxing Data(Vec>), }