gradual write for queue_data_msg_counts

This commit is contained in:
Youngjoon Lee 2024-08-22 23:42:49 +02:00
parent b36b8ac316
commit 4266ff445e
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
2 changed files with 35 additions and 27 deletions

View File

@ -1,5 +1,6 @@
use std::{collections::hash_map::Entry, path::Path};
use std::{collections::hash_map::Entry, fs::File, path::Path};
use csv::Writer;
use protocol::{
node::{MessageId, Node, NodeId},
queue::{Message, QueueConfig, QueueType},
@ -61,7 +62,8 @@ pub fn run_iteration(
} else {
None
};
let mut queue_data_msg_counts: Vec<Vec<Vec<usize>>> = Vec::new();
let mut queue_data_msg_counts_writer =
new_queue_data_msg_counts_writer(out_queue_data_msg_counts_path, &mixnodes);
let mut data_msg_rng = StdRng::seed_from_u64(seed);
loop {
@ -158,11 +160,7 @@ pub fn run_iteration(
});
// Record the number of data messages in each mix node's queues
let mut counts: Vec<Vec<usize>> = Vec::with_capacity(mixnodes.len());
mixnodes.iter().for_each(|node| {
counts.push(node.queue_data_msg_counts());
});
queue_data_msg_counts.push(counts);
append_queue_data_msg_counts(&mixnodes, vtime, &mut queue_data_msg_counts_writer);
// If all data messages (that have been sent by the senders) have been received by the receiver,
// stop the iteration.
@ -190,11 +188,6 @@ pub fn run_iteration(
format!("{out_received_sequence_path_prefix}_unified.csv").as_str(),
);
}
save_queue_data_msg_counts(
&queue_data_msg_counts,
transmission_interval,
out_queue_data_msg_counts_path,
);
// Calculate ordering coefficients and save them to a CSV file.
if paramset.queue_type != QueueType::NonMix {
if let Some(unified_recv_seq) = &unified_received_sequence {
@ -369,31 +362,37 @@ fn save_sequences(sequences: &[Sequence], path_prefix: &str) {
});
}
fn save_queue_data_msg_counts(data: &[Vec<Vec<usize>>], interval: f32, path: &str) {
fn new_queue_data_msg_counts_writer(path: &str, mixnodes: &[Node]) -> Writer<File> {
let mut writer = csv::Writer::from_path(path).unwrap();
let mut header = vec!["vtime".to_string()];
data[0].iter().enumerate().for_each(|(node_id, counts)| {
let num_queues = counts.len();
(0..num_queues).for_each(|q_idx| {
header.push(format!("node{node_id}_q{q_idx}"));
mixnodes
.iter()
.map(|node| node.queue_data_msg_counts())
.enumerate()
.for_each(|(node_id, counts)| {
let num_queues = counts.len();
(0..num_queues).for_each(|q_idx| {
header.push(format!("node{node_id}_q{q_idx}"));
});
});
});
writer.write_record(header).unwrap();
writer
}
data.iter().enumerate().for_each(|(i, counts_per_node)| {
let mut row = vec![(i as f64 * interval as f64).to_string()];
counts_per_node.iter().for_each(|counts| {
fn append_queue_data_msg_counts(mixnodes: &[Node], vtime: f32, writer: &mut Writer<File>) {
let mut row = vec![vtime.to_string()];
mixnodes
.iter()
.map(|node| node.queue_data_msg_counts())
.for_each(|counts| {
row.extend(
counts
.iter()
.map(|count| count.to_string())
.collect::<Vec<String>>(),
.collect::<Vec<_>>(),
);
});
writer.write_record(row).unwrap();
});
writer.flush().unwrap();
writer.write_record(row).unwrap();
}
fn save_ordering_coefficients(data: &[[u64; 2]], path: &str) {

View File

@ -71,18 +71,27 @@ fn main() {
save_paramset_info(&paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
for i in 0..paramset.num_iterations {
let wip_queue_data_msgs_counts_path =
format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv");
run_iteration(
paramset.clone(),
i as u64,
&format!("{paramset_dir}/iteration_{i}_latency.csv"),
&format!("{paramset_dir}/iteration_{i}_sent_seq.csv"),
&format!("{paramset_dir}/iteration_{i}_recv_seq"),
&format!("{paramset_dir}/iteration_{i}_data_msg_counts.csv"),
&wip_queue_data_msgs_counts_path,
&format!("{paramset_dir}/iteration_{i}_ordering_coeff.csv"),
&format!("{paramset_dir}/iteration_{i}_topology.csv"),
);
let new_queue_data_msgs_counts_path =
wip_queue_data_msgs_counts_path.replace("__WIP__iteration_", "iteration_");
std::fs::rename(&wip_queue_data_msgs_counts_path, &new_queue_data_msgs_counts_path).expect("Failed to rename {wip_queue_data_msgs_counts_path} -> {new_queue_data_msgs_counts_path}: {e}");
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
}
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
std::fs::rename(&paramset_dir, &new_paramset_dir)
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}");