parallelize iterations

This commit is contained in:
Youngjoon Lee 2024-08-25 01:33:17 +09:00
parent 138ae1f5c9
commit 960a184ee6
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
3 changed files with 207 additions and 194 deletions

View File

@ -9,6 +9,7 @@ clap = { version = "4.5.16", features = ["derive"] }
csv = "1.3.0"
protocol = { version = "0.1.0", path = "../protocol" }
rand = "0.8.5"
rayon = "1.10.0"
rustc-hash = "2.0.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

View File

@ -1,4 +1,4 @@
use std::collections::hash_map::Entry;
use std::{collections::hash_map::Entry, time::SystemTime};
use protocol::{
node::{MessagesToRelay, Node, NodeId},
@ -8,145 +8,196 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
format_duration,
message::{DataMessage, DataMessageGenerator},
outputs::Outputs,
paramset::ParamSet,
topology::{build_random_network, build_striped_network, RECEIVER_NODE_ID},
};
pub fn run_iteration(paramset: ParamSet, seed: u64, outputs: &mut Outputs) -> f32 {
let (mut mixnodes, all_sender_peers, receiver_peers) = if paramset.random_topology {
build_random_network(&paramset, seed, outputs)
} else {
build_striped_network(&paramset, seed)
};
// Check node ID consistency
for (i, node) in mixnodes.iter().enumerate() {
assert_eq!(node.id as usize, i);
}
// For N senders + 1 mix (all mixnodes will share the same sender ID)
let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1);
let mix_msg_sender_id = paramset.num_senders;
// Virtual discrete time
let mut vtime: f32 = 0.0;
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Results
let mut all_sent_count = 0; // all data + noise sent by all senders
let all_sent_count_target = (paramset.num_sender_msgs as usize)
.checked_mul(paramset.num_senders as usize)
.unwrap();
let mut sent_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
let mut recv_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
outputs.write_header_queue_data_msg_counts(&mixnodes);
let mut data_msg_rng = StdRng::seed_from_u64(seed);
loop {
tracing::trace!(
"VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}",
vtime,
all_sent_count,
sent_data_msgs.len(),
recv_data_msgs.len(),
);
// All senders emit a message (data or noise) to all of their own adjacent peers.
if all_sent_count < all_sent_count_target {
// For each sender
for (sender_idx, sender_peers) in all_sender_peers.iter() {
if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
let msg = data_msg_gen.next(sender_idx);
sender_peers.iter().for_each(|peer_id| {
mixnodes
.get_mut(*peer_id as usize)
.unwrap()
.receive(msg, None);
});
sent_data_msgs.insert(msg, vtime);
outputs.add_sent_msg(&msg)
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix nodes
// because the mix nodes will anyway drop the noise,
// and we don't need to record what the mix nodes receive.
outputs.add_sent_noise(sender_idx);
}
all_sent_count += 1;
}
}
// Each mix node add a new data message to its queue with a certain probability
if paramset.mix_data_msg_prob > 0.0 {
for node in mixnodes.iter_mut() {
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
node.send(data_msg_gen.next(mix_msg_sender_id));
// We don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the senders, not the mixnode.
}
}
}
// Each mix node relays a message (data or noise) to the next mix node or the receiver.
// As the receiver, record the time and order of the received messages.
AllMessagesToRelay::new(&mut mixnodes).into_iter().for_each(
|(relayer_id, msgs_to_relay)| {
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
if peer_id == RECEIVER_NODE_ID {
match msg {
Message::Data(msg) => {
// If msg was sent by the sender (not by any mix)
if let Some(&sent_time) = sent_data_msgs.get(&msg) {
// If this is the first time to see the msg,
// update stats that must ignore duplicate messages.
if let Entry::Vacant(e) = recv_data_msgs.entry(msg) {
e.insert(vtime);
outputs.add_latency(&msg, sent_time, vtime);
}
}
// Record msg to the sequence
let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap();
outputs.add_recv_msg(&msg, conn_idx);
}
Message::Noise => {
// Record noise to the sequence
let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap();
outputs.add_recv_noise(conn_idx);
}
}
} else if let Message::Data(msg) = msg {
let peer = mixnodes.get_mut(peer_id as usize).unwrap();
assert_eq!(peer.id, peer_id);
peer.receive(msg, Some(relayer_id));
}
});
},
);
// Record the number of data messages in each mix node's queues
outputs.add_queue_data_msg_counts(vtime, &mixnodes);
// If all senders finally emitted all data+noise messages,
// and If all data messages have been received by the receiver,
// stop the iteration.
if all_sent_count == all_sent_count_target && sent_data_msgs.len() == recv_data_msgs.len() {
break;
}
vtime += transmission_interval;
}
vtime
pub struct Iteration {
pub paramset: ParamSet,
pub iteration_idx: usize,
pub rootdir: String,
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
assert!(
(0.0..=1.0).contains(&prob),
"Probability must be in [0, 1]."
);
rng.gen::<f32>() < prob
impl Iteration {
pub fn start(&mut self) {
let dir = format!(
"{}/iteration_{}__WIP_DUR__",
self.rootdir, self.iteration_idx
);
std::fs::create_dir_all(dir.as_str()).unwrap();
let mut outputs = Outputs::new(
format!("{dir}/latency__WIP__.csv"),
(0..self.paramset.num_senders)
.map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv"))
.collect(),
(0..self.paramset.peering_degree)
.map(|conn_idx| format!("{dir}/recv_seq_{conn_idx}__WIP__.csv"))
.collect(),
format!("{dir}/data_msg_counts__WIP__.csv"),
format!("{dir}/topology.csv"),
);
let start_time = SystemTime::now();
let vtime = self.run(self.iteration_idx as u64, &mut outputs);
outputs.close();
outputs.rename_paths("__WIP__.csv", ".csv");
let duration = format_duration(SystemTime::now().duration_since(start_time).unwrap());
let new_dir = dir.replace("__WIP_DUR__", &format!("_{duration}"));
std::fs::rename(dir, new_dir).unwrap();
tracing::info!(
"ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}",
self.paramset.id,
self.iteration_idx,
duration,
vtime
);
}
fn run(&mut self, seed: u64, outputs: &mut Outputs) -> f32 {
let paramset = &self.paramset;
let (mut mixnodes, all_sender_peers, receiver_peers) = if paramset.random_topology {
build_random_network(paramset, seed, outputs)
} else {
build_striped_network(paramset, seed)
};
// Check node ID consistency
for (i, node) in mixnodes.iter().enumerate() {
assert_eq!(node.id as usize, i);
}
// For N senders + 1 mix (all mixnodes will share the same sender ID)
let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1);
let mix_msg_sender_id = paramset.num_senders;
// Virtual discrete time
let mut vtime: f32 = 0.0;
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Results
let mut all_sent_count = 0; // all data + noise sent by all senders
let all_sent_count_target = (paramset.num_sender_msgs as usize)
.checked_mul(paramset.num_senders as usize)
.unwrap();
let mut sent_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
let mut recv_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
outputs.write_header_queue_data_msg_counts(&mixnodes);
let mut data_msg_rng = StdRng::seed_from_u64(seed);
loop {
tracing::trace!(
"VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}",
vtime,
all_sent_count,
sent_data_msgs.len(),
recv_data_msgs.len(),
);
// All senders emit a message (data or noise) to all of their own adjacent peers.
if all_sent_count < all_sent_count_target {
// For each sender
for (sender_idx, sender_peers) in all_sender_peers.iter() {
if Self::try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
let msg = data_msg_gen.next(sender_idx);
sender_peers.iter().for_each(|peer_id| {
mixnodes
.get_mut(*peer_id as usize)
.unwrap()
.receive(msg, None);
});
sent_data_msgs.insert(msg, vtime);
outputs.add_sent_msg(&msg)
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix nodes
// because the mix nodes will anyway drop the noise,
// and we don't need to record what the mix nodes receive.
outputs.add_sent_noise(sender_idx);
}
all_sent_count += 1;
}
}
// Each mix node add a new data message to its queue with a certain probability
if paramset.mix_data_msg_prob > 0.0 {
for node in mixnodes.iter_mut() {
if Self::try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
node.send(data_msg_gen.next(mix_msg_sender_id));
// We don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the senders, not the mixnode.
}
}
}
// Each mix node relays a message (data or noise) to the next mix node or the receiver.
// As the receiver, record the time and order of the received messages.
AllMessagesToRelay::new(&mut mixnodes).into_iter().for_each(
|(relayer_id, msgs_to_relay)| {
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
if peer_id == RECEIVER_NODE_ID {
match msg {
Message::Data(msg) => {
// If msg was sent by the sender (not by any mix)
if let Some(&sent_time) = sent_data_msgs.get(&msg) {
// If this is the first time to see the msg,
// update stats that must ignore duplicate messages.
if let Entry::Vacant(e) = recv_data_msgs.entry(msg) {
e.insert(vtime);
outputs.add_latency(&msg, sent_time, vtime);
}
}
// Record msg to the sequence
let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap();
outputs.add_recv_msg(&msg, conn_idx);
}
Message::Noise => {
// Record noise to the sequence
let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap();
outputs.add_recv_noise(conn_idx);
}
}
} else if let Message::Data(msg) = msg {
let peer = mixnodes.get_mut(peer_id as usize).unwrap();
assert_eq!(peer.id, peer_id);
peer.receive(msg, Some(relayer_id));
}
});
},
);
// Record the number of data messages in each mix node's queues
outputs.add_queue_data_msg_counts(vtime, &mixnodes);
// If all senders finally emitted all data+noise messages,
// and If all data messages have been received by the receiver,
// stop the iteration.
if all_sent_count == all_sent_count_target
&& sent_data_msgs.len() == recv_data_msgs.len()
{
break;
}
vtime += transmission_interval;
}
vtime
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
assert!(
(0.0..=1.0).contains(&prob),
"Probability must be in [0, 1]."
);
rng.gen::<f32>() < prob
}
}
struct AllMessagesToRelay(Vec<(NodeId, MessagesToRelay<DataMessage>)>);

