From df72a0183741afa198642c511ee1257b88ef0da5 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Sat, 17 Aug 2024 09:37:17 +0900 Subject: [PATCH] minimize mem usages --- mixnet-rs/dissemination/src/iteration.rs | 24 ++++++++++------- mixnet-rs/dissemination/src/node.rs | 33 +++++++++++++++++++----- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/mixnet-rs/dissemination/src/iteration.rs b/mixnet-rs/dissemination/src/iteration.rs index 9297395..a7bd364 100644 --- a/mixnet-rs/dissemination/src/iteration.rs +++ b/mixnet-rs/dissemination/src/iteration.rs @@ -14,11 +14,14 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology let mut nodes: Vec = Vec::new(); let mut queue_seed_rng = StdRng::seed_from_u64(seed); for _ in 0..paramset.num_nodes { - nodes.push(Node::new(QueueConfig { - queue_type: paramset.queue_type, - seed: queue_seed_rng.next_u64(), - min_queue_size: paramset.min_queue_size, - })); + nodes.push(Node::new( + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + paramset.peering_degree, + )); } // Connect nodes @@ -39,7 +42,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology // To generate unique message IDs let mut next_msg_id: MessageId = 0; // To keep track of when each message was sent and how many nodes received it - let mut sent_msgs: HashMap = HashMap::new(); + let mut message_tracker: HashMap = HashMap::new(); // To keep track of how many messages have been disseminated to all nodes let mut num_disseminated_msgs = 0; @@ -50,11 +53,10 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology loop { // Send new messages - assert!(sent_msgs.len() % (paramset.num_senders as usize) == 0); - if sent_msgs.len() / (paramset.num_senders as usize) < paramset.num_sent_msgs as usize { + if next_msg_id < (paramset.num_senders * paramset.num_sent_msgs) as MessageId { for &sender_id in sender_ids.iter() { nodes[sender_id as usize].send(next_msg_id); - sent_msgs.insert(next_msg_id, (vtime, 1)); + message_tracker.insert(next_msg_id, (vtime, 1)); next_msg_id += 1; } } @@ -73,7 +75,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology .into_iter() .for_each(|(receiver_id, msg, sender_id)| { if nodes[receiver_id as usize].receive(msg, sender_id) { - let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap(); + let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap(); *num_received_nodes += 1; if *num_received_nodes == paramset.num_nodes { let dissemination_time = vtime - *sent_time; @@ -85,6 +87,8 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology ]) .unwrap(); num_disseminated_msgs += 1; + + message_tracker.remove(&msg); } } }); diff --git a/mixnet-rs/dissemination/src/node.rs b/mixnet-rs/dissemination/src/node.rs index c6e6db9..5cfbca4 100644 --- a/mixnet-rs/dissemination/src/node.rs +++ b/mixnet-rs/dissemination/src/node.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use crate::queue::{new_queue, Queue, QueueConfig}; @@ -13,16 +13,18 @@ pub struct Node { queues: Vec<(NodeId, Box>)>, connected_peers: HashSet, // A cache to avoid relaying the same message multiple times. - received_msgs: HashSet, + received_msgs: HashMap, + peering_degree: u16, } impl Node { - pub fn new(queue_config: QueueConfig) -> Self { + pub fn new(queue_config: QueueConfig, peering_degree: u16) -> Self { Node { queue_config, queues: Vec::new(), connected_peers: HashSet::new(), - received_msgs: HashSet::new(), + received_msgs: HashMap::new(), + peering_degree, } } @@ -38,14 +40,14 @@ impl Node { } pub fn send(&mut self, msg: MessageId) { - assert!(self.received_msgs.insert(msg)); + assert!(self.check_and_update_cache(msg, true)); for (_, queue) in self.queues.iter_mut() { queue.push(msg); } } pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool { - let first_received = self.received_msgs.insert(msg); + let first_received = self.check_and_update_cache(msg, false); if first_received { for (node_id, queue) in self.queues.iter_mut() { if *node_id != from { @@ -65,4 +67,23 @@ impl Node { } msgs_to_relay } + + fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool { + let first_received = if let Some(count) = self.received_msgs.get_mut(&msg) { + *count += 1; + false + } else { + self.received_msgs.insert(msg, if sending { 0 } else { 1 }); + true + }; + + // If the message have been received from all connected peers, remove it from the cache + // because there is no possibility that the message will be received again. + if self.received_msgs.get(&msg).unwrap() == &self.peering_degree { + tracing::debug!("Remove message from cache: {}", msg); + self.received_msgs.remove(&msg); + } + + first_received + } }