diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs index 8c7374b..9297395 100644 --- a/mixnet-rs/dissemination/src/iteration.rs +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -6,39 +6,41 @@ use crate::{ node::{MessageId, Node, NodeId}, paramset::ParamSet, queue::QueueConfig, - topology::build_topology, + topology::{build_topology, Topology}, }; pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) { // Initialize nodes - let mut nodes: HashMap = HashMap::new(); + let mut nodes: Vec = Vec::new(); let mut queue_seed_rng = StdRng::seed_from_u64(seed); - for i in 0..paramset.num_nodes { - nodes.insert( - i, - Node::new(QueueConfig { - queue_type: paramset.queue_type, - seed: queue_seed_rng.next_u64(), - min_queue_size: paramset.min_queue_size, - }), - ); + for _ in 0..paramset.num_nodes { + nodes.push(Node::new(QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + })); } // Connect nodes let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed); save_topology(&topology, topology_path).unwrap(); - for (node_id, peers) in topology.iter() { + for (node_id, peers) in topology.iter().enumerate() { peers.iter().for_each(|peer_id| { - nodes.get_mut(node_id).unwrap().connect(*peer_id); + nodes[node_id].connect(*peer_id); }); } let sender_ids: Vec = (0..paramset.num_senders).collect(); + // Virtual discrete time let mut vtime: f32 = 0.0; + // Increase vtime according to the transmission rate let interval: f32 = 1.0 / paramset.transmission_rate as f32; + // To generate unique message IDs let mut next_msg_id: MessageId = 0; + // To keep track of when each message was sent and how many nodes received it let mut sent_msgs: HashMap = HashMap::new(); + // To keep track of how many messages have been disseminated to all nodes let mut num_disseminated_msgs = 0; let mut writer = csv::Writer::from_path(out_csv_path).unwrap(); @@ -50,8 +52,8 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology // Send new messages assert!(sent_msgs.len() % (paramset.num_senders as usize) == 0); if sent_msgs.len() / (paramset.num_senders as usize) < paramset.num_sent_msgs as usize { - for sender_id in sender_ids.iter() { - nodes.get_mut(sender_id).unwrap().send(next_msg_id); + for &sender_id in sender_ids.iter() { + nodes[sender_id as usize].send(next_msg_id); sent_msgs.insert(next_msg_id, (vtime, 1)); next_msg_id += 1; } @@ -59,10 +61,10 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology // Collect messages to relay let mut all_msgs_to_relay = Vec::new(); - for (node_id, node) in nodes.iter_mut() { + for (node_id, node) in nodes.iter_mut().enumerate() { let msgs_to_relay = node.read_queues(); msgs_to_relay.iter().for_each(|(receiver_id, msg)| { - all_msgs_to_relay.push((*receiver_id, *msg, *node_id)); + all_msgs_to_relay.push((*receiver_id, *msg, node_id as u16)); }); } @@ -70,7 +72,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology all_msgs_to_relay .into_iter() .for_each(|(receiver_id, msg, sender_id)| { - if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) { + if nodes[receiver_id as usize].receive(msg, sender_id) { let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap(); *num_received_nodes += 1; if *num_received_nodes == paramset.num_nodes { @@ -96,22 +98,16 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology } } -fn save_topology( - topology: &HashMap>, - topology_path: &str, -) -> Result<(), Box> { +fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box> { let mut wtr = csv::Writer::from_path(topology_path)?; wtr.write_record(["node", "num_peers", "peers"])?; - let mut sorted_keys: Vec<&u16> = topology.keys().collect(); - sorted_keys.sort(); - for &node in &sorted_keys { - let peers = topology.get(node).unwrap(); + for (node, peers) in topology.iter().enumerate() { let peers_str: Vec = peers.iter().map(|peer_id| peer_id.to_string()).collect(); wtr.write_record(&[ node.to_string(), peers.len().to_string(), - format!("\"[{}]\"", peers_str.join(",")), + format!("[{}]", peers_str.join(",")), ])?; } wtr.flush()?; diff --git a/mixnet-rs/dissemination/src/node.rs b/mixnet-rs/dissemination/src/node.rs index 46f693d..c6e6db9 100644 --- a/mixnet-rs/dissemination/src/node.rs +++ b/mixnet-rs/dissemination/src/node.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use crate::queue::{new_queue, Queue, QueueConfig}; @@ -7,7 +7,12 @@ pub type MessageId = u32; pub struct Node { queue_config: QueueConfig, - queues: HashMap>>, + // To have the deterministic result, we use Vec instead of HashMap. + // Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning. + // Instead, use `connected_peers` to build `queues` efficiently. + queues: Vec<(NodeId, Box>)>, + connected_peers: HashSet, + // A cache to avoid relaying the same message multiple times. received_msgs: HashSet, } @@ -15,19 +20,21 @@ impl Node { pub fn new(queue_config: QueueConfig) -> Self { Node { queue_config, - queues: HashMap::new(), + queues: Vec::new(), + connected_peers: HashSet::new(), received_msgs: HashSet::new(), } } pub fn connect(&mut self, peer_id: NodeId) { - self.queues - .entry(peer_id) - .or_insert(new_queue(&self.queue_config)); - } - - pub fn num_queues(&self) -> usize { - self.queues.len() + if self.connected_peers.insert(peer_id) { + let pos = self + .queues + .binary_search_by(|probe| probe.0.cmp(&peer_id)) + .unwrap_or_else(|pos| pos); + self.queues + .insert(pos, (peer_id, new_queue(&self.queue_config))); + } } pub fn send(&mut self, msg: MessageId) { diff --git a/mixnet-rs/dissemination/src/queue.rs b/mixnet-rs/dissemination/src/queue.rs index 2332cc7..d1e96a4 100644 --- a/mixnet-rs/dissemination/src/queue.rs +++ b/mixnet-rs/dissemination/src/queue.rs @@ -87,7 +87,7 @@ impl Queue for NonMixQueue { } struct MixQueue { - queue: Vec>, + queue: Vec>, // None element means noise rng: StdRng, } @@ -301,7 +301,11 @@ mod tests { #[test] fn test_non_mix_queue() { - let mut queue = new_queue(QueueType::NonMix, 0, 0); + let mut queue = new_queue(&QueueConfig { + queue_type: QueueType::NonMix, + seed: 0, + min_queue_size: 0, + }); // Check if None (noise) is returned when queue is empty assert_eq!(queue.pop(), None); @@ -335,7 +339,11 @@ mod tests { } fn test_mix_queue(queue_type: QueueType) { - let mut queue = new_queue(queue_type, 0, 4); + let mut queue = new_queue(&QueueConfig { + queue_type, + seed: 0, + min_queue_size: 4, + }); // Check if None (noise) is returned when queue is empty assert_eq!(queue.pop(), None); diff --git a/mixnet-rs/dissemination/src/topology.rs b/mixnet-rs/dissemination/src/topology.rs index 699ca22..d09e4e8 100644 --- a/mixnet-rs/dissemination/src/topology.rs +++ b/mixnet-rs/dissemination/src/topology.rs @@ -1,20 +1,18 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; use crate::node::NodeId; -pub fn build_topology( - num_nodes: u16, - peering_degree: u16, - seed: u64, -) -> HashMap> { +pub type Topology = Vec>; + +pub fn build_topology(num_nodes: u16, peering_degree: u16, seed: u64) -> Topology { let mut rng = StdRng::seed_from_u64(seed); loop { - let mut topology: HashMap> = HashMap::new(); - for node in 0..num_nodes { - topology.insert(node, HashSet::new()); + let mut topology: Vec> = Vec::new(); + for _ in 0..num_nodes { + topology.push(HashSet::new()); } for node in 0..num_nodes { @@ -22,43 +20,48 @@ pub fn build_topology( for other in (0..node).chain(node + 1..num_nodes) { // Check if the other node is not already connected to the current node // and the other node has not reached the peering degree. - if !topology.get(&node).unwrap().contains(&other) - && topology.get(&other).unwrap().len() < peering_degree as usize + if !topology[node as usize].contains(&other) + && topology[other as usize].len() < peering_degree as usize { others.push(other); } } // How many more connections the current node needs - let num_needs = peering_degree as usize - topology.get(&node).unwrap().len(); + let num_needs = peering_degree as usize - topology[node as usize].len(); // Smaple peers as many as possible and connect them to the current node let k = std::cmp::min(num_needs, others.len()); others.as_mut_slice().shuffle(&mut rng); others.into_iter().take(k).for_each(|peer| { - topology.get_mut(&node).unwrap().insert(peer); - topology.get_mut(&peer).unwrap().insert(node); + topology[node as usize].insert(peer); + topology[peer as usize].insert(node); }); } if are_all_nodes_connected(&topology) { - return topology; + let mut sorted_topology: Vec> = Vec::new(); + for peers in topology.iter() { + let mut sorted_peers: Vec = peers.iter().copied().collect(); + sorted_peers.sort(); + sorted_topology.push(sorted_peers); + } + return sorted_topology; } } } -fn are_all_nodes_connected(topology: &HashMap>) -> bool { - let start_node = topology.keys().next().unwrap(); - let visited = dfs(topology, *start_node); +fn are_all_nodes_connected(topology: &[HashSet]) -> bool { + let visited = dfs(topology, 0); visited.len() == topology.len() } -fn dfs(topology: &HashMap>, start_node: NodeId) -> HashSet { +fn dfs(topology: &[HashSet], start_node: NodeId) -> HashSet { let mut visited: HashSet = HashSet::new(); let mut stack: Vec = Vec::new(); stack.push(start_node); while let Some(node) = stack.pop() { visited.insert(node); - for peer in topology.get(&node).unwrap().iter() { + for peer in topology[node as usize].iter() { if !visited.contains(peer) { stack.push(*peer); }