diff --git a/mixnet/ordering/src/iteration.rs b/mixnet/ordering/src/iteration.rs index 637b0ce..d8c4356 100644 --- a/mixnet/ordering/src/iteration.rs +++ b/mixnet/ordering/src/iteration.rs @@ -81,6 +81,7 @@ impl Iteration { // Virtual discrete time let mut vtime: f32 = 0.0; + let mut recent_vtime_sent_data_msg_by_sender: f32 = 0.0; let mut recent_vtime_queue_data_msg_count_measured: f32 = 0.0; // Transmission interval that each queue must release a message let transmission_interval = 1.0 / paramset.transmission_rate as f32; @@ -89,6 +90,9 @@ impl Iteration { let all_sent_count_target = (paramset.num_sender_msgs as usize) .checked_mul(paramset.num_senders as usize) .unwrap(); + let data_sent_count_target = paramset + .num_sender_data_msgs + .map(|target| target.checked_mul(paramset.num_senders as u32).unwrap()); let mut sent_data_msgs: FxHashMap = FxHashMap::default(); let mut recv_data_msgs: FxHashMap = FxHashMap::default(); @@ -105,10 +109,19 @@ impl Iteration { ); // All senders emit a message (data or noise) to all of their own adjacent peers. - if all_sent_count < all_sent_count_target { + let need_to_send_more_by_sender = match data_sent_count_target { + Some(target) => sent_data_msgs.len() < target as usize, + None => all_sent_count < all_sent_count_target, + }; + if need_to_send_more_by_sender { // For each sender for (sender_idx, sender_peers) in all_sender_peers.iter() { - if Self::try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { + if Self::decide_to_send_data_msg_by_sender( + vtime, + recent_vtime_sent_data_msg_by_sender, + paramset, + &mut data_msg_rng, + ) { let msg = data_msg_gen.next(sender_idx); sender_peers.iter().for_each(|peer_id| { mixnodes @@ -117,7 +130,8 @@ impl Iteration { .receive(msg, None); }); sent_data_msgs.insert(msg, vtime); - outputs.add_sent_msg(&msg) + outputs.add_sent_msg(&msg); + recent_vtime_sent_data_msg_by_sender = vtime; } else { // Generate noise and add it to the sequence to calculate ordering coefficients later, // but don't need to send it to the mix nodes @@ -241,6 +255,21 @@ impl Iteration { ); rng.gen::() < prob } + + fn decide_to_send_data_msg_by_sender( + vtime: f32, + recent_vtime_sent_data_msg_by_sender: f32, + paramset: &ParamSet, + rng: &mut StdRng, + ) -> bool { + match paramset.sender_data_msg_interval { + Some(interval) => { + vtime - recent_vtime_sent_data_msg_by_sender >= interval + && Self::try_probability(rng, paramset.sender_data_msg_prob) + } + None => Self::try_probability(rng, paramset.sender_data_msg_prob), + } + } } struct AllMessagesToRelay(Vec<(NodeId, MessagesToRelay)>); diff --git a/mixnet/ordering/src/paramset.rs b/mixnet/ordering/src/paramset.rs index 3c81820..c390715 100644 --- a/mixnet/ordering/src/paramset.rs +++ b/mixnet/ordering/src/paramset.rs @@ -64,7 +64,9 @@ pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ "transmission_rate", "num_senders", "num_sender_msgs", + "num_sender_data_msgs", "sender_data_msg_prob", + "sender_data_msg_interval", "mix_data_msg_prob", "num_mixes_sending_data", "queue_type", @@ -82,7 +84,9 @@ pub struct ParamSet { pub transmission_rate: u16, pub num_senders: u8, pub num_sender_msgs: u32, + pub num_sender_data_msgs: Option, pub sender_data_msg_prob: f32, + pub sender_data_msg_interval: Option, pub mix_data_msg_prob: f32, pub num_mixes_sending_data: u32, pub queue_type: QueueType, @@ -168,7 +172,9 @@ impl ParamSet { transmission_rate, num_senders, num_sender_msgs, + num_sender_data_msgs: None, sender_data_msg_prob, + sender_data_msg_interval: None, mix_data_msg_prob, num_mixes_sending_data: num_mixes, queue_type, @@ -196,7 +202,9 @@ impl ParamSet { transmission_rate, num_senders, num_sender_msgs, + num_sender_data_msgs: None, sender_data_msg_prob, + sender_data_msg_interval: None, mix_data_msg_prob, num_mixes_sending_data: num_mixes, queue_type, @@ -241,7 +249,9 @@ impl ParamSet { transmission_rate, num_senders, num_sender_msgs, + num_sender_data_msgs: None, sender_data_msg_prob, + sender_data_msg_interval: None, mix_data_msg_prob: 1.0, num_mixes_sending_data, queue_type, @@ -307,7 +317,9 @@ impl ParamSet { ExperimentId::Experiment6 => 10000, _ => 1000000, }, + num_sender_data_msgs: None, sender_data_msg_prob, + sender_data_msg_interval: None, mix_data_msg_prob, num_mixes_sending_data: num_mixes, queue_type, @@ -331,29 +343,29 @@ impl ParamSet { let mut paramsets: Vec = Vec::new(); match exp_id { ExperimentId::Experiment6 => { - for num_mixes in [1000, 10000, 100000] { + for num_mixes in [100, 1000, 10000, 100000] { for peering_degree in [10, 9, 8, 7, 6, 5, 4, 3] { - for transmission_rate in [100, 50, 10] { - for mix_data_msg_prob in [0.01] { - let paramset = ParamSet { - id, - num_mixes, - num_paths: 0, // since we're gonna build random topology - random_topology: true, - peering_degree: PeeringDegree::Fixed(peering_degree), - min_queue_size: 10, - transmission_rate, - num_senders: 1, - num_sender_msgs: 10000, - sender_data_msg_prob: 0.01, - mix_data_msg_prob, - num_mixes_sending_data: num_mixes, // All mixes try to send data msg following mix_data_msg_prob - queue_type, - num_iterations: 10, - }; - id += 1; - paramsets.push(paramset); - } + for transmission_rate in [100, 70, 40, 10] { + let paramset = ParamSet { + id, + num_mixes, + num_paths: 0, // since we're gonna build random topology + random_topology: true, + peering_degree: PeeringDegree::Fixed(peering_degree), + min_queue_size: 10, + transmission_rate, + num_senders: 1, + num_sender_msgs: 0, + num_sender_data_msgs: Some(100), + sender_data_msg_prob: 1.0, + sender_data_msg_interval: Some(20.0), + mix_data_msg_prob: 1.0 / transmission_rate as f32, // to let mix send data_msg every 1s approx. + num_mixes_sending_data: num_mixes, // All mixes try to send data msg following mix_data_msg_prob + queue_type, + num_iterations: 10, + }; + id += 1; + paramsets.push(paramset); } } } @@ -390,7 +402,15 @@ impl ParamSet { self.transmission_rate.to_string(), self.num_senders.to_string(), self.num_sender_msgs.to_string(), + match self.num_sender_data_msgs { + Some(v) => v.to_string(), + None => "0".to_string(), + }, self.sender_data_msg_prob.to_string(), + match self.sender_data_msg_interval { + Some(v) => v.to_string(), + None => "0".to_string(), + }, self.mix_data_msg_prob.to_string(), self.num_mixes_sending_data.to_string(), format!("{:?}", self.queue_type), @@ -434,7 +454,7 @@ mod tests { ((ExperimentId::Experiment6, SessionId::Session3), 3 * 3), ( (ExperimentId::Experiment6, SessionId::Session100), - 1 * 10 * 6 * 7, + 4 * 8 * 4, ), ];