diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index e53b8d2..f4e0165 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -1,7 +1,7 @@ use std::path::Path; use protocol::{ - node::{MessageId, Node}, + node::{MessageId, Node, NodeId}, queue::{Message, QueueConfig, QueueType}, }; use rand::{rngs::StdRng, Rng, SeedableRng}; @@ -9,6 +9,8 @@ use rustc_hash::FxHashMap; use crate::{ordercoeff::Sequence, paramset::ParamSet}; +const RECEIVER_ID: NodeId = NodeId::MAX; + pub fn run_iteration( paramset: ParamSet, seed: u64, @@ -18,27 +20,88 @@ pub fn run_iteration( out_data_msg_counts_path: &str, out_ordering_coeff_path: &str, ) { + if paramset.random_topology { + run_iteration_with_random_topology( + paramset, + seed, + out_latency_path, + out_sent_sequence_path, + out_received_sequence_path, + out_data_msg_counts_path, + out_ordering_coeff_path, + ) + } else { + run_iteration_without_random_topology( + paramset, + seed, + out_latency_path, + out_sent_sequence_path, + out_received_sequence_path, + out_data_msg_counts_path, + out_ordering_coeff_path, + ) + } +} + +fn run_iteration_without_random_topology( + paramset: ParamSet, + seed: u64, + out_latency_path: &str, + out_sent_sequence_path: &str, + out_received_sequence_path_prefix: &str, + out_queue_data_msg_counts_path: &str, + out_ordering_coeff_path: &str, +) { + assert!(!paramset.random_topology); + // Ensure that all output files do not exist for path in &[ out_latency_path, out_sent_sequence_path, - out_received_sequence_path, - out_data_msg_counts_path, + out_received_sequence_path_prefix, + out_queue_data_msg_counts_path, ] { assert!(!Path::new(path).exists(), "File already exists: {path}"); } - // Initialize a mix node - let mut mixnode = Node::new( - QueueConfig { - queue_type: paramset.queue_type, - seed, - min_queue_size: paramset.min_queue_size, - }, - paramset.peering_degree, - false, - ); - mixnode.connect(u32::MAX); // connect to the virtual receiver node + // Initialize mix nodes + let mut next_node_id: NodeId = 0; + let mut mixnodes: FxHashMap = FxHashMap::default(); + let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); + for _ in 0..paramset.num_paths { + let mut ids = Vec::with_capacity(paramset.num_mixes as usize); + for _ in 0..paramset.num_mixes { + let id = next_node_id; + next_node_id += 1; + mixnodes.insert( + id, + Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed, + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + paramset.random_topology, // disable cache + ), + ); + ids.push(id); + } + paths.push(ids); + } + + // Connect mix nodes + for path in paths.iter() { + for (i, id) in path.iter().enumerate() { + if i != path.len() - 1 { + let peer_id = path[i + 1]; + mixnodes.get_mut(id).unwrap().connect(peer_id); + } else { + mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID); + } + } + } + let sender_peers: Vec = paths.iter().map(|path| path[0]).collect(); let mut next_msg_id: MessageId = 0; @@ -51,10 +114,10 @@ pub fn run_iteration( let mut sent_times: FxHashMap = FxHashMap::default(); let mut latencies: FxHashMap = FxHashMap::default(); let mut sent_sequence = Sequence::new(); - let mut received_sequence = Sequence::new(); - let mut data_msg_counts_in_queue: Vec = Vec::new(); + let mut received_sequences: FxHashMap = FxHashMap::default(); + let mut queue_data_msg_counts: Vec>> = Vec::new(); - let mut rng = StdRng::seed_from_u64(seed); + let mut data_msg_rng = StdRng::seed_from_u64(seed); loop { tracing::trace!( "VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}", @@ -64,48 +127,81 @@ pub fn run_iteration( latencies.len() ); - // The sender emits a message (data or noise) to the mix node. + // The sender emits a message (data or noise) to all adjacent peers. if all_sent_count < paramset.num_sender_msgs as usize { - if try_probability(&mut rng, paramset.sender_data_msg_prob) { + if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { let msg = next_msg_id; next_msg_id += 1; - mixnode.receive(msg, None); + sender_peers.iter().for_each(|peer_id| { + mixnodes.get_mut(peer_id).unwrap().receive(msg, None); + }); sent_times.insert(msg, vtime); sent_sequence.add_message(msg); } else { // Generate noise and add it to the sequence to calculate ordering coefficients later, - // but don't need to send it to the mix node - // because the mix node will anyway drop the noise, - // and we don't need to record what the mix node receives. + // but don't need to send it to the mix nodes + // because the mix nodes will anyway drop the noise, + // and we don't need to record what the mix nodes receive. sent_sequence.add_noise(); } all_sent_count += 1; } - // The mix node add a new data message to its queue with a certain probability - if try_probability(&mut rng, paramset.mix_data_msg_prob) { - mixnode.send(next_msg_id); - next_msg_id += 1; - // Don't put the msg into the sent_sequence - // because sent_sequence is only for recording messages sent by the sender, not the mixnode. + // Each mix node add a new data message to its queue with a certain probability + if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { + for (_, node) in mixnodes.iter_mut() { + node.send(next_msg_id); + next_msg_id += 1; + // Don't put the msg into the sent_sequence + // because sent_sequence is only for recording messages sent by the sender, not the mixnode. + } } - // The mix node emits a message (data or noise) to the receiver. + // Each mix node relays a message (data or noise) to the next mix node or the receiver. // As the receiver, record the time and order of the received messages. - // TODO: handle all queues - match mixnode.read_queues().first().unwrap().1 { - Message::Data(msg) => { - latencies.insert(msg, vtime - sent_times.get(&msg).unwrap()); - received_sequence.add_message(msg); - } - Message::Noise => { - received_sequence.add_noise(); - } + // + // source -> (destination, msg) + let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message)>)> = Vec::new(); + for (node_id, node) in mixnodes.iter_mut() { + all_msgs_to_relay.push((*node_id, node.read_queues())); } + all_msgs_to_relay + .into_iter() + .for_each(|(sender_id, msgs_to_relay)| { + msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { + if peer_id == RECEIVER_ID { + match msg { + Message::Data(msg) => { + latencies + .entry(msg) + .or_insert(vtime - sent_times.get(&msg).unwrap()); + received_sequences + .entry(sender_id) + .or_insert(Sequence::new()) + .add_message(msg); + } + Message::Noise => { + received_sequences + .entry(sender_id) + .or_insert(Sequence::new()) + .add_noise(); + } + } + } else if let Message::Data(msg) = msg { + mixnodes + .get_mut(&peer_id) + .unwrap() + .receive(msg, Some(sender_id)); + } + }); + }); - // Record the number of data messages in the mix node's queue - // TODO: handle all queues - data_msg_counts_in_queue.push(*mixnode.data_count_in_queue().first().unwrap()); + // Record the number of data messages in each mix node's queues + let mut counts: FxHashMap> = FxHashMap::default(); + mixnodes.iter().for_each(|(id, node)| { + counts.insert(*id, node.queue_data_msg_counts()); + }); + queue_data_msg_counts.push(counts); // If all data amessages (that the sender has to send) have been received by the receiver, // stop the iteration. @@ -121,29 +217,37 @@ pub fn run_iteration( // Save results to CSV files save_latencies(&latencies, &sent_times, out_latency_path); save_sequence(&sent_sequence, out_sent_sequence_path); - save_sequence(&received_sequence, out_received_sequence_path); - save_data_msg_counts( - &data_msg_counts_in_queue, + save_sequences(&received_sequences, out_received_sequence_path_prefix); + save_queue_data_msg_counts( + &queue_data_msg_counts, transmission_interval, - out_data_msg_counts_path, + out_queue_data_msg_counts_path, ); // Calculate ordering coefficients and save them to a CSV file. if paramset.queue_type != QueueType::NonMix { - let strong_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, true); - let weak_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, false); - tracing::info!( - "STRONG_COEFF:{}, WEAK_COEFF:{}", - strong_ordering_coeff, - weak_ordering_coeff - ); - save_ordering_coefficients( - strong_ordering_coeff, - weak_ordering_coeff, - out_ordering_coeff_path, - ); + let mut coeffs: Vec<[u64; 2]> = Vec::new(); + for (_, recv_seq) in received_sequences.iter() { + let casual = sent_sequence.ordering_coefficient(recv_seq, true); + let weak = sent_sequence.ordering_coefficient(recv_seq, false); + coeffs.push([casual, weak]); + } + save_ordering_coefficients(&coeffs, out_ordering_coeff_path); } } +fn run_iteration_with_random_topology( + paramset: ParamSet, + seed: u64, + out_latency_path: &str, + out_sent_sequence_path: &str, + out_received_sequence_path: &str, + out_data_msg_counts_path: &str, + out_ordering_coeff_path: &str, +) { + assert!(paramset.random_topology); + todo!() +} + fn try_probability(rng: &mut StdRng, prob: f32) -> bool { assert!( (0.0..=1.0).contains(&prob), @@ -174,42 +278,54 @@ fn save_latencies( writer.flush().unwrap(); } -fn save_sequence(sequence: &Sequence, path: &str) { +fn save_sequence(seq: &Sequence, path: &str) { let mut writer = csv::Writer::from_path(path).unwrap(); - sequence.iter().for_each(|entry| { + seq.iter().for_each(|entry| { writer.write_record([entry.to_string()]).unwrap(); }); writer.flush().unwrap(); } -fn save_data_msg_counts( - data_msg_counts_in_queue: &[usize], - interval: f32, - out_data_msg_counts_path: &str, -) { - let mut writer = csv::Writer::from_path(out_data_msg_counts_path).unwrap(); - writer - .write_record(["vtime", "data_msg_count_in_queue"]) - .unwrap(); - data_msg_counts_in_queue - .iter() - .enumerate() - .for_each(|(i, count)| { - writer - .write_record([(i as f64 * interval as f64).to_string(), count.to_string()]) - .unwrap(); +fn save_sequences(sequences: &FxHashMap, path_prefix: &str) { + sequences.iter().enumerate().for_each(|(i, (_, seq))| { + save_sequence(seq, &format!("{path_prefix}_{i}.csv")); + }); +} + +fn save_queue_data_msg_counts(data: &[FxHashMap>], interval: f32, path: &str) { + let mut writer = csv::Writer::from_path(path).unwrap(); + + let mut header = vec!["vtime".to_string()]; + data[0].iter().for_each(|(node_id, counts)| { + let num_queues = counts.len(); + (0..num_queues).for_each(|q_idx| { + header.push(format!("node{node_id}_q{q_idx}")); }); + }); + writer.write_record(header).unwrap(); + + data.iter().enumerate().for_each(|(i, counts_per_node)| { + let mut row = vec![(i as f64 * interval as f64).to_string()]; + counts_per_node.iter().for_each(|(_, counts)| { + row.extend( + counts + .iter() + .map(|count| count.to_string()) + .collect::>(), + ); + }); + writer.write_record(row).unwrap(); + }); writer.flush().unwrap(); } -fn save_ordering_coefficients(strong_ordering_coeff: u64, weak_ordering_coeff: u64, path: &str) { +fn save_ordering_coefficients(data: &[[u64; 2]], path: &str) { let mut writer = csv::Writer::from_path(path).unwrap(); - writer.write_record(["strong", "weak"]).unwrap(); - writer - .write_record([ - strong_ordering_coeff.to_string(), - weak_ordering_coeff.to_string(), - ]) - .unwrap(); + writer.write_record(["path", "casual", "weak"]).unwrap(); + for (path_idx, [casual, weak]) in data.iter().enumerate() { + writer + .write_record([path_idx.to_string(), casual.to_string(), weak.to_string()]) + .unwrap(); + } writer.flush().unwrap(); } diff --git a/mixnet-rs/ordering/src/ordercoeff.rs b/mixnet-rs/ordering/src/ordercoeff.rs index 7ee1836..873745f 100644 --- a/mixnet-rs/ordering/src/ordercoeff.rs +++ b/mixnet-rs/ordering/src/ordercoeff.rs @@ -2,9 +2,10 @@ use std::fmt::Display; use protocol::node::MessageId; +#[derive(Debug, Clone)] pub struct Sequence(Vec); -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum Entry { Data(MessageId), Noise(u32), // the number of consecutive noises @@ -52,13 +53,13 @@ impl Sequence { } impl Sequence { - pub fn ordering_coefficient(&self, other: &Sequence, strong: bool) -> u64 { + pub fn ordering_coefficient(&self, other: &Sequence, casual: bool) -> u64 { let mut coeff = 0; let mut i = 0; while i < self.0.len() { if let Entry::Data(_) = &self.0[i] { - let (c, next_i) = self.ordering_coefficient_from(i, other, strong); + let (c, next_i) = self.ordering_coefficient_from(i, other, casual); coeff += c; if next_i != i { @@ -78,7 +79,7 @@ impl Sequence { &self, start_idx: usize, other: &Sequence, - strong: bool, + casual: bool, ) -> (u64, usize) { let msg1 = match self.0[start_idx] { Entry::Data(msg) => msg, @@ -89,8 +90,8 @@ impl Sequence { if let Entry::Data(msg2) = entry { if msg1 == *msg2 { // Found the 1st matching msg. Start finding the next adjacent matching msg. - if strong { - return self.strong_ordering_coefficient_from(start_idx, other, j); + if casual { + return self.casual_ordering_coefficient_from(start_idx, other, j); } else { return self.weak_ordering_coefficient_from(start_idx, other, j); } @@ -100,7 +101,7 @@ impl Sequence { (0, start_idx) } - fn strong_ordering_coefficient_from( + fn casual_ordering_coefficient_from( &self, start_idx: usize, other: &Sequence, @@ -172,24 +173,24 @@ impl Sequence { mod tests { use super::*; - fn test_ordering_coefficient_common(strong: bool) { + fn test_ordering_coefficient_common(casual: bool) { // Case 0: Empty sequences let seq = Sequence(vec![]); - assert_eq!(seq.ordering_coefficient(&seq, strong), 0); + assert_eq!(seq.ordering_coefficient(&seq, casual), 0); // Case 1: Exact one matched pair with no noise let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]); - assert_eq!(seq.ordering_coefficient(&seq, strong), 1); + assert_eq!(seq.ordering_coefficient(&seq, casual), 1); // Case 2: Exact one matched pair with noise let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - assert_eq!(seq.ordering_coefficient(&seq, strong), 1); + assert_eq!(seq.ordering_coefficient(&seq, casual), 1); // Case 3: One matched pair with no noise let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]); let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1); // Case 4: One matched pair with noise let seq1 = Sequence(vec![ @@ -199,8 +200,8 @@ mod tests { Entry::Data(3), ]); let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1); // Case 5: Two matched pairs with noise let seq1 = Sequence(vec![ @@ -216,26 +217,26 @@ mod tests { Entry::Data(3), Entry::Data(4), ]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 2); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 2); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 2); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 2); // Case 6: Only partial match with no noise let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); // Case 7: Only partial match with noise let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); // Case 8: No match at all let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); // Case 9: Matches with noise but mixed orders let seq1 = Sequence(vec![ @@ -256,12 +257,12 @@ mod tests { Entry::Data(3), Entry::Data(6), ]); - assert_eq!(seq1.ordering_coefficient(&seq2, strong), 3); - assert_eq!(seq2.ordering_coefficient(&seq1, strong), 3); + assert_eq!(seq1.ordering_coefficient(&seq2, casual), 3); + assert_eq!(seq2.ordering_coefficient(&seq1, casual), 3); } #[test] - fn test_strong_ordering_coefficient() { + fn test_casual_ordering_coefficient() { test_ordering_coefficient_common(true); // Case 0: No match because of noise diff --git a/mixnet-rs/protocol/src/node.rs b/mixnet-rs/protocol/src/node.rs index 42f15c2..0495a0f 100644 --- a/mixnet-rs/protocol/src/node.rs +++ b/mixnet-rs/protocol/src/node.rs @@ -75,7 +75,7 @@ impl Node { msgs_to_relay } - pub fn data_count_in_queue(&self) -> Vec { + pub fn queue_data_msg_counts(&self) -> Vec { self.queues .iter() .map(|(_, queue)| queue.data_count())