From 4d6e2a93ce4d01ef7f63832863fc8c8e40f0ee9b Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:08:20 +0900 Subject: [PATCH] record hops --- mixnet/ordering/src/bin/coeff.rs | 7 ++++++- mixnet/ordering/src/iteration.rs | 8 ++++++-- mixnet/ordering/src/message.rs | 35 +++++++++++++++++++++++++++++--- mixnet/ordering/src/outputs.rs | 17 ++++++++++++++++ 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/mixnet/ordering/src/bin/coeff.rs b/mixnet/ordering/src/bin/coeff.rs index 2b8f0a9..fb5ef75 100644 --- a/mixnet/ordering/src/bin/coeff.rs +++ b/mixnet/ordering/src/bin/coeff.rs @@ -42,6 +42,7 @@ fn parse_data_msg(value: &str) -> DataMessage { DataMessage { sender: parts[0].parse::().unwrap(), msg_id: parts[1].parse::().unwrap(), + num_hops_passed: 0, } } @@ -526,7 +527,11 @@ mod tests { } fn data(msg_id: u32) -> Entry { - Entry::Data(DataMessage { sender: 0, msg_id }) + Entry::Data(DataMessage { + sender: 0, + msg_id, + num_hops_passed: 0, + }) } fn noise(count: u32) -> Entry { diff --git a/mixnet/ordering/src/iteration.rs b/mixnet/ordering/src/iteration.rs index 17eaf5f..d4e404e 100644 --- a/mixnet/ordering/src/iteration.rs +++ b/mixnet/ordering/src/iteration.rs @@ -34,6 +34,7 @@ impl Iteration { let mut outputs = Outputs::new( format!("{dir}/latency__WIP__.csv"), + format!("{dir}/hops__WIP__.csv"), (0..self.paramset.num_senders) .map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv")) .collect(), @@ -187,7 +188,8 @@ impl Iteration { msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { if peer_id == RECEIVER_NODE_ID { match msg { - Message::Data(msg) => { + Message::Data(mut msg) => { + msg.increment_hops(); // If msg was sent by the sender (not by any mix) if let Some(&sent_time) = sent_data_msgs.get(&msg) { // If this is the first time to see the msg, @@ -195,6 +197,7 @@ impl Iteration { if let Entry::Vacant(e) = recv_data_msgs.entry(msg) { e.insert(vtime); outputs.add_latency(&msg, sent_time, vtime); + outputs.add_hops(&msg); } } // Record msg to the sequence @@ -207,7 +210,8 @@ impl Iteration { outputs.add_recv_noise(conn_idx); } } - } else if let Message::Data(msg) = msg { + } else if let Message::Data(mut msg) = msg { + msg.increment_hops(); let peer = mixnodes.get_mut(peer_id as usize).unwrap(); assert_eq!(peer.id, peer_id); peer.receive(msg, Some(relayer_id)); diff --git a/mixnet/ordering/src/message.rs b/mixnet/ordering/src/message.rs index f62f8a4..4964667 100644 --- a/mixnet/ordering/src/message.rs +++ b/mixnet/ordering/src/message.rs @@ -1,11 +1,36 @@ -use std::fmt::Display; +use std::{ + fmt::Display, + hash::{Hash, Hasher}, +}; pub type SenderIdx = u8; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy)] pub struct DataMessage { pub sender: SenderIdx, pub msg_id: u32, + pub num_hops_passed: u32, +} + +impl DataMessage { + pub fn increment_hops(&mut self) { + self.num_hops_passed += 1; + } +} + +impl PartialEq for DataMessage { + fn eq(&self, other: &Self) -> bool { + self.sender == other.sender && self.msg_id == other.msg_id + } +} + +impl Eq for DataMessage {} + +impl Hash for DataMessage { + fn hash(&self, state: &mut H) { + self.sender.hash(state); + self.msg_id.hash(state); + } } impl Display for DataMessage { @@ -28,6 +53,10 @@ impl DataMessageGenerator { pub fn next(&mut self, sender: SenderIdx) -> DataMessage { let msg_id = self.next_msg_ids[sender as usize]; self.next_msg_ids[sender as usize] += 1; - DataMessage { sender, msg_id } + DataMessage { + sender, + msg_id, + num_hops_passed: 0, + } } } diff --git a/mixnet/ordering/src/outputs.rs b/mixnet/ordering/src/outputs.rs index 61922ed..6aca0de 100644 --- a/mixnet/ordering/src/outputs.rs +++ b/mixnet/ordering/src/outputs.rs @@ -13,6 +13,8 @@ pub struct Outputs { // gradual writing latency_path: String, latency_writer: csv::Writer, + hops_path: String, + hops_writer: csv::Writer, sent_sequence_paths: Vec, sent_sequence_writers: Vec, recv_sequence_paths: Vec, @@ -26,6 +28,7 @@ pub struct Outputs { impl Outputs { pub fn new( latency_path: String, + hops_path: String, sent_sequence_paths: Vec, recv_sequence_paths: Vec, queue_data_msg_counts_path: String, @@ -34,6 +37,7 @@ impl Outputs { // Ensure that all output files do not exist for path in [ latency_path.clone(), + hops_path.clone(), queue_data_msg_counts_path.clone(), topology_path.clone(), ] @@ -50,6 +54,9 @@ impl Outputs { .write_record(["msg", "latency", "sent_time", "recv_time"]) .unwrap(); latency_writer.flush().unwrap(); + let mut hops_writer = csv::Writer::from_path(&hops_path).unwrap(); + hops_writer.write_record(["msg", "hops"]).unwrap(); + hops_writer.flush().unwrap(); let sent_sequence_writers = sent_sequence_paths .iter() .map(|path| SequenceWriter::new(path)) @@ -65,6 +72,8 @@ impl Outputs { closed: false, latency_path, latency_writer, + hops_path, + hops_writer, sent_sequence_paths, sent_sequence_writers, recv_sequence_paths, @@ -77,6 +86,7 @@ impl Outputs { pub fn close(&mut self) { self.latency_writer.flush().unwrap(); + self.hops_writer.flush().unwrap(); for seq in &mut self.sent_sequence_writers { seq.flush(); } @@ -99,6 +109,12 @@ impl Outputs { .unwrap(); } + pub fn add_hops(&mut self, msg: &DataMessage) { + self.hops_writer + .write_record(&[msg.to_string(), msg.num_hops_passed.to_string()]) + .unwrap(); + } + pub fn add_sent_msg(&mut self, msg: &DataMessage) { let writer = &mut self.sent_sequence_writers[msg.sender as usize]; writer.add_message(msg); @@ -218,6 +234,7 @@ impl Outputs { for path in [ &self.latency_path.clone(), + &self.hops_path.clone(), &self.queue_data_msg_counts_path.clone(), ] .into_iter()