diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs index e6c0757..3b0649e 100644 --- a/mixnet-rs/dissemination/src/iteration.rs +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -1,5 +1,5 @@ use protocol::{ - node::{MessageId, Node, NodeId}, + node::{Node, NodeId}, queue::{Message, QueueConfig}, topology::{build_topology, save_topology}, }; @@ -8,18 +8,21 @@ use rustc_hash::FxHashMap; use crate::paramset::ParamSet; +type MessageId = u32; + // An interval that the sender nodes send (schedule) new messages const MSG_INTERVAL: f32 = 1.0; pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) { // Initialize nodes (not connected with each other yet) - let mut nodes: Vec = Vec::new(); + let mut nodes: Vec> = Vec::new(); let mut queue_seed_rng = StdRng::seed_from_u64(seed); let peering_degrees = paramset.gen_peering_degrees(seed); tracing::debug!("PeeringDegrees initialized."); for node_id in 0..paramset.num_nodes { nodes.push(Node::new( + node_id, QueueConfig { queue_type: paramset.queue_type, seed: queue_seed_rng.next_u64(), @@ -112,7 +115,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology fn send_messages( vtime: f32, sender_ids: &[NodeId], - nodes: &mut [Node], + nodes: &mut [Node], next_msg_id: &mut MessageId, message_tracker: &mut FxHashMap, ) { @@ -125,7 +128,7 @@ fn send_messages( fn relay_messages( vtime: f32, - nodes: &mut [Node], + nodes: &mut [Node], message_tracker: &mut FxHashMap, num_disseminated_msgs: &mut usize, writer: &mut csv::Writer, diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index 77337c2..6f75d40 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -1,49 +1,30 @@ -use std::{collections::hash_map::Entry, fs::File, path::Path}; +use std::collections::hash_map::Entry; -use csv::Writer; -use protocol::{ - node::{MessageId, Node, NodeId}, - queue::{Message, QueueConfig, QueueType}, - topology::{build_topology, save_topology}, -}; -use rand::{rngs::StdRng, seq::SliceRandom, Rng, RngCore, SeedableRng}; +use protocol::{node::NodeId, queue::Message}; +use rand::{rngs::StdRng, Rng, SeedableRng}; use rustc_hash::FxHashMap; -use crate::{ordercoeff::Sequence, paramset::ParamSet}; +use crate::{ + message::{DataMessage, DataMessageGenerator}, + outputs::Outputs, + paramset::ParamSet, + topology::{build_random_network, build_striped_network, RECEIVER_NODE_ID}, +}; -const RECEIVER_ID: NodeId = NodeId::MAX; - -pub fn run_iteration( - paramset: ParamSet, - seed: u64, - out_latency_path: &str, - out_sent_sequence_path: &str, - out_received_sequence_path_prefix: &str, - out_queue_data_msg_counts_path: &str, - out_ordering_coeff_path: Option, - out_topology_path: &str, -) -> f32 { - // Ensure that all output files do not exist - for path in &[ - out_latency_path, - out_sent_sequence_path, - out_received_sequence_path_prefix, - out_queue_data_msg_counts_path, - out_topology_path, - ] { - assert!(!Path::new(path).exists(), "File already exists: {path}"); - } - if let Some(path) = &out_ordering_coeff_path { - assert!(!Path::new(path).exists(), "File already exists: {path}"); - } - - let (mut mixnodes, sender_peers_list) = if paramset.random_topology { - build_random_network(¶mset, seed, out_topology_path) +pub fn run_iteration(paramset: ParamSet, seed: u64, outputs: &mut Outputs) -> f32 { + let (mut mixnodes, sender_peers_list, receiver_peer_conn_idx) = if paramset.random_topology { + build_random_network(¶mset, seed, outputs) } else { build_striped_network(¶mset, seed) }; + // Check node ID consistency + for (i, node) in mixnodes.iter().enumerate() { + assert_eq!(node.id as usize, i); + } - let mut next_msg_id: MessageId = 0; + // For N senders + 1 mix (all mixnodes will share the same sender ID) + let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1); + let mix_msg_sender_id = paramset.num_senders; // Virtual discrete time let mut vtime: f32 = 0.0; @@ -51,21 +32,13 @@ pub fn run_iteration( let transmission_interval = 1.0 / paramset.transmission_rate as f32; // Results let mut all_sent_count = 0; // all data + noise sent by all senders - let target_all_sent_count = (paramset.num_sender_msgs as usize) + let all_sent_count_target = (paramset.num_sender_msgs as usize) .checked_mul(paramset.num_senders as usize) .unwrap(); - let mut sent_times: FxHashMap = FxHashMap::default(); - let mut recv_times: FxHashMap = FxHashMap::default(); - let mut latencies: Vec<(MessageId, f32)> = Vec::new(); - let mut sent_sequence = Sequence::new(); - let mut received_sequences: FxHashMap = FxHashMap::default(); - let mut unified_received_sequence = if paramset.random_topology { - Some(Sequence::new()) - } else { - None - }; - let mut queue_data_msg_counts_writer = - new_queue_data_msg_counts_writer(out_queue_data_msg_counts_path, &mixnodes); + let mut sent_data_msgs: FxHashMap = FxHashMap::default(); + let mut recv_data_msgs: FxHashMap = FxHashMap::default(); + + outputs.write_header_queue_data_msg_counts(&mixnodes); let mut data_msg_rng = StdRng::seed_from_u64(seed); loop { @@ -73,254 +46,104 @@ pub fn run_iteration( "VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}", vtime, all_sent_count, - sent_times.len(), - latencies.len() + sent_data_msgs.len(), + recv_data_msgs.len(), ); // All senders emit a message (data or noise) to all of their own adjacent peers. - if all_sent_count < target_all_sent_count { - for sender_peers in sender_peers_list.iter() { + if all_sent_count < all_sent_count_target { + // For each sender + for (sender_idx, sender_peers) in sender_peers_list.iter().enumerate() { if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { - let msg = next_msg_id; - next_msg_id += 1; + let msg = data_msg_gen.next(sender_idx.try_into().unwrap()); sender_peers.iter().for_each(|peer_id| { mixnodes .get_mut(*peer_id as usize) .unwrap() .receive(msg, None); }); - sent_times.insert(msg, vtime); - sent_sequence.add_message(msg); + sent_data_msgs.insert(msg, vtime); + outputs.add_sent_msg(&msg) } else { // Generate noise and add it to the sequence to calculate ordering coefficients later, // but don't need to send it to the mix nodes // because the mix nodes will anyway drop the noise, // and we don't need to record what the mix nodes receive. - sent_sequence.add_noise(); + outputs.add_sent_noise(sender_idx); } all_sent_count += 1; } } // Each mix node add a new data message to its queue with a certain probability - for node in mixnodes.iter_mut() { - if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { - node.send(next_msg_id); - next_msg_id += 1; - // Don't put the msg into the sent_sequence - // because sent_sequence is only for recording messages sent by the senders, not the mixnode. + if paramset.mix_data_msg_prob > 0.0 { + for node in mixnodes.iter_mut() { + if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { + node.send(data_msg_gen.next(mix_msg_sender_id)); + // We don't put the msg into the sent_sequence + // because sent_sequence is only for recording messages sent by the senders, not the mixnode. + } } } // Each mix node relays a message (data or noise) to the next mix node or the receiver. // As the receiver, record the time and order of the received messages. // - // source -> (destination, msg) - let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message)>)> = Vec::new(); - for (node_id, node) in mixnodes.iter_mut().enumerate() { - all_msgs_to_relay.push((node_id.try_into().unwrap(), node.read_queues())); + // relayer_id -> (peer_id, msg) + let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message)>)> = Vec::new(); + for node in mixnodes.iter_mut() { + all_msgs_to_relay.push((node.id, node.read_queues())); } all_msgs_to_relay .into_iter() - .for_each(|(mix_id, msgs_to_relay)| { + .for_each(|(relayer_id, msgs_to_relay)| { msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { - if peer_id == RECEIVER_ID { + if peer_id == RECEIVER_NODE_ID { match msg { Message::Data(msg) => { // If msg was sent by the sender (not by any mix) - if let Some(&sent_time) = sent_times.get(&msg) { - // If this is the first time to see the msg - if let Entry::Vacant(e) = recv_times.entry(msg) { + if let Some(&sent_time) = sent_data_msgs.get(&msg) { + // If this is the first time to see the msg, + // update stats that must ignore duplicate messages. + if let Entry::Vacant(e) = recv_data_msgs.entry(msg) { e.insert(vtime); - latencies.push((msg, vtime - sent_time)); - if let Some(unified_recv_seq) = - &mut unified_received_sequence - { - unified_recv_seq.add_message(msg); - } + outputs.add_latency(&msg, sent_time, vtime); } - received_sequences - .entry(mix_id) - .or_insert(Sequence::new()) - .add_message(msg); } + // Record msg to the sequence + let conn_idx = receiver_peer_conn_idx.get(&relayer_id).unwrap(); + outputs.add_recv_msg(&msg, *conn_idx as usize); } Message::Noise => { - received_sequences - .entry(mix_id) - .or_insert(Sequence::new()) - .add_noise(); + // Record noise to the sequence + let conn_idx = receiver_peer_conn_idx.get(&relayer_id).unwrap(); + outputs.add_recv_noise(*conn_idx as usize); } } } else if let Message::Data(msg) = msg { - mixnodes - .get_mut(peer_id as usize) - .unwrap() - .receive(msg, Some(mix_id)); + let peer = mixnodes.get_mut(peer_id as usize).unwrap(); + assert_eq!(peer.id, peer_id); + peer.receive(msg, Some(relayer_id)); } }); }); // Record the number of data messages in each mix node's queues - append_queue_data_msg_counts(&mixnodes, vtime, &mut queue_data_msg_counts_writer); + outputs.add_queue_data_msg_counts(vtime, &mixnodes); - // If all data messages (that have been sent by the senders) have been received by the receiver, + // If all senders finally emitted all data+noise messages, + // and If all data messages have been received by the receiver, // stop the iteration. - if all_sent_count == target_all_sent_count && sent_times.len() == latencies.len() { + if all_sent_count == all_sent_count_target && sent_data_msgs.len() == recv_data_msgs.len() { break; } vtime += transmission_interval; } - // Save results to CSV files - save_latencies(&latencies, &sent_times, &recv_times, out_latency_path); - save_sequence(&sent_sequence, out_sent_sequence_path); - // Sort received_sequences - let mut node_ids: Vec = received_sequences.keys().cloned().collect(); - node_ids.sort(); - let received_sequences: Vec = node_ids - .iter() - .map(|node_id| received_sequences.remove(node_id).unwrap()) - .collect(); - save_sequences(&received_sequences, out_received_sequence_path_prefix); - if let Some(unified_recv_seq) = &unified_received_sequence { - save_sequence( - unified_recv_seq, - format!("{out_received_sequence_path_prefix}_unified.csv").as_str(), - ); - } - // Calculate ordering coefficients and save them to a CSV file (if enabled) - if let Some(out_ordering_coeff_path) = &out_ordering_coeff_path { - if paramset.queue_type != QueueType::NonMix { - if let Some(unified_recv_seq) = &unified_received_sequence { - let casual = sent_sequence.ordering_coefficient(unified_recv_seq, true); - let weak = sent_sequence.ordering_coefficient(unified_recv_seq, false); - save_ordering_coefficients(&[[casual, weak]], out_ordering_coeff_path); - } else { - let mut coeffs: Vec<[u64; 2]> = Vec::new(); - for recv_seq in received_sequences.iter() { - let casual = sent_sequence.ordering_coefficient(recv_seq, true); - let weak = sent_sequence.ordering_coefficient(recv_seq, false); - coeffs.push([casual, weak]); - } - save_ordering_coefficients(&coeffs, out_ordering_coeff_path); - } - } - } - vtime } -fn build_striped_network(paramset: &ParamSet, seed: u64) -> (Vec, Vec>) { - assert!(!paramset.random_topology); - let mut next_node_id: NodeId = 0; - let mut queue_seed_rng = StdRng::seed_from_u64(seed); - let mut mixnodes: Vec = - Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize); - let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); - for _ in 0..paramset.num_paths { - let mut ids = Vec::with_capacity(paramset.num_mixes as usize); - for _ in 0..paramset.num_mixes { - let id = next_node_id; - next_node_id += 1; - mixnodes.push(Node::new( - QueueConfig { - queue_type: paramset.queue_type, - seed: queue_seed_rng.next_u64(), - min_queue_size: paramset.min_queue_size, - }, - paramset.peering_degree, - false, // disable cache - )); - ids.push(id); - } - paths.push(ids); - } - - // Connect mix nodes - for path in paths.iter() { - for (i, id) in path.iter().enumerate() { - if i != path.len() - 1 { - let peer_id = path[i + 1]; - mixnodes.get_mut(*id as usize).unwrap().connect(peer_id); - } else { - mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID); - } - } - } - let sender_peers_list: Vec> = - vec![ - paths.iter().map(|path| *path.first().unwrap()).collect(); - paramset.num_senders as usize - ]; - (mixnodes, sender_peers_list) -} - -fn build_random_network( - paramset: &ParamSet, - seed: u64, - out_topology_path: &str, -) -> (Vec, Vec>) { - assert!(paramset.random_topology); - // Init mix nodes - let mut queue_seed_rng = StdRng::seed_from_u64(seed); - let mut mixnodes: Vec = Vec::with_capacity(paramset.num_mixes as usize); - for _ in 0..paramset.num_mixes { - mixnodes.push(Node::new( - QueueConfig { - queue_type: paramset.queue_type, - seed: queue_seed_rng.next_u64(), - min_queue_size: paramset.min_queue_size, - }, - paramset.peering_degree, - true, // enable cache - )); - } - - // Choose sender's peers and receiver's peers randomly - let mut peers_rng = StdRng::seed_from_u64(seed); - let mut candidates: Vec = (0..paramset.num_mixes).collect(); - assert!(candidates.len() >= paramset.peering_degree as usize); - let mut sender_peers_list: Vec> = Vec::with_capacity(paramset.num_senders as usize); - for _ in 0..paramset.num_senders { - candidates.as_mut_slice().shuffle(&mut peers_rng); - sender_peers_list.push( - candidates - .iter() - .cloned() - .take(paramset.peering_degree as usize) - .collect(), - ); - } - candidates.as_mut_slice().shuffle(&mut peers_rng); - let receiver_peers: Vec = candidates - .iter() - .cloned() - .take(paramset.peering_degree as usize) - .collect(); - - // Connect mix nodes - let topology = build_topology( - paramset.num_mixes, - &vec![paramset.peering_degree; paramset.num_mixes as usize], - seed, - ); - save_topology(&topology, out_topology_path).unwrap(); - for (node_id, peers) in topology.iter().enumerate() { - peers.iter().for_each(|peer_id| { - mixnodes.get_mut(node_id).unwrap().connect(*peer_id); - }); - } - - // Connect the selected mix nodes with the receiver - for id in receiver_peers.iter() { - mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID); - } - - (mixnodes, sender_peers_list) -} - fn try_probability(rng: &mut StdRng, prob: f32) -> bool { assert!( (0.0..=1.0).contains(&prob), @@ -328,87 +151,3 @@ fn try_probability(rng: &mut StdRng, prob: f32) -> bool { ); rng.gen::() < prob } - -fn save_latencies( - latencies: &[(MessageId, f32)], - sent_times: &FxHashMap, - recv_times: &FxHashMap, - path: &str, -) { - let mut writer = csv::Writer::from_path(path).unwrap(); - writer - .write_record(["msg_id", "latency", "sent_time", "received_time"]) - .unwrap(); - for (msg, latency) in latencies.iter() { - let sent_time = sent_times.get(msg).unwrap(); - let recv_time = recv_times.get(msg).unwrap(); - writer - .write_record(&[ - msg.to_string(), - latency.to_string(), - sent_time.to_string(), - recv_time.to_string(), - ]) - .unwrap(); - } - writer.flush().unwrap(); -} - -fn save_sequence(seq: &Sequence, path: &str) { - let mut writer = csv::Writer::from_path(path).unwrap(); - seq.iter().for_each(|entry| { - writer.write_record([entry.to_string()]).unwrap(); - }); - writer.flush().unwrap(); -} - -fn save_sequences(sequences: &[Sequence], path_prefix: &str) { - sequences.iter().enumerate().for_each(|(i, seq)| { - save_sequence(seq, &format!("{path_prefix}_{i}.csv")); - }); -} - -fn new_queue_data_msg_counts_writer(path: &str, mixnodes: &[Node]) -> Writer { - let mut writer = csv::Writer::from_path(path).unwrap(); - let mut header = vec!["vtime".to_string()]; - mixnodes - .iter() - .map(|node| node.queue_data_msg_counts()) - .enumerate() - .for_each(|(node_id, counts)| { - let num_queues = counts.len(); - (0..num_queues).for_each(|q_idx| { - header.push(format!("node{node_id}_q{q_idx}")); - }); - }); - writer.write_record(header).unwrap(); - writer.flush().unwrap(); - writer -} - -fn append_queue_data_msg_counts(mixnodes: &[Node], vtime: f32, writer: &mut Writer) { - let mut row = vec![vtime.to_string()]; - mixnodes - .iter() - .map(|node| node.queue_data_msg_counts()) - .for_each(|counts| { - row.extend( - counts - .iter() - .map(|count| count.to_string()) - .collect::>(), - ); - }); - writer.write_record(row).unwrap(); -} - -fn save_ordering_coefficients(data: &[[u64; 2]], path: &str) { - let mut writer = csv::Writer::from_path(path).unwrap(); - writer.write_record(["path", "casual", "weak"]).unwrap(); - for (path_idx, [casual, weak]) in data.iter().enumerate() { - writer - .write_record([path_idx.to_string(), casual.to_string(), weak.to_string()]) - .unwrap(); - } - writer.flush().unwrap(); -} diff --git a/mixnet-rs/ordering/src/main.rs b/mixnet-rs/ordering/src/main.rs index 06815f6..a57cc73 100644 --- a/mixnet-rs/ordering/src/main.rs +++ b/mixnet-rs/ordering/src/main.rs @@ -1,6 +1,9 @@ mod iteration; +mod message; mod ordercoeff; +mod outputs; mod paramset; +mod topology; use std::{ error::Error, @@ -11,11 +14,12 @@ use std::{ use chrono::Utc; use clap::Parser; use iteration::run_iteration; +use outputs::Outputs; use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; use protocol::queue::QueueType; #[derive(Debug, Parser)] -#[command(name = "Single Sender Single Mix Measurement")] +#[command(name = "Ordering Measurement")] struct Args { #[arg(short, long)] exp_id: ExperimentId, @@ -25,8 +29,6 @@ struct Args { queue_type: QueueType, #[arg(short, long)] outdir: String, - #[arg(short, long, default_value_t = false)] - skip_coeff_calc: bool, #[arg(short, long)] from_paramset: Option, #[arg(short, long)] @@ -43,7 +45,6 @@ fn main() { session_id, queue_type, outdir, - skip_coeff_calc, from_paramset, to_paramset, } = args; @@ -87,24 +88,28 @@ fn main() { dur_writer.flush().unwrap(); for i in 0..paramset.num_iterations { - let wip_queue_data_msgs_counts_path = - format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv"); + let mut outputs = Outputs::new( + format!("{paramset_dir}/__WIP__iteration_{i}_latency.csv"), + (0..paramset.num_senders) + .map(|sender_idx| { + format!("{paramset_dir}/__WIP__iteration_{i}_sent_seq_{sender_idx}.csv") + }) + .collect(), + (0..paramset.peering_degree) + .map(|conn_idx| { + format!("{paramset_dir}/__WIP__iteration_{i}_recv_seq_{conn_idx}.csv") + }) + .collect(), + format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv"), + format!("{paramset_dir}/iteration_{i}_topology.csv"), + ); let start_time = SystemTime::now(); - let vtime = run_iteration( - paramset.clone(), - i as u64, - &format!("{paramset_dir}/iteration_{i}_latency.csv"), - &format!("{paramset_dir}/iteration_{i}_sent_seq.csv"), - &format!("{paramset_dir}/iteration_{i}_recv_seq"), - &wip_queue_data_msgs_counts_path, - if !skip_coeff_calc { - Some(format!("{paramset_dir}/iteration_{i}_ordering_coeff.csv")) - } else { - None - }, - &format!("{paramset_dir}/iteration_{i}_topology.csv"), - ); + + let vtime = run_iteration(paramset.clone(), i as u64, &mut outputs); + outputs.close(); + outputs.rename_paths("__WIP__iteration", "iteration"); + let duration = SystemTime::now().duration_since(start_time).unwrap(); let duration_human = format_duration(duration); dur_writer @@ -117,10 +122,6 @@ fn main() { .unwrap(); dur_writer.flush().unwrap(); - let new_queue_data_msgs_counts_path = - wip_queue_data_msgs_counts_path.replace("__WIP__iteration_", "iteration_"); - std::fs::rename(&wip_queue_data_msgs_counts_path, &new_queue_data_msgs_counts_path).expect("Failed to rename {wip_queue_data_msgs_counts_path} -> {new_queue_data_msgs_counts_path}: {e}"); - tracing::info!( "ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}", paramset.id, diff --git a/mixnet-rs/ordering/src/message.rs b/mixnet-rs/ordering/src/message.rs new file mode 100644 index 0000000..2eecacf --- /dev/null +++ b/mixnet-rs/ordering/src/message.rs @@ -0,0 +1,31 @@ +use std::fmt::Display; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct DataMessage { + pub sender: u8, + pub msg_id: u32, +} + +impl Display for DataMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}:{}", self.sender, self.msg_id)) + } +} + +pub struct DataMessageGenerator { + next_msg_ids: Vec, +} + +impl DataMessageGenerator { + pub fn new(num_senders: u8) -> Self { + Self { + next_msg_ids: vec![0; num_senders as usize], + } + } + + pub fn next(&mut self, sender: u8) -> DataMessage { + let msg_id = self.next_msg_ids[sender as usize]; + self.next_msg_ids[sender as usize] += 1; + DataMessage { sender, msg_id } + } +} diff --git a/mixnet-rs/ordering/src/ordercoeff.rs b/mixnet-rs/ordering/src/ordercoeff.rs index 873745f..dd3a655 100644 --- a/mixnet-rs/ordering/src/ordercoeff.rs +++ b/mixnet-rs/ordering/src/ordercoeff.rs @@ -1,297 +1,285 @@ -use std::fmt::Display; +use std::fs::File; -use protocol::node::MessageId; +use crate::message::DataMessage; -#[derive(Debug, Clone)] -pub struct Sequence(Vec); - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Entry { - Data(MessageId), - Noise(u32), // the number of consecutive noises +#[derive(Debug)] +pub struct SequenceWriter { + noise_buf: u32, + writer: csv::Writer, } -impl Display for Entry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let s = match self { - Entry::Data(msg) => msg.to_string(), - Entry::Noise(cnt) => format!("-{cnt}"), - }; - f.write_str(s.as_str()) - } -} - -impl Sequence { - pub fn new() -> Self { - Self(Vec::new()) +impl SequenceWriter { + pub fn new(path: &str) -> Self { + Self { + noise_buf: 0, + writer: csv::Writer::from_path(path).unwrap(), + } } - pub fn add_message(&mut self, msg: MessageId) { - self.0.push(Entry::Data(msg)); + pub fn flush(&mut self) { + self.clear_buf(); + self.writer.flush().unwrap(); + } + + fn clear_buf(&mut self) { + if self.noise_buf > 0 { + self.writer + .write_record(&[format!("-{}", self.noise_buf)]) + .unwrap(); + self.noise_buf = 0; + } + } + + pub fn add_message(&mut self, msg: &DataMessage) { + self.clear_buf(); + self.writer.write_record(&[msg.to_string()]).unwrap(); } pub fn add_noise(&mut self) { - if let Some(last) = self.0.last_mut() { - match last { - Entry::Noise(cnt) => { - *cnt += 1; - } - _ => self.0.push(Entry::Noise(1)), - } - } else { - self.0.push(Entry::Noise(1)) - } - } - - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } - - pub fn len(&self) -> usize { - self.0.len() + self.noise_buf += 1; } } -impl Sequence { - pub fn ordering_coefficient(&self, other: &Sequence, casual: bool) -> u64 { - let mut coeff = 0; - let mut i = 0; +// impl Sequence { +// pub fn ordering_coefficient(&self, other: &Sequence, casual: bool) -> u64 { +// let mut coeff = 0; +// let mut i = 0; - while i < self.0.len() { - if let Entry::Data(_) = &self.0[i] { - let (c, next_i) = self.ordering_coefficient_from(i, other, casual); - coeff += c; +// while i < self.0.len() { +// if let Entry::Data(_) = &self.0[i] { +// let (c, next_i) = self.ordering_coefficient_from(i, other, casual); +// coeff += c; - if next_i != i { - i = next_i; - } else { - i += 1; - } - } else { - i += 1; - } - } +// if next_i != i { +// i = next_i; +// } else { +// i += 1; +// } +// } else { +// i += 1; +// } +// } - coeff - } +// coeff +// } - fn ordering_coefficient_from( - &self, - start_idx: usize, - other: &Sequence, - casual: bool, - ) -> (u64, usize) { - let msg1 = match self.0[start_idx] { - Entry::Data(msg) => msg, - _ => panic!("Entry at {start_idx} must be Message"), - }; +// fn ordering_coefficient_from( +// &self, +// start_idx: usize, +// other: &Sequence, +// casual: bool, +// ) -> (u64, usize) { +// let msg1 = match self.0[start_idx] { +// Entry::Data(msg) => msg, +// _ => panic!("Entry at {start_idx} must be Message"), +// }; - for (j, entry) in other.iter().enumerate() { - if let Entry::Data(msg2) = entry { - if msg1 == *msg2 { - // Found the 1st matching msg. Start finding the next adjacent matching msg. - if casual { - return self.casual_ordering_coefficient_from(start_idx, other, j); - } else { - return self.weak_ordering_coefficient_from(start_idx, other, j); - } - } - } - } - (0, start_idx) - } +// for (j, entry) in other.iter().enumerate() { +// if let Entry::Data(msg2) = entry { +// if msg1 == *msg2 { +// // Found the 1st matching msg. Start finding the next adjacent matching msg. +// if casual { +// return self.casual_ordering_coefficient_from(start_idx, other, j); +// } else { +// return self.weak_ordering_coefficient_from(start_idx, other, j); +// } +// } +// } +// } +// (0, start_idx) +// } - fn casual_ordering_coefficient_from( - &self, - start_idx: usize, - other: &Sequence, - other_start_idx: usize, - ) -> (u64, usize) { - let mut coeff = 0; - let mut i = start_idx + 1; - let mut j = other_start_idx + 1; - while i < self.0.len() && j < other.0.len() { - match (&self.0[i], &other.0[j]) { - (Entry::Noise(cnt1), Entry::Noise(cnt2)) => { - if cnt1 == cnt2 { - i += 1; - j += 1; - } else { - break; - } - } - (Entry::Data(msg1), Entry::Data(msg2)) => { - if msg1 == msg2 { - coeff += 1; - i += 1; - j += 1; - } else { - break; - } - } - _ => break, - } - } - (coeff, i) - } +// fn casual_ordering_coefficient_from( +// &self, +// start_idx: usize, +// other: &Sequence, +// other_start_idx: usize, +// ) -> (u64, usize) { +// let mut coeff = 0; +// let mut i = start_idx + 1; +// let mut j = other_start_idx + 1; +// while i < self.0.len() && j < other.0.len() { +// match (&self.0[i], &other.0[j]) { +// (Entry::Noise(cnt1), Entry::Noise(cnt2)) => { +// if cnt1 == cnt2 { +// i += 1; +// j += 1; +// } else { +// break; +// } +// } +// (Entry::Data(msg1), Entry::Data(msg2)) => { +// if msg1 == msg2 { +// coeff += 1; +// i += 1; +// j += 1; +// } else { +// break; +// } +// } +// _ => break, +// } +// } +// (coeff, i) +// } - fn weak_ordering_coefficient_from( - &self, - start_idx: usize, - other: &Sequence, - other_start_idx: usize, - ) -> (u64, usize) { - let mut coeff = 0; - let mut i = start_idx + 1; - let mut j = other_start_idx + 1; - while i < self.0.len() && j < other.0.len() { - i = self.skip_noise(i); - j = other.skip_noise(j); - if i < self.0.len() && j < other.0.len() && self.0[i] == other.0[j] { - coeff += 1; - i += 1; - j += 1; - } else { - break; - } - } - (coeff, i) - } +// fn weak_ordering_coefficient_from( +// &self, +// start_idx: usize, +// other: &Sequence, +// other_start_idx: usize, +// ) -> (u64, usize) { +// let mut coeff = 0; +// let mut i = start_idx + 1; +// let mut j = other_start_idx + 1; +// while i < self.0.len() && j < other.0.len() { +// i = self.skip_noise(i); +// j = other.skip_noise(j); +// if i < self.0.len() && j < other.0.len() && self.0[i] == other.0[j] { +// coeff += 1; +// i += 1; +// j += 1; +// } else { +// break; +// } +// } +// (coeff, i) +// } - fn skip_noise(&self, mut index: usize) -> usize { - while index < self.0.len() { - if let Entry::Data(_) = self.0[index] { - break; - } - index += 1; - } - index - } -} +// fn skip_noise(&self, mut index: usize) -> usize { +// while index < self.0.len() { +// if let Entry::Data(_) = self.0[index] { +// break; +// } +// index += 1; +// } +// index +// } +// } -#[cfg(test)] -mod tests { - use super::*; +// #[cfg(test)] +// mod tests { +// use super::*; - fn test_ordering_coefficient_common(casual: bool) { - // Case 0: Empty sequences - let seq = Sequence(vec![]); - assert_eq!(seq.ordering_coefficient(&seq, casual), 0); +// fn test_ordering_coefficient_common(casual: bool) { +// // Case 0: Empty sequences +// let seq = Sequence(vec![]); +// assert_eq!(seq.ordering_coefficient(&seq, casual), 0); - // Case 1: Exact one matched pair with no noise - let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]); - assert_eq!(seq.ordering_coefficient(&seq, casual), 1); +// // Case 1: Exact one matched pair with no noise +// let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]); +// assert_eq!(seq.ordering_coefficient(&seq, casual), 1); - // Case 2: Exact one matched pair with noise - let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - assert_eq!(seq.ordering_coefficient(&seq, casual), 1); +// // Case 2: Exact one matched pair with noise +// let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); +// assert_eq!(seq.ordering_coefficient(&seq, casual), 1); - // Case 3: One matched pair with no noise - let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]); - let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1); +// // Case 3: One matched pair with no noise +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]); +// let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1); - // Case 4: One matched pair with noise - let seq1 = Sequence(vec![ - Entry::Data(1), - Entry::Noise(10), - Entry::Data(2), - Entry::Data(3), - ]); - let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1); +// // Case 4: One matched pair with noise +// let seq1 = Sequence(vec![ +// Entry::Data(1), +// Entry::Noise(10), +// Entry::Data(2), +// Entry::Data(3), +// ]); +// let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1); - // Case 5: Two matched pairs with noise - let seq1 = Sequence(vec![ - Entry::Data(1), - Entry::Noise(10), - Entry::Data(2), - Entry::Data(3), - ]); - let seq2 = Sequence(vec![ - Entry::Data(1), - Entry::Noise(10), - Entry::Data(2), - Entry::Data(3), - Entry::Data(4), - ]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 2); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 2); +// // Case 5: Two matched pairs with noise +// let seq1 = Sequence(vec![ +// Entry::Data(1), +// Entry::Noise(10), +// Entry::Data(2), +// Entry::Data(3), +// ]); +// let seq2 = Sequence(vec![ +// Entry::Data(1), +// Entry::Noise(10), +// Entry::Data(2), +// Entry::Data(3), +// Entry::Data(4), +// ]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 2); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 2); - // Case 6: Only partial match with no noise - let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); - let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); +// // Case 6: Only partial match with no noise +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); +// let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); - // Case 7: Only partial match with noise - let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); - let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); +// // Case 7: Only partial match with noise +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); +// let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); - // Case 8: No match at all - let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); - let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); +// // Case 8: No match at all +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]); +// let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0); - // Case 9: Matches with noise but mixed orders - let seq1 = Sequence(vec![ - Entry::Data(1), - Entry::Data(2), - Entry::Noise(10), - Entry::Data(3), - Entry::Data(4), - Entry::Data(5), - Entry::Data(6), - ]); - let seq2 = Sequence(vec![ - Entry::Data(4), - Entry::Data(5), - Entry::Data(1), - Entry::Data(2), - Entry::Noise(10), - Entry::Data(3), - Entry::Data(6), - ]); - assert_eq!(seq1.ordering_coefficient(&seq2, casual), 3); - assert_eq!(seq2.ordering_coefficient(&seq1, casual), 3); - } +// // Case 9: Matches with noise but mixed orders +// let seq1 = Sequence(vec![ +// Entry::Data(1), +// Entry::Data(2), +// Entry::Noise(10), +// Entry::Data(3), +// Entry::Data(4), +// Entry::Data(5), +// Entry::Data(6), +// ]); +// let seq2 = Sequence(vec![ +// Entry::Data(4), +// Entry::Data(5), +// Entry::Data(1), +// Entry::Data(2), +// Entry::Noise(10), +// Entry::Data(3), +// Entry::Data(6), +// ]); +// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 3); +// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 3); +// } - #[test] - fn test_casual_ordering_coefficient() { - test_ordering_coefficient_common(true); +// #[test] +// fn test_casual_ordering_coefficient() { +// test_ordering_coefficient_common(true); - // Case 0: No match because of noise - let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); - assert_eq!(seq1.ordering_coefficient(&seq2, true), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, true), 0); +// // Case 0: No match because of noise +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); +// let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, true), 0); +// assert_eq!(seq2.ordering_coefficient(&seq1, true), 0); - // Case 1: No match because of different count of noises - let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]); - assert_eq!(seq1.ordering_coefficient(&seq2, true), 0); - assert_eq!(seq2.ordering_coefficient(&seq1, true), 0); - } +// // Case 1: No match because of different count of noises +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); +// let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, true), 0); +// assert_eq!(seq2.ordering_coefficient(&seq1, true), 0); +// } - #[test] - fn test_weak_ordering_coefficient() { - test_ordering_coefficient_common(false); +// #[test] +// fn test_weak_ordering_coefficient() { +// test_ordering_coefficient_common(false); - // Case 0: Match ignoring noises - let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); - assert_eq!(seq1.ordering_coefficient(&seq2, false), 1); - assert_eq!(seq2.ordering_coefficient(&seq1, false), 1); +// // Case 0: Match ignoring noises +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); +// let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, false), 1); +// assert_eq!(seq2.ordering_coefficient(&seq1, false), 1); - // Case 1: Match ignoring noise count - let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); - let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]); - assert_eq!(seq1.ordering_coefficient(&seq2, false), 1); - assert_eq!(seq2.ordering_coefficient(&seq1, false), 1); - } -} +// // Case 1: Match ignoring noise count +// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]); +// let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]); +// assert_eq!(seq1.ordering_coefficient(&seq2, false), 1); +// assert_eq!(seq2.ordering_coefficient(&seq1, false), 1); +// } +// } diff --git a/mixnet-rs/ordering/src/outputs.rs b/mixnet-rs/ordering/src/outputs.rs new file mode 100644 index 0000000..e43b895 --- /dev/null +++ b/mixnet-rs/ordering/src/outputs.rs @@ -0,0 +1,230 @@ +use std::{fs::File, path::Path}; + +use protocol::{ + node::{Node, NodeId}, + topology::Topology, +}; + +use crate::{message::DataMessage, ordercoeff::SequenceWriter}; + +pub struct Outputs { + closed: bool, + // gradual writing + latency_path: String, + latency_writer: csv::Writer, + sent_sequence_paths: Vec, + sent_sequence_writers: Vec, + recv_sequence_paths: Vec, + recv_sequence_writers: Vec, + queue_data_msg_counts_path: String, + queue_data_msg_counts_writer: csv::Writer, + // bulk writing + pub topology_path: String, +} + +impl Outputs { + pub fn new( + latency_path: String, + sent_sequence_paths: Vec, + recv_sequence_paths: Vec, + queue_data_msg_counts_path: String, + topology_path: String, + ) -> Self { + // Ensure that all output files do not exist + for path in [ + latency_path.clone(), + queue_data_msg_counts_path.clone(), + topology_path.clone(), + ] + .iter() + .chain(sent_sequence_paths.iter()) + .chain(recv_sequence_paths.iter()) + { + assert!(!Path::new(path).exists(), "File already exists: {path}"); + } + + // Prepare writers and headers + let mut latency_writer = csv::Writer::from_path(&latency_path).unwrap(); + latency_writer + .write_record(["msg", "latency", "sent_time", "recv_time"]) + .unwrap(); + latency_writer.flush().unwrap(); + let sent_sequence_writers = sent_sequence_paths + .iter() + .map(|path| SequenceWriter::new(path)) + .collect::>(); + let recv_sequence_writers = recv_sequence_paths + .iter() + .map(|path| SequenceWriter::new(path)) + .collect::>(); + let queue_data_msg_counts_writer = + csv::Writer::from_path(&queue_data_msg_counts_path).unwrap(); + + Self { + closed: false, + latency_path, + latency_writer, + sent_sequence_paths, + sent_sequence_writers, + recv_sequence_paths, + recv_sequence_writers, + queue_data_msg_counts_path, + queue_data_msg_counts_writer, + topology_path, + } + } + + pub fn close(&mut self) { + self.latency_writer.flush().unwrap(); + for seq in &mut self.sent_sequence_writers { + seq.flush(); + } + for seq in &mut self.recv_sequence_writers { + seq.flush(); + } + self.queue_data_msg_counts_writer.flush().unwrap(); + + self.closed = true; + } + + pub fn add_latency(&mut self, msg: &DataMessage, sent_time: f32, recv_time: f32) { + self.latency_writer + .write_record(&[ + msg.to_string(), + (recv_time - sent_time).to_string(), + sent_time.to_string(), + recv_time.to_string(), + ]) + .unwrap(); + } + + pub fn add_sent_msg(&mut self, msg: &DataMessage) { + let writer = &mut self.sent_sequence_writers[msg.sender as usize]; + writer.add_message(msg); + } + + pub fn add_sent_noise(&mut self, sender_idx: usize) { + let writer = &mut self.sent_sequence_writers[sender_idx]; + writer.add_noise(); + } + + pub fn add_recv_msg(&mut self, msg: &DataMessage, conn_idx: usize) { + let writer = &mut self.recv_sequence_writers[conn_idx]; + writer.add_message(msg); + } + + pub fn add_recv_noise(&mut self, conn_idx: usize) { + let writer = &mut self.recv_sequence_writers[conn_idx]; + writer.add_noise(); + } + + pub fn write_header_queue_data_msg_counts(&mut self, mixnodes: &[Node]) { + let writer = &mut self.queue_data_msg_counts_writer; + let mut header = vec!["vtime".to_string()]; + mixnodes + .iter() + .map(|node| (node.id, node.queue_data_msg_counts())) + .for_each(|(node_id, counts)| { + let num_queues = counts.len(); + (0..num_queues).for_each(|q_idx| { + header.push(format!("node{}_q{}", node_id, q_idx)); + }); + }); + writer.write_record(header).unwrap(); + writer.flush().unwrap(); + } + + pub fn add_queue_data_msg_counts(&mut self, vtime: f32, mixnodes: &[Node]) { + let writer = &mut self.queue_data_msg_counts_writer; + let mut record = vec![vtime.to_string()]; + mixnodes + .iter() + .map(|node| node.queue_data_msg_counts()) + .for_each(|counts| { + counts.iter().for_each(|count| { + record.push(count.to_string()); + }); + }); + writer.write_record(record).unwrap(); + } + + pub fn write_topology( + &self, + topology: &Topology, + sender_peers_list: &[Vec], + receiver_peers: &[NodeId], + ) { + let mut writer = csv::Writer::from_path(&self.topology_path).unwrap(); + writer.write_record(["node", "num_peers", "peers"]).unwrap(); + + // Write peers of mix nodes + for (node_id, peers) in topology.iter().enumerate() { + writer + .write_record(&[ + node_id.to_string(), + peers.len().to_string(), + format!( + "[{}]", + peers + .iter() + .map(|peer_id| peer_id.to_string()) + .collect::>() + .join(",") + ), + ]) + .unwrap(); + } + + // Write peers of senders + for (sender_idx, peers) in sender_peers_list.iter().enumerate() { + writer + .write_record(&[ + format!("sender-{}", sender_idx), + peers.len().to_string(), + format!( + "[{}]", + peers + .iter() + .map(|peer_id| peer_id.to_string()) + .collect::>() + .join(",") + ), + ]) + .unwrap(); + } + + // Write peers of the receiver + writer + .write_record(&[ + "receiver".to_string(), + receiver_peers.len().to_string(), + format!( + "[{}]", + receiver_peers + .iter() + .map(|peer_id| peer_id.to_string()) + .collect::>() + .join(",") + ), + ]) + .unwrap(); + + writer.flush().unwrap(); + } + + pub fn rename_paths(&self, from: &str, to: &str) { + assert!(self.closed); + + for path in [ + &self.latency_path.clone(), + &self.queue_data_msg_counts_path.clone(), + ] + .into_iter() + .chain(self.sent_sequence_paths.iter()) + .chain(self.recv_sequence_paths.iter()) + { + let new_path = path.replace(from, to); + std::fs::rename(path, new_path).unwrap(); + } + } +} diff --git a/mixnet-rs/ordering/src/paramset.rs b/mixnet-rs/ordering/src/paramset.rs index 36d3491..9626ac2 100644 --- a/mixnet-rs/ordering/src/paramset.rs +++ b/mixnet-rs/ordering/src/paramset.rs @@ -69,7 +69,7 @@ pub struct ParamSet { pub peering_degree: u32, pub min_queue_size: u16, pub transmission_rate: u16, - pub num_senders: u32, + pub num_senders: u8, pub num_sender_msgs: u32, pub sender_data_msg_prob: f32, pub mix_data_msg_prob: f32, @@ -91,7 +91,7 @@ impl ParamSet { fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { let transmission_rate: u16 = 1; let min_queue_size: u16 = 10; - let num_senders: u32 = match exp_id { + let num_senders: u8 = match exp_id { ExperimentId::Experiment3 | ExperimentId::Experiment4 => 2, _ => 1, }; diff --git a/mixnet-rs/ordering/src/topology.rs b/mixnet-rs/ordering/src/topology.rs new file mode 100644 index 0000000..bde0455 --- /dev/null +++ b/mixnet-rs/ordering/src/topology.rs @@ -0,0 +1,146 @@ +use std::{fmt::Debug, hash::Hash}; + +use protocol::{ + node::{Node, NodeId}, + queue::QueueConfig, + topology::build_topology, +}; +use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng}; +use rustc_hash::FxHashMap; + +use crate::{outputs::Outputs, paramset::ParamSet}; + +pub const RECEIVER_NODE_ID: NodeId = NodeId::MAX; + +pub fn build_striped_network( + paramset: &ParamSet, + seed: u64, +) -> (Vec>, Vec>, FxHashMap) { + assert!(!paramset.random_topology); + let mut next_node_id: NodeId = 0; + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let mut mixnodes: Vec> = + Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize); + let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); + for _ in 0..paramset.num_paths { + let mut ids = Vec::with_capacity(paramset.num_mixes as usize); + for _ in 0..paramset.num_mixes { + let id = next_node_id; + next_node_id += 1; + mixnodes.push(Node::new( + id, + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + false, // disable cache + )); + ids.push(id); + } + paths.push(ids); + } + + // Connect mix nodes + let mut receiver_peer_conn_idx: FxHashMap = FxHashMap::default(); + for path in paths.iter() { + for (i, id) in path.iter().enumerate() { + if i != path.len() - 1 { + let peer_id = path[i + 1]; + let mixnode = mixnodes.get_mut(*id as usize).unwrap(); + assert_eq!(mixnode.id, *id); + mixnode.connect(peer_id); + } else { + let mixnode = mixnodes.get_mut(*id as usize).unwrap(); + assert_eq!(mixnode.id, *id); + mixnode.connect(RECEIVER_NODE_ID); + + receiver_peer_conn_idx + .insert(*id, receiver_peer_conn_idx.len().try_into().unwrap()); + } + } + } + let sender_peers_list: Vec> = + vec![ + paths.iter().map(|path| *path.first().unwrap()).collect(); + paramset.num_senders as usize + ]; + (mixnodes, sender_peers_list, receiver_peer_conn_idx) +} + +pub fn build_random_network( + paramset: &ParamSet, + seed: u64, + outputs: &mut Outputs, +) -> (Vec>, Vec>, FxHashMap) { + assert!(paramset.random_topology); + // Init mix nodes + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let mut mixnodes: Vec> = Vec::with_capacity(paramset.num_mixes as usize); + for id in 0..paramset.num_mixes { + mixnodes.push(Node::new( + id, + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + true, // enable cache + )); + } + + // Choose sender's peers and receiver's peers randomly + let mut peers_rng = StdRng::seed_from_u64(seed); + let mut candidates: Vec = mixnodes.iter().map(|mixnode| mixnode.id).collect(); + assert!(candidates.len() >= paramset.peering_degree as usize); + let mut sender_peers_list: Vec> = Vec::with_capacity(paramset.num_senders as usize); + for _ in 0..paramset.num_senders { + candidates.as_mut_slice().shuffle(&mut peers_rng); + let mut peers: Vec = candidates + .iter() + .cloned() + .take(paramset.peering_degree as usize) + .collect(); + peers.sort(); + sender_peers_list.push(peers); + } + candidates.as_mut_slice().shuffle(&mut peers_rng); + let mut receiver_peers: Vec = candidates + .iter() + .cloned() + .take(paramset.peering_degree as usize) + .collect(); + receiver_peers.sort(); + + // Connect mix nodes + let topology = build_topology( + paramset.num_mixes, + &vec![paramset.peering_degree; paramset.num_mixes as usize], + seed, + ); + for (node_id, peers) in topology.iter().enumerate() { + peers.iter().for_each(|peer_id| { + let mixnode = mixnodes.get_mut(node_id).unwrap(); + assert_eq!(mixnode.id as usize, node_id); + mixnode.connect(*peer_id); + }); + } + + // Connect the selected mix nodes with the receiver + // + // peer_id -> conn_idx + let mut receiver_peer_conn_idx: FxHashMap = FxHashMap::default(); + for (conn_idx, mixnode_id) in receiver_peers.iter().enumerate() { + let mixnode = mixnodes.get_mut(*mixnode_id as usize).unwrap(); + assert_eq!(mixnode.id, *mixnode_id); + mixnode.connect(RECEIVER_NODE_ID); + + receiver_peer_conn_idx.insert(*mixnode_id, conn_idx.try_into().unwrap()); + } + + outputs.write_topology(&topology, &sender_peers_list, &receiver_peers); + + (mixnodes, sender_peers_list, receiver_peer_conn_idx) +} diff --git a/mixnet-rs/protocol/src/node.rs b/mixnet-rs/protocol/src/node.rs index 0495a0f..c5725af 100644 --- a/mixnet-rs/protocol/src/node.rs +++ b/mixnet-rs/protocol/src/node.rs @@ -1,25 +1,39 @@ +use std::{fmt::Debug, hash::Hash}; + use rustc_hash::{FxHashMap, FxHashSet}; use crate::queue::{new_queue, Message, Queue, QueueConfig}; pub type NodeId = u32; -pub type MessageId = u32; -pub struct Node { +pub struct Node +where + M: Debug + Copy + Clone + PartialEq + Eq + Hash, +{ + pub id: NodeId, queue_config: QueueConfig, // To have the deterministic result, we use Vec instead of FxHashMap. // Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning. // Instead, use `connected_peers` to build `queues` efficiently. - queues: Vec<(NodeId, Box>)>, + queues: Vec<(NodeId, Box>)>, connected_peers: FxHashSet, // A cache to avoid relaying the same message multiple times. - received_msgs: Option>, + received_msgs: Option>, peering_degree: u32, } -impl Node { - pub fn new(queue_config: QueueConfig, peering_degree: u32, enable_cache: bool) -> Self { - Node { +impl Node +where + M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash, +{ + pub fn new( + id: NodeId, + queue_config: QueueConfig, + peering_degree: u32, + enable_cache: bool, + ) -> Self { + Node:: { + id, queue_config, queues: Vec::new(), connected_peers: FxHashSet::default(), @@ -39,18 +53,18 @@ impl Node { .binary_search_by(|probe| probe.0.cmp(&peer_id)) .unwrap_or_else(|pos| pos); self.queues - .insert(pos, (peer_id, new_queue(&self.queue_config))); + .insert(pos, (peer_id, new_queue::(&self.queue_config))); } } - pub fn send(&mut self, msg: MessageId) { + pub fn send(&mut self, msg: M) { assert!(self.check_and_update_cache(msg, true)); for (_, queue) in self.queues.iter_mut() { queue.push(msg); } } - pub fn receive(&mut self, msg: MessageId, from: Option) -> bool { + pub fn receive(&mut self, msg: M, from: Option) -> bool { let first_received = self.check_and_update_cache(msg, false); if first_received { for (node_id, queue) in self.queues.iter_mut() { @@ -67,8 +81,8 @@ impl Node { first_received } - pub fn read_queues(&mut self) -> Vec<(NodeId, Message)> { - let mut msgs_to_relay: Vec<(NodeId, Message)> = Vec::new(); + pub fn read_queues(&mut self) -> Vec<(NodeId, Message)> { + let mut msgs_to_relay: Vec<(NodeId, Message)> = Vec::new(); self.queues.iter_mut().for_each(|(node_id, queue)| { msgs_to_relay.push((*node_id, queue.pop())); }); @@ -82,7 +96,7 @@ impl Node { .collect() } - fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool { + fn check_and_update_cache(&mut self, msg: M, sending: bool) -> bool { if let Some(received_msgs) = &mut self.received_msgs { let first_received = if let Some(count) = received_msgs.get_mut(&msg) { *count += 1; @@ -95,7 +109,7 @@ impl Node { // If the message have been received from all connected peers, remove it from the cache // because there is no possibility that the message will be received again. if received_msgs.get(&msg).unwrap() == &self.peering_degree { - tracing::debug!("Remove message from cache: {}", msg); + tracing::debug!("Remove message from cache: {:?}", msg); received_msgs.remove(&msg); }