impl exp1~4: multi path/mix

This commit is contained in:
Youngjoon Lee 2024-08-22 19:01:51 +02:00
parent e9012eae83
commit 27a8313450
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
3 changed files with 228 additions and 111 deletions

View File

@ -1,7 +1,7 @@
use std::path::Path;
use protocol::{
node::{MessageId, Node},
node::{MessageId, Node, NodeId},
queue::{Message, QueueConfig, QueueType},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
@ -9,6 +9,8 @@ use rustc_hash::FxHashMap;
use crate::{ordercoeff::Sequence, paramset::ParamSet};
const RECEIVER_ID: NodeId = NodeId::MAX;
pub fn run_iteration(
paramset: ParamSet,
seed: u64,
@ -18,27 +20,88 @@ pub fn run_iteration(
out_data_msg_counts_path: &str,
out_ordering_coeff_path: &str,
) {
if paramset.random_topology {
run_iteration_with_random_topology(
paramset,
seed,
out_latency_path,
out_sent_sequence_path,
out_received_sequence_path,
out_data_msg_counts_path,
out_ordering_coeff_path,
)
} else {
run_iteration_without_random_topology(
paramset,
seed,
out_latency_path,
out_sent_sequence_path,
out_received_sequence_path,
out_data_msg_counts_path,
out_ordering_coeff_path,
)
}
}
fn run_iteration_without_random_topology(
paramset: ParamSet,
seed: u64,
out_latency_path: &str,
out_sent_sequence_path: &str,
out_received_sequence_path_prefix: &str,
out_queue_data_msg_counts_path: &str,
out_ordering_coeff_path: &str,
) {
assert!(!paramset.random_topology);
// Ensure that all output files do not exist
for path in &[
out_latency_path,
out_sent_sequence_path,
out_received_sequence_path,
out_data_msg_counts_path,
out_received_sequence_path_prefix,
out_queue_data_msg_counts_path,
] {
assert!(!Path::new(path).exists(), "File already exists: {path}");
}
// Initialize a mix node
let mut mixnode = Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed,
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
false,
);
mixnode.connect(u32::MAX); // connect to the virtual receiver node
// Initialize mix nodes
let mut next_node_id: NodeId = 0;
let mut mixnodes: FxHashMap<NodeId, Node> = FxHashMap::default();
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
for _ in 0..paramset.num_paths {
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
for _ in 0..paramset.num_mixes {
let id = next_node_id;
next_node_id += 1;
mixnodes.insert(
id,
Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed,
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
paramset.random_topology, // disable cache
),
);
ids.push(id);
}
paths.push(ids);
}
// Connect mix nodes
for path in paths.iter() {
for (i, id) in path.iter().enumerate() {
if i != path.len() - 1 {
let peer_id = path[i + 1];
mixnodes.get_mut(id).unwrap().connect(peer_id);
} else {
mixnodes.get_mut(id).unwrap().connect(RECEIVER_ID);
}
}
}
let sender_peers: Vec<NodeId> = paths.iter().map(|path| path[0]).collect();
let mut next_msg_id: MessageId = 0;
@ -51,10 +114,10 @@ pub fn run_iteration(
let mut sent_times: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut latencies: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut sent_sequence = Sequence::new();
let mut received_sequence = Sequence::new();
let mut data_msg_counts_in_queue: Vec<usize> = Vec::new();
let mut received_sequences: FxHashMap<NodeId, Sequence> = FxHashMap::default();
let mut queue_data_msg_counts: Vec<FxHashMap<NodeId, Vec<usize>>> = Vec::new();
let mut rng = StdRng::seed_from_u64(seed);
let mut data_msg_rng = StdRng::seed_from_u64(seed);
loop {
tracing::trace!(
"VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}",
@ -64,48 +127,81 @@ pub fn run_iteration(
latencies.len()
);
// The sender emits a message (data or noise) to the mix node.
// The sender emits a message (data or noise) to all adjacent peers.
if all_sent_count < paramset.num_sender_msgs as usize {
if try_probability(&mut rng, paramset.sender_data_msg_prob) {
if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
let msg = next_msg_id;
next_msg_id += 1;
mixnode.receive(msg, None);
sender_peers.iter().for_each(|peer_id| {
mixnodes.get_mut(peer_id).unwrap().receive(msg, None);
});
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix node
// because the mix node will anyway drop the noise,
// and we don't need to record what the mix node receives.
// but don't need to send it to the mix nodes
// because the mix nodes will anyway drop the noise,
// and we don't need to record what the mix nodes receive.
sent_sequence.add_noise();
}
all_sent_count += 1;
}
// The mix node add a new data message to its queue with a certain probability
if try_probability(&mut rng, paramset.mix_data_msg_prob) {
mixnode.send(next_msg_id);
next_msg_id += 1;
// Don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the sender, not the mixnode.
// Each mix node add a new data message to its queue with a certain probability
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
for (_, node) in mixnodes.iter_mut() {
node.send(next_msg_id);
next_msg_id += 1;
// Don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the sender, not the mixnode.
}
}
// The mix node emits a message (data or noise) to the receiver.
// Each mix node relays a message (data or noise) to the next mix node or the receiver.
// As the receiver, record the time and order of the received messages.
// TODO: handle all queues
match mixnode.read_queues().first().unwrap().1 {
Message::Data(msg) => {
latencies.insert(msg, vtime - sent_times.get(&msg).unwrap());
received_sequence.add_message(msg);
}
Message::Noise => {
received_sequence.add_noise();
}
//
// source -> (destination, msg)
let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message<MessageId>)>)> = Vec::new();
for (node_id, node) in mixnodes.iter_mut() {
all_msgs_to_relay.push((*node_id, node.read_queues()));
}
all_msgs_to_relay
.into_iter()
.for_each(|(sender_id, msgs_to_relay)| {
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
if peer_id == RECEIVER_ID {
match msg {
Message::Data(msg) => {
latencies
.entry(msg)
.or_insert(vtime - sent_times.get(&msg).unwrap());
received_sequences
.entry(sender_id)
.or_insert(Sequence::new())
.add_message(msg);
}
Message::Noise => {
received_sequences
.entry(sender_id)
.or_insert(Sequence::new())
.add_noise();
}
}
} else if let Message::Data(msg) = msg {
mixnodes
.get_mut(&peer_id)
.unwrap()
.receive(msg, Some(sender_id));
}
});
});
// Record the number of data messages in the mix node's queue
// TODO: handle all queues
data_msg_counts_in_queue.push(*mixnode.data_count_in_queue().first().unwrap());
// Record the number of data messages in each mix node's queues
let mut counts: FxHashMap<NodeId, Vec<usize>> = FxHashMap::default();
mixnodes.iter().for_each(|(id, node)| {
counts.insert(*id, node.queue_data_msg_counts());
});
queue_data_msg_counts.push(counts);
// If all data amessages (that the sender has to send) have been received by the receiver,
// stop the iteration.
@ -121,29 +217,37 @@ pub fn run_iteration(
// Save results to CSV files
save_latencies(&latencies, &sent_times, out_latency_path);
save_sequence(&sent_sequence, out_sent_sequence_path);
save_sequence(&received_sequence, out_received_sequence_path);
save_data_msg_counts(
&data_msg_counts_in_queue,
save_sequences(&received_sequences, out_received_sequence_path_prefix);
save_queue_data_msg_counts(
&queue_data_msg_counts,
transmission_interval,
out_data_msg_counts_path,
out_queue_data_msg_counts_path,
);
// Calculate ordering coefficients and save them to a CSV file.
if paramset.queue_type != QueueType::NonMix {
let strong_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, true);
let weak_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, false);
tracing::info!(
"STRONG_COEFF:{}, WEAK_COEFF:{}",
strong_ordering_coeff,
weak_ordering_coeff
);
save_ordering_coefficients(
strong_ordering_coeff,
weak_ordering_coeff,
out_ordering_coeff_path,
);
let mut coeffs: Vec<[u64; 2]> = Vec::new();
for (_, recv_seq) in received_sequences.iter() {
let casual = sent_sequence.ordering_coefficient(recv_seq, true);
let weak = sent_sequence.ordering_coefficient(recv_seq, false);
coeffs.push([casual, weak]);
}
save_ordering_coefficients(&coeffs, out_ordering_coeff_path);
}
}
fn run_iteration_with_random_topology(
paramset: ParamSet,
seed: u64,
out_latency_path: &str,
out_sent_sequence_path: &str,
out_received_sequence_path: &str,
out_data_msg_counts_path: &str,
out_ordering_coeff_path: &str,
) {
assert!(paramset.random_topology);
todo!()
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
assert!(
(0.0..=1.0).contains(&prob),
@ -174,42 +278,54 @@ fn save_latencies(
writer.flush().unwrap();
}
fn save_sequence(sequence: &Sequence, path: &str) {
fn save_sequence(seq: &Sequence, path: &str) {
let mut writer = csv::Writer::from_path(path).unwrap();
sequence.iter().for_each(|entry| {
seq.iter().for_each(|entry| {
writer.write_record([entry.to_string()]).unwrap();
});
writer.flush().unwrap();
}
fn save_data_msg_counts(
data_msg_counts_in_queue: &[usize],
interval: f32,
out_data_msg_counts_path: &str,
) {
let mut writer = csv::Writer::from_path(out_data_msg_counts_path).unwrap();
writer
.write_record(["vtime", "data_msg_count_in_queue"])
.unwrap();
data_msg_counts_in_queue
.iter()
.enumerate()
.for_each(|(i, count)| {
writer
.write_record([(i as f64 * interval as f64).to_string(), count.to_string()])
.unwrap();
fn save_sequences(sequences: &FxHashMap<NodeId, Sequence>, path_prefix: &str) {
sequences.iter().enumerate().for_each(|(i, (_, seq))| {
save_sequence(seq, &format!("{path_prefix}_{i}.csv"));
});
}
fn save_queue_data_msg_counts(data: &[FxHashMap<NodeId, Vec<usize>>], interval: f32, path: &str) {
let mut writer = csv::Writer::from_path(path).unwrap();
let mut header = vec!["vtime".to_string()];
data[0].iter().for_each(|(node_id, counts)| {
let num_queues = counts.len();
(0..num_queues).for_each(|q_idx| {
header.push(format!("node{node_id}_q{q_idx}"));
});
});
writer.write_record(header).unwrap();
data.iter().enumerate().for_each(|(i, counts_per_node)| {
let mut row = vec![(i as f64 * interval as f64).to_string()];
counts_per_node.iter().for_each(|(_, counts)| {
row.extend(
counts
.iter()
.map(|count| count.to_string())
.collect::<Vec<String>>(),
);
});
writer.write_record(row).unwrap();
});
writer.flush().unwrap();
}
fn save_ordering_coefficients(strong_ordering_coeff: u64, weak_ordering_coeff: u64, path: &str) {
fn save_ordering_coefficients(data: &[[u64; 2]], path: &str) {
let mut writer = csv::Writer::from_path(path).unwrap();
writer.write_record(["strong", "weak"]).unwrap();
writer
.write_record([
strong_ordering_coeff.to_string(),
weak_ordering_coeff.to_string(),
])
.unwrap();
writer.write_record(["path", "casual", "weak"]).unwrap();
for (path_idx, [casual, weak]) in data.iter().enumerate() {
writer
.write_record([path_idx.to_string(), casual.to_string(), weak.to_string()])
.unwrap();
}
writer.flush().unwrap();
}

View File

@ -2,9 +2,10 @@ use std::fmt::Display;
use protocol::node::MessageId;
#[derive(Debug, Clone)]
pub struct Sequence(Vec<Entry>);
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Entry {
Data(MessageId),
Noise(u32), // the number of consecutive noises
@ -52,13 +53,13 @@ impl Sequence {
}
impl Sequence {
pub fn ordering_coefficient(&self, other: &Sequence, strong: bool) -> u64 {
pub fn ordering_coefficient(&self, other: &Sequence, casual: bool) -> u64 {
let mut coeff = 0;
let mut i = 0;
while i < self.0.len() {
if let Entry::Data(_) = &self.0[i] {
let (c, next_i) = self.ordering_coefficient_from(i, other, strong);
let (c, next_i) = self.ordering_coefficient_from(i, other, casual);
coeff += c;
if next_i != i {
@ -78,7 +79,7 @@ impl Sequence {
&self,
start_idx: usize,
other: &Sequence,
strong: bool,
casual: bool,
) -> (u64, usize) {
let msg1 = match self.0[start_idx] {
Entry::Data(msg) => msg,
@ -89,8 +90,8 @@ impl Sequence {
if let Entry::Data(msg2) = entry {
if msg1 == *msg2 {
// Found the 1st matching msg. Start finding the next adjacent matching msg.
if strong {
return self.strong_ordering_coefficient_from(start_idx, other, j);
if casual {
return self.casual_ordering_coefficient_from(start_idx, other, j);
} else {
return self.weak_ordering_coefficient_from(start_idx, other, j);
}
@ -100,7 +101,7 @@ impl Sequence {
(0, start_idx)
}
fn strong_ordering_coefficient_from(
fn casual_ordering_coefficient_from(
&self,
start_idx: usize,
other: &Sequence,
@ -172,24 +173,24 @@ impl Sequence {
mod tests {
use super::*;
fn test_ordering_coefficient_common(strong: bool) {
fn test_ordering_coefficient_common(casual: bool) {
// Case 0: Empty sequences
let seq = Sequence(vec![]);
assert_eq!(seq.ordering_coefficient(&seq, strong), 0);
assert_eq!(seq.ordering_coefficient(&seq, casual), 0);
// Case 1: Exact one matched pair with no noise
let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
assert_eq!(seq.ordering_coefficient(&seq, strong), 1);
assert_eq!(seq.ordering_coefficient(&seq, casual), 1);
// Case 2: Exact one matched pair with noise
let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
assert_eq!(seq.ordering_coefficient(&seq, strong), 1);
assert_eq!(seq.ordering_coefficient(&seq, casual), 1);
// Case 3: One matched pair with no noise
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1);
// Case 4: One matched pair with noise
let seq1 = Sequence(vec![
@ -199,8 +200,8 @@ mod tests {
Entry::Data(3),
]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1);
// Case 5: Two matched pairs with noise
let seq1 = Sequence(vec![
@ -216,26 +217,26 @@ mod tests {
Entry::Data(3),
Entry::Data(4),
]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 2);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 2);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 2);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 2);
// Case 6: Only partial match with no noise
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
// Case 7: Only partial match with noise
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
// Case 8: No match at all
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
// Case 9: Matches with noise but mixed orders
let seq1 = Sequence(vec![
@ -256,12 +257,12 @@ mod tests {
Entry::Data(3),
Entry::Data(6),
]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 3);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 3);
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 3);
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 3);
}
#[test]
fn test_strong_ordering_coefficient() {
fn test_casual_ordering_coefficient() {
test_ordering_coefficient_common(true);
// Case 0: No match because of noise

View File

@ -75,7 +75,7 @@ impl Node {
msgs_to_relay
}
pub fn data_count_in_queue(&self) -> Vec<usize> {
pub fn queue_data_msg_counts(&self) -> Vec<usize> {
self.queues
.iter()
.map(|(_, queue)| queue.data_count())