add session 3

This commit is contained in:
Youngjoon Lee 2024-08-18 12:46:42 +09:00
parent 0d9cabc329
commit 8e06695aa4
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
5 changed files with 175 additions and 51 deletions

View File

@ -17,25 +17,32 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
// Initialize nodes (not connected with each other yet)
let mut nodes: Vec<Node> = Vec::new();
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
for _ in 0..paramset.num_nodes {
let peering_degrees = paramset.gen_peering_degrees(seed);
tracing::debug!("PeeringDegrees initialized.");
for node_id in 0..paramset.num_nodes {
nodes.push(Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
peering_degrees[node_id as usize],
));
}
tracing::debug!("Nodes initialized.");
// Build a random topology, and connect nodes with each other
let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed);
let topology = build_topology(paramset.num_nodes, &peering_degrees, seed);
tracing::debug!("Topology built.");
save_topology(&topology, topology_path).unwrap();
tracing::debug!("Topology saved.");
for (node_id, peers) in topology.iter().enumerate() {
peers.iter().for_each(|peer_id| {
nodes[node_id].connect(*peer_id);
});
}
tracing::debug!("Nodes connected");
let mut sender_selector = SenderSelector::new(
(0..paramset.num_nodes).collect(),
@ -46,8 +53,8 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
// To generate unique message IDs
let mut next_msg_id: MessageId = 0;
let total_num_msgs: u32 = paramset.num_senders as u32 * paramset.num_sent_msgs as u32;
// To keep track of when each message was sent and how many nodes received it
let total_num_msgs = paramset.total_num_messages();
// msg_id -> (sent_vtime, num_received_nodes)
let mut message_tracker: FxHashMap<MessageId, (f32, u16)> = FxHashMap::default();
// To keep track of how many messages have been disseminated to all nodes
let mut num_disseminated_msgs = 0;
@ -135,8 +142,9 @@ fn relay_messages(
.into_iter()
.enumerate()
.for_each(|(sender_id, msgs_to_relay)| {
let sender_id: NodeId = sender_id.try_into().unwrap();
msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| {
if nodes[receiver_id as usize].receive(msg, sender_id as NodeId) {
if nodes[receiver_id as usize].receive(msg, sender_id) {
let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes as usize == nodes.len() {
@ -163,6 +171,7 @@ fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box<dyn
wtr.write_record(["node", "num_peers", "peers"])?;
for (node, peers) in topology.iter().enumerate() {
let node: NodeId = node.try_into().unwrap();
let peers_str: Vec<String> = peers.iter().map(|peer_id| peer_id.to_string()).collect();
wtr.write_record(&[
node.to_string(),
@ -176,7 +185,7 @@ fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box<dyn
struct SenderSelector {
candidates: Vec<NodeId>,
num_senders: usize,
num_senders: NodeId,
random_senders_every_time: bool,
rng: StdRng,
}
@ -184,14 +193,14 @@ struct SenderSelector {
impl SenderSelector {
fn new(
candidates: Vec<NodeId>,
num_senders: u16,
num_senders: u32,
random_senders_every_time: bool,
seed: u64,
) -> Self {
assert!(candidates.len() >= num_senders as usize);
Self {
candidates,
num_senders: num_senders as usize,
num_senders,
random_senders_every_time,
rng: StdRng::seed_from_u64(seed),
}
@ -201,10 +210,10 @@ impl SenderSelector {
if !self.random_senders_every_time {
// It's okay to pick the first `num_senders` nodes
// because the topology is randomly generated.
&self.candidates[..self.num_senders]
&self.candidates[..self.num_senders as usize]
} else {
self.candidates.shuffle(&mut self.rng);
&self.candidates[..self.num_senders]
&self.candidates[..self.num_senders as usize]
}
}
}

View File

@ -86,7 +86,7 @@ fn main() {
let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv");
let topology_path = format!("{paramset_dir}/topology_{i}.csv");
run_iteration(*paramset, i as u64, &out_csv_path, &topology_path);
run_iteration(paramset.clone(), i as u64, &out_csv_path, &topology_path);
let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_");
std::fs::rename(&out_csv_path, &new_out_csv_path)

View File

@ -2,7 +2,7 @@ use rustc_hash::{FxHashMap, FxHashSet};
use crate::queue::{new_queue, Queue, QueueConfig};
pub type NodeId = u16;
pub type NodeId = u32;
pub type MessageId = u32;
pub struct Node {
@ -13,12 +13,12 @@ pub struct Node {
queues: Vec<(NodeId, Box<dyn Queue<MessageId>>)>,
connected_peers: FxHashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: FxHashMap<MessageId, u16>,
peering_degree: u16,
received_msgs: FxHashMap<MessageId, u32>,
peering_degree: u32,
}
impl Node {
pub fn new(queue_config: QueueConfig, peering_degree: u16) -> Self {
pub fn new(queue_config: QueueConfig, peering_degree: u32) -> Self {
Node {
queue_config,
queues: Vec::new(),

View File

@ -1,3 +1,5 @@
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use crate::queue::QueueType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -31,6 +33,7 @@ pub enum SessionId {
Session1 = 1,
Session2 = 2,
Session2_1 = 21,
Session3 = 3,
}
impl std::str::FromStr for SessionId {
@ -41,6 +44,7 @@ impl std::str::FromStr for SessionId {
"1" | "Session1" => Ok(SessionId::Session1),
"2" | "Session2" => Ok(SessionId::Session2),
"2.1" | "Session21" => Ok(SessionId::Session2_1),
"3" | "Session3" => Ok(SessionId::Session3),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
@ -59,20 +63,24 @@ pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"num_iterations",
];
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq)]
pub struct ParamSet {
pub id: u16,
pub num_nodes: u16,
pub peering_degree: u16,
pub num_nodes: u32,
pub peering_degree_rates: PeeringDegreeRates,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_sent_msgs: u16,
pub num_senders: u16,
pub num_sent_msgs: u32,
pub num_senders: u32,
pub random_senders_every_time: bool,
pub queue_type: QueueType,
pub num_iterations: u16,
pub num_iterations: usize,
}
// peering_degree -> rate
// Use Vec instead of HashMap to avoid unexpected undeterministic behavior
type PeeringDegreeRates = Vec<(u32, f32)>;
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
@ -83,16 +91,29 @@ impl ParamSet {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
SessionId::Session2 => Self::new_session2_paramsets(exp_id, queue_type),
SessionId::Session2_1 => Self::new_session2_1_paramsets(exp_id, queue_type),
SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type),
}
}
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[20, 40, 80] {
let peering_degree_list = &[num_nodes / 5, num_nodes / 4, num_nodes / 2];
let min_queue_size_list = &[num_nodes / 2, num_nodes, num_nodes * 2];
let transmission_rate_list = &[num_nodes / 2, num_nodes, num_nodes * 2];
for &num_nodes in &[20u32, 40u32, 80u32] {
let peering_degrees_list = &[
vec![(num_nodes.checked_div(5).unwrap(), 1.0)],
vec![(num_nodes.checked_div(4).unwrap(), 1.0)],
vec![(num_nodes.checked_div(2).unwrap(), 1.0)],
];
let min_queue_size_list = &[
num_nodes.checked_div(2).unwrap().try_into().unwrap(),
num_nodes.try_into().unwrap(),
num_nodes.checked_mul(2).unwrap().try_into().unwrap(),
];
let transmission_rate_list = &[
num_nodes.checked_div(2).unwrap().try_into().unwrap(),
num_nodes.try_into().unwrap(),
num_nodes.checked_mul(2).unwrap().try_into().unwrap(),
];
let num_sent_msgs_list = |_| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2
@ -104,16 +125,20 @@ impl ParamSet {
ExperimentId::Experiment3
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![num_nodes / 10, num_nodes / 5, num_nodes / 2]
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
let num_iterations = num_nodes / 2;
let num_iterations = num_nodes.checked_div(2).unwrap().try_into().unwrap();
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
peering_degree_list,
peering_degrees_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
@ -131,16 +156,20 @@ impl ParamSet {
fn new_session2_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[100, 1000, 10000] {
let peering_degree_list = &[4, 8, 16];
for &num_nodes in &[100u32, 1000u32, 10000u32] {
let peering_degrees_list = &[vec![(4, 1.0)], vec![(8, 1.0)], vec![(16, 1.0)]];
let min_queue_size_list = &[10, 50, 100];
let transmission_rate_list = &[1, 10, 100];
let num_sent_msgs_list = |min_queue_size| match exp_id {
let num_sent_msgs_list = |min_queue_size: u16| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![min_queue_size / 2, min_queue_size, min_queue_size * 2]
vec![
min_queue_size.checked_div(2).unwrap().into(),
min_queue_size.into(),
min_queue_size.checked_mul(2).unwrap().into(),
]
}
};
let num_senders_list = match exp_id {
@ -148,7 +177,11 @@ impl ParamSet {
ExperimentId::Experiment3
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![num_nodes / 10, num_nodes / 5, num_nodes / 2]
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
@ -157,7 +190,7 @@ impl ParamSet {
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
peering_degree_list,
peering_degrees_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
@ -175,8 +208,8 @@ impl ParamSet {
fn new_session2_1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[20, 200, 2000] {
let peering_degree_list = &[4, 6, 8];
for &num_nodes in &[20u32, 200u32, 2000u32] {
let peering_degrees_list = &[vec![(4, 1.0)], vec![(6, 1.0)], vec![(8, 1.0)]];
let min_queue_size_list = &[10, 50, 100];
let transmission_rate_list = &[1];
let num_sent_msgs_list = |_| match exp_id {
@ -190,7 +223,11 @@ impl ParamSet {
ExperimentId::Experiment3
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![num_nodes / 10, num_nodes / 5, num_nodes / 2]
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
@ -199,7 +236,7 @@ impl ParamSet {
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
peering_degree_list,
peering_degrees_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
@ -214,22 +251,67 @@ impl ParamSet {
paramsets
}
fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let start_id: u16 = 1;
let num_nodes: u32 = 100000;
let peering_degrees = vec![(4, 0.87), (129, 0.123), (500, 0.07)];
let min_queue_size_list = &[10, 50, 100];
let transmission_rate_list = &[1];
let num_sent_msgs_list = |min_queue_size: u16| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2 | ExperimentId::Experiment4 | ExperimentId::Experiment5 => {
vec![
min_queue_size.checked_div(2).unwrap().into(),
min_queue_size.into(),
min_queue_size.checked_mul(2).unwrap().into(),
]
}
};
let num_senders_list = match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3 | ExperimentId::Experiment4 | ExperimentId::Experiment5 => {
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
let num_iterations = 100;
let (paramsets, _) = Self::new_paramsets(
start_id,
num_nodes,
&[peering_degrees],
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
num_senders_list.as_slice(),
random_senders_every_time,
queue_type,
num_iterations,
);
paramsets
}
#[allow(clippy::too_many_arguments)]
fn new_paramsets(
start_id: u16,
num_nodes: u16,
peering_degree_list: &[u16],
num_nodes: u32,
peering_degrees_list: &[PeeringDegreeRates],
min_queue_size_list: &[u16],
transmission_rate_list: &[u16],
num_sent_msgs_list: impl Fn(u16) -> Vec<u16>,
num_senders_list: &[u16],
num_sent_msgs_list: impl Fn(u16) -> Vec<u32>,
num_senders_list: &[u32],
random_senders_every_time: bool,
queue_type: QueueType,
num_iterations: u16,
num_iterations: usize,
) -> (Vec<ParamSet>, u16) {
let mut id = start_id;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &peering_degree in peering_degree_list {
for peering_degrees in peering_degrees_list {
for &min_queue_size in min_queue_size_list {
for &transmission_rate in transmission_rate_list {
for &num_sent_msgs in num_sent_msgs_list(min_queue_size).iter() {
@ -243,7 +325,7 @@ impl ParamSet {
paramsets.push(ParamSet {
id,
num_nodes,
peering_degree,
peering_degree_rates: peering_degrees.clone(),
min_queue_size,
transmission_rate,
num_sent_msgs,
@ -270,11 +352,21 @@ impl ParamSet {
)
}
pub fn total_num_messages(&self) -> u32 {
self.num_sent_msgs.checked_mul(self.num_senders).unwrap()
}
pub fn as_csv_record(&self) -> Vec<String> {
let peering_degrees = self
.peering_degree_rates
.iter()
.map(|(degree, rate)| format!("({degree}:{rate})"))
.collect::<Vec<String>>()
.join(",");
vec![
self.id.to_string(),
self.num_nodes.to_string(),
self.peering_degree.to_string(),
format!("[{peering_degrees}]"),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_sent_msgs.to_string(),
@ -284,6 +376,20 @@ impl ParamSet {
self.num_iterations.to_string(),
]
}
pub fn gen_peering_degrees(&self, seed: u64) -> Vec<u32> {
let mut vec = Vec::with_capacity(self.num_nodes as usize);
self.peering_degree_rates.iter().for_each(|(degree, rate)| {
let num_nodes = std::cmp::min(
(self.num_nodes as f32 * rate).round() as u32,
self.num_nodes - vec.len() as u32,
);
vec.extend(std::iter::repeat(*degree).take(num_nodes as usize));
});
assert_eq!(vec.len(), self.num_nodes as usize);
vec.shuffle(&mut StdRng::seed_from_u64(seed));
vec
}
}
#[cfg(test)]
@ -342,6 +448,10 @@ mod tests {
(ExperimentId::Experiment5, SessionId::Session2_1),
3u32.pow(4),
),
(
(ExperimentId::Experiment5, SessionId::Session3),
3u32.pow(3),
),
];
for queue_type in QueueType::iter() {
@ -355,7 +465,10 @@ mod tests {
assert_eq!(paramsets.len(), expected_cnt as usize);
// Check if all parameter sets are unique
let unique_paramsets: HashSet<ParamSet> = paramsets.clone().into_iter().collect();
let unique_paramsets: HashSet<Vec<String>> = paramsets
.iter()
.map(|paramset| paramset.as_csv_record())
.collect();
assert_eq!(unique_paramsets.len(), paramsets.len());
// Check if paramset IDs are correct.
@ -382,6 +495,7 @@ mod tests {
(ExperimentId::Experiment1, SessionId::Session2_1),
(ExperimentId::Experiment4, SessionId::Session2_1),
(ExperimentId::Experiment5, SessionId::Session2_1),
(ExperimentId::Experiment5, SessionId::Session3),
];
for (exp_id, session_id) in cases.into_iter() {

View File

@ -6,7 +6,8 @@ use crate::node::NodeId;
pub type Topology = Vec<Vec<NodeId>>;
pub fn build_topology(num_nodes: u16, peering_degree: u16, seed: u64) -> Topology {
pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology {
assert_eq!(num_nodes as usize, peering_degrees.len());
let mut rng = StdRng::seed_from_u64(seed);
loop {
@ -21,14 +22,14 @@ pub fn build_topology(num_nodes: u16, peering_degree: u16, seed: u64) -> Topolog
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
if !topology[node as usize].contains(&other)
&& topology[other as usize].len() < peering_degree as usize
&& topology[other as usize].len() < peering_degrees[other as usize] as usize
{
others.push(other);
}
}
// How many more connections the current node needs
let num_needs = peering_degree as usize - topology[node as usize].len();
let num_needs = peering_degrees[node as usize] as usize - topology[node as usize].len();
// Smaple peers as many as possible and connect them to the current node
let k = std::cmp::min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);