From b36b8ac31602759117dc17dc58e5a151ea375d2e Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:03:40 +0200 Subject: [PATCH] fix: use Vec for deterministic results --- mixnet-rs/ordering/src/iteration.rs | 123 ++++++++++++++-------------- 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index ea7e2bf..5ed27d0 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -52,7 +52,8 @@ pub fn run_iteration( .checked_mul(paramset.num_senders as usize) .unwrap(); let mut sent_times: FxHashMap = FxHashMap::default(); - let mut latencies: FxHashMap = FxHashMap::default(); + let mut recv_times: FxHashMap = FxHashMap::default(); + let mut latencies: Vec<(MessageId, f32)> = Vec::new(); let mut sent_sequence = Sequence::new(); let mut received_sequences: FxHashMap = FxHashMap::default(); let mut unified_received_sequence = if paramset.random_topology { @@ -60,7 +61,7 @@ pub fn run_iteration( } else { None }; - let mut queue_data_msg_counts: Vec>> = Vec::new(); + let mut queue_data_msg_counts: Vec>> = Vec::new(); let mut data_msg_rng = StdRng::seed_from_u64(seed); loop { @@ -79,7 +80,10 @@ pub fn run_iteration( let msg = next_msg_id; next_msg_id += 1; sender_peers.iter().for_each(|peer_id| { - mixnodes.get_mut(peer_id).unwrap().receive(msg, None); + mixnodes + .get_mut(*peer_id as usize) + .unwrap() + .receive(msg, None); }); sent_times.insert(msg, vtime); sent_sequence.add_message(msg); @@ -95,7 +99,7 @@ pub fn run_iteration( } // Each mix node add a new data message to its queue with a certain probability - for (_, node) in mixnodes.iter_mut() { + for node in mixnodes.iter_mut() { if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { node.send(next_msg_id); next_msg_id += 1; @@ -109,8 +113,8 @@ pub fn run_iteration( // // source -> (destination, msg) let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message)>)> = Vec::new(); - for (node_id, node) in mixnodes.iter_mut() { - all_msgs_to_relay.push((*node_id, node.read_queues())); + for (node_id, node) in mixnodes.iter_mut().enumerate() { + all_msgs_to_relay.push((node_id.try_into().unwrap(), node.read_queues())); } all_msgs_to_relay .into_iter() @@ -122,8 +126,9 @@ pub fn run_iteration( // If msg was sent by the sender (not by any mix) if let Some(&sent_time) = sent_times.get(&msg) { // If this is the first time to see the msg - if let Entry::Vacant(e) = latencies.entry(msg) { - e.insert(vtime - sent_time); + if let Entry::Vacant(e) = recv_times.entry(msg) { + e.insert(vtime); + latencies.push((msg, vtime - sent_time)); if let Some(unified_recv_seq) = &mut unified_received_sequence { @@ -145,7 +150,7 @@ pub fn run_iteration( } } else if let Message::Data(msg) = msg { mixnodes - .get_mut(&peer_id) + .get_mut(peer_id as usize) .unwrap() .receive(msg, Some(mix_id)); } @@ -153,9 +158,9 @@ pub fn run_iteration( }); // Record the number of data messages in each mix node's queues - let mut counts: FxHashMap> = FxHashMap::default(); - mixnodes.iter().for_each(|(id, node)| { - counts.insert(*id, node.queue_data_msg_counts()); + let mut counts: Vec> = Vec::with_capacity(mixnodes.len()); + mixnodes.iter().for_each(|node| { + counts.push(node.queue_data_msg_counts()); }); queue_data_msg_counts.push(counts); @@ -169,8 +174,15 @@ pub fn run_iteration( } // Save results to CSV files - save_latencies(&latencies, &sent_times, out_latency_path); + save_latencies(&latencies, &sent_times, &recv_times, out_latency_path); save_sequence(&sent_sequence, out_sent_sequence_path); + // Sort received_sequences + let mut node_ids: Vec = received_sequences.keys().cloned().collect(); + node_ids.sort(); + let received_sequences: Vec = node_ids + .iter() + .map(|node_id| received_sequences.remove(node_id).unwrap()) + .collect(); save_sequences(&received_sequences, out_received_sequence_path_prefix); if let Some(unified_recv_seq) = &unified_received_sequence { save_sequence( @@ -191,7 +203,7 @@ pub fn run_iteration( save_ordering_coefficients(&[[casual, weak]], out_ordering_coeff_path); } else { let mut coeffs: Vec<[u64; 2]> = Vec::new(); - for (_, recv_seq) in received_sequences.iter() { + for recv_seq in received_sequences.iter() { let casual = sent_sequence.ordering_coefficient(recv_seq, true); let weak = sent_sequence.ordering_coefficient(recv_seq, false); coeffs.push([casual, weak]); @@ -201,32 +213,27 @@ pub fn run_iteration( } } -fn build_striped_network( - paramset: &ParamSet, - seed: u64, -) -> (FxHashMap, Vec>) { +fn build_striped_network(paramset: &ParamSet, seed: u64) -> (Vec, Vec>) { assert!(!paramset.random_topology); let mut next_node_id: NodeId = 0; let mut queue_seed_rng = StdRng::seed_from_u64(seed); - let mut mixnodes: FxHashMap = FxHashMap::default(); + let mut mixnodes: Vec = + Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize); let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); for _ in 0..paramset.num_paths { let mut ids = Vec::with_capacity(paramset.num_mixes as usize); for _ in 0..paramset.num_mixes { let id = next_node_id; next_node_id += 1; - mixnodes.insert( - id, - Node::new( - QueueConfig { - queue_type: paramset.queue_type, - seed: queue_seed_rng.next_u64(), - min_queue_size: paramset.min_queue_size, - }, - paramset.peering_degree, - false, // disable cache - ), - ); + mixnodes.push(Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + false, // disable cache + )); ids.push(id); } paths.push(ids); @@ -237,9 +244,9 @@ fn build_striped_network( for (i, id) in path.iter().enumerate() { if i != path.len() - 1 { let peer_id = path[i + 1]; - mixnodes.get_mut(id).unwrap().connect(peer_id); + mixnodes.get_mut(*id as usize).unwrap().connect(peer_id); } else { - mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); + mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID); } } } @@ -255,24 +262,21 @@ fn build_random_network( paramset: &ParamSet, seed: u64, out_topology_path: &str, -) -> (FxHashMap, Vec>) { +) -> (Vec, Vec>) { assert!(paramset.random_topology); // Init mix nodes let mut queue_seed_rng = StdRng::seed_from_u64(seed); - let mut mixnodes: FxHashMap = FxHashMap::default(); - for id in 0..paramset.num_mixes { - mixnodes.insert( - id, - Node::new( - QueueConfig { - queue_type: paramset.queue_type, - seed: queue_seed_rng.next_u64(), - min_queue_size: paramset.min_queue_size, - }, - paramset.peering_degree, - true, // enable cache - ), - ); + let mut mixnodes: Vec = Vec::with_capacity(paramset.num_mixes as usize); + for _ in 0..paramset.num_mixes { + mixnodes.push(Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + true, // enable cache + )); } // Choose sender's peers and receiver's peers randomly @@ -306,16 +310,13 @@ fn build_random_network( save_topology(&topology, out_topology_path).unwrap(); for (node_id, peers) in topology.iter().enumerate() { peers.iter().for_each(|peer_id| { - mixnodes - .get_mut(&(node_id.try_into().unwrap())) - .unwrap() - .connect(*peer_id); + mixnodes.get_mut(node_id).unwrap().connect(*peer_id); }); } // Connect the selected mix nodes with the receiver for id in receiver_peers.iter() { - mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); + mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID); } (mixnodes, sender_peers_list) @@ -330,8 +331,9 @@ fn try_probability(rng: &mut StdRng, prob: f32) -> bool { } fn save_latencies( - latencies: &FxHashMap, + latencies: &[(MessageId, f32)], sent_times: &FxHashMap, + recv_times: &FxHashMap, path: &str, ) { let mut writer = csv::Writer::from_path(path).unwrap(); @@ -340,12 +342,13 @@ fn save_latencies( .unwrap(); for (msg, latency) in latencies.iter() { let sent_time = sent_times.get(msg).unwrap(); + let recv_time = recv_times.get(msg).unwrap(); writer .write_record(&[ msg.to_string(), latency.to_string(), sent_time.to_string(), - (sent_time + latency).to_string(), + recv_time.to_string(), ]) .unwrap(); } @@ -360,17 +363,17 @@ fn save_sequence(seq: &Sequence, path: &str) { writer.flush().unwrap(); } -fn save_sequences(sequences: &FxHashMap, path_prefix: &str) { - sequences.iter().enumerate().for_each(|(i, (_, seq))| { +fn save_sequences(sequences: &[Sequence], path_prefix: &str) { + sequences.iter().enumerate().for_each(|(i, seq)| { save_sequence(seq, &format!("{path_prefix}_{i}.csv")); }); } -fn save_queue_data_msg_counts(data: &[FxHashMap>], interval: f32, path: &str) { +fn save_queue_data_msg_counts(data: &[Vec>], interval: f32, path: &str) { let mut writer = csv::Writer::from_path(path).unwrap(); let mut header = vec!["vtime".to_string()]; - data[0].iter().for_each(|(node_id, counts)| { + data[0].iter().enumerate().for_each(|(node_id, counts)| { let num_queues = counts.len(); (0..num_queues).for_each(|q_idx| { header.push(format!("node{node_id}_q{q_idx}")); @@ -380,7 +383,7 @@ fn save_queue_data_msg_counts(data: &[FxHashMap>], interval: data.iter().enumerate().for_each(|(i, counts_per_node)| { let mut row = vec![(i as f64 * interval as f64).to_string()]; - counts_per_node.iter().for_each(|(_, counts)| { + counts_per_node.iter().for_each(|counts| { row.extend( counts .iter()