random peering degrees

This commit is contained in:
Youngjoon Lee 2024-09-12 10:51:21 +09:00
parent 7a1282372d
commit daf637d2e6
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
5 changed files with 163 additions and 22 deletions

View File

@ -36,7 +36,7 @@ impl Iteration {
(0..self.paramset.num_senders)
.map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv"))
.collect(),
(0..self.paramset.num_receiver_connections())
(0..self.paramset.num_sender_or_receiver_conns())
.map(|conn_idx| format!("{dir}/recv_seq_{conn_idx}__WIP__.csv"))
.collect(),
format!("{dir}/data_msg_counts__WIP__.csv"),

View File

@ -1,3 +1,5 @@
use std::fmt::Display;
use protocol::queue::QueueType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -16,12 +18,12 @@ impl std::str::FromStr for ExperimentId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Experiment1" => Ok(ExperimentId::Experiment1),
"2" | "Experiment2" => Ok(ExperimentId::Experiment2),
"3" | "Experiment3" => Ok(ExperimentId::Experiment3),
"4" | "Experiment4" => Ok(ExperimentId::Experiment4),
"5" | "Experiment5" => Ok(ExperimentId::Experiment5),
"6" | "Experiment6" => Ok(ExperimentId::Experiment6),
"1" => Ok(ExperimentId::Experiment1),
"2" => Ok(ExperimentId::Experiment2),
"3" => Ok(ExperimentId::Experiment3),
"4" => Ok(ExperimentId::Experiment4),
"5" => Ok(ExperimentId::Experiment5),
"6" => Ok(ExperimentId::Experiment6),
_ => Err(format!("Invalid experiment ID: {}", s)),
}
}
@ -31,6 +33,7 @@ impl std::str::FromStr for ExperimentId {
#[repr(u8)]
pub enum SessionId {
Session1 = 1,
Session3 = 3,
}
impl std::str::FromStr for SessionId {
@ -38,7 +41,8 @@ impl std::str::FromStr for SessionId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Session1" => Ok(SessionId::Session1),
"1" => Ok(SessionId::Session1),
"3" => Ok(SessionId::Session3),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
@ -66,7 +70,7 @@ pub struct ParamSet {
pub num_mixes: u32,
pub num_paths: u16,
pub random_topology: bool,
pub peering_degree: u32,
pub peering_degree: PeeringDegree,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_senders: u8,
@ -77,6 +81,21 @@ pub struct ParamSet {
pub num_iterations: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PeeringDegree {
Fixed(u32),
Random(Vec<(u32, f32)>),
}
impl Display for PeeringDegree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeeringDegree::Fixed(c) => write!(f, "{c}"),
PeeringDegree::Random(c_probs) => write!(f, "{c_probs:?}"),
}
}
}
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
@ -85,6 +104,7 @@ impl ParamSet {
) -> Vec<Self> {
match session_id {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type),
}
}
@ -130,7 +150,7 @@ impl ParamSet {
num_mixes,
num_paths,
random_topology: false,
peering_degree: 1,
peering_degree: PeeringDegree::Fixed(1),
min_queue_size,
transmission_rate,
num_senders,
@ -157,7 +177,7 @@ impl ParamSet {
num_mixes,
num_paths: 0, // since we're gonna build random topology
random_topology: true,
peering_degree,
peering_degree: PeeringDegree::Fixed(peering_degree),
min_queue_size,
transmission_rate,
num_senders,
@ -179,9 +199,77 @@ impl ParamSet {
paramsets
}
pub fn num_receiver_connections(&self) -> usize {
fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let sender_data_msg_probs: &[f32] = match exp_id {
ExperimentId::Experiment5 => &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0],
ExperimentId::Experiment6 => &[0.01, 0.1, 0.5],
_ => {
panic!("Only Experiment5 and Experiment6 are supported for Session3");
}
};
let mix_data_msg_probs = |num_mixes: u32| match exp_id {
ExperimentId::Experiment5 => {
vec![0.0]
}
ExperimentId::Experiment6 => {
let g: f32 = num_mixes as f32;
vec![1.0 / (2.0 * g), 1.0 / g, 2.0 / g]
}
_ => {
panic!("Only Experiment5 and Experiment6 are supported for Session3");
}
};
let mut id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
match exp_id {
ExperimentId::Experiment5 | ExperimentId::Experiment6 => {
let num_mixes = 32;
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) {
let paramset = ParamSet {
id,
num_mixes,
num_paths: 0, // since we're gonna build random topology
random_topology: true,
peering_degree: PeeringDegree::Random(vec![
(4, 0.87),
(12, 0.123),
(24, 0.007),
]),
min_queue_size: 10,
transmission_rate: 1,
num_senders: 1,
num_sender_msgs: match exp_id {
ExperimentId::Experiment6 => 10000,
_ => 1000000,
},
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations: 10,
};
id += 1;
paramsets.push(paramset);
}
}
}
_ => {
panic!("Only Experiment5 and Experiment6 are supported for Session3");
}
}
paramsets
}
pub fn num_sender_or_receiver_conns(&self) -> usize {
if self.random_topology {
self.peering_degree as usize
match &self.peering_degree {
PeeringDegree::Fixed(c) => *c as usize,
PeeringDegree::Random(c_probs) => {
c_probs.iter().map(|(c, _)| *c).min().unwrap() as usize
}
}
} else {
self.num_paths as usize
}
@ -233,6 +321,8 @@ mod tests {
(ExperimentId::Experiment6, SessionId::Session1),
3 * 3 * 3 * 3,
),
((ExperimentId::Experiment5, SessionId::Session3), 6),
((ExperimentId::Experiment6, SessionId::Session3), 3 * 3),
];
for queue_type in QueueType::iter() {
@ -260,6 +350,18 @@ mod tests {
assert_eq!(paramset.id as usize, i + 1);
println!("{:?}", paramset);
}
// Check PeeringDegree
for paramset in paramsets.iter() {
match session_id {
SessionId::Session1 => {
assert!(matches!(paramset.peering_degree, PeeringDegree::Fixed(_)))
}
SessionId::Session3 => {
assert!(matches!(paramset.peering_degree, PeeringDegree::Random(_)))
}
}
}
}
}
}

