diff --git a/network-runner/src/bin/app/main.rs b/network-runner/src/bin/app/main.rs index 82fedf5..d29456c 100644 --- a/network-runner/src/bin/app/main.rs +++ b/network-runner/src/bin/app/main.rs @@ -101,6 +101,8 @@ impl SimulationApp { .filter(|&id| id != &node_id) .copied() .choose_multiple(&mut rng, 3), + data_message_lottery_interval: Duration::from_secs(20), + stake_proportion: 1.0 / node_ids.len() as f64, seed: 0, persistent_transmission: PersistentTransmissionSettings { max_emission_frequency: 1.0, diff --git a/network-runner/src/node/mix/lottery.rs b/network-runner/src/node/mix/lottery.rs new file mode 100644 index 0000000..d0ed0b9 --- /dev/null +++ b/network-runner/src/node/mix/lottery.rs @@ -0,0 +1,22 @@ +use rand::Rng; + +pub struct StakeLottery { + rng: R, + stake_proportion: f64, +} + +impl StakeLottery +where + R: Rng, +{ + pub fn new(rng: R, stake_proportion: f64) -> Self { + Self { + rng, + stake_proportion, + } + } + + pub fn run(&mut self) -> bool { + self.rng.gen_range(0.0..1.0) < self.stake_proportion + } +} diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index ac68231..1b7b129 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,3 +1,4 @@ +mod lottery; mod consensus_streams; mod scheduler; pub mod state; @@ -7,6 +8,7 @@ use super::{Node, NodeId}; use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; use crossbeam::channel; use futures::Stream; +use lottery::StakeLottery; use multiaddr::Multiaddr; use nomos_mix::{ membership::Membership, @@ -19,7 +21,7 @@ use nomos_mix::{ MixOutgoingMessage, }; use nomos_mix_message::mock::MockMixMessage; -use rand::{Rng, RngCore, SeedableRng}; +use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use scheduler::{Interval, TemporalRelease}; use serde::Deserialize; @@ -43,6 +45,8 @@ impl PayloadSize for MixMessage { #[derive(Clone, Deserialize)] pub struct MixnodeSettings { pub connected_peers: Vec, + pub data_message_lottery_interval: Duration, + pub stake_proportion: f64, pub seed: u64, pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, @@ -56,7 +60,9 @@ pub struct MixNode { settings: MixnodeSettings, network_interface: InMemoryNetworkInterface, - msg_gen_rng: ChaCha12Rng, + data_msg_lottery_update_time_sender: channel::Sender, + data_msg_lottery_interval: Interval, + data_msg_lottery: StakeLottery, persistent_sender: channel::Sender>, persistent_update_time_sender: channel::Sender, @@ -85,6 +91,18 @@ impl MixNode { ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); + // Init Interval for data message lottery + let (data_msg_lottery_update_time_sender, data_msg_lottery_update_time_receiver) = + channel::unbounded(); + let data_msg_lottery_interval = Interval::new( + settings.data_message_lottery_interval, + data_msg_lottery_update_time_receiver, + ); + let data_msg_lottery = StakeLottery::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + settings.stake_proportion, + ); + // Init Tier-1: Persistent transmission let (persistent_sender, persistent_receiver) = channel::unbounded(); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); @@ -146,7 +164,9 @@ impl MixNode { step_id: 0, num_messages_broadcasted: 0, }, - msg_gen_rng: ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + data_msg_lottery_update_time_sender, + data_msg_lottery_interval, + data_msg_lottery, persistent_sender, persistent_update_time_sender, persistent_transmission_messages, @@ -165,6 +185,9 @@ impl MixNode { } fn update_time(&mut self, elapsed: Duration) { + self.data_msg_lottery_update_time_sender + .send(elapsed) + .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); self.blend_update_time_sender.send(elapsed).unwrap(); } @@ -187,6 +210,8 @@ impl Node for MixNode { self.update_time(elapsed); let Self { + data_msg_lottery_interval, + data_msg_lottery, persistent_sender, persistent_transmission_messages, crypto_processor, @@ -194,24 +219,26 @@ impl Node for MixNode { blend_messages, .. } = self; + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); - // Generate a message probabilistically (1 % chance) - // TODO: Replace this with the actual cover message generation - if self.msg_gen_rng.gen_range(0..100) == 0 { - let mut payload = [0u8; 1024]; - self.msg_gen_rng.fill_bytes(&mut payload); - let message = crypto_processor.wrap_message(&payload).unwrap(); - persistent_sender.send(message).unwrap(); + if let Poll::Ready(Some(_)) = pin::pin!(data_msg_lottery_interval).poll_next(&mut cx) { + if data_msg_lottery.run() { + // TODO: Include a meaningful information in the payload (such as, step_id) to + // measure the latency until the message reaches the last mix node. + let message = crypto_processor.wrap_message(&[1u8; 1024]).unwrap(); + persistent_sender.send(message).unwrap(); + } } + // TODO: Generate cover message with probability + let messages = self.network_interface.receive_messages(); for message in messages { println!(">>>>> Node {}, message: {message:?}", self.id); blend_sender.send(message.into_payload().0).unwrap(); } - let waker = futures::task::noop_waker(); - let mut cx = futures::task::Context::from_waker(&waker); // Proceed message blend if let Poll::Ready(Some(msg)) = pin::pin!(blend_messages).poll_next(&mut cx) { match msg {