record hops

This commit is contained in:
Youngjoon Lee 2024-09-27 13:08:20 +09:00
parent 7d29227a65
commit 4d6e2a93ce
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 61 additions and 6 deletions

View File

@ -42,6 +42,7 @@ fn parse_data_msg(value: &str) -> DataMessage {
DataMessage {
sender: parts[0].parse::<SenderIdx>().unwrap(),
msg_id: parts[1].parse::<u32>().unwrap(),
num_hops_passed: 0,
}
}
@ -526,7 +527,11 @@ mod tests {
}
fn data(msg_id: u32) -> Entry {
Entry::Data(DataMessage { sender: 0, msg_id })
Entry::Data(DataMessage {
sender: 0,
msg_id,
num_hops_passed: 0,
})
}
fn noise(count: u32) -> Entry {

View File

@ -34,6 +34,7 @@ impl Iteration {
let mut outputs = Outputs::new(
format!("{dir}/latency__WIP__.csv"),
format!("{dir}/hops__WIP__.csv"),
(0..self.paramset.num_senders)
.map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv"))
.collect(),
@ -187,7 +188,8 @@ impl Iteration {
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
if peer_id == RECEIVER_NODE_ID {
match msg {
Message::Data(msg) => {
Message::Data(mut msg) => {
msg.increment_hops();
// If msg was sent by the sender (not by any mix)
if let Some(&sent_time) = sent_data_msgs.get(&msg) {
// If this is the first time to see the msg,
@ -195,6 +197,7 @@ impl Iteration {
if let Entry::Vacant(e) = recv_data_msgs.entry(msg) {
e.insert(vtime);
outputs.add_latency(&msg, sent_time, vtime);
outputs.add_hops(&msg);
}
}
// Record msg to the sequence
@ -207,7 +210,8 @@ impl Iteration {
outputs.add_recv_noise(conn_idx);
}
}
} else if let Message::Data(msg) = msg {
} else if let Message::Data(mut msg) = msg {
msg.increment_hops();
let peer = mixnodes.get_mut(peer_id as usize).unwrap();
assert_eq!(peer.id, peer_id);
peer.receive(msg, Some(relayer_id));

View File

@ -1,11 +1,36 @@
use std::fmt::Display;
use std::{
fmt::Display,
hash::{Hash, Hasher},
};
pub type SenderIdx = u8;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy)]
pub struct DataMessage {
pub sender: SenderIdx,
pub msg_id: u32,
pub num_hops_passed: u32,
}
impl DataMessage {
pub fn increment_hops(&mut self) {
self.num_hops_passed += 1;
}
}
impl PartialEq for DataMessage {
fn eq(&self, other: &Self) -> bool {
self.sender == other.sender && self.msg_id == other.msg_id
}
}
impl Eq for DataMessage {}
impl Hash for DataMessage {
fn hash<H: Hasher>(&self, state: &mut H) {
self.sender.hash(state);
self.msg_id.hash(state);
}
}
impl Display for DataMessage {
@ -28,6 +53,10 @@ impl DataMessageGenerator {
pub fn next(&mut self, sender: SenderIdx) -> DataMessage {
let msg_id = self.next_msg_ids[sender as usize];
self.next_msg_ids[sender as usize] += 1;
DataMessage { sender, msg_id }
DataMessage {
sender,
msg_id,
num_hops_passed: 0,
}
}
}

View File

@ -13,6 +13,8 @@ pub struct Outputs {
// gradual writing
latency_path: String,
latency_writer: csv::Writer<File>,
hops_path: String,
hops_writer: csv::Writer<File>,
sent_sequence_paths: Vec<String>,
sent_sequence_writers: Vec<SequenceWriter>,
recv_sequence_paths: Vec<String>,
@ -26,6 +28,7 @@ pub struct Outputs {
impl Outputs {
pub fn new(
latency_path: String,
hops_path: String,
sent_sequence_paths: Vec<String>,
recv_sequence_paths: Vec<String>,
queue_data_msg_counts_path: String,
@ -34,6 +37,7 @@ impl Outputs {
// Ensure that all output files do not exist
for path in [
latency_path.clone(),
hops_path.clone(),
queue_data_msg_counts_path.clone(),
topology_path.clone(),
]
@ -50,6 +54,9 @@ impl Outputs {
.write_record(["msg", "latency", "sent_time", "recv_time"])
.unwrap();
latency_writer.flush().unwrap();
let mut hops_writer = csv::Writer::from_path(&hops_path).unwrap();
hops_writer.write_record(["msg", "hops"]).unwrap();
hops_writer.flush().unwrap();
let sent_sequence_writers = sent_sequence_paths
.iter()
.map(|path| SequenceWriter::new(path))
@ -65,6 +72,8 @@ impl Outputs {
closed: false,
latency_path,
latency_writer,
hops_path,
hops_writer,
sent_sequence_paths,
sent_sequence_writers,
recv_sequence_paths,
@ -77,6 +86,7 @@ impl Outputs {
pub fn close(&mut self) {
self.latency_writer.flush().unwrap();
self.hops_writer.flush().unwrap();
for seq in &mut self.sent_sequence_writers {
seq.flush();
}
@ -99,6 +109,12 @@ impl Outputs {
.unwrap();
}
pub fn add_hops(&mut self, msg: &DataMessage) {
self.hops_writer
.write_record(&[msg.to_string(), msg.num_hops_passed.to_string()])
.unwrap();
}
pub fn add_sent_msg(&mut self, msg: &DataMessage) {
let writer = &mut self.sent_sequence_writers[msg.sender as usize];
writer.add_message(msg);
@ -218,6 +234,7 @@ impl Outputs {
for path in [
&self.latency_path.clone(),
&self.hops_path.clone(),
&self.queue_data_msg_counts_path.clone(),
]
.into_iter()