From 4266ff445e979f4c03efa6825935f5d935cd67e0 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 22 Aug 2024 23:42:49 +0200 Subject: [PATCH] gradual write for queue_data_msg_counts --- mixnet-rs/ordering/src/iteration.rs | 51 ++++++++++++++--------------- mixnet-rs/ordering/src/main.rs | 11 ++++++- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/mixnet-rs/ordering/src/iteration.rs b/mixnet-rs/ordering/src/iteration.rs index 5ed27d0..5f22f91 100644 --- a/mixnet-rs/ordering/src/iteration.rs +++ b/mixnet-rs/ordering/src/iteration.rs @@ -1,5 +1,6 @@ -use std::{collections::hash_map::Entry, path::Path}; +use std::{collections::hash_map::Entry, fs::File, path::Path}; +use csv::Writer; use protocol::{ node::{MessageId, Node, NodeId}, queue::{Message, QueueConfig, QueueType}, @@ -61,7 +62,8 @@ pub fn run_iteration( } else { None }; - let mut queue_data_msg_counts: Vec>> = Vec::new(); + let mut queue_data_msg_counts_writer = + new_queue_data_msg_counts_writer(out_queue_data_msg_counts_path, &mixnodes); let mut data_msg_rng = StdRng::seed_from_u64(seed); loop { @@ -158,11 +160,7 @@ pub fn run_iteration( }); // Record the number of data messages in each mix node's queues - let mut counts: Vec> = Vec::with_capacity(mixnodes.len()); - mixnodes.iter().for_each(|node| { - counts.push(node.queue_data_msg_counts()); - }); - queue_data_msg_counts.push(counts); + append_queue_data_msg_counts(&mixnodes, vtime, &mut queue_data_msg_counts_writer); // If all data messages (that have been sent by the senders) have been received by the receiver, // stop the iteration. @@ -190,11 +188,6 @@ pub fn run_iteration( format!("{out_received_sequence_path_prefix}_unified.csv").as_str(), ); } - save_queue_data_msg_counts( - &queue_data_msg_counts, - transmission_interval, - out_queue_data_msg_counts_path, - ); // Calculate ordering coefficients and save them to a CSV file. if paramset.queue_type != QueueType::NonMix { if let Some(unified_recv_seq) = &unified_received_sequence { @@ -369,31 +362,37 @@ fn save_sequences(sequences: &[Sequence], path_prefix: &str) { }); } -fn save_queue_data_msg_counts(data: &[Vec>], interval: f32, path: &str) { +fn new_queue_data_msg_counts_writer(path: &str, mixnodes: &[Node]) -> Writer { let mut writer = csv::Writer::from_path(path).unwrap(); - let mut header = vec!["vtime".to_string()]; - data[0].iter().enumerate().for_each(|(node_id, counts)| { - let num_queues = counts.len(); - (0..num_queues).for_each(|q_idx| { - header.push(format!("node{node_id}_q{q_idx}")); + mixnodes + .iter() + .map(|node| node.queue_data_msg_counts()) + .enumerate() + .for_each(|(node_id, counts)| { + let num_queues = counts.len(); + (0..num_queues).for_each(|q_idx| { + header.push(format!("node{node_id}_q{q_idx}")); + }); }); - }); writer.write_record(header).unwrap(); + writer +} - data.iter().enumerate().for_each(|(i, counts_per_node)| { - let mut row = vec![(i as f64 * interval as f64).to_string()]; - counts_per_node.iter().for_each(|counts| { +fn append_queue_data_msg_counts(mixnodes: &[Node], vtime: f32, writer: &mut Writer) { + let mut row = vec![vtime.to_string()]; + mixnodes + .iter() + .map(|node| node.queue_data_msg_counts()) + .for_each(|counts| { row.extend( counts .iter() .map(|count| count.to_string()) - .collect::>(), + .collect::>(), ); }); - writer.write_record(row).unwrap(); - }); - writer.flush().unwrap(); + writer.write_record(row).unwrap(); } fn save_ordering_coefficients(data: &[[u64; 2]], path: &str) { diff --git a/mixnet-rs/ordering/src/main.rs b/mixnet-rs/ordering/src/main.rs index 3265fcd..f411313 100644 --- a/mixnet-rs/ordering/src/main.rs +++ b/mixnet-rs/ordering/src/main.rs @@ -71,18 +71,27 @@ fn main() { save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); for i in 0..paramset.num_iterations { + let wip_queue_data_msgs_counts_path = + format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv"); + run_iteration( paramset.clone(), i as u64, &format!("{paramset_dir}/iteration_{i}_latency.csv"), &format!("{paramset_dir}/iteration_{i}_sent_seq.csv"), &format!("{paramset_dir}/iteration_{i}_recv_seq"), - &format!("{paramset_dir}/iteration_{i}_data_msg_counts.csv"), + &wip_queue_data_msgs_counts_path, &format!("{paramset_dir}/iteration_{i}_ordering_coeff.csv"), &format!("{paramset_dir}/iteration_{i}_topology.csv"), ); + + let new_queue_data_msgs_counts_path = + wip_queue_data_msgs_counts_path.replace("__WIP__iteration_", "iteration_"); + std::fs::rename(&wip_queue_data_msgs_counts_path, &new_queue_data_msgs_counts_path).expect("Failed to rename {wip_queue_data_msgs_counts_path} -> {new_queue_data_msgs_counts_path}: {e}"); + tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); } + let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); std::fs::rename(¶mset_dir, &new_paramset_dir) .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}");