mirror of
https://github.com/logos-blockchain/logos-blockchain-simulations.git
synced 2026-02-18 12:13:09 +00:00
multiple sent_seqs, sender_idx in msg, all gradual writings, new topology writing, no odercoeff calculation
This commit is contained in:
parent
ff92070498
commit
60722361d1
@ -1,5 +1,5 @@
|
||||
use protocol::{
|
||||
node::{MessageId, Node, NodeId},
|
||||
node::{Node, NodeId},
|
||||
queue::{Message, QueueConfig},
|
||||
topology::{build_topology, save_topology},
|
||||
};
|
||||
@ -8,18 +8,21 @@ use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::paramset::ParamSet;
|
||||
|
||||
type MessageId = u32;
|
||||
|
||||
// An interval that the sender nodes send (schedule) new messages
|
||||
const MSG_INTERVAL: f32 = 1.0;
|
||||
|
||||
pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) {
|
||||
// Initialize nodes (not connected with each other yet)
|
||||
let mut nodes: Vec<Node> = Vec::new();
|
||||
let mut nodes: Vec<Node<MessageId>> = Vec::new();
|
||||
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
|
||||
let peering_degrees = paramset.gen_peering_degrees(seed);
|
||||
tracing::debug!("PeeringDegrees initialized.");
|
||||
|
||||
for node_id in 0..paramset.num_nodes {
|
||||
nodes.push(Node::new(
|
||||
node_id,
|
||||
QueueConfig {
|
||||
queue_type: paramset.queue_type,
|
||||
seed: queue_seed_rng.next_u64(),
|
||||
@ -112,7 +115,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
|
||||
fn send_messages(
|
||||
vtime: f32,
|
||||
sender_ids: &[NodeId],
|
||||
nodes: &mut [Node],
|
||||
nodes: &mut [Node<MessageId>],
|
||||
next_msg_id: &mut MessageId,
|
||||
message_tracker: &mut FxHashMap<MessageId, (f32, u16)>,
|
||||
) {
|
||||
@ -125,7 +128,7 @@ fn send_messages(
|
||||
|
||||
fn relay_messages(
|
||||
vtime: f32,
|
||||
nodes: &mut [Node],
|
||||
nodes: &mut [Node<MessageId>],
|
||||
message_tracker: &mut FxHashMap<MessageId, (f32, u16)>,
|
||||
num_disseminated_msgs: &mut usize,
|
||||
writer: &mut csv::Writer<std::fs::File>,
|
||||
|
||||
@ -1,49 +1,30 @@
|
||||
use std::{collections::hash_map::Entry, fs::File, path::Path};
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
use csv::Writer;
|
||||
use protocol::{
|
||||
node::{MessageId, Node, NodeId},
|
||||
queue::{Message, QueueConfig, QueueType},
|
||||
topology::{build_topology, save_topology},
|
||||
};
|
||||
use rand::{rngs::StdRng, seq::SliceRandom, Rng, RngCore, SeedableRng};
|
||||
use protocol::{node::NodeId, queue::Message};
|
||||
use rand::{rngs::StdRng, Rng, SeedableRng};
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::{ordercoeff::Sequence, paramset::ParamSet};
|
||||
use crate::{
|
||||
message::{DataMessage, DataMessageGenerator},
|
||||
outputs::Outputs,
|
||||
paramset::ParamSet,
|
||||
topology::{build_random_network, build_striped_network, RECEIVER_NODE_ID},
|
||||
};
|
||||
|
||||
const RECEIVER_ID: NodeId = NodeId::MAX;
|
||||
|
||||
pub fn run_iteration(
|
||||
paramset: ParamSet,
|
||||
seed: u64,
|
||||
out_latency_path: &str,
|
||||
out_sent_sequence_path: &str,
|
||||
out_received_sequence_path_prefix: &str,
|
||||
out_queue_data_msg_counts_path: &str,
|
||||
out_ordering_coeff_path: Option<String>,
|
||||
out_topology_path: &str,
|
||||
) -> f32 {
|
||||
// Ensure that all output files do not exist
|
||||
for path in &[
|
||||
out_latency_path,
|
||||
out_sent_sequence_path,
|
||||
out_received_sequence_path_prefix,
|
||||
out_queue_data_msg_counts_path,
|
||||
out_topology_path,
|
||||
] {
|
||||
assert!(!Path::new(path).exists(), "File already exists: {path}");
|
||||
}
|
||||
if let Some(path) = &out_ordering_coeff_path {
|
||||
assert!(!Path::new(path).exists(), "File already exists: {path}");
|
||||
}
|
||||
|
||||
let (mut mixnodes, sender_peers_list) = if paramset.random_topology {
|
||||
build_random_network(¶mset, seed, out_topology_path)
|
||||
pub fn run_iteration(paramset: ParamSet, seed: u64, outputs: &mut Outputs) -> f32 {
|
||||
let (mut mixnodes, sender_peers_list, receiver_peer_conn_idx) = if paramset.random_topology {
|
||||
build_random_network(¶mset, seed, outputs)
|
||||
} else {
|
||||
build_striped_network(¶mset, seed)
|
||||
};
|
||||
// Check node ID consistency
|
||||
for (i, node) in mixnodes.iter().enumerate() {
|
||||
assert_eq!(node.id as usize, i);
|
||||
}
|
||||
|
||||
let mut next_msg_id: MessageId = 0;
|
||||
// For N senders + 1 mix (all mixnodes will share the same sender ID)
|
||||
let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1);
|
||||
let mix_msg_sender_id = paramset.num_senders;
|
||||
|
||||
// Virtual discrete time
|
||||
let mut vtime: f32 = 0.0;
|
||||
@ -51,21 +32,13 @@ pub fn run_iteration(
|
||||
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
|
||||
// Results
|
||||
let mut all_sent_count = 0; // all data + noise sent by all senders
|
||||
let target_all_sent_count = (paramset.num_sender_msgs as usize)
|
||||
let all_sent_count_target = (paramset.num_sender_msgs as usize)
|
||||
.checked_mul(paramset.num_senders as usize)
|
||||
.unwrap();
|
||||
let mut sent_times: FxHashMap<MessageId, f32> = FxHashMap::default();
|
||||
let mut recv_times: FxHashMap<MessageId, f32> = FxHashMap::default();
|
||||
let mut latencies: Vec<(MessageId, f32)> = Vec::new();
|
||||
let mut sent_sequence = Sequence::new();
|
||||
let mut received_sequences: FxHashMap<NodeId, Sequence> = FxHashMap::default();
|
||||
let mut unified_received_sequence = if paramset.random_topology {
|
||||
Some(Sequence::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut queue_data_msg_counts_writer =
|
||||
new_queue_data_msg_counts_writer(out_queue_data_msg_counts_path, &mixnodes);
|
||||
let mut sent_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
|
||||
let mut recv_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
|
||||
|
||||
outputs.write_header_queue_data_msg_counts(&mixnodes);
|
||||
|
||||
let mut data_msg_rng = StdRng::seed_from_u64(seed);
|
||||
loop {
|
||||
@ -73,254 +46,104 @@ pub fn run_iteration(
|
||||
"VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}",
|
||||
vtime,
|
||||
all_sent_count,
|
||||
sent_times.len(),
|
||||
latencies.len()
|
||||
sent_data_msgs.len(),
|
||||
recv_data_msgs.len(),
|
||||
);
|
||||
|
||||
// All senders emit a message (data or noise) to all of their own adjacent peers.
|
||||
if all_sent_count < target_all_sent_count {
|
||||
for sender_peers in sender_peers_list.iter() {
|
||||
if all_sent_count < all_sent_count_target {
|
||||
// For each sender
|
||||
for (sender_idx, sender_peers) in sender_peers_list.iter().enumerate() {
|
||||
if try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
|
||||
let msg = next_msg_id;
|
||||
next_msg_id += 1;
|
||||
let msg = data_msg_gen.next(sender_idx.try_into().unwrap());
|
||||
sender_peers.iter().for_each(|peer_id| {
|
||||
mixnodes
|
||||
.get_mut(*peer_id as usize)
|
||||
.unwrap()
|
||||
.receive(msg, None);
|
||||
});
|
||||
sent_times.insert(msg, vtime);
|
||||
sent_sequence.add_message(msg);
|
||||
sent_data_msgs.insert(msg, vtime);
|
||||
outputs.add_sent_msg(&msg)
|
||||
} else {
|
||||
// Generate noise and add it to the sequence to calculate ordering coefficients later,
|
||||
// but don't need to send it to the mix nodes
|
||||
// because the mix nodes will anyway drop the noise,
|
||||
// and we don't need to record what the mix nodes receive.
|
||||
sent_sequence.add_noise();
|
||||
outputs.add_sent_noise(sender_idx);
|
||||
}
|
||||
all_sent_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Each mix node add a new data message to its queue with a certain probability
|
||||
for node in mixnodes.iter_mut() {
|
||||
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
|
||||
node.send(next_msg_id);
|
||||
next_msg_id += 1;
|
||||
// Don't put the msg into the sent_sequence
|
||||
// because sent_sequence is only for recording messages sent by the senders, not the mixnode.
|
||||
if paramset.mix_data_msg_prob > 0.0 {
|
||||
for node in mixnodes.iter_mut() {
|
||||
if try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
|
||||
node.send(data_msg_gen.next(mix_msg_sender_id));
|
||||
// We don't put the msg into the sent_sequence
|
||||
// because sent_sequence is only for recording messages sent by the senders, not the mixnode.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Each mix node relays a message (data or noise) to the next mix node or the receiver.
|
||||
// As the receiver, record the time and order of the received messages.
|
||||
//
|
||||
// source -> (destination, msg)
|
||||
let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message<MessageId>)>)> = Vec::new();
|
||||
for (node_id, node) in mixnodes.iter_mut().enumerate() {
|
||||
all_msgs_to_relay.push((node_id.try_into().unwrap(), node.read_queues()));
|
||||
// relayer_id -> (peer_id, msg)
|
||||
let mut all_msgs_to_relay: Vec<(NodeId, Vec<(NodeId, Message<DataMessage>)>)> = Vec::new();
|
||||
for node in mixnodes.iter_mut() {
|
||||
all_msgs_to_relay.push((node.id, node.read_queues()));
|
||||
}
|
||||
all_msgs_to_relay
|
||||
.into_iter()
|
||||
.for_each(|(mix_id, msgs_to_relay)| {
|
||||
.for_each(|(relayer_id, msgs_to_relay)| {
|
||||
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
|
||||
if peer_id == RECEIVER_ID {
|
||||
if peer_id == RECEIVER_NODE_ID {
|
||||
match msg {
|
||||
Message::Data(msg) => {
|
||||
// If msg was sent by the sender (not by any mix)
|
||||
if let Some(&sent_time) = sent_times.get(&msg) {
|
||||
// If this is the first time to see the msg
|
||||
if let Entry::Vacant(e) = recv_times.entry(msg) {
|
||||
if let Some(&sent_time) = sent_data_msgs.get(&msg) {
|
||||
// If this is the first time to see the msg,
|
||||
// update stats that must ignore duplicate messages.
|
||||
if let Entry::Vacant(e) = recv_data_msgs.entry(msg) {
|
||||
e.insert(vtime);
|
||||
latencies.push((msg, vtime - sent_time));
|
||||
if let Some(unified_recv_seq) =
|
||||
&mut unified_received_sequence
|
||||
{
|
||||
unified_recv_seq.add_message(msg);
|
||||
}
|
||||
outputs.add_latency(&msg, sent_time, vtime);
|
||||
}
|
||||
received_sequences
|
||||
.entry(mix_id)
|
||||
.or_insert(Sequence::new())
|
||||
.add_message(msg);
|
||||
}
|
||||
// Record msg to the sequence
|
||||
let conn_idx = receiver_peer_conn_idx.get(&relayer_id).unwrap();
|
||||
outputs.add_recv_msg(&msg, *conn_idx as usize);
|
||||
}
|
||||
Message::Noise => {
|
||||
received_sequences
|
||||
.entry(mix_id)
|
||||
.or_insert(Sequence::new())
|
||||
.add_noise();
|
||||
// Record noise to the sequence
|
||||
let conn_idx = receiver_peer_conn_idx.get(&relayer_id).unwrap();
|
||||
outputs.add_recv_noise(*conn_idx as usize);
|
||||
}
|
||||
}
|
||||
} else if let Message::Data(msg) = msg {
|
||||
mixnodes
|
||||
.get_mut(peer_id as usize)
|
||||
.unwrap()
|
||||
.receive(msg, Some(mix_id));
|
||||
let peer = mixnodes.get_mut(peer_id as usize).unwrap();
|
||||
assert_eq!(peer.id, peer_id);
|
||||
peer.receive(msg, Some(relayer_id));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Record the number of data messages in each mix node's queues
|
||||
append_queue_data_msg_counts(&mixnodes, vtime, &mut queue_data_msg_counts_writer);
|
||||
outputs.add_queue_data_msg_counts(vtime, &mixnodes);
|
||||
|
||||
// If all data messages (that have been sent by the senders) have been received by the receiver,
|
||||
// If all senders finally emitted all data+noise messages,
|
||||
// and If all data messages have been received by the receiver,
|
||||
// stop the iteration.
|
||||
if all_sent_count == target_all_sent_count && sent_times.len() == latencies.len() {
|
||||
if all_sent_count == all_sent_count_target && sent_data_msgs.len() == recv_data_msgs.len() {
|
||||
break;
|
||||
}
|
||||
|
||||
vtime += transmission_interval;
|
||||
}
|
||||
|
||||
// Save results to CSV files
|
||||
save_latencies(&latencies, &sent_times, &recv_times, out_latency_path);
|
||||
save_sequence(&sent_sequence, out_sent_sequence_path);
|
||||
// Sort received_sequences
|
||||
let mut node_ids: Vec<NodeId> = received_sequences.keys().cloned().collect();
|
||||
node_ids.sort();
|
||||
let received_sequences: Vec<Sequence> = node_ids
|
||||
.iter()
|
||||
.map(|node_id| received_sequences.remove(node_id).unwrap())
|
||||
.collect();
|
||||
save_sequences(&received_sequences, out_received_sequence_path_prefix);
|
||||
if let Some(unified_recv_seq) = &unified_received_sequence {
|
||||
save_sequence(
|
||||
unified_recv_seq,
|
||||
format!("{out_received_sequence_path_prefix}_unified.csv").as_str(),
|
||||
);
|
||||
}
|
||||
// Calculate ordering coefficients and save them to a CSV file (if enabled)
|
||||
if let Some(out_ordering_coeff_path) = &out_ordering_coeff_path {
|
||||
if paramset.queue_type != QueueType::NonMix {
|
||||
if let Some(unified_recv_seq) = &unified_received_sequence {
|
||||
let casual = sent_sequence.ordering_coefficient(unified_recv_seq, true);
|
||||
let weak = sent_sequence.ordering_coefficient(unified_recv_seq, false);
|
||||
save_ordering_coefficients(&[[casual, weak]], out_ordering_coeff_path);
|
||||
} else {
|
||||
let mut coeffs: Vec<[u64; 2]> = Vec::new();
|
||||
for recv_seq in received_sequences.iter() {
|
||||
let casual = sent_sequence.ordering_coefficient(recv_seq, true);
|
||||
let weak = sent_sequence.ordering_coefficient(recv_seq, false);
|
||||
coeffs.push([casual, weak]);
|
||||
}
|
||||
save_ordering_coefficients(&coeffs, out_ordering_coeff_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
vtime
|
||||
}
|
||||
|
||||
fn build_striped_network(paramset: &ParamSet, seed: u64) -> (Vec<Node>, Vec<Vec<NodeId>>) {
|
||||
assert!(!paramset.random_topology);
|
||||
let mut next_node_id: NodeId = 0;
|
||||
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
|
||||
let mut mixnodes: Vec<Node> =
|
||||
Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize);
|
||||
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
|
||||
for _ in 0..paramset.num_paths {
|
||||
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
|
||||
for _ in 0..paramset.num_mixes {
|
||||
let id = next_node_id;
|
||||
next_node_id += 1;
|
||||
mixnodes.push(Node::new(
|
||||
QueueConfig {
|
||||
queue_type: paramset.queue_type,
|
||||
seed: queue_seed_rng.next_u64(),
|
||||
min_queue_size: paramset.min_queue_size,
|
||||
},
|
||||
paramset.peering_degree,
|
||||
false, // disable cache
|
||||
));
|
||||
ids.push(id);
|
||||
}
|
||||
paths.push(ids);
|
||||
}
|
||||
|
||||
// Connect mix nodes
|
||||
for path in paths.iter() {
|
||||
for (i, id) in path.iter().enumerate() {
|
||||
if i != path.len() - 1 {
|
||||
let peer_id = path[i + 1];
|
||||
mixnodes.get_mut(*id as usize).unwrap().connect(peer_id);
|
||||
} else {
|
||||
mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID);
|
||||
}
|
||||
}
|
||||
}
|
||||
let sender_peers_list: Vec<Vec<NodeId>> =
|
||||
vec![
|
||||
paths.iter().map(|path| *path.first().unwrap()).collect();
|
||||
paramset.num_senders as usize
|
||||
];
|
||||
(mixnodes, sender_peers_list)
|
||||
}
|
||||
|
||||
fn build_random_network(
|
||||
paramset: &ParamSet,
|
||||
seed: u64,
|
||||
out_topology_path: &str,
|
||||
) -> (Vec<Node>, Vec<Vec<NodeId>>) {
|
||||
assert!(paramset.random_topology);
|
||||
// Init mix nodes
|
||||
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
|
||||
let mut mixnodes: Vec<Node> = Vec::with_capacity(paramset.num_mixes as usize);
|
||||
for _ in 0..paramset.num_mixes {
|
||||
mixnodes.push(Node::new(
|
||||
QueueConfig {
|
||||
queue_type: paramset.queue_type,
|
||||
seed: queue_seed_rng.next_u64(),
|
||||
min_queue_size: paramset.min_queue_size,
|
||||
},
|
||||
paramset.peering_degree,
|
||||
true, // enable cache
|
||||
));
|
||||
}
|
||||
|
||||
// Choose sender's peers and receiver's peers randomly
|
||||
let mut peers_rng = StdRng::seed_from_u64(seed);
|
||||
let mut candidates: Vec<NodeId> = (0..paramset.num_mixes).collect();
|
||||
assert!(candidates.len() >= paramset.peering_degree as usize);
|
||||
let mut sender_peers_list: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_senders as usize);
|
||||
for _ in 0..paramset.num_senders {
|
||||
candidates.as_mut_slice().shuffle(&mut peers_rng);
|
||||
sender_peers_list.push(
|
||||
candidates
|
||||
.iter()
|
||||
.cloned()
|
||||
.take(paramset.peering_degree as usize)
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
candidates.as_mut_slice().shuffle(&mut peers_rng);
|
||||
let receiver_peers: Vec<NodeId> = candidates
|
||||
.iter()
|
||||
.cloned()
|
||||
.take(paramset.peering_degree as usize)
|
||||
.collect();
|
||||
|
||||
// Connect mix nodes
|
||||
let topology = build_topology(
|
||||
paramset.num_mixes,
|
||||
&vec![paramset.peering_degree; paramset.num_mixes as usize],
|
||||
seed,
|
||||
);
|
||||
save_topology(&topology, out_topology_path).unwrap();
|
||||
for (node_id, peers) in topology.iter().enumerate() {
|
||||
peers.iter().for_each(|peer_id| {
|
||||
mixnodes.get_mut(node_id).unwrap().connect(*peer_id);
|
||||
});
|
||||
}
|
||||
|
||||
// Connect the selected mix nodes with the receiver
|
||||
for id in receiver_peers.iter() {
|
||||
mixnodes.get_mut(*id as usize).unwrap().connect(RECEIVER_ID);
|
||||
}
|
||||
|
||||
(mixnodes, sender_peers_list)
|
||||
}
|
||||
|
||||
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
|
||||
assert!(
|
||||
(0.0..=1.0).contains(&prob),
|
||||
@ -328,87 +151,3 @@ fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
|
||||
);
|
||||
rng.gen::<f32>() < prob
|
||||
}
|
||||
|
||||
fn save_latencies(
|
||||
latencies: &[(MessageId, f32)],
|
||||
sent_times: &FxHashMap<MessageId, f32>,
|
||||
recv_times: &FxHashMap<MessageId, f32>,
|
||||
path: &str,
|
||||
) {
|
||||
let mut writer = csv::Writer::from_path(path).unwrap();
|
||||
writer
|
||||
.write_record(["msg_id", "latency", "sent_time", "received_time"])
|
||||
.unwrap();
|
||||
for (msg, latency) in latencies.iter() {
|
||||
let sent_time = sent_times.get(msg).unwrap();
|
||||
let recv_time = recv_times.get(msg).unwrap();
|
||||
writer
|
||||
.write_record(&[
|
||||
msg.to_string(),
|
||||
latency.to_string(),
|
||||
sent_time.to_string(),
|
||||
recv_time.to_string(),
|
||||
])
|
||||
.unwrap();
|
||||
}
|
||||
writer.flush().unwrap();
|
||||
}
|
||||
|
||||
fn save_sequence(seq: &Sequence, path: &str) {
|
||||
let mut writer = csv::Writer::from_path(path).unwrap();
|
||||
seq.iter().for_each(|entry| {
|
||||
writer.write_record([entry.to_string()]).unwrap();
|
||||
});
|
||||
writer.flush().unwrap();
|
||||
}
|
||||
|
||||
fn save_sequences(sequences: &[Sequence], path_prefix: &str) {
|
||||
sequences.iter().enumerate().for_each(|(i, seq)| {
|
||||
save_sequence(seq, &format!("{path_prefix}_{i}.csv"));
|
||||
});
|
||||
}
|
||||
|
||||
fn new_queue_data_msg_counts_writer(path: &str, mixnodes: &[Node]) -> Writer<File> {
|
||||
let mut writer = csv::Writer::from_path(path).unwrap();
|
||||
let mut header = vec!["vtime".to_string()];
|
||||
mixnodes
|
||||
.iter()
|
||||
.map(|node| node.queue_data_msg_counts())
|
||||
.enumerate()
|
||||
.for_each(|(node_id, counts)| {
|
||||
let num_queues = counts.len();
|
||||
(0..num_queues).for_each(|q_idx| {
|
||||
header.push(format!("node{node_id}_q{q_idx}"));
|
||||
});
|
||||
});
|
||||
writer.write_record(header).unwrap();
|
||||
writer.flush().unwrap();
|
||||
writer
|
||||
}
|
||||
|
||||
fn append_queue_data_msg_counts(mixnodes: &[Node], vtime: f32, writer: &mut Writer<File>) {
|
||||
let mut row = vec![vtime.to_string()];
|
||||
mixnodes
|
||||
.iter()
|
||||
.map(|node| node.queue_data_msg_counts())
|
||||
.for_each(|counts| {
|
||||
row.extend(
|
||||
counts
|
||||
.iter()
|
||||
.map(|count| count.to_string())
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
});
|
||||
writer.write_record(row).unwrap();
|
||||
}
|
||||
|
||||
fn save_ordering_coefficients(data: &[[u64; 2]], path: &str) {
|
||||
let mut writer = csv::Writer::from_path(path).unwrap();
|
||||
writer.write_record(["path", "casual", "weak"]).unwrap();
|
||||
for (path_idx, [casual, weak]) in data.iter().enumerate() {
|
||||
writer
|
||||
.write_record([path_idx.to_string(), casual.to_string(), weak.to_string()])
|
||||
.unwrap();
|
||||
}
|
||||
writer.flush().unwrap();
|
||||
}
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
mod iteration;
|
||||
mod message;
|
||||
mod ordercoeff;
|
||||
mod outputs;
|
||||
mod paramset;
|
||||
mod topology;
|
||||
|
||||
use std::{
|
||||
error::Error,
|
||||
@ -11,11 +14,12 @@ use std::{
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use iteration::run_iteration;
|
||||
use outputs::Outputs;
|
||||
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
|
||||
use protocol::queue::QueueType;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(name = "Single Sender Single Mix Measurement")]
|
||||
#[command(name = "Ordering Measurement")]
|
||||
struct Args {
|
||||
#[arg(short, long)]
|
||||
exp_id: ExperimentId,
|
||||
@ -25,8 +29,6 @@ struct Args {
|
||||
queue_type: QueueType,
|
||||
#[arg(short, long)]
|
||||
outdir: String,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
skip_coeff_calc: bool,
|
||||
#[arg(short, long)]
|
||||
from_paramset: Option<u16>,
|
||||
#[arg(short, long)]
|
||||
@ -43,7 +45,6 @@ fn main() {
|
||||
session_id,
|
||||
queue_type,
|
||||
outdir,
|
||||
skip_coeff_calc,
|
||||
from_paramset,
|
||||
to_paramset,
|
||||
} = args;
|
||||
@ -87,24 +88,28 @@ fn main() {
|
||||
dur_writer.flush().unwrap();
|
||||
|
||||
for i in 0..paramset.num_iterations {
|
||||
let wip_queue_data_msgs_counts_path =
|
||||
format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv");
|
||||
let mut outputs = Outputs::new(
|
||||
format!("{paramset_dir}/__WIP__iteration_{i}_latency.csv"),
|
||||
(0..paramset.num_senders)
|
||||
.map(|sender_idx| {
|
||||
format!("{paramset_dir}/__WIP__iteration_{i}_sent_seq_{sender_idx}.csv")
|
||||
})
|
||||
.collect(),
|
||||
(0..paramset.peering_degree)
|
||||
.map(|conn_idx| {
|
||||
format!("{paramset_dir}/__WIP__iteration_{i}_recv_seq_{conn_idx}.csv")
|
||||
})
|
||||
.collect(),
|
||||
format!("{paramset_dir}/__WIP__iteration_{i}_data_msg_counts.csv"),
|
||||
format!("{paramset_dir}/iteration_{i}_topology.csv"),
|
||||
);
|
||||
|
||||
let start_time = SystemTime::now();
|
||||
let vtime = run_iteration(
|
||||
paramset.clone(),
|
||||
i as u64,
|
||||
&format!("{paramset_dir}/iteration_{i}_latency.csv"),
|
||||
&format!("{paramset_dir}/iteration_{i}_sent_seq.csv"),
|
||||
&format!("{paramset_dir}/iteration_{i}_recv_seq"),
|
||||
&wip_queue_data_msgs_counts_path,
|
||||
if !skip_coeff_calc {
|
||||
Some(format!("{paramset_dir}/iteration_{i}_ordering_coeff.csv"))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
&format!("{paramset_dir}/iteration_{i}_topology.csv"),
|
||||
);
|
||||
|
||||
let vtime = run_iteration(paramset.clone(), i as u64, &mut outputs);
|
||||
outputs.close();
|
||||
outputs.rename_paths("__WIP__iteration", "iteration");
|
||||
|
||||
let duration = SystemTime::now().duration_since(start_time).unwrap();
|
||||
let duration_human = format_duration(duration);
|
||||
dur_writer
|
||||
@ -117,10 +122,6 @@ fn main() {
|
||||
.unwrap();
|
||||
dur_writer.flush().unwrap();
|
||||
|
||||
let new_queue_data_msgs_counts_path =
|
||||
wip_queue_data_msgs_counts_path.replace("__WIP__iteration_", "iteration_");
|
||||
std::fs::rename(&wip_queue_data_msgs_counts_path, &new_queue_data_msgs_counts_path).expect("Failed to rename {wip_queue_data_msgs_counts_path} -> {new_queue_data_msgs_counts_path}: {e}");
|
||||
|
||||
tracing::info!(
|
||||
"ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}",
|
||||
paramset.id,
|
||||
|
||||
31
mixnet-rs/ordering/src/message.rs
Normal file
31
mixnet-rs/ordering/src/message.rs
Normal file
@ -0,0 +1,31 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct DataMessage {
|
||||
pub sender: u8,
|
||||
pub msg_id: u32,
|
||||
}
|
||||
|
||||
impl Display for DataMessage {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_fmt(format_args!("{}:{}", self.sender, self.msg_id))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DataMessageGenerator {
|
||||
next_msg_ids: Vec<u32>,
|
||||
}
|
||||
|
||||
impl DataMessageGenerator {
|
||||
pub fn new(num_senders: u8) -> Self {
|
||||
Self {
|
||||
next_msg_ids: vec![0; num_senders as usize],
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&mut self, sender: u8) -> DataMessage {
|
||||
let msg_id = self.next_msg_ids[sender as usize];
|
||||
self.next_msg_ids[sender as usize] += 1;
|
||||
DataMessage { sender, msg_id }
|
||||
}
|
||||
}
|
||||
@ -1,297 +1,285 @@
|
||||
use std::fmt::Display;
|
||||
use std::fs::File;
|
||||
|
||||
use protocol::node::MessageId;
|
||||
use crate::message::DataMessage;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Sequence(Vec<Entry>);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Entry {
|
||||
Data(MessageId),
|
||||
Noise(u32), // the number of consecutive noises
|
||||
#[derive(Debug)]
|
||||
pub struct SequenceWriter {
|
||||
noise_buf: u32,
|
||||
writer: csv::Writer<File>,
|
||||
}
|
||||
|
||||
impl Display for Entry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let s = match self {
|
||||
Entry::Data(msg) => msg.to_string(),
|
||||
Entry::Noise(cnt) => format!("-{cnt}"),
|
||||
};
|
||||
f.write_str(s.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl Sequence {
|
||||
pub fn new() -> Self {
|
||||
Self(Vec::new())
|
||||
impl SequenceWriter {
|
||||
pub fn new(path: &str) -> Self {
|
||||
Self {
|
||||
noise_buf: 0,
|
||||
writer: csv::Writer::from_path(path).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_message(&mut self, msg: MessageId) {
|
||||
self.0.push(Entry::Data(msg));
|
||||
pub fn flush(&mut self) {
|
||||
self.clear_buf();
|
||||
self.writer.flush().unwrap();
|
||||
}
|
||||
|
||||
fn clear_buf(&mut self) {
|
||||
if self.noise_buf > 0 {
|
||||
self.writer
|
||||
.write_record(&[format!("-{}", self.noise_buf)])
|
||||
.unwrap();
|
||||
self.noise_buf = 0;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_message(&mut self, msg: &DataMessage) {
|
||||
self.clear_buf();
|
||||
self.writer.write_record(&[msg.to_string()]).unwrap();
|
||||
}
|
||||
|
||||
pub fn add_noise(&mut self) {
|
||||
if let Some(last) = self.0.last_mut() {
|
||||
match last {
|
||||
Entry::Noise(cnt) => {
|
||||
*cnt += 1;
|
||||
}
|
||||
_ => self.0.push(Entry::Noise(1)),
|
||||
}
|
||||
} else {
|
||||
self.0.push(Entry::Noise(1))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &Entry> {
|
||||
self.0.iter()
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.0.len()
|
||||
self.noise_buf += 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl Sequence {
|
||||
pub fn ordering_coefficient(&self, other: &Sequence, casual: bool) -> u64 {
|
||||
let mut coeff = 0;
|
||||
let mut i = 0;
|
||||
// impl Sequence {
|
||||
// pub fn ordering_coefficient(&self, other: &Sequence, casual: bool) -> u64 {
|
||||
// let mut coeff = 0;
|
||||
// let mut i = 0;
|
||||
|
||||
while i < self.0.len() {
|
||||
if let Entry::Data(_) = &self.0[i] {
|
||||
let (c, next_i) = self.ordering_coefficient_from(i, other, casual);
|
||||
coeff += c;
|
||||
// while i < self.0.len() {
|
||||
// if let Entry::Data(_) = &self.0[i] {
|
||||
// let (c, next_i) = self.ordering_coefficient_from(i, other, casual);
|
||||
// coeff += c;
|
||||
|
||||
if next_i != i {
|
||||
i = next_i;
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
// if next_i != i {
|
||||
// i = next_i;
|
||||
// } else {
|
||||
// i += 1;
|
||||
// }
|
||||
// } else {
|
||||
// i += 1;
|
||||
// }
|
||||
// }
|
||||
|
||||
coeff
|
||||
}
|
||||
// coeff
|
||||
// }
|
||||
|
||||
fn ordering_coefficient_from(
|
||||
&self,
|
||||
start_idx: usize,
|
||||
other: &Sequence,
|
||||
casual: bool,
|
||||
) -> (u64, usize) {
|
||||
let msg1 = match self.0[start_idx] {
|
||||
Entry::Data(msg) => msg,
|
||||
_ => panic!("Entry at {start_idx} must be Message"),
|
||||
};
|
||||
// fn ordering_coefficient_from(
|
||||
// &self,
|
||||
// start_idx: usize,
|
||||
// other: &Sequence,
|
||||
// casual: bool,
|
||||
// ) -> (u64, usize) {
|
||||
// let msg1 = match self.0[start_idx] {
|
||||
// Entry::Data(msg) => msg,
|
||||
// _ => panic!("Entry at {start_idx} must be Message"),
|
||||
// };
|
||||
|
||||
for (j, entry) in other.iter().enumerate() {
|
||||
if let Entry::Data(msg2) = entry {
|
||||
if msg1 == *msg2 {
|
||||
// Found the 1st matching msg. Start finding the next adjacent matching msg.
|
||||
if casual {
|
||||
return self.casual_ordering_coefficient_from(start_idx, other, j);
|
||||
} else {
|
||||
return self.weak_ordering_coefficient_from(start_idx, other, j);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
(0, start_idx)
|
||||
}
|
||||
// for (j, entry) in other.iter().enumerate() {
|
||||
// if let Entry::Data(msg2) = entry {
|
||||
// if msg1 == *msg2 {
|
||||
// // Found the 1st matching msg. Start finding the next adjacent matching msg.
|
||||
// if casual {
|
||||
// return self.casual_ordering_coefficient_from(start_idx, other, j);
|
||||
// } else {
|
||||
// return self.weak_ordering_coefficient_from(start_idx, other, j);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// (0, start_idx)
|
||||
// }
|
||||
|
||||
fn casual_ordering_coefficient_from(
|
||||
&self,
|
||||
start_idx: usize,
|
||||
other: &Sequence,
|
||||
other_start_idx: usize,
|
||||
) -> (u64, usize) {
|
||||
let mut coeff = 0;
|
||||
let mut i = start_idx + 1;
|
||||
let mut j = other_start_idx + 1;
|
||||
while i < self.0.len() && j < other.0.len() {
|
||||
match (&self.0[i], &other.0[j]) {
|
||||
(Entry::Noise(cnt1), Entry::Noise(cnt2)) => {
|
||||
if cnt1 == cnt2 {
|
||||
i += 1;
|
||||
j += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
(Entry::Data(msg1), Entry::Data(msg2)) => {
|
||||
if msg1 == msg2 {
|
||||
coeff += 1;
|
||||
i += 1;
|
||||
j += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
(coeff, i)
|
||||
}
|
||||
// fn casual_ordering_coefficient_from(
|
||||
// &self,
|
||||
// start_idx: usize,
|
||||
// other: &Sequence,
|
||||
// other_start_idx: usize,
|
||||
// ) -> (u64, usize) {
|
||||
// let mut coeff = 0;
|
||||
// let mut i = start_idx + 1;
|
||||
// let mut j = other_start_idx + 1;
|
||||
// while i < self.0.len() && j < other.0.len() {
|
||||
// match (&self.0[i], &other.0[j]) {
|
||||
// (Entry::Noise(cnt1), Entry::Noise(cnt2)) => {
|
||||
// if cnt1 == cnt2 {
|
||||
// i += 1;
|
||||
// j += 1;
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// (Entry::Data(msg1), Entry::Data(msg2)) => {
|
||||
// if msg1 == msg2 {
|
||||
// coeff += 1;
|
||||
// i += 1;
|
||||
// j += 1;
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// _ => break,
|
||||
// }
|
||||
// }
|
||||
// (coeff, i)
|
||||
// }
|
||||
|
||||
fn weak_ordering_coefficient_from(
|
||||
&self,
|
||||
start_idx: usize,
|
||||
other: &Sequence,
|
||||
other_start_idx: usize,
|
||||
) -> (u64, usize) {
|
||||
let mut coeff = 0;
|
||||
let mut i = start_idx + 1;
|
||||
let mut j = other_start_idx + 1;
|
||||
while i < self.0.len() && j < other.0.len() {
|
||||
i = self.skip_noise(i);
|
||||
j = other.skip_noise(j);
|
||||
if i < self.0.len() && j < other.0.len() && self.0[i] == other.0[j] {
|
||||
coeff += 1;
|
||||
i += 1;
|
||||
j += 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
(coeff, i)
|
||||
}
|
||||
// fn weak_ordering_coefficient_from(
|
||||
// &self,
|
||||
// start_idx: usize,
|
||||
// other: &Sequence,
|
||||
// other_start_idx: usize,
|
||||
// ) -> (u64, usize) {
|
||||
// let mut coeff = 0;
|
||||
// let mut i = start_idx + 1;
|
||||
// let mut j = other_start_idx + 1;
|
||||
// while i < self.0.len() && j < other.0.len() {
|
||||
// i = self.skip_noise(i);
|
||||
// j = other.skip_noise(j);
|
||||
// if i < self.0.len() && j < other.0.len() && self.0[i] == other.0[j] {
|
||||
// coeff += 1;
|
||||
// i += 1;
|
||||
// j += 1;
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// (coeff, i)
|
||||
// }
|
||||
|
||||
fn skip_noise(&self, mut index: usize) -> usize {
|
||||
while index < self.0.len() {
|
||||
if let Entry::Data(_) = self.0[index] {
|
||||
break;
|
||||
}
|
||||
index += 1;
|
||||
}
|
||||
index
|
||||
}
|
||||
}
|
||||
// fn skip_noise(&self, mut index: usize) -> usize {
|
||||
// while index < self.0.len() {
|
||||
// if let Entry::Data(_) = self.0[index] {
|
||||
// break;
|
||||
// }
|
||||
// index += 1;
|
||||
// }
|
||||
// index
|
||||
// }
|
||||
// }
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
// #[cfg(test)]
|
||||
// mod tests {
|
||||
// use super::*;
|
||||
|
||||
fn test_ordering_coefficient_common(casual: bool) {
|
||||
// Case 0: Empty sequences
|
||||
let seq = Sequence(vec![]);
|
||||
assert_eq!(seq.ordering_coefficient(&seq, casual), 0);
|
||||
// fn test_ordering_coefficient_common(casual: bool) {
|
||||
// // Case 0: Empty sequences
|
||||
// let seq = Sequence(vec![]);
|
||||
// assert_eq!(seq.ordering_coefficient(&seq, casual), 0);
|
||||
|
||||
// Case 1: Exact one matched pair with no noise
|
||||
let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
assert_eq!(seq.ordering_coefficient(&seq, casual), 1);
|
||||
// // Case 1: Exact one matched pair with no noise
|
||||
// let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
// assert_eq!(seq.ordering_coefficient(&seq, casual), 1);
|
||||
|
||||
// Case 2: Exact one matched pair with noise
|
||||
let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
assert_eq!(seq.ordering_coefficient(&seq, casual), 1);
|
||||
// // Case 2: Exact one matched pair with noise
|
||||
// let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
// assert_eq!(seq.ordering_coefficient(&seq, casual), 1);
|
||||
|
||||
// Case 3: One matched pair with no noise
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1);
|
||||
// // Case 3: One matched pair with no noise
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1);
|
||||
|
||||
// Case 4: One matched pair with noise
|
||||
let seq1 = Sequence(vec![
|
||||
Entry::Data(1),
|
||||
Entry::Noise(10),
|
||||
Entry::Data(2),
|
||||
Entry::Data(3),
|
||||
]);
|
||||
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1);
|
||||
// // Case 4: One matched pair with noise
|
||||
// let seq1 = Sequence(vec![
|
||||
// Entry::Data(1),
|
||||
// Entry::Noise(10),
|
||||
// Entry::Data(2),
|
||||
// Entry::Data(3),
|
||||
// ]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 1);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 1);
|
||||
|
||||
// Case 5: Two matched pairs with noise
|
||||
let seq1 = Sequence(vec![
|
||||
Entry::Data(1),
|
||||
Entry::Noise(10),
|
||||
Entry::Data(2),
|
||||
Entry::Data(3),
|
||||
]);
|
||||
let seq2 = Sequence(vec![
|
||||
Entry::Data(1),
|
||||
Entry::Noise(10),
|
||||
Entry::Data(2),
|
||||
Entry::Data(3),
|
||||
Entry::Data(4),
|
||||
]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 2);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 2);
|
||||
// // Case 5: Two matched pairs with noise
|
||||
// let seq1 = Sequence(vec![
|
||||
// Entry::Data(1),
|
||||
// Entry::Noise(10),
|
||||
// Entry::Data(2),
|
||||
// Entry::Data(3),
|
||||
// ]);
|
||||
// let seq2 = Sequence(vec![
|
||||
// Entry::Data(1),
|
||||
// Entry::Noise(10),
|
||||
// Entry::Data(2),
|
||||
// Entry::Data(3),
|
||||
// Entry::Data(4),
|
||||
// ]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 2);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 2);
|
||||
|
||||
// Case 6: Only partial match with no noise
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
|
||||
// // Case 6: Only partial match with no noise
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
|
||||
|
||||
// Case 7: Only partial match with noise
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
|
||||
// // Case 7: Only partial match with noise
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
|
||||
|
||||
// Case 8: No match at all
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
|
||||
// // Case 8: No match at all
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 0);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 0);
|
||||
|
||||
// Case 9: Matches with noise but mixed orders
|
||||
let seq1 = Sequence(vec![
|
||||
Entry::Data(1),
|
||||
Entry::Data(2),
|
||||
Entry::Noise(10),
|
||||
Entry::Data(3),
|
||||
Entry::Data(4),
|
||||
Entry::Data(5),
|
||||
Entry::Data(6),
|
||||
]);
|
||||
let seq2 = Sequence(vec![
|
||||
Entry::Data(4),
|
||||
Entry::Data(5),
|
||||
Entry::Data(1),
|
||||
Entry::Data(2),
|
||||
Entry::Noise(10),
|
||||
Entry::Data(3),
|
||||
Entry::Data(6),
|
||||
]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, casual), 3);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, casual), 3);
|
||||
}
|
||||
// // Case 9: Matches with noise but mixed orders
|
||||
// let seq1 = Sequence(vec![
|
||||
// Entry::Data(1),
|
||||
// Entry::Data(2),
|
||||
// Entry::Noise(10),
|
||||
// Entry::Data(3),
|
||||
// Entry::Data(4),
|
||||
// Entry::Data(5),
|
||||
// Entry::Data(6),
|
||||
// ]);
|
||||
// let seq2 = Sequence(vec![
|
||||
// Entry::Data(4),
|
||||
// Entry::Data(5),
|
||||
// Entry::Data(1),
|
||||
// Entry::Data(2),
|
||||
// Entry::Noise(10),
|
||||
// Entry::Data(3),
|
||||
// Entry::Data(6),
|
||||
// ]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, casual), 3);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, casual), 3);
|
||||
// }
|
||||
|
||||
#[test]
|
||||
fn test_casual_ordering_coefficient() {
|
||||
test_ordering_coefficient_common(true);
|
||||
// #[test]
|
||||
// fn test_casual_ordering_coefficient() {
|
||||
// test_ordering_coefficient_common(true);
|
||||
|
||||
// Case 0: No match because of noise
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, true), 0);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, true), 0);
|
||||
// // Case 0: No match because of noise
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, true), 0);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, true), 0);
|
||||
|
||||
// Case 1: No match because of different count of noises
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, true), 0);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, true), 0);
|
||||
}
|
||||
// // Case 1: No match because of different count of noises
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, true), 0);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, true), 0);
|
||||
// }
|
||||
|
||||
#[test]
|
||||
fn test_weak_ordering_coefficient() {
|
||||
test_ordering_coefficient_common(false);
|
||||
// #[test]
|
||||
// fn test_weak_ordering_coefficient() {
|
||||
// test_ordering_coefficient_common(false);
|
||||
|
||||
// Case 0: Match ignoring noises
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, false), 1);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, false), 1);
|
||||
// // Case 0: Match ignoring noises
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, false), 1);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, false), 1);
|
||||
|
||||
// Case 1: Match ignoring noise count
|
||||
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]);
|
||||
assert_eq!(seq1.ordering_coefficient(&seq2, false), 1);
|
||||
assert_eq!(seq2.ordering_coefficient(&seq1, false), 1);
|
||||
}
|
||||
}
|
||||
// // Case 1: Match ignoring noise count
|
||||
// let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
|
||||
// let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]);
|
||||
// assert_eq!(seq1.ordering_coefficient(&seq2, false), 1);
|
||||
// assert_eq!(seq2.ordering_coefficient(&seq1, false), 1);
|
||||
// }
|
||||
// }
|
||||
|
||||
230
mixnet-rs/ordering/src/outputs.rs
Normal file
230
mixnet-rs/ordering/src/outputs.rs
Normal file
@ -0,0 +1,230 @@
|
||||
use std::{fs::File, path::Path};
|
||||
|
||||
use protocol::{
|
||||
node::{Node, NodeId},
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
use crate::{message::DataMessage, ordercoeff::SequenceWriter};
|
||||
|
||||
pub struct Outputs {
|
||||
closed: bool,
|
||||
// gradual writing
|
||||
latency_path: String,
|
||||
latency_writer: csv::Writer<File>,
|
||||
sent_sequence_paths: Vec<String>,
|
||||
sent_sequence_writers: Vec<SequenceWriter>,
|
||||
recv_sequence_paths: Vec<String>,
|
||||
recv_sequence_writers: Vec<SequenceWriter>,
|
||||
queue_data_msg_counts_path: String,
|
||||
queue_data_msg_counts_writer: csv::Writer<File>,
|
||||
// bulk writing
|
||||
pub topology_path: String,
|
||||
}
|
||||
|
||||
impl Outputs {
|
||||
pub fn new(
|
||||
latency_path: String,
|
||||
sent_sequence_paths: Vec<String>,
|
||||
recv_sequence_paths: Vec<String>,
|
||||
queue_data_msg_counts_path: String,
|
||||
topology_path: String,
|
||||
) -> Self {
|
||||
// Ensure that all output files do not exist
|
||||
for path in [
|
||||
latency_path.clone(),
|
||||
queue_data_msg_counts_path.clone(),
|
||||
topology_path.clone(),
|
||||
]
|
||||
.iter()
|
||||
.chain(sent_sequence_paths.iter())
|
||||
.chain(recv_sequence_paths.iter())
|
||||
{
|
||||
assert!(!Path::new(path).exists(), "File already exists: {path}");
|
||||
}
|
||||
|
||||
// Prepare writers and headers
|
||||
let mut latency_writer = csv::Writer::from_path(&latency_path).unwrap();
|
||||
latency_writer
|
||||
.write_record(["msg", "latency", "sent_time", "recv_time"])
|
||||
.unwrap();
|
||||
latency_writer.flush().unwrap();
|
||||
let sent_sequence_writers = sent_sequence_paths
|
||||
.iter()
|
||||
.map(|path| SequenceWriter::new(path))
|
||||
.collect::<Vec<_>>();
|
||||
let recv_sequence_writers = recv_sequence_paths
|
||||
.iter()
|
||||
.map(|path| SequenceWriter::new(path))
|
||||
.collect::<Vec<_>>();
|
||||
let queue_data_msg_counts_writer =
|
||||
csv::Writer::from_path(&queue_data_msg_counts_path).unwrap();
|
||||
|
||||
Self {
|
||||
closed: false,
|
||||
latency_path,
|
||||
latency_writer,
|
||||
sent_sequence_paths,
|
||||
sent_sequence_writers,
|
||||
recv_sequence_paths,
|
||||
recv_sequence_writers,
|
||||
queue_data_msg_counts_path,
|
||||
queue_data_msg_counts_writer,
|
||||
topology_path,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(&mut self) {
|
||||
self.latency_writer.flush().unwrap();
|
||||
for seq in &mut self.sent_sequence_writers {
|
||||
seq.flush();
|
||||
}
|
||||
for seq in &mut self.recv_sequence_writers {
|
||||
seq.flush();
|
||||
}
|
||||
self.queue_data_msg_counts_writer.flush().unwrap();
|
||||
|
||||
self.closed = true;
|
||||
}
|
||||
|
||||
pub fn add_latency(&mut self, msg: &DataMessage, sent_time: f32, recv_time: f32) {
|
||||
self.latency_writer
|
||||
.write_record(&[
|
||||
msg.to_string(),
|
||||
(recv_time - sent_time).to_string(),
|
||||
sent_time.to_string(),
|
||||
recv_time.to_string(),
|
||||
])
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn add_sent_msg(&mut self, msg: &DataMessage) {
|
||||
let writer = &mut self.sent_sequence_writers[msg.sender as usize];
|
||||
writer.add_message(msg);
|
||||
}
|
||||
|
||||
pub fn add_sent_noise(&mut self, sender_idx: usize) {
|
||||
let writer = &mut self.sent_sequence_writers[sender_idx];
|
||||
writer.add_noise();
|
||||
}
|
||||
|
||||
pub fn add_recv_msg(&mut self, msg: &DataMessage, conn_idx: usize) {
|
||||
let writer = &mut self.recv_sequence_writers[conn_idx];
|
||||
writer.add_message(msg);
|
||||
}
|
||||
|
||||
pub fn add_recv_noise(&mut self, conn_idx: usize) {
|
||||
let writer = &mut self.recv_sequence_writers[conn_idx];
|
||||
writer.add_noise();
|
||||
}
|
||||
|
||||
pub fn write_header_queue_data_msg_counts(&mut self, mixnodes: &[Node<DataMessage>]) {
|
||||
let writer = &mut self.queue_data_msg_counts_writer;
|
||||
let mut header = vec!["vtime".to_string()];
|
||||
mixnodes
|
||||
.iter()
|
||||
.map(|node| (node.id, node.queue_data_msg_counts()))
|
||||
.for_each(|(node_id, counts)| {
|
||||
let num_queues = counts.len();
|
||||
(0..num_queues).for_each(|q_idx| {
|
||||
header.push(format!("node{}_q{}", node_id, q_idx));
|
||||
});
|
||||
});
|
||||
writer.write_record(header).unwrap();
|
||||
writer.flush().unwrap();
|
||||
}
|
||||
|
||||
pub fn add_queue_data_msg_counts(&mut self, vtime: f32, mixnodes: &[Node<DataMessage>]) {
|
||||
let writer = &mut self.queue_data_msg_counts_writer;
|
||||
let mut record = vec![vtime.to_string()];
|
||||
mixnodes
|
||||
.iter()
|
||||
.map(|node| node.queue_data_msg_counts())
|
||||
.for_each(|counts| {
|
||||
counts.iter().for_each(|count| {
|
||||
record.push(count.to_string());
|
||||
});
|
||||
});
|
||||
writer.write_record(record).unwrap();
|
||||
}
|
||||
|
||||
pub fn write_topology(
|
||||
&self,
|
||||
topology: &Topology,
|
||||
sender_peers_list: &[Vec<NodeId>],
|
||||
receiver_peers: &[NodeId],
|
||||
) {
|
||||
let mut writer = csv::Writer::from_path(&self.topology_path).unwrap();
|
||||
writer.write_record(["node", "num_peers", "peers"]).unwrap();
|
||||
|
||||
// Write peers of mix nodes
|
||||
for (node_id, peers) in topology.iter().enumerate() {
|
||||
writer
|
||||
.write_record(&[
|
||||
node_id.to_string(),
|
||||
peers.len().to_string(),
|
||||
format!(
|
||||
"[{}]",
|
||||
peers
|
||||
.iter()
|
||||
.map(|peer_id| peer_id.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
),
|
||||
])
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Write peers of senders
|
||||
for (sender_idx, peers) in sender_peers_list.iter().enumerate() {
|
||||
writer
|
||||
.write_record(&[
|
||||
format!("sender-{}", sender_idx),
|
||||
peers.len().to_string(),
|
||||
format!(
|
||||
"[{}]",
|
||||
peers
|
||||
.iter()
|
||||
.map(|peer_id| peer_id.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
),
|
||||
])
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Write peers of the receiver
|
||||
writer
|
||||
.write_record(&[
|
||||
"receiver".to_string(),
|
||||
receiver_peers.len().to_string(),
|
||||
format!(
|
||||
"[{}]",
|
||||
receiver_peers
|
||||
.iter()
|
||||
.map(|peer_id| peer_id.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
),
|
||||
])
|
||||
.unwrap();
|
||||
|
||||
writer.flush().unwrap();
|
||||
}
|
||||
|
||||
pub fn rename_paths(&self, from: &str, to: &str) {
|
||||
assert!(self.closed);
|
||||
|
||||
for path in [
|
||||
&self.latency_path.clone(),
|
||||
&self.queue_data_msg_counts_path.clone(),
|
||||
]
|
||||
.into_iter()
|
||||
.chain(self.sent_sequence_paths.iter())
|
||||
.chain(self.recv_sequence_paths.iter())
|
||||
{
|
||||
let new_path = path.replace(from, to);
|
||||
std::fs::rename(path, new_path).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -69,7 +69,7 @@ pub struct ParamSet {
|
||||
pub peering_degree: u32,
|
||||
pub min_queue_size: u16,
|
||||
pub transmission_rate: u16,
|
||||
pub num_senders: u32,
|
||||
pub num_senders: u8,
|
||||
pub num_sender_msgs: u32,
|
||||
pub sender_data_msg_prob: f32,
|
||||
pub mix_data_msg_prob: f32,
|
||||
@ -91,7 +91,7 @@ impl ParamSet {
|
||||
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
|
||||
let transmission_rate: u16 = 1;
|
||||
let min_queue_size: u16 = 10;
|
||||
let num_senders: u32 = match exp_id {
|
||||
let num_senders: u8 = match exp_id {
|
||||
ExperimentId::Experiment3 | ExperimentId::Experiment4 => 2,
|
||||
_ => 1,
|
||||
};
|
||||
|
||||
146
mixnet-rs/ordering/src/topology.rs
Normal file
146
mixnet-rs/ordering/src/topology.rs
Normal file
@ -0,0 +1,146 @@
|
||||
use std::{fmt::Debug, hash::Hash};
|
||||
|
||||
use protocol::{
|
||||
node::{Node, NodeId},
|
||||
queue::QueueConfig,
|
||||
topology::build_topology,
|
||||
};
|
||||
use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng};
|
||||
use rustc_hash::FxHashMap;
|
||||
|
||||
use crate::{outputs::Outputs, paramset::ParamSet};
|
||||
|
||||
pub const RECEIVER_NODE_ID: NodeId = NodeId::MAX;
|
||||
|
||||
pub fn build_striped_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash>(
|
||||
paramset: &ParamSet,
|
||||
seed: u64,
|
||||
) -> (Vec<Node<M>>, Vec<Vec<NodeId>>, FxHashMap<NodeId, u16>) {
|
||||
assert!(!paramset.random_topology);
|
||||
let mut next_node_id: NodeId = 0;
|
||||
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
|
||||
let mut mixnodes: Vec<Node<M>> =
|
||||
Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize);
|
||||
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
|
||||
for _ in 0..paramset.num_paths {
|
||||
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
|
||||
for _ in 0..paramset.num_mixes {
|
||||
let id = next_node_id;
|
||||
next_node_id += 1;
|
||||
mixnodes.push(Node::new(
|
||||
id,
|
||||
QueueConfig {
|
||||
queue_type: paramset.queue_type,
|
||||
seed: queue_seed_rng.next_u64(),
|
||||
min_queue_size: paramset.min_queue_size,
|
||||
},
|
||||
paramset.peering_degree,
|
||||
false, // disable cache
|
||||
));
|
||||
ids.push(id);
|
||||
}
|
||||
paths.push(ids);
|
||||
}
|
||||
|
||||
// Connect mix nodes
|
||||
let mut receiver_peer_conn_idx: FxHashMap<NodeId, u16> = FxHashMap::default();
|
||||
for path in paths.iter() {
|
||||
for (i, id) in path.iter().enumerate() {
|
||||
if i != path.len() - 1 {
|
||||
let peer_id = path[i + 1];
|
||||
let mixnode = mixnodes.get_mut(*id as usize).unwrap();
|
||||
assert_eq!(mixnode.id, *id);
|
||||
mixnode.connect(peer_id);
|
||||
} else {
|
||||
let mixnode = mixnodes.get_mut(*id as usize).unwrap();
|
||||
assert_eq!(mixnode.id, *id);
|
||||
mixnode.connect(RECEIVER_NODE_ID);
|
||||
|
||||
receiver_peer_conn_idx
|
||||
.insert(*id, receiver_peer_conn_idx.len().try_into().unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
let sender_peers_list: Vec<Vec<NodeId>> =
|
||||
vec![
|
||||
paths.iter().map(|path| *path.first().unwrap()).collect();
|
||||
paramset.num_senders as usize
|
||||
];
|
||||
(mixnodes, sender_peers_list, receiver_peer_conn_idx)
|
||||
}
|
||||
|
||||
pub fn build_random_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash>(
|
||||
paramset: &ParamSet,
|
||||
seed: u64,
|
||||
outputs: &mut Outputs,
|
||||
) -> (Vec<Node<M>>, Vec<Vec<NodeId>>, FxHashMap<NodeId, u16>) {
|
||||
assert!(paramset.random_topology);
|
||||
// Init mix nodes
|
||||
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
|
||||
let mut mixnodes: Vec<Node<M>> = Vec::with_capacity(paramset.num_mixes as usize);
|
||||
for id in 0..paramset.num_mixes {
|
||||
mixnodes.push(Node::new(
|
||||
id,
|
||||
QueueConfig {
|
||||
queue_type: paramset.queue_type,
|
||||
seed: queue_seed_rng.next_u64(),
|
||||
min_queue_size: paramset.min_queue_size,
|
||||
},
|
||||
paramset.peering_degree,
|
||||
true, // enable cache
|
||||
));
|
||||
}
|
||||
|
||||
// Choose sender's peers and receiver's peers randomly
|
||||
let mut peers_rng = StdRng::seed_from_u64(seed);
|
||||
let mut candidates: Vec<NodeId> = mixnodes.iter().map(|mixnode| mixnode.id).collect();
|
||||
assert!(candidates.len() >= paramset.peering_degree as usize);
|
||||
let mut sender_peers_list: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_senders as usize);
|
||||
for _ in 0..paramset.num_senders {
|
||||
candidates.as_mut_slice().shuffle(&mut peers_rng);
|
||||
let mut peers: Vec<NodeId> = candidates
|
||||
.iter()
|
||||
.cloned()
|
||||
.take(paramset.peering_degree as usize)
|
||||
.collect();
|
||||
peers.sort();
|
||||
sender_peers_list.push(peers);
|
||||
}
|
||||
candidates.as_mut_slice().shuffle(&mut peers_rng);
|
||||
let mut receiver_peers: Vec<NodeId> = candidates
|
||||
.iter()
|
||||
.cloned()
|
||||
.take(paramset.peering_degree as usize)
|
||||
.collect();
|
||||
receiver_peers.sort();
|
||||
|
||||
// Connect mix nodes
|
||||
let topology = build_topology(
|
||||
paramset.num_mixes,
|
||||
&vec![paramset.peering_degree; paramset.num_mixes as usize],
|
||||
seed,
|
||||
);
|
||||
for (node_id, peers) in topology.iter().enumerate() {
|
||||
peers.iter().for_each(|peer_id| {
|
||||
let mixnode = mixnodes.get_mut(node_id).unwrap();
|
||||
assert_eq!(mixnode.id as usize, node_id);
|
||||
mixnode.connect(*peer_id);
|
||||
});
|
||||
}
|
||||
|
||||
// Connect the selected mix nodes with the receiver
|
||||
//
|
||||
// peer_id -> conn_idx
|
||||
let mut receiver_peer_conn_idx: FxHashMap<NodeId, u16> = FxHashMap::default();
|
||||
for (conn_idx, mixnode_id) in receiver_peers.iter().enumerate() {
|
||||
let mixnode = mixnodes.get_mut(*mixnode_id as usize).unwrap();
|
||||
assert_eq!(mixnode.id, *mixnode_id);
|
||||
mixnode.connect(RECEIVER_NODE_ID);
|
||||
|
||||
receiver_peer_conn_idx.insert(*mixnode_id, conn_idx.try_into().unwrap());
|
||||
}
|
||||
|
||||
outputs.write_topology(&topology, &sender_peers_list, &receiver_peers);
|
||||
|
||||
(mixnodes, sender_peers_list, receiver_peer_conn_idx)
|
||||
}
|
||||
@ -1,25 +1,39 @@
|
||||
use std::{fmt::Debug, hash::Hash};
|
||||
|
||||
use rustc_hash::{FxHashMap, FxHashSet};
|
||||
|
||||
use crate::queue::{new_queue, Message, Queue, QueueConfig};
|
||||
|
||||
pub type NodeId = u32;
|
||||
pub type MessageId = u32;
|
||||
|
||||
pub struct Node {
|
||||
pub struct Node<M>
|
||||
where
|
||||
M: Debug + Copy + Clone + PartialEq + Eq + Hash,
|
||||
{
|
||||
pub id: NodeId,
|
||||
queue_config: QueueConfig,
|
||||
// To have the deterministic result, we use Vec instead of FxHashMap.
|
||||
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
|
||||
// Instead, use `connected_peers` to build `queues` efficiently.
|
||||
queues: Vec<(NodeId, Box<dyn Queue<MessageId>>)>,
|
||||
queues: Vec<(NodeId, Box<dyn Queue<M>>)>,
|
||||
connected_peers: FxHashSet<NodeId>,
|
||||
// A cache to avoid relaying the same message multiple times.
|
||||
received_msgs: Option<FxHashMap<MessageId, u32>>,
|
||||
received_msgs: Option<FxHashMap<M, u32>>,
|
||||
peering_degree: u32,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
pub fn new(queue_config: QueueConfig, peering_degree: u32, enable_cache: bool) -> Self {
|
||||
Node {
|
||||
impl<M> Node<M>
|
||||
where
|
||||
M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash,
|
||||
{
|
||||
pub fn new(
|
||||
id: NodeId,
|
||||
queue_config: QueueConfig,
|
||||
peering_degree: u32,
|
||||
enable_cache: bool,
|
||||
) -> Self {
|
||||
Node::<M> {
|
||||
id,
|
||||
queue_config,
|
||||
queues: Vec::new(),
|
||||
connected_peers: FxHashSet::default(),
|
||||
@ -39,18 +53,18 @@ impl Node {
|
||||
.binary_search_by(|probe| probe.0.cmp(&peer_id))
|
||||
.unwrap_or_else(|pos| pos);
|
||||
self.queues
|
||||
.insert(pos, (peer_id, new_queue(&self.queue_config)));
|
||||
.insert(pos, (peer_id, new_queue::<M>(&self.queue_config)));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&mut self, msg: MessageId) {
|
||||
pub fn send(&mut self, msg: M) {
|
||||
assert!(self.check_and_update_cache(msg, true));
|
||||
for (_, queue) in self.queues.iter_mut() {
|
||||
queue.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive(&mut self, msg: MessageId, from: Option<NodeId>) -> bool {
|
||||
pub fn receive(&mut self, msg: M, from: Option<NodeId>) -> bool {
|
||||
let first_received = self.check_and_update_cache(msg, false);
|
||||
if first_received {
|
||||
for (node_id, queue) in self.queues.iter_mut() {
|
||||
@ -67,8 +81,8 @@ impl Node {
|
||||
first_received
|
||||
}
|
||||
|
||||
pub fn read_queues(&mut self) -> Vec<(NodeId, Message<MessageId>)> {
|
||||
let mut msgs_to_relay: Vec<(NodeId, Message<MessageId>)> = Vec::new();
|
||||
pub fn read_queues(&mut self) -> Vec<(NodeId, Message<M>)> {
|
||||
let mut msgs_to_relay: Vec<(NodeId, Message<M>)> = Vec::new();
|
||||
self.queues.iter_mut().for_each(|(node_id, queue)| {
|
||||
msgs_to_relay.push((*node_id, queue.pop()));
|
||||
});
|
||||
@ -82,7 +96,7 @@ impl Node {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool {
|
||||
fn check_and_update_cache(&mut self, msg: M, sending: bool) -> bool {
|
||||
if let Some(received_msgs) = &mut self.received_msgs {
|
||||
let first_received = if let Some(count) = received_msgs.get_mut(&msg) {
|
||||
*count += 1;
|
||||
@ -95,7 +109,7 @@ impl Node {
|
||||
// If the message have been received from all connected peers, remove it from the cache
|
||||
// because there is no possibility that the message will be received again.
|
||||
if received_msgs.get(&msg).unwrap() == &self.peering_degree {
|
||||
tracing::debug!("Remove message from cache: {}", msg);
|
||||
tracing::debug!("Remove message from cache: {:?}", msg);
|
||||
received_msgs.remove(&msg);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user