From a7ed744783f482831fc5e6a255d7d05823070983 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sat, 17 Aug 2024 19:23:22 +0900 Subject: [PATCH] have msg_interval separate with transmission_interval --- mixnet-rs/dissemination/src/iteration.rs | 122 ++++++++++++++++------- 1 file changed, 86 insertions(+), 36 deletions(-) diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs index a7bd364..bdf3b6b 100644 --- a/mixnet-rs/dissemination/src/iteration.rs +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -9,8 +9,11 @@ use crate::{ topology::{build_topology, Topology}, }; +// An interval that the sender nodes send (schedule) new messages +const MSG_INTERVAL: f32 = 1.0; + pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) { - // Initialize nodes + // Initialize nodes (not connected with each other yet) let mut nodes: Vec = Vec::new(); let mut queue_seed_rng = StdRng::seed_from_u64(seed); for _ in 0..paramset.num_nodes { @@ -24,7 +27,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology )); } - // Connect nodes + // Build a random topology, and connect nodes with each other let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed); save_topology(&topology, topology_path).unwrap(); for (node_id, peers) in topology.iter().enumerate() { @@ -33,14 +36,13 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology }); } + // It's okay to choose the first `num_senders` nodes as senders + // because the topology is randomly generated. let sender_ids: Vec = (0..paramset.num_senders).collect(); - // Virtual discrete time - let mut vtime: f32 = 0.0; - // Increase vtime according to the transmission rate - let interval: f32 = 1.0 / paramset.transmission_rate as f32; // To generate unique message IDs let mut next_msg_id: MessageId = 0; + let total_num_msgs: u32 = paramset.num_senders as u32 * paramset.num_sent_msgs as u32; // To keep track of when each message was sent and how many nodes received it let mut message_tracker: HashMap = HashMap::new(); // To keep track of how many messages have been disseminated to all nodes @@ -51,33 +53,88 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology .write_record(["dissemination_time", "sent_time", "all_received_time"]) .unwrap(); + // Virtual discrete time + let mut vtime: f32; + // Transmission interval that each queue must release a message + let transmission_interval = 1.0 / paramset.transmission_rate as f32; + // Jump `vtime` to one of the following two vtimes. + // 1. The next time to send (schedule) a message. Increased by `MSG_INTERVAL`. + let mut next_messaging_vtime: f32 = 0.0; + // 2. The next time to release a message from each queue and relay them. Increased by `transmission_interval`. + let mut next_transmission_vtime: f32 = 0.0; loop { - // Send new messages - if next_msg_id < (paramset.num_senders * paramset.num_sent_msgs) as MessageId { - for &sender_id in sender_ids.iter() { - nodes[sender_id as usize].send(next_msg_id); - message_tracker.insert(next_msg_id, (vtime, 1)); - next_msg_id += 1; + // If there are still messages to be sent (scheduled), + // and if the next time to send a message is earlier than the next time to relay messages. + if next_msg_id < total_num_msgs && next_messaging_vtime <= next_transmission_vtime { + // Send new messages + vtime = next_messaging_vtime; + next_messaging_vtime += MSG_INTERVAL; + + send_messages( + vtime, + &sender_ids, + &mut nodes, + &mut next_msg_id, + &mut message_tracker, + ); + } else { + // Release a message from each queue and relay all of them + vtime = next_transmission_vtime; + next_transmission_vtime += transmission_interval; + + relay_messages( + vtime, + &mut nodes, + &mut message_tracker, + &mut num_disseminated_msgs, + &mut writer, + ); + + // Check if all messages have been disseminated to all nodes. + if num_disseminated_msgs == total_num_msgs as usize { + break; } } + } +} - // Collect messages to relay - let mut all_msgs_to_relay = Vec::new(); - for (node_id, node) in nodes.iter_mut().enumerate() { - let msgs_to_relay = node.read_queues(); - msgs_to_relay.iter().for_each(|(receiver_id, msg)| { - all_msgs_to_relay.push((*receiver_id, *msg, node_id as u16)); - }); - } +fn send_messages( + vtime: f32, + sender_ids: &[NodeId], + nodes: &mut [Node], + next_msg_id: &mut MessageId, + message_tracker: &mut HashMap, +) { + for &sender_id in sender_ids.iter() { + nodes[sender_id as usize].send(*next_msg_id); + message_tracker.insert(*next_msg_id, (vtime, 1)); + *next_msg_id += 1; + } +} - // Relay the messages - all_msgs_to_relay - .into_iter() - .for_each(|(receiver_id, msg, sender_id)| { - if nodes[receiver_id as usize].receive(msg, sender_id) { +fn relay_messages( + vtime: f32, + nodes: &mut [Node], + message_tracker: &mut HashMap, + num_disseminated_msgs: &mut usize, + writer: &mut csv::Writer, +) { + // Collect messages to relay + let mut all_msgs_to_relay: Vec> = Vec::new(); + for node in nodes.iter_mut() { + all_msgs_to_relay.push(node.read_queues()); + } + + // Relay the messages + all_msgs_to_relay + .into_iter() + .enumerate() + .for_each(|(sender_id, msgs_to_relay)| { + msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| { + if nodes[receiver_id as usize].receive(msg, sender_id as NodeId) { let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap(); *num_received_nodes += 1; - if *num_received_nodes == paramset.num_nodes { + if *num_received_nodes as usize == nodes.len() { let dissemination_time = vtime - *sent_time; writer .write_record(&[ @@ -86,20 +143,13 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology vtime.to_string(), ]) .unwrap(); - num_disseminated_msgs += 1; + *num_disseminated_msgs += 1; message_tracker.remove(&msg); } } - }); - - // Check if all messages have been disseminated to all nodes. - if num_disseminated_msgs == (paramset.num_senders * paramset.num_sent_msgs) as usize { - break; - } - - vtime += interval; - } + }) + }); } fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box> {