From c7b83c813da82989a065516ac828756f6577706d Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sat, 17 Aug 2024 00:24:26 +0900 Subject: [PATCH] paramset, args, csv --- mixnet-rs/dissemination/Cargo.toml | 5 + mixnet-rs/dissemination/src/main.rs | 238 +++++++++++----------------- mixnet-rs/dissemination/src/node.rs | 11 +- 3 files changed, 100 insertions(+), 154 deletions(-) diff --git a/mixnet-rs/dissemination/Cargo.toml b/mixnet-rs/dissemination/Cargo.toml index 44657c0..61c8c78 100644 --- a/mixnet-rs/dissemination/Cargo.toml +++ b/mixnet-rs/dissemination/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" edition = "2021" [dependencies] +chrono = "0.4.38" +clap = { version = "4.5.16", features = ["derive"] } +csv = "1.3.0" rand = "0.8.5" +strum = "0.26.3" +strum_macros = "0.26.4" tracing = "0.1.40" tracing-subscriber = "0.3.18" diff --git a/mixnet-rs/dissemination/src/main.rs b/mixnet-rs/dissemination/src/main.rs index cb78806..2cec3fc 100644 --- a/mixnet-rs/dissemination/src/main.rs +++ b/mixnet-rs/dissemination/src/main.rs @@ -1,164 +1,112 @@ -use rand::seq::SliceRandom; -use rand::{rngs::StdRng, SeedableRng}; -use std::cmp::min; -use std::collections::{HashMap, HashSet}; -use std::f32::consts::PI; -use tracing::info; +use chrono::Utc; +use clap::Parser; +use std::{error::Error, path::Path, time::SystemTime}; -use node::{MessageId, Node, NodeId}; +use iteration::run_iteration; +use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; +use queue::QueueType; +mod iteration; mod node; +mod paramset; +mod queue; +mod topology; + +#[derive(Debug, Parser)] +#[command(name = "Message Dessemination Time Measurement")] +struct Args { + #[arg(short, long)] + exp_id: ExperimentId, + #[arg(short, long)] + session_id: SessionId, + #[arg(short, long)] + queue_type: QueueType, + #[arg(short, long)] + outdir: String, +} fn main() { tracing_subscriber::fmt::init(); - let seed: u64 = 0; - let gtr: f32 = 10.0; - let num_nodes: u16 = 10; - let peering_degree: u16 = 2; - let num_senders: u16 = 1; - let num_sending_msgs: u16 = 3; - assert!(num_nodes >= 2); - assert!(num_senders <= num_nodes); + let args = Args::parse(); + tracing::info!("Arguments: {:?}", args); + let Args { + exp_id, + session_id, + queue_type, + outdir, + } = args; - // Initialize nodes - let mut nodes: HashMap = HashMap::new(); - for i in 0..num_nodes { - nodes.insert(i, Node::new(i)); - } + // Create a directory and initialize a CSV file only with a header + assert!( + Path::new(&outdir).is_dir(), + "Output directory does not exist: {outdir}" + ); + let subdir = format!( + "__WIP__dissemination_e{}s{}_{:?}_{}___DUR__", + exp_id as u8, + session_id as u8, + queue_type, + Utc::now().to_rfc3339() + ); + std::fs::create_dir_all(&format!("{outdir}/{subdir}")).unwrap(); - // Connect nodes - let topology = build_topology(num_nodes, peering_degree, seed); - for (node_id, peers) in topology.iter() { - peers.iter().for_each(|peer_id| { - nodes.get_mut(node_id).unwrap().connect(*peer_id); - }); - } + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); - let sender_ids: Vec = (0..num_senders).collect(); + let session_start_time = SystemTime::now(); - let mut vtime: f32 = 0.0; - let interval: f32 = 1.0 / gtr; - let mut next_msg_id: MessageId = 0; - let mut sent_msgs: HashMap = HashMap::new(); - let mut disseminated_msgs: HashMap = HashMap::new(); - loop { - // Send new messages - assert!(sent_msgs.len() % (num_senders as usize) == 0); - if sent_msgs.len() / (num_senders as usize) < num_sending_msgs as usize { - for sender_id in sender_ids.iter() { - nodes.get_mut(sender_id).unwrap().send(next_msg_id); - sent_msgs.insert(next_msg_id, (vtime, 1)); - next_msg_id += 1; - } + for paramset in paramsets { + let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id); + std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); + save_paramset_info(¶mset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); + + for i in 0..paramset.num_iterations { + let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv"); + let topology_path = format!("{paramset_dir}/topology_{i}.csv"); + run_iteration(paramset, i as u64, &out_csv_path, &topology_path); + + let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_"); + std::fs::rename(&out_csv_path, &new_out_csv_path) + .expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}"); + + tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); } - // Collect messages to relay - let mut all_msgs_to_relay = Vec::new(); - for (node_id, node) in nodes.iter_mut() { - let msgs_to_relay = node.read_queues(); - msgs_to_relay.iter().for_each(|(receiver_id, msg)| { - all_msgs_to_relay.push((*receiver_id, *msg, *node_id)); - }); - } - - // Relay the messages - all_msgs_to_relay - .into_iter() - .for_each(|(receiver_id, msg, sender_id)| { - if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) { - let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap(); - *num_received_nodes += 1; - if *num_received_nodes == num_nodes { - assert!(!disseminated_msgs.contains_key(&msg)); - disseminated_msgs.insert(msg, vtime - *sent_time); - } - } - }); - - // Check if all messages have been disseminated to all nodes. - if disseminated_msgs.len() == (num_senders * num_sending_msgs) as usize { - info!( - "vtime:{vtime}: All {} messages have been disseminated to all nodes. Exiting...", - disseminated_msgs.len() - ); - for (msg, latency) in disseminated_msgs.iter() { - info!( - "Message {} took {} time units to disseminate.", - msg, latency - ); - } - break; - } else { - info!( - "vtime:{vtime}: {} messages have been disseminated to all nodes.", - disseminated_msgs.len() - ); - } - - vtime += interval; + let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); + std::fs::rename(¶mset_dir, &new_paramset_dir) + .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}"); + tracing::info!("ParamSet:{} completed.", paramset.id); } + + let session_duration = SystemTime::now() + .duration_since(session_start_time) + .unwrap(); + + // Replace "__WIP__" and "__DUR__" in the subdir string + let new_subdir = subdir + .replace("__WIP__", "") + .replace("__DUR__", &format!("{:?}", session_duration)); + let old_path = format!("{}/{}", outdir, subdir); + let new_path = format!("{}/{}", outdir, new_subdir); + assert!( + !Path::new(&new_path).exists(), + "The new directory already exists: {new_path}" + ); + std::fs::rename(&old_path, &new_path) + .expect("Failed to rename the directory: {old_path} -> {new_path}"); } -fn build_topology( - num_nodes: u16, - peering_degree: u16, - seed: u64, -) -> HashMap> { - let mut rng = StdRng::seed_from_u64(seed); +fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box> { + // Assert that the file does not already exist + assert!( + !Path::new(path).exists(), + "File already exists at path: {path}", + ); - loop { - let mut topology: HashMap> = HashMap::new(); - for node in 0..num_nodes { - topology.insert(node, HashSet::new()); - } + let mut wtr = csv::Writer::from_path(path)?; + wtr.write_record(PARAMSET_CSV_COLUMNS)?; + wtr.write_record(paramset.as_csv_record())?; + wtr.flush()?; - for node in 0..num_nodes { - let mut others: Vec = Vec::new(); - for other in (0..node).chain(node + 1..num_nodes) { - // Check if the other node is not already connected to the current node - // and the other node has not reached the peering degree. - if !topology.get(&node).unwrap().contains(&other) - && topology.get(&other).unwrap().len() < peering_degree as usize - { - others.push(other); - } - } - - // How many more connections the current node needs - let num_needs = peering_degree as usize - topology.get(&node).unwrap().len(); - // Smaple peers as many as possible and connect them to the current node - let k = min(num_needs, others.len()); - others.as_mut_slice().shuffle(&mut rng); - others.into_iter().take(k).for_each(|peer| { - topology.get_mut(&node).unwrap().insert(peer); - topology.get_mut(&peer).unwrap().insert(node); - }); - } - - if are_all_nodes_connected(&topology) { - return topology; - } - } -} - -fn are_all_nodes_connected(topology: &HashMap>) -> bool { - let start_node = topology.keys().next().unwrap(); - let visited = dfs(topology, *start_node); - visited.len() == topology.len() -} - -fn dfs(topology: &HashMap>, start_node: NodeId) -> HashSet { - let mut visited: HashSet = HashSet::new(); - let mut stack: Vec = Vec::new(); - stack.push(start_node); - while let Some(node) = stack.pop() { - visited.insert(node); - for peer in topology.get(&node).unwrap().iter() { - if !visited.contains(peer) { - stack.push(*peer); - } - } - } - visited + Ok(()) } diff --git a/mixnet-rs/dissemination/src/node.rs b/mixnet-rs/dissemination/src/node.rs index 1c2d46a..f0cd822 100644 --- a/mixnet-rs/dissemination/src/node.rs +++ b/mixnet-rs/dissemination/src/node.rs @@ -4,15 +4,13 @@ pub type NodeId = u16; pub type MessageId = u32; pub struct Node { - id: NodeId, queues: HashMap>, received_msgs: HashSet, } impl Node { - pub fn new(id: NodeId) -> Self { + pub fn new() -> Self { Node { - id, queues: HashMap::new(), received_msgs: HashSet::new(), } @@ -20,7 +18,6 @@ impl Node { pub fn connect(&mut self, peer_id: NodeId) { self.queues.entry(peer_id).or_default(); - tracing::info!("Node {} connected to {}", self.id, peer_id); } pub fn num_queues(&self) -> usize { @@ -29,23 +26,19 @@ impl Node { pub fn send(&mut self, msg: MessageId) { assert!(self.received_msgs.insert(msg)); - for (peer_id, queue) in self.queues.iter_mut() { + for (_, queue) in self.queues.iter_mut() { queue.push_back(msg); - tracing::info!("Node {} sent message {} to peer {}", self.id, msg, peer_id); } } pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool { let first_received = self.received_msgs.insert(msg); if first_received { - tracing::info!("Node {} received message {} from {}", self.id, msg, from); for (node_id, queue) in self.queues.iter_mut() { if *node_id != from { queue.push_back(msg); } } - } else { - tracing::info!("Node {} ignored message {} from {}", self.id, msg, from); } first_received }