impl exp5~6: random network

This commit is contained in:
Youngjoon Lee 2024-08-22 20:13:44 +02:00
parent 27a8313450
commit 962b72dced
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
7 changed files with 137 additions and 109 deletions

View File

@ -1,16 +1,12 @@
use std::error::Error;
use protocol::{
node::{MessageId, Node, NodeId},
queue::{Message, QueueConfig},
topology::{build_topology, save_topology},
};
use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
paramset::ParamSet,
topology::{build_topology, Topology},
};
use crate::paramset::ParamSet;
// An interval that the sender nodes send (schedule) new messages
const MSG_INTERVAL: f32 = 1.0;
@ -172,23 +168,6 @@ fn relay_messages(
});
}
fn save_topology(topology: &Topology, topology_path: &str) -> Result<(), Box<dyn Error>> {
let mut wtr = csv::Writer::from_path(topology_path)?;
wtr.write_record(["node", "num_peers", "peers"])?;
for (node, peers) in topology.iter().enumerate() {
let node: NodeId = node.try_into().unwrap();
let peers_str: Vec<String> = peers.iter().map(|peer_id| peer_id.to_string()).collect();
wtr.write_record(&[
node.to_string(),
peers.len().to_string(),
format!("[{}]", peers_str.join(",")),
])?;
}
wtr.flush()?;
Ok(())
}
struct SenderSelector {
candidates: Vec<NodeId>,
num_senders: NodeId,

View File

@ -13,7 +13,6 @@ use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
mod iteration;
mod paramset;
mod topology;
#[derive(Debug, Parser)]
#[command(name = "Message Dessemination Time Measurement")]

View File

@ -3,8 +3,9 @@ use std::path::Path;
use protocol::{
node::{MessageId, Node, NodeId},
queue::{Message, QueueConfig, QueueType},
topology::{build_topology, save_topology},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand::{rngs::StdRng, seq::SliceRandom, Rng, RngCore, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{ordercoeff::Sequence, paramset::ParamSet};
@ -12,38 +13,6 @@ use crate::{ordercoeff::Sequence, paramset::ParamSet};
const RECEIVER_ID: NodeId = NodeId::MAX;
pub fn run_iteration(
paramset: ParamSet,
seed: u64,
out_latency_path: &str,
out_sent_sequence_path: &str,
out_received_sequence_path: &str,
out_data_msg_counts_path: &str,
out_ordering_coeff_path: &str,
) {
if paramset.random_topology {
run_iteration_with_random_topology(
paramset,
seed,
out_latency_path,
out_sent_sequence_path,
out_received_sequence_path,
out_data_msg_counts_path,
out_ordering_coeff_path,
)
} else {
run_iteration_without_random_topology(
paramset,
seed,
out_latency_path,
out_sent_sequence_path,
out_received_sequence_path,
out_data_msg_counts_path,
out_ordering_coeff_path,
)
}
}
fn run_iteration_without_random_topology(
paramset: ParamSet,
seed: u64,
out_latency_path: &str,
@ -51,9 +20,8 @@ fn run_iteration_without_random_topology(
out_received_sequence_path_prefix: &str,
out_queue_data_msg_counts_path: &str,
out_ordering_coeff_path: &str,
out_topology_path: &str,
) {
assert!(!paramset.random_topology);
// Ensure that all output files do not exist
for path in &[
out_latency_path,
@ -64,44 +32,11 @@ fn run_iteration_without_random_topology(
assert!(!Path::new(path).exists(), "File already exists: {path}");
}
// Initialize mix nodes
let mut next_node_id: NodeId = 0;
let mut mixnodes: FxHashMap<NodeId, Node> = FxHashMap::default();
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
for _ in 0..paramset.num_paths {
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
for _ in 0..paramset.num_mixes {
let id = next_node_id;
next_node_id += 1;
mixnodes.insert(
id,
Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed,
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
paramset.random_topology, // disable cache
),
);
ids.push(id);
}
paths.push(ids);
}
// Connect mix nodes
for path in paths.iter() {
for (i, id) in path.iter().enumerate() {
if i != path.len() - 1 {
let peer_id = path[i + 1];
mixnodes.get_mut(id).unwrap().connect(peer_id);
} else {
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
}
}
}
let sender_peers: Vec<NodeId> = paths.iter().map(|path| path[0]).collect();
let (mut mixnodes, sender_peers) = if paramset.random_topology {
build_random_network(&paramset, seed, out_topology_path)
} else {
build_striped_network(&paramset, seed)
};
let mut next_msg_id: MessageId = 0;
@ -235,17 +170,112 @@ fn run_iteration_without_random_topology(
}
}
fn run_iteration_with_random_topology(
paramset: ParamSet,
fn build_striped_network(paramset: &ParamSet, seed: u64) -> (FxHashMap<NodeId, Node>, Vec<NodeId>) {
assert!(!paramset.random_topology);
let mut next_node_id: NodeId = 0;
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: FxHashMap<NodeId, Node> = FxHashMap::default();
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
for _ in 0..paramset.num_paths {
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
for _ in 0..paramset.num_mixes {
let id = next_node_id;
next_node_id += 1;
mixnodes.insert(
id,
Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
false, // disable cache
),
);
ids.push(id);
}
paths.push(ids);
}
// Connect mix nodes
for path in paths.iter() {
for (i, id) in path.iter().enumerate() {
if i != path.len() - 1 {
let peer_id = path[i + 1];
mixnodes.get_mut(id).unwrap().connect(peer_id);
} else {
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
}
}
}
let sender_peers: Vec<NodeId> = paths.iter().map(|path| *path.first().unwrap()).collect();
(mixnodes, sender_peers)
}
fn build_random_network(
paramset: &ParamSet,
seed: u64,
out_latency_path: &str,
out_sent_sequence_path: &str,
out_received_sequence_path: &str,
out_data_msg_counts_path: &str,
out_ordering_coeff_path: &str,
) {
out_topology_path: &str,
) -> (FxHashMap<NodeId, Node>, Vec<NodeId>) {
assert!(paramset.random_topology);
todo!()
// Init mix nodes
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: FxHashMap<NodeId, Node> = FxHashMap::default();
for id in 0..paramset.num_mixes {
mixnodes.insert(
id,
Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
true, // enable cache
),
);
}
// Choose sender's peers and receiver's peers randomly
let mut peers_rng = StdRng::seed_from_u64(seed);
let mut candidates: Vec<NodeId> = (0..paramset.num_mixes).collect();
assert!(candidates.len() >= paramset.peering_degree as usize);
candidates.as_mut_slice().shuffle(&mut peers_rng);
let sender_peers: Vec<NodeId> = candidates
.iter()
.cloned()
.take(paramset.peering_degree as usize)
.collect();
candidates.as_mut_slice().shuffle(&mut peers_rng);
let receiver_peers: Vec<NodeId> = candidates
.iter()
.cloned()
.take(paramset.peering_degree as usize)
.collect();
// Connect mix nodes
let topology = build_topology(
paramset.num_mixes,
&vec![paramset.peering_degree; paramset.num_mixes as usize],
seed,
);
save_topology(&topology, out_topology_path).unwrap();
for (node_id, peers) in topology.iter().enumerate() {
peers.iter().for_each(|peer_id| {
mixnodes
.get_mut(&(node_id.try_into().unwrap()))
.unwrap()
.connect(*peer_id);
});
}
// Connect the selected mix nodes with the receiver
for id in receiver_peers.iter() {
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
}
(mixnodes, sender_peers)
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {

View File

@ -76,9 +76,10 @@ fn main() {
i as u64,
&format!("{paramset_dir}/iteration_{i}_latency.csv"),
&format!("{paramset_dir}/iteration_{i}_sent_seq.csv"),
&format!("{paramset_dir}/iteration_{i}_recv_seq.csv"),
&format!("{paramset_dir}/iteration_{i}_recv_seq"),
&format!("{paramset_dir}/iteration_{i}_data_msg_counts.csv"),
&format!("{paramset_dir}/iteration_{i}_ordering_coeff.csv"),
&format!("{paramset_dir}/iteration_{i}_topology.csv"),
);
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
}

View File

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
csv = "1.3.0"
rand = "0.8.5"
rustc-hash = "2.0.0"
strum = "0.26.3"

View File

@ -1,2 +1,3 @@
pub mod node;
pub mod queue;
pub mod topology;

View File

@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::{collections::HashSet, error::Error};
use protocol::node::NodeId;
use crate::node::NodeId;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
pub type Topology = Vec<Vec<NodeId>>;
@ -69,3 +69,20 @@ fn dfs(topology: &[HashSet<NodeId>], start_node: NodeId) -> HashSet<NodeId> {
}
visited
}
pub fn save_topology(topology: &Topology, path: &str) -> Result<(), Box<dyn Error>> {
let mut wtr = csv::Writer::from_path(path)?;
wtr.write_record(["node", "num_peers", "peers"])?;
for (node, peers) in topology.iter().enumerate() {
let node: NodeId = node.try_into().unwrap();
let peers_str: Vec<String> = peers.iter().map(|peer_id| peer_id.to_string()).collect();
wtr.write_record(&[
node.to_string(),
peers.len().to_string(),
format!("[{}]", peers_str.join(",")),
])?;
}
wtr.flush()?;
Ok(())
}