paramset, args, csv

This commit is contained in:
Youngjoon Lee 2024-08-17 00:24:26 +09:00
parent bd00f9dd3a
commit c7b83c813d
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
3 changed files with 100 additions and 154 deletions

View File

@ -4,6 +4,11 @@ version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.38"
clap = { version = "4.5.16", features = ["derive"] }
csv = "1.3.0"
rand = "0.8.5"
strum = "0.26.3"
strum_macros = "0.26.4"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

View File

@ -1,164 +1,112 @@
use rand::seq::SliceRandom;
use rand::{rngs::StdRng, SeedableRng};
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::f32::consts::PI;
use tracing::info;
use chrono::Utc;
use clap::Parser;
use std::{error::Error, path::Path, time::SystemTime};
use node::{MessageId, Node, NodeId};
use iteration::run_iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
use queue::QueueType;
mod iteration;
mod node;
mod paramset;
mod queue;
mod topology;
#[derive(Debug, Parser)]
#[command(name = "Message Dessemination Time Measurement")]
struct Args {
#[arg(short, long)]
exp_id: ExperimentId,
#[arg(short, long)]
session_id: SessionId,
#[arg(short, long)]
queue_type: QueueType,
#[arg(short, long)]
outdir: String,
}
fn main() {
tracing_subscriber::fmt::init();
let seed: u64 = 0;
let gtr: f32 = 10.0;
let num_nodes: u16 = 10;
let peering_degree: u16 = 2;
let num_senders: u16 = 1;
let num_sending_msgs: u16 = 3;
assert!(num_nodes >= 2);
assert!(num_senders <= num_nodes);
let args = Args::parse();
tracing::info!("Arguments: {:?}", args);
let Args {
exp_id,
session_id,
queue_type,
outdir,
} = args;
// Initialize nodes
let mut nodes: HashMap<NodeId, Node> = HashMap::new();
for i in 0..num_nodes {
nodes.insert(i, Node::new(i));
}
// Create a directory and initialize a CSV file only with a header
assert!(
Path::new(&outdir).is_dir(),
"Output directory does not exist: {outdir}"
);
let subdir = format!(
"__WIP__dissemination_e{}s{}_{:?}_{}___DUR__",
exp_id as u8,
session_id as u8,
queue_type,
Utc::now().to_rfc3339()
);
std::fs::create_dir_all(&format!("{outdir}/{subdir}")).unwrap();
// Connect nodes
let topology = build_topology(num_nodes, peering_degree, seed);
for (node_id, peers) in topology.iter() {
peers.iter().for_each(|peer_id| {
nodes.get_mut(node_id).unwrap().connect(*peer_id);
});
}
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
let sender_ids: Vec<NodeId> = (0..num_senders).collect();
let session_start_time = SystemTime::now();
let mut vtime: f32 = 0.0;
let interval: f32 = 1.0 / gtr;
let mut next_msg_id: MessageId = 0;
let mut sent_msgs: HashMap<MessageId, (f32, u16)> = HashMap::new();
let mut disseminated_msgs: HashMap<MessageId, f32> = HashMap::new();
loop {
// Send new messages
assert!(sent_msgs.len() % (num_senders as usize) == 0);
if sent_msgs.len() / (num_senders as usize) < num_sending_msgs as usize {
for sender_id in sender_ids.iter() {
nodes.get_mut(sender_id).unwrap().send(next_msg_id);
sent_msgs.insert(next_msg_id, (vtime, 1));
next_msg_id += 1;
}
for paramset in paramsets {
let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id);
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
save_paramset_info(&paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
for i in 0..paramset.num_iterations {
let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv");
let topology_path = format!("{paramset_dir}/topology_{i}.csv");
run_iteration(paramset, i as u64, &out_csv_path, &topology_path);
let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_");
std::fs::rename(&out_csv_path, &new_out_csv_path)
.expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}");
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
}
// Collect messages to relay
let mut all_msgs_to_relay = Vec::new();
for (node_id, node) in nodes.iter_mut() {
let msgs_to_relay = node.read_queues();
msgs_to_relay.iter().for_each(|(receiver_id, msg)| {
all_msgs_to_relay.push((*receiver_id, *msg, *node_id));
});
}
// Relay the messages
all_msgs_to_relay
.into_iter()
.for_each(|(receiver_id, msg, sender_id)| {
if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) {
let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes == num_nodes {
assert!(!disseminated_msgs.contains_key(&msg));
disseminated_msgs.insert(msg, vtime - *sent_time);
}
}
});
// Check if all messages have been disseminated to all nodes.
if disseminated_msgs.len() == (num_senders * num_sending_msgs) as usize {
info!(
"vtime:{vtime}: All {} messages have been disseminated to all nodes. Exiting...",
disseminated_msgs.len()
);
for (msg, latency) in disseminated_msgs.iter() {
info!(
"Message {} took {} time units to disseminate.",
msg, latency
);
}
break;
} else {
info!(
"vtime:{vtime}: {} messages have been disseminated to all nodes.",
disseminated_msgs.len()
);
}
vtime += interval;
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
std::fs::rename(&paramset_dir, &new_paramset_dir)
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}");
tracing::info!("ParamSet:{} completed.", paramset.id);
}
let session_duration = SystemTime::now()
.duration_since(session_start_time)
.unwrap();
// Replace "__WIP__" and "__DUR__" in the subdir string
let new_subdir = subdir
.replace("__WIP__", "")
.replace("__DUR__", &format!("{:?}", session_duration));
let old_path = format!("{}/{}", outdir, subdir);
let new_path = format!("{}/{}", outdir, new_subdir);
assert!(
!Path::new(&new_path).exists(),
"The new directory already exists: {new_path}"
);
std::fs::rename(&old_path, &new_path)
.expect("Failed to rename the directory: {old_path} -> {new_path}");
}
fn build_topology(
num_nodes: u16,
peering_degree: u16,
seed: u64,
) -> HashMap<NodeId, HashSet<NodeId>> {
let mut rng = StdRng::seed_from_u64(seed);
fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box<dyn Error>> {
// Assert that the file does not already exist
assert!(
!Path::new(path).exists(),
"File already exists at path: {path}",
);
loop {
let mut topology: HashMap<NodeId, HashSet<NodeId>> = HashMap::new();
for node in 0..num_nodes {
topology.insert(node, HashSet::new());
}
let mut wtr = csv::Writer::from_path(path)?;
wtr.write_record(PARAMSET_CSV_COLUMNS)?;
wtr.write_record(paramset.as_csv_record())?;
wtr.flush()?;
for node in 0..num_nodes {
let mut others: Vec<NodeId> = Vec::new();
for other in (0..node).chain(node + 1..num_nodes) {
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
if !topology.get(&node).unwrap().contains(&other)
&& topology.get(&other).unwrap().len() < peering_degree as usize
{
others.push(other);
}
}
// How many more connections the current node needs
let num_needs = peering_degree as usize - topology.get(&node).unwrap().len();
// Smaple peers as many as possible and connect them to the current node
let k = min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);
others.into_iter().take(k).for_each(|peer| {
topology.get_mut(&node).unwrap().insert(peer);
topology.get_mut(&peer).unwrap().insert(node);
});
}
if are_all_nodes_connected(&topology) {
return topology;
}
}
}
fn are_all_nodes_connected(topology: &HashMap<NodeId, HashSet<NodeId>>) -> bool {
let start_node = topology.keys().next().unwrap();
let visited = dfs(topology, *start_node);
visited.len() == topology.len()
}
fn dfs(topology: &HashMap<NodeId, HashSet<NodeId>>, start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node);
while let Some(node) = stack.pop() {
visited.insert(node);
for peer in topology.get(&node).unwrap().iter() {
if !visited.contains(peer) {
stack.push(*peer);
}
}
}
visited
Ok(())
}

