From d0fe4abba987ed48d3154ef869bd3297d246597a Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 22 Aug 2024 21:15:49 +0200 Subject: [PATCH] fix: handle multiple senders for exp3~4 --- mixnet-rs/ordering/src/iteration.rs | 97 +++++++++++++++++------------ mixnet-rs/ordering/src/paramset.rs | 9 +++ 2 files changed, 65 insertions(+), 41 deletions(-) diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index 504145d..a96005a 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -34,7 +34,7 @@ pub fn run_iteration( assert!(!Path::new(path).exists(), "File already exists: {path}"); } - let (mut mixnodes, sender_peers) = if paramset.random_topology { + let (mut mixnodes, sender_peers_list) = if paramset.random_topology { build_random_network(¶mset, seed, out_topology_path) } else { build_striped_network(¶mset, seed) @@ -47,7 +47,10 @@ pub fn run_iteration( // Transmission interval that each queue must release a message let transmission_interval = 1.0 / paramset.transmission_rate as f32; // Results - let mut all_sent_count = 0; // all data + noise sent by the sender + let mut all_sent_count = 0; // all data + noise sent by all senders + let target_all_sent_count = (paramset.num_sender_msgs as usize) + .checked_mul(paramset.num_senders as usize) + .unwrap(); let mut sent_times: FxHashMap = FxHashMap::default(); let mut latencies: FxHashMap = FxHashMap::default(); let mut sent_sequence = Sequence::new(); @@ -65,33 +68,35 @@ pub fn run_iteration( latencies.len() ); - // The sender emits a message (data or noise) to all adjacent peers. - if all_sent_count < paramset.num_sender_msgs as usize { - if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { - let msg = next_msg_id; - next_msg_id += 1; - sender_peers.iter().for_each(|peer_id| { - mixnodes.get_mut(peer_id).unwrap().receive(msg, None); - }); - sent_times.insert(msg, vtime); - sent_sequence.add_message(msg); - } else { - // Generate noise and add it to the sequence to calculate ordering coefficients later, - // but don't need to send it to the mix nodes - // because the mix nodes will anyway drop the noise, - // and we don't need to record what the mix nodes receive. - sent_sequence.add_noise(); + // All senders emit a message (data or noise) to all of their own adjacent peers. + if all_sent_count < target_all_sent_count { + for sender_peers in sender_peers_list.iter() { + if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { + let msg = next_msg_id; + next_msg_id += 1; + sender_peers.iter().for_each(|peer_id| { + mixnodes.get_mut(peer_id).unwrap().receive(msg, None); + }); + sent_times.insert(msg, vtime); + sent_sequence.add_message(msg); + } else { + // Generate noise and add it to the sequence to calculate ordering coefficients later, + // but don't need to send it to the mix nodes + // because the mix nodes will anyway drop the noise, + // and we don't need to record what the mix nodes receive. + sent_sequence.add_noise(); + } + all_sent_count += 1; } - all_sent_count += 1; } // Each mix node add a new data message to its queue with a certain probability - if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { - for (_, node) in mixnodes.iter_mut() { + for (_, node) in mixnodes.iter_mut() { + if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { node.send(next_msg_id); next_msg_id += 1; // Don't put the msg into the sent_sequence - // because sent_sequence is only for recording messages sent by the sender, not the mixnode. + // because sent_sequence is only for recording messages sent by the senders, not the mixnode. } } @@ -105,7 +110,7 @@ pub fn run_iteration( } all_msgs_to_relay .into_iter() - .for_each(|(sender_id, msgs_to_relay)| { + .for_each(|(mix_id, msgs_to_relay)| { msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { if peer_id == RECEIVER_ID { match msg { @@ -118,14 +123,14 @@ pub fn run_iteration( unified_received_sequence.add_message(msg); } received_sequences - .entry(sender_id) + .entry(mix_id) .or_insert(Sequence::new()) .add_message(msg); } } Message::Noise => { received_sequences - .entry(sender_id) + .entry(mix_id) .or_insert(Sequence::new()) .add_noise(); } @@ -134,7 +139,7 @@ pub fn run_iteration( mixnodes .get_mut(&peer_id) .unwrap() - .receive(msg, Some(sender_id)); + .receive(msg, Some(mix_id)); } }); }); @@ -146,11 +151,9 @@ pub fn run_iteration( }); queue_data_msg_counts.push(counts); - // If all data amessages (that the sender has to send) have been received by the receiver, + // If all data messages (that have been sent by the senders) have been received by the receiver, // stop the iteration. - if all_sent_count == paramset.num_sender_msgs as usize - && sent_times.len() == latencies.len() - { + if all_sent_count == target_all_sent_count && sent_times.len() == latencies.len() { break; } @@ -188,7 +191,10 @@ pub fn run_iteration( } } -fn build_striped_network(paramset: &ParamSet, seed: u64) -> (FxHashMap, Vec) { +fn build_striped_network( + paramset: &ParamSet, + seed: u64, +) -> (FxHashMap, Vec>) { assert!(!paramset.random_topology); let mut next_node_id: NodeId = 0; let mut queue_seed_rng = StdRng::seed_from_u64(seed); @@ -227,15 +233,19 @@ fn build_striped_network(paramset: &ParamSet, seed: u64) -> (FxHashMap = paths.iter().map(|path| *path.first().unwrap()).collect(); - (mixnodes, sender_peers) + let sender_peers_list: Vec> = + vec![ + paths.iter().map(|path| *path.first().unwrap()).collect(); + paramset.num_senders as usize + ]; + (mixnodes, sender_peers_list) } fn build_random_network( paramset: &ParamSet, seed: u64, out_topology_path: &str, -) -> (FxHashMap, Vec) { +) -> (FxHashMap, Vec>) { assert!(paramset.random_topology); // Init mix nodes let mut queue_seed_rng = StdRng::seed_from_u64(seed); @@ -259,12 +269,17 @@ fn build_random_network( let mut peers_rng = StdRng::seed_from_u64(seed); let mut candidates: Vec = (0..paramset.num_mixes).collect(); assert!(candidates.len() >= paramset.peering_degree as usize); - candidates.as_mut_slice().shuffle(&mut peers_rng); - let sender_peers: Vec = candidates - .iter() - .cloned() - .take(paramset.peering_degree as usize) - .collect(); + let mut sender_peers_list: Vec> = Vec::with_capacity(paramset.num_senders as usize); + for _ in 0..paramset.num_senders { + candidates.as_mut_slice().shuffle(&mut peers_rng); + sender_peers_list.push( + candidates + .iter() + .cloned() + .take(paramset.peering_degree as usize) + .collect(), + ); + } candidates.as_mut_slice().shuffle(&mut peers_rng); let receiver_peers: Vec = candidates .iter() @@ -293,7 +308,7 @@ fn build_random_network( mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); } - (mixnodes, sender_peers) + (mixnodes, sender_peers_list) } fn try_probability(rng: &mut StdRng, prob: f32) -> bool { diff --git a/mixnet-rs/ordering/src/paramset.rs b/mixnet-rs/ordering/src/paramset.rs index e211b38..36d3491 100644 --- a/mixnet-rs/ordering/src/paramset.rs +++ b/mixnet-rs/ordering/src/paramset.rs @@ -52,6 +52,7 @@ pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ "peering_degree", "min_queue_size", "transmission_rate", + "num_senders", "num_sender_msgs", "sender_data_msg_prob", "mix_data_msg_prob", @@ -68,6 +69,7 @@ pub struct ParamSet { pub peering_degree: u32, pub min_queue_size: u16, pub transmission_rate: u16, + pub num_senders: u32, pub num_sender_msgs: u32, pub sender_data_msg_prob: f32, pub mix_data_msg_prob: f32, @@ -89,6 +91,10 @@ impl ParamSet { fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { let transmission_rate: u16 = 1; let min_queue_size: u16 = 10; + let num_senders: u32 = match exp_id { + ExperimentId::Experiment3 | ExperimentId::Experiment4 => 2, + _ => 1, + }; let num_sender_msgs: u32 = 1000000; let sender_data_msg_probs: &[f32] = &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0]; let mix_data_msg_probs: &[f32] = match exp_id { @@ -119,6 +125,7 @@ impl ParamSet { peering_degree: 1, min_queue_size, transmission_rate, + num_senders, num_sender_msgs, sender_data_msg_prob, mix_data_msg_prob, @@ -145,6 +152,7 @@ impl ParamSet { peering_degree, min_queue_size, transmission_rate, + num_senders, num_sender_msgs, sender_data_msg_prob, mix_data_msg_prob, @@ -172,6 +180,7 @@ impl ParamSet { self.peering_degree.to_string(), self.min_queue_size.to_string(), self.transmission_rate.to_string(), + self.num_senders.to_string(), self.num_sender_msgs.to_string(), self.sender_data_msg_prob.to_string(), self.mix_data_msg_prob.to_string(),