View File

@ -13,10 +13,10 @@ use std::{
use chrono::Utc;
use clap::Parser;
use iteration::run_iteration;
use outputs::Outputs;
use iteration::Iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
use protocol::queue::QueueType;
use rayon::prelude::*;
#[derive(Debug, Parser)]
#[command(name = "Ordering Measurement")]
@ -30,6 +30,8 @@ struct Args {
#[arg(short, long)]
outdir: String,
#[arg(short, long)]
num_threads: usize,
#[arg(short, long)]
from_paramset: Option<u16>,
#[arg(short, long)]
to_paramset: Option<u16>,
@ -45,6 +47,7 @@ fn main() {
session_id,
queue_type,
outdir,
num_threads,
from_paramset,
to_paramset,
} = args;
@ -67,6 +70,7 @@ fn main() {
let session_start_time = SystemTime::now();
let mut iterations: Vec<Iteration> = Vec::new();
for paramset in paramsets {
if paramset.id < from_paramset.unwrap_or(0) {
tracing::info!("ParamSet:{} skipped", paramset.id);
@ -76,73 +80,30 @@ fn main() {
break;
}
let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id);
let paramset_dir = format!("{outdir}/{subdir}/paramset_{}", paramset.id);
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
save_paramset_info(&paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
let dur_path = format!("{paramset_dir}/__WIP__durations.csv");
let mut dur_writer = csv::Writer::from_path(&dur_path).unwrap();
dur_writer
.write_record(["iteration", "time_human", "time_sec", "vtime"])
.unwrap();
dur_writer.flush().unwrap();
for i in 0..paramset.num_iterations {
let mut outputs = Outputs::new(
format!("{paramset_dir}/__WIP__iteration_{i}_latency.csv"),
(0..paramset.num_senders)
.map(|sender_idx| {
format!("{paramset_dir}/__WIP__iteration_{i}_sent_seq_{sender_idx}.csv")
})
.collect(),
(0..paramset.peering_degree)
.map(|conn_idx| {
format!("{paramset_dir}/__WIP__iteration_{i}_recv_seq_{conn_idx}.csv")
})
.collect(),
format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv"),
format!("{paramset_dir}/iteration_{i}_topology.csv"),
);
let start_time = SystemTime::now();
let vtime = run_iteration(paramset.clone(), i as u64, &mut outputs);
outputs.close();
outputs.rename_paths("__WIP__iteration", "iteration");
let duration = SystemTime::now().duration_since(start_time).unwrap();
let duration_human = format_duration(duration);
dur_writer
.write_record([
i.to_string(),
duration_human.clone(),
duration.as_secs().to_string(),
vtime.to_string(),
])
.unwrap();
dur_writer.flush().unwrap();
tracing::info!(
"ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}",
paramset.id,
i,
duration_human,
vtime
);
iterations.push(Iteration {
paramset: paramset.clone(),
iteration_idx: i,
rootdir: paramset_dir.clone(),
});
}
dur_writer.flush().unwrap();
let new_dur_path = dur_path.replace("__WIP__durations", "durations");
std::fs::rename(&dur_path, &new_dur_path)
.expect("Failed to rename: {dur_path} -> {new_dur_dir}: {e}");
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
std::fs::rename(&paramset_dir, &new_paramset_dir)
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}");
tracing::info!("ParamSet:{} completed", paramset.id);
}
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.unwrap();
pool.install(|| {
iterations.par_iter_mut().for_each(|iteration| {
iteration.start();
});
});
let session_duration = SystemTime::now()
.duration_since(session_start_time)
.unwrap();