have msg_interval separate with transmission_interval

This commit is contained in:
Youngjoon Lee 2024-08-17 19:23:22 +09:00
parent df72a01837
commit a7ed744783
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C

View File

@ -9,8 +9,11 @@ use crate::{
topology::{build_topology, Topology},
};
// An interval that the sender nodes send (schedule) new messages
const MSG_INTERVAL: f32 = 1.0;
pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) {
// Initialize nodes
// Initialize nodes (not connected with each other yet)
let mut nodes: Vec<Node> = Vec::new();
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
for _ in 0..paramset.num_nodes {
@ -24,7 +27,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
));
}
// Connect nodes
// Build a random topology, and connect nodes with each other
let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed);
save_topology(&topology, topology_path).unwrap();
for (node_id, peers) in topology.iter().enumerate() {
@ -33,14 +36,13 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
});
}
// It's okay to choose the first `num_senders` nodes as senders
// because the topology is randomly generated.
let sender_ids: Vec<NodeId> = (0..paramset.num_senders).collect();
// Virtual discrete time
let mut vtime: f32 = 0.0;
// Increase vtime according to the transmission rate
let interval: f32 = 1.0 / paramset.transmission_rate as f32;
// To generate unique message IDs
let mut next_msg_id: MessageId = 0;
let total_num_msgs: u32 = paramset.num_senders as u32 * paramset.num_sent_msgs as u32;
// To keep track of when each message was sent and how many nodes received it
let mut message_tracker: HashMap<MessageId, (f32, u16)> = HashMap::new();
// To keep track of how many messages have been disseminated to all nodes
@ -51,33 +53,88 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
.write_record(["dissemination_time", "sent_time", "all_received_time"])
.unwrap();
// Virtual discrete time
let mut vtime: f32;
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Jump `vtime` to one of the following two vtimes.
// 1. The next time to send (schedule) a message. Increased by `MSG_INTERVAL`.
let mut next_messaging_vtime: f32 = 0.0;
// 2. The next time to release a message from each queue and relay them. Increased by `transmission_interval`.
let mut next_transmission_vtime: f32 = 0.0;
loop {
// Send new messages
if next_msg_id < (paramset.num_senders * paramset.num_sent_msgs) as MessageId {
for &sender_id in sender_ids.iter() {
nodes[sender_id as usize].send(next_msg_id);
message_tracker.insert(next_msg_id, (vtime, 1));
next_msg_id += 1;
// If there are still messages to be sent (scheduled),
// and if the next time to send a message is earlier than the next time to relay messages.
if next_msg_id < total_num_msgs && next_messaging_vtime <= next_transmission_vtime {
// Send new messages
vtime = next_messaging_vtime;
next_messaging_vtime += MSG_INTERVAL;
send_messages(
vtime,
&sender_ids,
&mut nodes,
&mut next_msg_id,
&mut message_tracker,
);
} else {
// Release a message from each queue and relay all of them
vtime = next_transmission_vtime;
next_transmission_vtime += transmission_interval;
relay_messages(
vtime,
&mut nodes,
&mut message_tracker,
&mut num_disseminated_msgs,
&mut writer,
);
// Check if all messages have been disseminated to all nodes.
if num_disseminated_msgs == total_num_msgs as usize {
break;
}
}
}
}
// Collect messages to relay
let mut all_msgs_to_relay = Vec::new();
for (node_id, node) in nodes.iter_mut().enumerate() {
let msgs_to_relay = node.read_queues();
msgs_to_relay.iter().for_each(|(receiver_id, msg)| {
all_msgs_to_relay.push((*receiver_id, *msg, node_id as u16));
});
}
fn send_messages(
vtime: f32,
sender_ids: &[NodeId],
nodes: &mut [Node],
next_msg_id: &mut MessageId,
message_tracker: &mut HashMap<MessageId, (f32, u16)>,
) {
for &sender_id in sender_ids.iter() {
nodes[sender_id as usize].send(*next_msg_id);
message_tracker.insert(*next_msg_id, (vtime, 1));
*next_msg_id += 1;
}
}
// Relay the messages
all_msgs_to_relay
.into_iter()
.for_each(|(receiver_id, msg, sender_id)| {
if nodes[receiver_id as usize].receive(msg, sender_id) {
fn relay_messages(
vtime: f32,
nodes: &mut [Node],
message_tracker: &mut HashMap<MessageId, (f32, u16)>,
num_disseminated_msgs: &mut usize,
writer: &mut csv::Writer<std::fs::File>,
) {
// Collect messages to relay
let mut all_msgs_to_relay: Vec<Vec<(NodeId, MessageId)>> = Vec::new();
for node in nodes.iter_mut() {
all_msgs_to_relay.push(node.read_queues());
}
// Relay the messages
all_msgs_to_relay
.into_iter()
.enumerate()
.for_each(|(sender_id, msgs_to_relay)| {
msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| {
if nodes[receiver_id as usize].receive(msg, sender_id as NodeId) {
let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes == paramset.num_nodes {
if *num_received_nodes as usize == nodes.len() {
let dissemination_time = vtime - *sent_time;
writer
.write_record(&[
@ -86,20 +143,13 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
vtime.to_string(),
])
.unwrap();
num_disseminated_msgs += 1;
*num_disseminated_msgs += 1;
message_tracker.remove(&msg);
}
}
});
// Check if all messages have been disseminated to all nodes.
if num_disseminated_msgs == (paramset.num_senders * paramset.num_sent_msgs) as usize {
break;
}
vtime += interval;
}
})
});
}
fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box<dyn Error>> {