minimize mem usages

This commit is contained in:
Youngjoon Lee 2024-08-17 09:37:17 +09:00
parent c60bde5a35
commit df72a01837
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
2 changed files with 41 additions and 16 deletions

View File

@ -14,11 +14,14 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
let mut nodes: Vec<Node> = Vec::new();
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
for _ in 0..paramset.num_nodes {
nodes.push(Node::new(QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
}));
nodes.push(Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
));
}
// Connect nodes
@ -39,7 +42,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
// To generate unique message IDs
let mut next_msg_id: MessageId = 0;
// To keep track of when each message was sent and how many nodes received it
let mut sent_msgs: HashMap<MessageId, (f32, u16)> = HashMap::new();
let mut message_tracker: HashMap<MessageId, (f32, u16)> = HashMap::new();
// To keep track of how many messages have been disseminated to all nodes
let mut num_disseminated_msgs = 0;
@ -50,11 +53,10 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
loop {
// Send new messages
assert!(sent_msgs.len() % (paramset.num_senders as usize) == 0);
if sent_msgs.len() / (paramset.num_senders as usize) < paramset.num_sent_msgs as usize {
if next_msg_id < (paramset.num_senders * paramset.num_sent_msgs) as MessageId {
for &sender_id in sender_ids.iter() {
nodes[sender_id as usize].send(next_msg_id);
sent_msgs.insert(next_msg_id, (vtime, 1));
message_tracker.insert(next_msg_id, (vtime, 1));
next_msg_id += 1;
}
}
@ -73,7 +75,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
.into_iter()
.for_each(|(receiver_id, msg, sender_id)| {
if nodes[receiver_id as usize].receive(msg, sender_id) {
let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap();
let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes == paramset.num_nodes {
let dissemination_time = vtime - *sent_time;
@ -85,6 +87,8 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
])
.unwrap();
num_disseminated_msgs += 1;
message_tracker.remove(&msg);
}
}
});

View File

@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use crate::queue::{new_queue, Queue, QueueConfig};
@ -13,16 +13,18 @@ pub struct Node {
queues: Vec<(NodeId, Box<dyn Queue<MessageId>>)>,
connected_peers: HashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: HashSet<MessageId>,
received_msgs: HashMap<MessageId, u16>,
peering_degree: u16,
}
impl Node {
pub fn new(queue_config: QueueConfig) -> Self {
pub fn new(queue_config: QueueConfig, peering_degree: u16) -> Self {
Node {
queue_config,
queues: Vec::new(),
connected_peers: HashSet::new(),
received_msgs: HashSet::new(),
received_msgs: HashMap::new(),
peering_degree,
}
}
@ -38,14 +40,14 @@ impl Node {
}
pub fn send(&mut self, msg: MessageId) {
assert!(self.received_msgs.insert(msg));
assert!(self.check_and_update_cache(msg, true));
for (_, queue) in self.queues.iter_mut() {
queue.push(msg);
}
}
pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool {
let first_received = self.received_msgs.insert(msg);
let first_received = self.check_and_update_cache(msg, false);
if first_received {
for (node_id, queue) in self.queues.iter_mut() {
if *node_id != from {
@ -65,4 +67,23 @@ impl Node {
}
msgs_to_relay
}
fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool {
let first_received = if let Some(count) = self.received_msgs.get_mut(&msg) {
*count += 1;
false
} else {
self.received_msgs.insert(msg, if sending { 0 } else { 1 });
true
};
// If the message have been received from all connected peers, remove it from the cache
// because there is no possibility that the message will be received again.
if self.received_msgs.get(&msg).unwrap() == &self.peering_degree {
tracing::debug!("Remove message from cache: {}", msg);
self.received_msgs.remove(&msg);
}
first_received
}
}