do not use HashMap to make the result 100% deterministic

This commit is contained in:
Youngjoon Lee 2024-08-17 08:39:03 +09:00
parent cd35121d5e
commit 70d85a591f
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 74 additions and 60 deletions

View File

@ -6,39 +6,41 @@ use crate::{
node::{MessageId, Node, NodeId},
paramset::ParamSet,
queue::QueueConfig,
topology::build_topology,
topology::{build_topology, Topology},
};
pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) {
// Initialize nodes
let mut nodes: HashMap<NodeId, Node> = HashMap::new();
let mut nodes: Vec<Node> = Vec::new();
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
for i in 0..paramset.num_nodes {
nodes.insert(
i,
Node::new(QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
}),
);
for _ in 0..paramset.num_nodes {
nodes.push(Node::new(QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
}));
}
// Connect nodes
let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed);
save_topology(&topology, topology_path).unwrap();
for (node_id, peers) in topology.iter() {
for (node_id, peers) in topology.iter().enumerate() {
peers.iter().for_each(|peer_id| {
nodes.get_mut(node_id).unwrap().connect(*peer_id);
nodes[node_id].connect(*peer_id);
});
}
let sender_ids: Vec<NodeId> = (0..paramset.num_senders).collect();
// Virtual discrete time
let mut vtime: f32 = 0.0;
// Increase vtime according to the transmission rate
let interval: f32 = 1.0 / paramset.transmission_rate as f32;
// To generate unique message IDs
let mut next_msg_id: MessageId = 0;
// To keep track of when each message was sent and how many nodes received it
let mut sent_msgs: HashMap<MessageId, (f32, u16)> = HashMap::new();
// To keep track of how many messages have been disseminated to all nodes
let mut num_disseminated_msgs = 0;
let mut writer = csv::Writer::from_path(out_csv_path).unwrap();
@ -50,8 +52,8 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
// Send new messages
assert!(sent_msgs.len() % (paramset.num_senders as usize) == 0);
if sent_msgs.len() / (paramset.num_senders as usize) < paramset.num_sent_msgs as usize {
for sender_id in sender_ids.iter() {
nodes.get_mut(sender_id).unwrap().send(next_msg_id);
for &sender_id in sender_ids.iter() {
nodes[sender_id as usize].send(next_msg_id);
sent_msgs.insert(next_msg_id, (vtime, 1));
next_msg_id += 1;
}
@ -59,10 +61,10 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
// Collect messages to relay
let mut all_msgs_to_relay = Vec::new();
for (node_id, node) in nodes.iter_mut() {
for (node_id, node) in nodes.iter_mut().enumerate() {
let msgs_to_relay = node.read_queues();
msgs_to_relay.iter().for_each(|(receiver_id, msg)| {
all_msgs_to_relay.push((*receiver_id, *msg, *node_id));
all_msgs_to_relay.push((*receiver_id, *msg, node_id as u16));
});
}
@ -70,7 +72,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
all_msgs_to_relay
.into_iter()
.for_each(|(receiver_id, msg, sender_id)| {
if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) {
if nodes[receiver_id as usize].receive(msg, sender_id) {
let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes == paramset.num_nodes {
@ -96,22 +98,16 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
}
}
fn save_topology(
topology: &HashMap<u16, std::collections::HashSet<u16>>,
topology_path: &str,
) -> Result<(), Box<dyn Error>> {
fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box<dyn Error>> {
let mut wtr = csv::Writer::from_path(topology_path)?;
wtr.write_record(["node", "num_peers", "peers"])?;
let mut sorted_keys: Vec<&u16> = topology.keys().collect();
sorted_keys.sort();
for &node in &sorted_keys {
let peers = topology.get(node).unwrap();
for (node, peers) in topology.iter().enumerate() {
let peers_str: Vec<String> = peers.iter().map(|peer_id| peer_id.to_string()).collect();
wtr.write_record(&[
node.to_string(),
peers.len().to_string(),
format!("\"[{}]\"", peers_str.join(",")),
format!("[{}]", peers_str.join(",")),
])?;
}
wtr.flush()?;

View File

@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use crate::queue::{new_queue, Queue, QueueConfig};
@ -7,7 +7,12 @@ pub type MessageId = u32;
pub struct Node {
queue_config: QueueConfig,
queues: HashMap<NodeId, Box<dyn Queue<MessageId>>>,
// To have the deterministic result, we use Vec instead of HashMap.
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
// Instead, use `connected_peers` to build `queues` efficiently.
queues: Vec<(NodeId, Box<dyn Queue<MessageId>>)>,
connected_peers: HashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: HashSet<MessageId>,
}
@ -15,19 +20,21 @@ impl Node {
pub fn new(queue_config: QueueConfig) -> Self {
Node {
queue_config,
queues: HashMap::new(),
queues: Vec::new(),
connected_peers: HashSet::new(),
received_msgs: HashSet::new(),
}
}
pub fn connect(&mut self, peer_id: NodeId) {
self.queues
.entry(peer_id)
.or_insert(new_queue(&self.queue_config));
}
pub fn num_queues(&self) -> usize {
self.queues.len()
if self.connected_peers.insert(peer_id) {
let pos = self
.queues
.binary_search_by(|probe| probe.0.cmp(&peer_id))
.unwrap_or_else(|pos| pos);
self.queues
.insert(pos, (peer_id, new_queue(&self.queue_config)));
}
}
pub fn send(&mut self, msg: MessageId) {

View File

@ -87,7 +87,7 @@ impl<T: Copy> Queue<T> for NonMixQueue<T> {
}
struct MixQueue<T: Copy> {
queue: Vec<Option<T>>,
queue: Vec<Option<T>>, // None element means noise
rng: StdRng,
}
@ -301,7 +301,11 @@ mod tests {
#[test]
fn test_non_mix_queue() {
let mut queue = new_queue(QueueType::NonMix, 0, 0);
let mut queue = new_queue(&QueueConfig {
queue_type: QueueType::NonMix,
seed: 0,
min_queue_size: 0,
});
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
@ -335,7 +339,11 @@ mod tests {
}
fn test_mix_queue(queue_type: QueueType) {
let mut queue = new_queue(queue_type, 0, 4);
let mut queue = new_queue(&QueueConfig {
queue_type,
seed: 0,
min_queue_size: 4,
});
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);

View File

@ -1,20 +1,18 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use crate::node::NodeId;
pub fn build_topology(
num_nodes: u16,
peering_degree: u16,
seed: u64,
) -> HashMap<NodeId, HashSet<NodeId>> {
pub type Topology = Vec<Vec<NodeId>>;
pub fn build_topology(num_nodes: u16, peering_degree: u16, seed: u64) -> Topology {
let mut rng = StdRng::seed_from_u64(seed);
loop {
let mut topology: HashMap<NodeId, HashSet<NodeId>> = HashMap::new();
for node in 0..num_nodes {
topology.insert(node, HashSet::new());
let mut topology: Vec<HashSet<NodeId>> = Vec::new();
for _ in 0..num_nodes {
topology.push(HashSet::new());
}
for node in 0..num_nodes {
@ -22,43 +20,48 @@ pub fn build_topology(
for other in (0..node).chain(node + 1..num_nodes) {
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
if !topology.get(&node).unwrap().contains(&other)
&& topology.get(&other).unwrap().len() < peering_degree as usize
if !topology[node as usize].contains(&other)
&& topology[other as usize].len() < peering_degree as usize
{
others.push(other);
}
}
// How many more connections the current node needs
let num_needs = peering_degree as usize - topology.get(&node).unwrap().len();
let num_needs = peering_degree as usize - topology[node as usize].len();
// Smaple peers as many as possible and connect them to the current node
let k = std::cmp::min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);
others.into_iter().take(k).for_each(|peer| {
topology.get_mut(&node).unwrap().insert(peer);
topology.get_mut(&peer).unwrap().insert(node);
topology[node as usize].insert(peer);
topology[peer as usize].insert(node);
});
}
if are_all_nodes_connected(&topology) {
return topology;
let mut sorted_topology: Vec<Vec<NodeId>> = Vec::new();
for peers in topology.iter() {
let mut sorted_peers: Vec<NodeId> = peers.iter().copied().collect();
sorted_peers.sort();
sorted_topology.push(sorted_peers);
}
return sorted_topology;
}
}
}
fn are_all_nodes_connected(topology: &HashMap<NodeId, HashSet<NodeId>>) -> bool {
let start_node = topology.keys().next().unwrap();
let visited = dfs(topology, *start_node);
fn are_all_nodes_connected(topology: &[HashSet<NodeId>]) -> bool {
let visited = dfs(topology, 0);
visited.len() == topology.len()
}
fn dfs(topology: &HashMap<NodeId, HashSet<NodeId>>, start_node: NodeId) -> HashSet<NodeId> {
fn dfs(topology: &[HashSet<NodeId>], start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node);
while let Some(node) = stack.pop() {
visited.insert(node);
for peer in topology.get(&node).unwrap().iter() {
for peer in topology[node as usize].iter() {
if !visited.contains(peer) {
stack.push(*peer);
}