fix: handle multiple senders for exp3~4

This commit is contained in:
Youngjoon Lee 2024-08-22 21:15:49 +02:00
parent 1857f9f05d
commit d0fe4abba9
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
2 changed files with 65 additions and 41 deletions

View File

@ -34,7 +34,7 @@ pub fn run_iteration(
assert!(!Path::new(path).exists(), "File already exists: {path}");
}
let (mut mixnodes, sender_peers) = if paramset.random_topology {
let (mut mixnodes, sender_peers_list) = if paramset.random_topology {
build_random_network(&paramset, seed, out_topology_path)
} else {
build_striped_network(&paramset, seed)
@ -47,7 +47,10 @@ pub fn run_iteration(
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Results
let mut all_sent_count = 0; // all data + noise sent by the sender
let mut all_sent_count = 0; // all data + noise sent by all senders
let target_all_sent_count = (paramset.num_sender_msgs as usize)
.checked_mul(paramset.num_senders as usize)
.unwrap();
let mut sent_times: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut latencies: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut sent_sequence = Sequence::new();
@ -65,33 +68,35 @@ pub fn run_iteration(
latencies.len()
);
// The sender emits a message (data or noise) to all adjacent peers.
if all_sent_count < paramset.num_sender_msgs as usize {
if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
let msg = next_msg_id;
next_msg_id += 1;
sender_peers.iter().for_each(|peer_id| {
mixnodes.get_mut(peer_id).unwrap().receive(msg, None);
});
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix nodes
// because the mix nodes will anyway drop the noise,
// and we don't need to record what the mix nodes receive.
sent_sequence.add_noise();
// All senders emit a message (data or noise) to all of their own adjacent peers.
if all_sent_count < target_all_sent_count {
for sender_peers in sender_peers_list.iter() {
if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
let msg = next_msg_id;
next_msg_id += 1;
sender_peers.iter().for_each(|peer_id| {
mixnodes.get_mut(peer_id).unwrap().receive(msg, None);
});
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix nodes
// because the mix nodes will anyway drop the noise,
// and we don't need to record what the mix nodes receive.
sent_sequence.add_noise();
}
all_sent_count += 1;
}
all_sent_count += 1;
}
// Each mix node add a new data message to its queue with a certain probability
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
for (_, node) in mixnodes.iter_mut() {
for (_, node) in mixnodes.iter_mut() {
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
node.send(next_msg_id);
next_msg_id += 1;
// Don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the sender, not the mixnode.
// because sent_sequence is only for recording messages sent by the senders, not the mixnode.
}
}
@ -105,7 +110,7 @@ pub fn run_iteration(
}
all_msgs_to_relay
.into_iter()
.for_each(|(sender_id, msgs_to_relay)| {
.for_each(|(mix_id, msgs_to_relay)| {
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
if peer_id == RECEIVER_ID {
match msg {
@ -118,14 +123,14 @@ pub fn run_iteration(
unified_received_sequence.add_message(msg);
}
received_sequences
.entry(sender_id)
.entry(mix_id)
.or_insert(Sequence::new())
.add_message(msg);
}
}
Message::Noise => {
received_sequences
.entry(sender_id)
.entry(mix_id)
.or_insert(Sequence::new())
.add_noise();
}
@ -134,7 +139,7 @@ pub fn run_iteration(
mixnodes
.get_mut(&peer_id)
.unwrap()
.receive(msg, Some(sender_id));
.receive(msg, Some(mix_id));
}
});
});
@ -146,11 +151,9 @@ pub fn run_iteration(
});
queue_data_msg_counts.push(counts);
// If all data amessages (that the sender has to send) have been received by the receiver,
// If all data messages (that have been sent by the senders) have been received by the receiver,
// stop the iteration.
if all_sent_count == paramset.num_sender_msgs as usize
&& sent_times.len() == latencies.len()
{
if all_sent_count == target_all_sent_count && sent_times.len() == latencies.len() {
break;
}
@ -188,7 +191,10 @@ pub fn run_iteration(
}
}
fn build_striped_network(paramset: &ParamSet, seed: u64) -> (FxHashMap<NodeId, Node>, Vec<NodeId>) {
fn build_striped_network(
paramset: &ParamSet,
seed: u64,
) -> (FxHashMap<NodeId, Node>, Vec<Vec<NodeId>>) {
assert!(!paramset.random_topology);
let mut next_node_id: NodeId = 0;
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
@ -227,15 +233,19 @@ fn build_striped_network(paramset: &ParamSet, seed: u64) -> (FxHashMap<NodeId, N
}
}
}
let sender_peers: Vec<NodeId> = paths.iter().map(|path| *path.first().unwrap()).collect();
(mixnodes, sender_peers)
let sender_peers_list: Vec<Vec<NodeId>> =
vec![
paths.iter().map(|path| *path.first().unwrap()).collect();
paramset.num_senders as usize
];
(mixnodes, sender_peers_list)
}
fn build_random_network(
paramset: &ParamSet,
seed: u64,
out_topology_path: &str,
) -> (FxHashMap<NodeId, Node>, Vec<NodeId>) {
) -> (FxHashMap<NodeId, Node>, Vec<Vec<NodeId>>) {
assert!(paramset.random_topology);
// Init mix nodes
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
@ -259,12 +269,17 @@ fn build_random_network(
let mut peers_rng = StdRng::seed_from_u64(seed);
let mut candidates: Vec<NodeId> = (0..paramset.num_mixes).collect();
assert!(candidates.len() >= paramset.peering_degree as usize);
candidates.as_mut_slice().shuffle(&mut peers_rng);
let sender_peers: Vec<NodeId> = candidates
.iter()
.cloned()
.take(paramset.peering_degree as usize)
.collect();
let mut sender_peers_list: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_senders as usize);
for _ in 0..paramset.num_senders {
candidates.as_mut_slice().shuffle(&mut peers_rng);
sender_peers_list.push(
candidates
.iter()
.cloned()
.take(paramset.peering_degree as usize)
.collect(),
);
}
candidates.as_mut_slice().shuffle(&mut peers_rng);
let receiver_peers: Vec<NodeId> = candidates
.iter()
@ -293,7 +308,7 @@ fn build_random_network(
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
}
(mixnodes, sender_peers)
(mixnodes, sender_peers_list)
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {

View File

@ -52,6 +52,7 @@ pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_senders",
"num_sender_msgs",
"sender_data_msg_prob",
"mix_data_msg_prob",
@ -68,6 +69,7 @@ pub struct ParamSet {
pub peering_degree: u32,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_senders: u32,
pub num_sender_msgs: u32,
pub sender_data_msg_prob: f32,
pub mix_data_msg_prob: f32,
@ -89,6 +91,10 @@ impl ParamSet {
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let transmission_rate: u16 = 1;
let min_queue_size: u16 = 10;
let num_senders: u32 = match exp_id {
ExperimentId::Experiment3 | ExperimentId::Experiment4 => 2,
_ => 1,
};
let num_sender_msgs: u32 = 1000000;
let sender_data_msg_probs: &[f32] = &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0];
let mix_data_msg_probs: &[f32] = match exp_id {
@ -119,6 +125,7 @@ impl ParamSet {
peering_degree: 1,
min_queue_size,
transmission_rate,
num_senders,
num_sender_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
@ -145,6 +152,7 @@ impl ParamSet {
peering_degree,
min_queue_size,
transmission_rate,
num_senders,
num_sender_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
@ -172,6 +180,7 @@ impl ParamSet {
self.peering_degree.to_string(),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_senders.to_string(),
self.num_sender_msgs.to_string(),
self.sender_data_msg_prob.to_string(),
self.mix_data_msg_prob.to_string(),