View File

@ -4,15 +4,13 @@ pub type NodeId = u16;
pub type MessageId = u32;
pub struct Node {
id: NodeId,
queues: HashMap<NodeId, VecDeque<MessageId>>,
received_msgs: HashSet<MessageId>,
}
impl Node {
pub fn new(id: NodeId) -> Self {
pub fn new() -> Self {
Node {
id,
queues: HashMap::new(),
received_msgs: HashSet::new(),
}
@ -20,7 +18,6 @@ impl Node {
pub fn connect(&mut self, peer_id: NodeId) {
self.queues.entry(peer_id).or_default();
tracing::info!("Node {} connected to {}", self.id, peer_id);
}
pub fn num_queues(&self) -> usize {
@ -29,23 +26,19 @@ impl Node {
pub fn send(&mut self, msg: MessageId) {
assert!(self.received_msgs.insert(msg));
for (peer_id, queue) in self.queues.iter_mut() {
for (_, queue) in self.queues.iter_mut() {
queue.push_back(msg);
tracing::info!("Node {} sent message {} to peer {}", self.id, msg, peer_id);
}
}
pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool {
let first_received = self.received_msgs.insert(msg);
if first_received {
tracing::info!("Node {} received message {} from {}", self.id, msg, from);
for (node_id, queue) in self.queues.iter_mut() {
if *node_id != from {
queue.push_back(msg);
}
}
} else {
tracing::info!("Node {} ignored message {} from {}", self.id, msg, from);
}
first_received
}