fix: use Vec for deterministic results

This commit is contained in:
Youngjoon Lee 2024-08-22 23:03:40 +02:00
parent efee24d95f
commit b36b8ac316
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C

View File

@ -52,7 +52,8 @@ pub fn run_iteration(
.checked_mul(paramset.num_senders as usize)
.unwrap();
let mut sent_times: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut latencies: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut recv_times: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut latencies: Vec<(MessageId, f32)> = Vec::new();
let mut sent_sequence = Sequence::new();
let mut received_sequences: FxHashMap<NodeId, Sequence> = FxHashMap::default();
let mut unified_received_sequence = if paramset.random_topology {
@ -60,7 +61,7 @@ pub fn run_iteration(
} else {
None
};
let mut queue_data_msg_counts: Vec<FxHashMap<NodeId, Vec<usize>>> = Vec::new();
let mut queue_data_msg_counts: Vec<Vec<Vec<usize>>> = Vec::new();
let mut data_msg_rng = StdRng::seed_from_u64(seed);
loop {
@ -79,7 +80,10 @@ pub fn run_iteration(
let msg = next_msg_id;
next_msg_id += 1;
sender_peers.iter().for_each(|peer_id| {
mixnodes.get_mut(peer_id).unwrap().receive(msg, None);
mixnodes
.get_mut(*peer_id as usize)
.unwrap()
.receive(msg, None);
});
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
@ -95,7 +99,7 @@ pub fn run_iteration(
}
// Each mix node add a new data message to its queue with a certain probability
for (_, node) in mixnodes.iter_mut() {
for node in mixnodes.iter_mut() {
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
node.send(next_msg_id);
next_msg_id += 1;
@ -109,8 +113,8 @@ pub fn run_iteration(
//
// source -> (destination, msg)
let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message<MessageId>)>)> = Vec::new();
for (node_id, node) in mixnodes.iter_mut() {
all_msgs_to_relay.push((*node_id, node.read_queues()));
for (node_id, node) in mixnodes.iter_mut().enumerate() {
all_msgs_to_relay.push((node_id.try_into().unwrap(), node.read_queues()));
}
all_msgs_to_relay
.into_iter()
@ -122,8 +126,9 @@ pub fn run_iteration(
// If msg was sent by the sender (not by any mix)
if let Some(&sent_time) = sent_times.get(&msg) {
// If this is the first time to see the msg
if let Entry::Vacant(e) = latencies.entry(msg) {
e.insert(vtime - sent_time);
if let Entry::Vacant(e) = recv_times.entry(msg) {
e.insert(vtime);
latencies.push((msg, vtime - sent_time));
if let Some(unified_recv_seq) =
&mut unified_received_sequence
{
@ -145,7 +150,7 @@ pub fn run_iteration(
}
} else if let Message::Data(msg) = msg {
mixnodes
.get_mut(&peer_id)
.get_mut(peer_id as usize)
.unwrap()
.receive(msg, Some(mix_id));
}
@ -153,9 +158,9 @@ pub fn run_iteration(
});
// Record the number of data messages in each mix node's queues
let mut counts: FxHashMap<NodeId, Vec<usize>> = FxHashMap::default();
mixnodes.iter().for_each(|(id, node)| {
counts.insert(*id, node.queue_data_msg_counts());
let mut counts: Vec<Vec<usize>> = Vec::with_capacity(mixnodes.len());
mixnodes.iter().for_each(|node| {
counts.push(node.queue_data_msg_counts());
});
queue_data_msg_counts.push(counts);
@ -169,8 +174,15 @@ pub fn run_iteration(
}
// Save results to CSV files
save_latencies(&latencies, &sent_times, out_latency_path);
save_latencies(&latencies, &sent_times, &recv_times, out_latency_path);
save_sequence(&sent_sequence, out_sent_sequence_path);
// Sort received_sequences
let mut node_ids: Vec<NodeId> = received_sequences.keys().cloned().collect();
node_ids.sort();
let received_sequences: Vec<Sequence> = node_ids
.iter()
.map(|node_id| received_sequences.remove(node_id).unwrap())
.collect();
save_sequences(&received_sequences, out_received_sequence_path_prefix);
if let Some(unified_recv_seq) = &unified_received_sequence {
save_sequence(
@ -191,7 +203,7 @@ pub fn run_iteration(
save_ordering_coefficients(&[[casual, weak]], out_ordering_coeff_path);
} else {
let mut coeffs: Vec<[u64; 2]> = Vec::new();
for (_, recv_seq) in received_sequences.iter() {
for recv_seq in received_sequences.iter() {
let casual = sent_sequence.ordering_coefficient(recv_seq, true);
let weak = sent_sequence.ordering_coefficient(recv_seq, false);
coeffs.push([casual, weak]);
@ -201,32 +213,27 @@ pub fn run_iteration(
}
}
fn build_striped_network(
paramset: &ParamSet,
seed: u64,
) -> (FxHashMap<NodeId, Node>, Vec<Vec<NodeId>>) {
fn build_striped_network(paramset: &ParamSet, seed: u64) -> (Vec<Node>, Vec<Vec<NodeId>>) {
assert!(!paramset.random_topology);
let mut next_node_id: NodeId = 0;
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: FxHashMap<NodeId, Node> = FxHashMap::default();
let mut mixnodes: Vec<Node> =
Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize);
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
for _ in 0..paramset.num_paths {
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
for _ in 0..paramset.num_mixes {
let id = next_node_id;
next_node_id += 1;
mixnodes.insert(
id,
Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
false, // disable cache
),
);
mixnodes.push(Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
false, // disable cache
));
ids.push(id);
}
paths.push(ids);
@ -237,9 +244,9 @@ fn build_striped_network(
for (i, id) in path.iter().enumerate() {
if i != path.len() - 1 {
let peer_id = path[i + 1];
mixnodes.get_mut(id).unwrap().connect(peer_id);
mixnodes.get_mut(*id as usize).unwrap().connect(peer_id);
} else {
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID);
}
}
}
@ -255,24 +262,21 @@ fn build_random_network(
paramset: &ParamSet,
seed: u64,
out_topology_path: &str,
) -> (FxHashMap<NodeId, Node>, Vec<Vec<NodeId>>) {
) -> (Vec<Node>, Vec<Vec<NodeId>>) {
assert!(paramset.random_topology);
// Init mix nodes
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: FxHashMap<NodeId, Node> = FxHashMap::default();
for id in 0..paramset.num_mixes {
mixnodes.insert(
id,
Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
true, // enable cache
),
);
let mut mixnodes: Vec<Node> = Vec::with_capacity(paramset.num_mixes as usize);
for _ in 0..paramset.num_mixes {
mixnodes.push(Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
true, // enable cache
));
}
// Choose sender's peers and receiver's peers randomly
@ -306,16 +310,13 @@ fn build_random_network(
save_topology(&topology, out_topology_path).unwrap();
for (node_id, peers) in topology.iter().enumerate() {
peers.iter().for_each(|peer_id| {
mixnodes
.get_mut(&(node_id.try_into().unwrap()))
.unwrap()
.connect(*peer_id);
mixnodes.get_mut(node_id).unwrap().connect(*peer_id);
});
}
// Connect the selected mix nodes with the receiver
for id in receiver_peers.iter() {
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID);
}
(mixnodes, sender_peers_list)
@ -330,8 +331,9 @@ fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
}
fn save_latencies(
latencies: &FxHashMap<MessageId, f32>,
latencies: &[(MessageId, f32)],
sent_times: &FxHashMap<MessageId, f32>,
recv_times: &FxHashMap<MessageId, f32>,
path: &str,
) {
let mut writer = csv::Writer::from_path(path).unwrap();
@ -340,12 +342,13 @@ fn save_latencies(
.unwrap();
for (msg, latency) in latencies.iter() {
let sent_time = sent_times.get(msg).unwrap();
let recv_time = recv_times.get(msg).unwrap();
writer
.write_record(&[
msg.to_string(),
latency.to_string(),
sent_time.to_string(),
(sent_time + latency).to_string(),
recv_time.to_string(),
])
.unwrap();
}
@ -360,17 +363,17 @@ fn save_sequence(seq: &Sequence, path: &str) {
writer.flush().unwrap();
}
fn save_sequences(sequences: &FxHashMap<NodeId, Sequence>, path_prefix: &str) {
sequences.iter().enumerate().for_each(|(i, (_, seq))| {
fn save_sequences(sequences: &[Sequence], path_prefix: &str) {
sequences.iter().enumerate().for_each(|(i, seq)| {
save_sequence(seq, &format!("{path_prefix}_{i}.csv"));
});
}
fn save_queue_data_msg_counts(data: &[FxHashMap<NodeId, Vec<usize>>], interval: f32, path: &str) {
fn save_queue_data_msg_counts(data: &[Vec<Vec<usize>>], interval: f32, path: &str) {
let mut writer = csv::Writer::from_path(path).unwrap();
let mut header = vec!["vtime".to_string()];
data[0].iter().for_each(|(node_id, counts)| {
data[0].iter().enumerate().for_each(|(node_id, counts)| {
let num_queues = counts.len();
(0..num_queues).for_each(|q_idx| {
header.push(format!("node{node_id}_q{q_idx}"));
@ -380,7 +383,7 @@ fn save_queue_data_msg_counts(data: &[FxHashMap<NodeId, Vec<usize>>], interval:
data.iter().enumerate().for_each(|(i, counts_per_node)| {
let mut row = vec![(i as f64 * interval as f64).to_string()];
counts_per_node.iter().for_each(|(_, counts)| {
counts_per_node.iter().for_each(|counts| {
row.extend(
counts
.iter()