View File

@ -8,7 +8,10 @@ use protocol::{
use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{outputs::Outputs, paramset::ParamSet};
use crate::{
outputs::Outputs,
paramset::{ParamSet, PeeringDegree},
};
use ordering::message::SenderIdx;
pub const RECEIVER_NODE_ID: NodeId = NodeId::MAX;
@ -18,6 +21,13 @@ pub fn build_striped_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq
seed: u64,
) -> (Vec<Node<M>>, AllSenderPeers, ReceiverPeers) {
assert!(!paramset.random_topology);
let peering_degree = match paramset.peering_degree {
PeeringDegree::Fixed(c) => c,
PeeringDegree::Random(_) => {
panic!("PeeringDegree::Random not supported for striped network");
}
};
let mut next_node_id: NodeId = 0;
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: Vec<Node<M>> =
@ -35,7 +45,7 @@ pub fn build_striped_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
peering_degree,
false, // disable cache
));
ids.push(id);
@ -80,6 +90,28 @@ pub fn build_random_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq +
outputs: &mut Outputs,
) -> (Vec<Node<M>>, AllSenderPeers, ReceiverPeers) {
assert!(paramset.random_topology);
let peering_degrees = match &paramset.peering_degree {
PeeringDegree::Fixed(c) => vec![*c; paramset.num_mixes as usize],
PeeringDegree::Random(c_probs) => {
// Sort c_probs by the probability in ascending order
let mut c_probs = c_probs.clone();
c_probs.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
let mut degrees = Vec::with_capacity(paramset.num_mixes as usize);
for (i, (c, prob)) in c_probs.iter().enumerate() {
let count = if i < c_probs.len() - 1 {
(prob * paramset.num_mixes as f32).ceil() as u32
} else {
let num_determined: u32 = degrees.len().try_into().unwrap();
paramset.num_mixes - num_determined
};
degrees.extend(std::iter::repeat(*c).take(count as usize));
}
degrees
}
};
// Init mix nodes
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: Vec<Node<M>> = Vec::with_capacity(paramset.num_mixes as usize);
@ -91,7 +123,7 @@ pub fn build_random_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq +
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
peering_degrees[id as usize],
true, // enable cache
));
}
@ -99,14 +131,15 @@ pub fn build_random_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq +
// Choose sender's peers and receiver's peers randomly
let mut peers_rng = StdRng::seed_from_u64(seed);
let mut candidates: Vec<NodeId> = mixnodes.iter().map(|mixnode| mixnode.id).collect();
assert!(candidates.len() >= paramset.peering_degree as usize);
let num_sender_or_receiver_conns = paramset.num_sender_or_receiver_conns();
assert!(candidates.len() >= num_sender_or_receiver_conns);
let mut all_sender_peers = AllSenderPeers::new(paramset.num_senders);
for _ in 0..paramset.num_senders {
candidates.as_mut_slice().shuffle(&mut peers_rng);
let mut peers: Vec<NodeId> = candidates
.iter()
.cloned()
.take(paramset.peering_degree as usize)
.take(num_sender_or_receiver_conns)
.collect();
peers.sort();
all_sender_peers.add(peers);
@ -115,14 +148,17 @@ pub fn build_random_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq +
let mut receiver_peer_ids: Vec<NodeId> = candidates
.iter()
.cloned()
.take(paramset.peering_degree as usize)
.take(num_sender_or_receiver_conns)
.collect();
receiver_peer_ids.sort();
// Connect mix nodes
let topology = build_topology(
paramset.num_mixes,
&vec![paramset.peering_degree; paramset.num_mixes as usize],
mixnodes.len().try_into().unwrap(),
&mixnodes
.iter()
.map(|mixnode| mixnode.peering_degree)
.collect::<Vec<u32>>(),
seed,
);
for (node_id, peers) in topology.iter().enumerate() {

View File

@ -19,7 +19,7 @@ where
connected_peers: FxHashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: Option<FxHashMap<M, u32>>,
peering_degree: u32,
pub peering_degree: u32,
}
pub type MessagesToRelay<M> = Vec<(NodeId, Message<M>)>;

View File

@ -7,6 +7,9 @@ pub type Topology = Vec<Vec<NodeId>>;
pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology {
assert_eq!(num_nodes as usize, peering_degrees.len());
// Assert that peering degrees are sorted in descending order
assert!(peering_degrees.windows(2).all(|w| w[0] >= w[1]));
let mut rng = StdRng::seed_from_u64(seed);
loop {