refactor all, and prepare revised paramsets, but no multiple mix impl yet

This commit is contained in:
Youngjoon Lee 2024-08-22 11:42:04 +02:00
parent a7f827efd0
commit e9012eae83
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
18 changed files with 584 additions and 547 deletions

View File

@ -1,3 +1,3 @@
[workspace]
members = ["dissemination", "queue", "single-path"]
members = ["dissemination", "protocol", "ordering"]
resolver = "2"

View File

@ -7,7 +7,7 @@ edition = "2021"
chrono = "0.4.38"
clap = { version = "4.5.16", features = ["derive"] }
csv = "1.3.0"
queue = { version = "0.1.0", path = "../queue" }
protocol = { version = "0.1.0", path = "../protocol" }
rand = "0.8.5"
rayon = "1.10.0"
rustc-hash = "2.0.0"

View File

@ -1,11 +1,13 @@
use std::error::Error;
use queue::QueueConfig;
use protocol::{
node::{MessageId, Node, NodeId},
queue::{Message, QueueConfig},
};
use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
node::{MessageId, Node, NodeId},
paramset::ParamSet,
topology::{build_topology, Topology},
};
@ -28,6 +30,7 @@ pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology
min_queue_size: paramset.min_queue_size,
},
peering_degrees[node_id as usize],
true,
));
}
tracing::debug!("Nodes initialized.");
@ -132,7 +135,7 @@ fn relay_messages(
writer: &mut csv::Writer<std::fs::File>,
) {
// Collect messages to relay
let mut all_msgs_to_relay: Vec<Vec<(NodeId, MessageId)>> = Vec::new();
let mut all_msgs_to_relay: Vec<Vec<(NodeId, Message<MessageId>)>> = Vec::new();
for node in nodes.iter_mut() {
all_msgs_to_relay.push(node.read_queues());
}
@ -144,22 +147,25 @@ fn relay_messages(
.for_each(|(sender_id, msgs_to_relay)| {
let sender_id: NodeId = sender_id.try_into().unwrap();
msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| {
if nodes[receiver_id as usize].receive(msg, sender_id) {
let (sent_time, num_received_nodes) = message_tracker.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes as usize == nodes.len() {
let dissemination_time = vtime - *sent_time;
writer
.write_record(&[
dissemination_time.to_string(),
sent_time.to_string(),
vtime.to_string(),
])
.unwrap();
writer.flush().unwrap();
*num_disseminated_msgs += 1;
if let Message::Data(msg) = msg {
if nodes[receiver_id as usize].receive(msg, Some(sender_id)) {
let (sent_time, num_received_nodes) =
message_tracker.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes as usize == nodes.len() {
let dissemination_time = vtime - *sent_time;
writer
.write_record(&[
dissemination_time.to_string(),
sent_time.to_string(),
vtime.to_string(),
])
.unwrap();
writer.flush().unwrap();
*num_disseminated_msgs += 1;
message_tracker.remove(&msg);
message_tracker.remove(&msg);
}
}
}
})

View File

@ -1,6 +1,6 @@
use chrono::Utc;
use clap::Parser;
use queue::QueueType;
use protocol::queue::QueueType;
use rayon::prelude::*;
use std::{
error::Error,
@ -12,7 +12,6 @@ use iteration::run_iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
mod iteration;
mod node;
mod paramset;
mod topology;

View File

@ -1,88 +0,0 @@
use queue::{new_queue, Queue, QueueConfig};
use rustc_hash::{FxHashMap, FxHashSet};
pub type NodeId = u32;
pub type MessageId = u32;
pub struct Node {
queue_config: QueueConfig,
// To have the deterministic result, we use Vec instead of FxHashMap.
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
// Instead, use `connected_peers` to build `queues` efficiently.
queues: Vec<(NodeId, Box<dyn Queue<MessageId>>)>,
connected_peers: FxHashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: FxHashMap<MessageId, u32>,
peering_degree: u32,
}
impl Node {
pub fn new(queue_config: QueueConfig, peering_degree: u32) -> Self {
Node {
queue_config,
queues: Vec::new(),
connected_peers: FxHashSet::default(),
received_msgs: FxHashMap::default(),
peering_degree,
}
}
pub fn connect(&mut self, peer_id: NodeId) {
if self.connected_peers.insert(peer_id) {
let pos = self
.queues
.binary_search_by(|probe| probe.0.cmp(&peer_id))
.unwrap_or_else(|pos| pos);
self.queues
.insert(pos, (peer_id, new_queue(&self.queue_config)));
}
}
pub fn send(&mut self, msg: MessageId) {
assert!(self.check_and_update_cache(msg, true));
for (_, queue) in self.queues.iter_mut() {
queue.push(msg);
}
}
pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool {
let first_received = self.check_and_update_cache(msg, false);
if first_received {
for (node_id, queue) in self.queues.iter_mut() {
if *node_id != from {
queue.push(msg);
}
}
}
first_received
}
pub fn read_queues(&mut self) -> Vec<(NodeId, MessageId)> {
let mut msgs_to_relay: Vec<(NodeId, MessageId)> = Vec::new();
for (node_id, queue) in self.queues.iter_mut() {
if let Some(msg) = queue.pop() {
msgs_to_relay.push((*node_id, msg));
}
}
msgs_to_relay
}
fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool {
let first_received = if let Some(count) = self.received_msgs.get_mut(&msg) {
*count += 1;
false
} else {
self.received_msgs.insert(msg, if sending { 0 } else { 1 });
true
};
// If the message have been received from all connected peers, remove it from the cache
// because there is no possibility that the message will be received again.
if self.received_msgs.get(&msg).unwrap() == &self.peering_degree {
tracing::debug!("Remove message from cache: {}", msg);
self.received_msgs.remove(&msg);
}
first_received
}
}

View File

@ -1,4 +1,4 @@
use queue::QueueType;
use protocol::queue::QueueType;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

View File

@ -1,9 +1,8 @@
use std::collections::HashSet;
use protocol::node::NodeId;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use crate::node::NodeId;
pub type Topology = Vec<Vec<NodeId>>;
pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology {

View File

@ -1,5 +1,5 @@
[package]
name = "single-path"
name = "ordering"
version = "0.1.0"
edition = "2021"
@ -7,7 +7,7 @@ edition = "2021"
chrono = "0.4.38"
clap = "4.5.16"
csv = "1.3.0"
queue = { version = "0.1.0", path = "../queue" }
protocol = { version = "0.1.0", path = "../protocol" }
rand = "0.8.5"
rustc-hash = "2.0.0"
tracing = "0.1.40"

View File

@ -1,14 +1,13 @@
use std::path::Path;
use queue::QueueConfig;
use protocol::{
node::{MessageId, Node},
queue::{Message, QueueConfig, QueueType},
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
node::{MessageId, Node},
ordercoeff::Sequence,
paramset::ParamSet,
};
use crate::{ordercoeff::Sequence, paramset::ParamSet};
pub fn run_iteration(
paramset: ParamSet,
@ -30,11 +29,16 @@ pub fn run_iteration(
}
// Initialize a mix node
let mut mixnode = Node::new(&QueueConfig {
queue_type: paramset.queue_type,
seed,
min_queue_size: paramset.min_queue_size,
});
let mut mixnode = Node::new(
QueueConfig {
queue_type: paramset.queue_type,
seed,
min_queue_size: paramset.min_queue_size,
},
paramset.peering_degree,
false,
);
mixnode.connect(u32::MAX); // connect to the virtual receiver node
let mut next_msg_id: MessageId = 0;
@ -43,29 +47,39 @@ pub fn run_iteration(
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Results
let mut all_sent_count = 0; // all data + noise sent by the sender
let mut sent_times: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut latencies: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut sent_sequence = Sequence::new();
let mut received_sequence = Sequence::new();
let mut data_msg_counts_in_queue: Vec<u32> = Vec::new();
let mut data_msg_counts_in_queue: Vec<usize> = Vec::new();
let mut rng = StdRng::seed_from_u64(seed);
loop {
tracing::trace!(
"VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}",
vtime,
all_sent_count,
sent_times.len(),
latencies.len()
);
// The sender emits a message (data or noise) to the mix node.
if sent_times.len() < paramset.num_sender_data_msgs as usize
&& try_probability(&mut rng, paramset.sender_data_msg_prob)
{
let msg = next_msg_id;
next_msg_id += 1;
mixnode.receive(msg);
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix node
// because the mix node will anyway drop the noise,
// and we don't need to record what the mix node receives.
sent_sequence.add_noise();
if all_sent_count < paramset.num_sender_msgs as usize {
if try_probability(&mut rng, paramset.sender_data_msg_prob) {
let msg = next_msg_id;
next_msg_id += 1;
mixnode.receive(msg, None);
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix node
// because the mix node will anyway drop the noise,
// and we don't need to record what the mix node receives.
sent_sequence.add_noise();
}
all_sent_count += 1;
}
// The mix node add a new data message to its queue with a certain probability
@ -78,22 +92,26 @@ pub fn run_iteration(
// The mix node emits a message (data or noise) to the receiver.
// As the receiver, record the time and order of the received messages.
match mixnode.read_queue() {
Some(msg) => {
// TODO: handle all queues
match mixnode.read_queues().first().unwrap().1 {
Message::Data(msg) => {
latencies.insert(msg, vtime - sent_times.get(&msg).unwrap());
received_sequence.add_message(msg);
}
None => {
Message::Noise => {
received_sequence.add_noise();
}
}
// Record the number of data messages in the mix node's queue
data_msg_counts_in_queue.push(mixnode.message_count_in_queue());
// TODO: handle all queues
data_msg_counts_in_queue.push(*mixnode.data_count_in_queue().first().unwrap());
// If all messages have been received by the receiver, stop the iteration.
assert!(latencies.len() <= paramset.num_sender_data_msgs as usize);
if latencies.len() == paramset.num_sender_data_msgs as usize {
// If all data amessages (that the sender has to send) have been received by the receiver,
// stop the iteration.
if all_sent_count == paramset.num_sender_msgs as usize
&& sent_times.len() == latencies.len()
{
break;
}
@ -110,18 +128,20 @@ pub fn run_iteration(
out_data_msg_counts_path,
);
// Calculate ordering coefficients and save them to a CSV file.
let strong_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, true);
let weak_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, false);
tracing::info!(
"STRONG_COEFF:{}, WEAK_COEFF:{}",
strong_ordering_coeff,
weak_ordering_coeff
);
save_ordering_coefficients(
strong_ordering_coeff,
weak_ordering_coeff,
out_ordering_coeff_path,
);
if paramset.queue_type != QueueType::NonMix {
let strong_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, true);
let weak_ordering_coeff = sent_sequence.ordering_coefficient(&received_sequence, false);
tracing::info!(
"STRONG_COEFF:{}, WEAK_COEFF:{}",
strong_ordering_coeff,
weak_ordering_coeff
);
save_ordering_coefficients(
strong_ordering_coeff,
weak_ordering_coeff,
out_ordering_coeff_path,
);
}
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
@ -163,7 +183,7 @@ fn save_sequence(sequence: &Sequence, path: &str) {
}
fn save_data_msg_counts(
data_msg_counts_in_queue: &[u32],
data_msg_counts_in_queue: &[usize],
interval: f32,
out_data_msg_counts_path: &str,
) {

View File

@ -1,5 +1,4 @@
mod iteration;
mod node;
mod ordercoeff;
mod paramset;
@ -13,7 +12,7 @@ use chrono::Utc;
use clap::Parser;
use iteration::run_iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
use queue::QueueType;
use protocol::queue::QueueType;
#[derive(Debug, Parser)]
#[command(name = "Single Sender Single Mix Measurement")]
@ -49,7 +48,7 @@ fn main() {
"Output directory does not exist: {outdir}"
);
let subdir = format!(
"__WIP__dissemination_e{}s{}_{:?}_{}___DUR__",
"__WIP__ordering_e{}s{}_{:?}_{}___DUR__",
exp_id as u8,
session_id as u8,
queue_type,

View File

@ -1,19 +1,19 @@
use std::fmt::Display;
use crate::node::MessageId;
use protocol::node::MessageId;
pub struct Sequence(Vec<Entry>);
#[derive(Debug, PartialEq, Eq)]
pub enum Entry {
Message(MessageId),
Data(MessageId),
Noise(u32), // the number of consecutive noises
}
impl Display for Entry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Entry::Message(msg) => msg.to_string(),
Entry::Data(msg) => msg.to_string(),
Entry::Noise(cnt) => format!("-{cnt}"),
};
f.write_str(s.as_str())
@ -26,22 +26,29 @@ impl Sequence {
}
pub fn add_message(&mut self, msg: MessageId) {
self.0.push(Entry::Message(msg));
self.0.push(Entry::Data(msg));
}
pub fn add_noise(&mut self) {
if let Some(last) = self.0.last_mut() {
if let Entry::Noise(cnt) = last {
*cnt += 1;
} else {
self.0.push(Entry::Noise(1))
match last {
Entry::Noise(cnt) => {
*cnt += 1;
}
_ => self.0.push(Entry::Noise(1)),
}
} else {
self.0.push(Entry::Noise(1))
}
}
pub fn iter(&self) -> impl Iterator<Item = &Entry> {
self.0.iter()
}
pub fn len(&self) -> usize {
self.0.len()
}
}
impl Sequence {
@ -50,7 +57,7 @@ impl Sequence {
let mut i = 0;
while i < self.0.len() {
if let Entry::Message(_) = &self.0[i] {
if let Entry::Data(_) = &self.0[i] {
let (c, next_i) = self.ordering_coefficient_from(i, other, strong);
coeff += c;
@ -74,12 +81,12 @@ impl Sequence {
strong: bool,
) -> (u64, usize) {
let msg1 = match self.0[start_idx] {
Entry::Message(msg) => msg,
Entry::Data(msg) => msg,
_ => panic!("Entry at {start_idx} must be Message"),
};
for (j, entry) in other.iter().enumerate() {
if let Entry::Message(msg2) = entry {
if let Entry::Data(msg2) = entry {
if msg1 == *msg2 {
// Found the 1st matching msg. Start finding the next adjacent matching msg.
if strong {
@ -112,7 +119,7 @@ impl Sequence {
break;
}
}
(Entry::Message(msg1), Entry::Message(msg2)) => {
(Entry::Data(msg1), Entry::Data(msg2)) => {
if msg1 == msg2 {
coeff += 1;
i += 1;
@ -152,7 +159,7 @@ impl Sequence {
fn skip_noise(&self, mut index: usize) -> usize {
while index < self.0.len() {
if let Entry::Message(_) = self.0[index] {
if let Entry::Data(_) = self.0[index] {
break;
}
index += 1;
@ -171,91 +178,83 @@ mod tests {
assert_eq!(seq.ordering_coefficient(&seq, strong), 0);
// Case 1: Exact one matched pair with no noise
let seq = Sequence(vec![Entry::Message(1), Entry::Message(2)]);
let seq = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
assert_eq!(seq.ordering_coefficient(&seq, strong), 1);
// Case 2: Exact one matched pair with noise
let seq = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]);
let seq = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
assert_eq!(seq.ordering_coefficient(&seq, strong), 1);
// Case 3: One matched pair with no noise
let seq1 = Sequence(vec![
Entry::Message(1),
Entry::Message(2),
Entry::Message(3),
]);
let seq2 = Sequence(vec![
Entry::Message(1),
Entry::Message(2),
Entry::Message(4),
]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(3)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Data(4)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1);
// Case 4: One matched pair with noise
let seq1 = Sequence(vec![
Entry::Message(1),
Entry::Data(1),
Entry::Noise(10),
Entry::Message(2),
Entry::Message(3),
Entry::Data(2),
Entry::Data(3),
]);
let seq2 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 1);
// Case 5: Two matched pairs with noise
let seq1 = Sequence(vec![
Entry::Message(1),
Entry::Data(1),
Entry::Noise(10),
Entry::Message(2),
Entry::Message(3),
Entry::Data(2),
Entry::Data(3),
]);
let seq2 = Sequence(vec![
Entry::Message(1),
Entry::Data(1),
Entry::Noise(10),
Entry::Message(2),
Entry::Message(3),
Entry::Message(4),
Entry::Data(2),
Entry::Data(3),
Entry::Data(4),
]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 2);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 2);
// Case 6: Only partial match with no noise
let seq1 = Sequence(vec![Entry::Message(1), Entry::Message(2)]);
let seq2 = Sequence(vec![Entry::Message(2), Entry::Message(3)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
let seq2 = Sequence(vec![Entry::Data(2), Entry::Data(3)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0);
// Case 7: Only partial match with noise
let seq1 = Sequence(vec![Entry::Message(1), Entry::Message(2), Entry::Noise(10)]);
let seq2 = Sequence(vec![Entry::Message(2), Entry::Noise(10), Entry::Message(3)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
let seq2 = Sequence(vec![Entry::Data(2), Entry::Noise(10), Entry::Data(3)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0);
// Case 8: No match at all
let seq1 = Sequence(vec![Entry::Message(1), Entry::Message(2), Entry::Noise(10)]);
let seq2 = Sequence(vec![Entry::Message(3), Entry::Noise(10), Entry::Message(4)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Data(2), Entry::Noise(10)]);
let seq2 = Sequence(vec![Entry::Data(3), Entry::Noise(10), Entry::Data(4)]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 0);
// Case 9: Matches with noise but mixed orders
let seq1 = Sequence(vec![
Entry::Message(1),
Entry::Message(2),
Entry::Data(1),
Entry::Data(2),
Entry::Noise(10),
Entry::Message(3),
Entry::Message(4),
Entry::Message(5),
Entry::Message(6),
Entry::Data(3),
Entry::Data(4),
Entry::Data(5),
Entry::Data(6),
]);
let seq2 = Sequence(vec![
Entry::Message(4),
Entry::Message(5),
Entry::Message(1),
Entry::Message(2),
Entry::Data(4),
Entry::Data(5),
Entry::Data(1),
Entry::Data(2),
Entry::Noise(10),
Entry::Message(3),
Entry::Message(6),
Entry::Data(3),
Entry::Data(6),
]);
assert_eq!(seq1.ordering_coefficient(&seq2, strong), 3);
assert_eq!(seq2.ordering_coefficient(&seq1, strong), 3);
@ -266,14 +265,14 @@ mod tests {
test_ordering_coefficient_common(true);
// Case 0: No match because of noise
let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]);
let seq2 = Sequence(vec![Entry::Message(1), Entry::Message(2)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
assert_eq!(seq1.ordering_coefficient(&seq2, true), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, true), 0);
// Case 1: No match because of different count of noises
let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]);
let seq2 = Sequence(vec![Entry::Message(1), Entry::Noise(5), Entry::Message(2)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]);
assert_eq!(seq1.ordering_coefficient(&seq2, true), 0);
assert_eq!(seq2.ordering_coefficient(&seq1, true), 0);
}
@ -283,14 +282,14 @@ mod tests {
test_ordering_coefficient_common(false);
// Case 0: Match ignoring noises
let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]);
let seq2 = Sequence(vec![Entry::Message(1), Entry::Message(2)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Data(2)]);
assert_eq!(seq1.ordering_coefficient(&seq2, false), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, false), 1);
// Case 1: Match ignoring noise count
let seq1 = Sequence(vec![Entry::Message(1), Entry::Noise(10), Entry::Message(2)]);
let seq2 = Sequence(vec![Entry::Message(1), Entry::Noise(5), Entry::Message(2)]);
let seq1 = Sequence(vec![Entry::Data(1), Entry::Noise(10), Entry::Data(2)]);
let seq2 = Sequence(vec![Entry::Data(1), Entry::Noise(5), Entry::Data(2)]);
assert_eq!(seq1.ordering_coefficient(&seq2, false), 1);
assert_eq!(seq2.ordering_coefficient(&seq1, false), 1);
}

View File

@ -0,0 +1,240 @@
use protocol::queue::QueueType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ExperimentId {
Experiment1 = 1,
Experiment2 = 2,
Experiment3 = 3,
Experiment4 = 4,
Experiment5 = 5,
Experiment6 = 6,
}
impl std::str::FromStr for ExperimentId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Experiment1" => Ok(ExperimentId::Experiment1),
"2" | "Experiment2" => Ok(ExperimentId::Experiment2),
"3" | "Experiment3" => Ok(ExperimentId::Experiment3),
"4" | "Experiment4" => Ok(ExperimentId::Experiment4),
"5" | "Experiment5" => Ok(ExperimentId::Experiment5),
"6" | "Experiment6" => Ok(ExperimentId::Experiment6),
_ => Err(format!("Invalid experiment ID: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum SessionId {
Session1 = 1,
}
impl std::str::FromStr for SessionId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Session1" => Ok(SessionId::Session1),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
}
pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"paramset",
"num_mixes",
"num_paths",
"random_topology",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_sender_msgs",
"sender_data_msg_prob",
"mix_data_msg_prob",
"queue_type",
"num_iterations",
];
#[derive(Debug, Clone, PartialEq)]
pub struct ParamSet {
pub id: u16,
pub num_mixes: u32,
pub num_paths: u16,
pub random_topology: bool,
pub peering_degree: u32,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_sender_msgs: u32,
pub sender_data_msg_prob: f32,
pub mix_data_msg_prob: f32,
pub queue_type: QueueType,
pub num_iterations: usize,
}
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
session_id: SessionId,
queue_type: QueueType,
) -> Vec<Self> {
match session_id {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
}
}
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let transmission_rate: u16 = 1;
let min_queue_size: u16 = 10;
let num_sender_msgs: u32 = 1000000;
let sender_data_msg_probs: &[f32] = &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0];
let mix_data_msg_probs: &[f32] = match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 | ExperimentId::Experiment5 => {
&[0.0]
}
ExperimentId::Experiment2 | ExperimentId::Experiment4 | ExperimentId::Experiment6 => {
&[0.001, 0.01, 0.1]
}
};
let mut id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
match exp_id {
ExperimentId::Experiment1
| ExperimentId::Experiment2
| ExperimentId::Experiment3
| ExperimentId::Experiment4 => {
for &num_paths in &[1, 2, 3, 4] {
for &num_mixes in &[1, 2, 3, 4] {
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in mix_data_msg_probs {
let paramset = ParamSet {
id,
num_mixes,
num_paths,
random_topology: false,
peering_degree: 1,
min_queue_size,
transmission_rate,
num_sender_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations: 1,
};
id += 1;
paramsets.push(paramset);
}
}
}
}
}
ExperimentId::Experiment5 | ExperimentId::Experiment6 => {
for &num_mixes in &[8, 16, 32] {
for &peering_degree in &[2, 3, 4] {
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in mix_data_msg_probs {
let paramset = ParamSet {
id,
num_mixes,
num_paths: 0, // since we're gonna build random topology
random_topology: true,
peering_degree,
min_queue_size,
transmission_rate,
num_sender_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations: 10,
};
id += 1;
paramsets.push(paramset);
}
}
}
}
}
}
paramsets
}
pub fn as_csv_record(&self) -> Vec<String> {
vec![
self.id.to_string(),
self.num_mixes.to_string(),
self.num_paths.to_string(),
self.random_topology.to_string(),
self.peering_degree.to_string(),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_sender_msgs.to_string(),
self.sender_data_msg_prob.to_string(),
self.mix_data_msg_prob.to_string(),
format!("{:?}", self.queue_type),
self.num_iterations.to_string(),
]
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use strum::IntoEnumIterator;
use crate::paramset::ParamSet;
use super::*;
#[test]
fn test_new_all_paramsets() {
let cases = vec![
((ExperimentId::Experiment1, SessionId::Session1), 4 * 4 * 6),
(
(ExperimentId::Experiment2, SessionId::Session1),
4 * 4 * 6 * 3,
),
((ExperimentId::Experiment3, SessionId::Session1), 4 * 4 * 6),
(
(ExperimentId::Experiment4, SessionId::Session1),
4 * 4 * 6 * 3,
),
((ExperimentId::Experiment5, SessionId::Session1), 3 * 3 * 6),
(
(ExperimentId::Experiment6, SessionId::Session1),
3 * 3 * 6 * 3,
),
];
for queue_type in QueueType::iter() {
for ((exp_id, session_id), expected_cnt) in cases.clone().into_iter() {
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
assert_eq!(
paramsets.len(),
expected_cnt as usize,
"queue_type:{:?}, exp:{:?}, session:{:?}",
queue_type,
exp_id,
session_id,
);
// Check if all parameter sets are unique
let unique_paramsets: HashSet<Vec<String>> = paramsets
.iter()
.map(|paramset| paramset.as_csv_record())
.collect();
assert_eq!(unique_paramsets.len(), paramsets.len());
// Check if paramset IDs are correct.
for (i, paramset) in paramsets.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
}
}
}
}

View File

@ -1,9 +1,11 @@
[package]
name = "queue"
name = "protocol"
version = "0.1.0"
edition = "2021"
[dependencies]
rand = "0.8.5"
rustc-hash = "2.0.0"
strum = "0.26.3"
strum_macros = "0.26.4"
tracing = "0.1.40"

View File

@ -0,0 +1,2 @@
pub mod node;
pub mod queue;

View File

@ -0,0 +1,107 @@
use rustc_hash::{FxHashMap, FxHashSet};
use crate::queue::{new_queue, Message, Queue, QueueConfig};
pub type NodeId = u32;
pub type MessageId = u32;
pub struct Node {
queue_config: QueueConfig,
// To have the deterministic result, we use Vec instead of FxHashMap.
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
// Instead, use `connected_peers` to build `queues` efficiently.
queues: Vec<(NodeId, Box<dyn Queue<MessageId>>)>,
connected_peers: FxHashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: Option<FxHashMap<MessageId, u32>>,
peering_degree: u32,
}
impl Node {
pub fn new(queue_config: QueueConfig, peering_degree: u32, enable_cache: bool) -> Self {
Node {
queue_config,
queues: Vec::new(),
connected_peers: FxHashSet::default(),
received_msgs: if enable_cache {
Some(FxHashMap::default())
} else {
None
},
peering_degree,
}
}
pub fn connect(&mut self, peer_id: NodeId) {
if self.connected_peers.insert(peer_id) {
let pos = self
.queues
.binary_search_by(|probe| probe.0.cmp(&peer_id))
.unwrap_or_else(|pos| pos);
self.queues
.insert(pos, (peer_id, new_queue(&self.queue_config)));
}
}
pub fn send(&mut self, msg: MessageId) {
assert!(self.check_and_update_cache(msg, true));
for (_, queue) in self.queues.iter_mut() {
queue.push(msg);
}
}
pub fn receive(&mut self, msg: MessageId, from: Option<NodeId>) -> bool {
let first_received = self.check_and_update_cache(msg, false);
if first_received {
for (node_id, queue) in self.queues.iter_mut() {
match from {
Some(sender) => {
if *node_id != sender {
queue.push(msg);
}
}
None => queue.push(msg),
}
}
}
first_received
}
pub fn read_queues(&mut self) -> Vec<(NodeId, Message<MessageId>)> {
let mut msgs_to_relay: Vec<(NodeId, Message<MessageId>)> = Vec::new();
self.queues.iter_mut().for_each(|(node_id, queue)| {
msgs_to_relay.push((*node_id, queue.pop()));
});
msgs_to_relay
}
pub fn data_count_in_queue(&self) -> Vec<usize> {
self.queues
.iter()
.map(|(_, queue)| queue.data_count())
.collect()
}
fn check_and_update_cache(&mut self, msg: MessageId, sending: bool) -> bool {
if let Some(received_msgs) = &mut self.received_msgs {
let first_received = if let Some(count) = received_msgs.get_mut(&msg) {
*count += 1;
false
} else {
received_msgs.insert(msg, if sending { 0 } else { 1 });
true
};
// If the message have been received from all connected peers, remove it from the cache
// because there is no possibility that the message will be received again.
if received_msgs.get(&msg).unwrap() == &self.peering_degree {
tracing::debug!("Remove message from cache: {}", msg);
received_msgs.remove(&msg);
}
first_received
} else {
true
}
}
}

View File

@ -30,9 +30,15 @@ impl std::str::FromStr for QueueType {
}
pub trait Queue<T: Copy> {
fn push(&mut self, msg: T);
fn pop(&mut self) -> Option<T>;
fn message_count(&self) -> usize;
fn push(&mut self, data: T);
fn pop(&mut self) -> Message<T>;
fn data_count(&self) -> usize;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Message<T: Copy> {
Data(T),
Noise,
}
pub struct QueueConfig {
@ -61,7 +67,7 @@ pub fn new_queue<T: 'static + Copy>(cfg: &QueueConfig) -> Box<dyn Queue<T>> {
}
struct NonMixQueue<T: Copy> {
queue: VecDeque<T>,
queue: VecDeque<T>, // don't need to contain Noise
}
impl<T: Copy> NonMixQueue<T> {
@ -73,59 +79,60 @@ impl<T: Copy> NonMixQueue<T> {
}
impl<T: Copy> Queue<T> for NonMixQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push_back(msg)
fn push(&mut self, data: T) {
self.queue.push_back(data)
}
fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
fn pop(&mut self) -> Message<T> {
match self.queue.pop_front() {
Some(data) => Message::Data(data),
None => Message::Noise,
}
}
fn message_count(&self) -> usize {
fn data_count(&self) -> usize {
self.queue.len()
}
}
struct MixQueue<T: Copy> {
queue: Vec<Option<T>>, // None element means noise
message_count: usize,
queue: Vec<Message<T>>,
data_count: usize,
rng: StdRng,
}
impl<T: Copy> MixQueue<T> {
fn new(num_initial_noises: usize, seed: u64) -> Self {
Self {
queue: vec![None; num_initial_noises],
message_count: 0,
queue: vec![Message::Noise; num_initial_noises],
data_count: 0,
rng: StdRng::seed_from_u64(seed),
}
}
fn push(&mut self, data: T) {
self.queue.push(Some(data));
self.message_count += 1;
self.queue.push(Message::Data(data));
self.data_count += 1;
}
fn fill_noises(&mut self, k: usize) {
self.queue.extend(std::iter::repeat(None).take(k))
self.queue.extend(std::iter::repeat(Message::Noise).take(k))
}
fn pop(&mut self, idx: usize) -> Option<T> {
fn pop(&mut self, idx: usize) -> Option<Message<T>> {
if idx < self.queue.len() {
match self.queue.remove(idx) {
Some(msg) => {
self.message_count -= 1;
Some(msg)
}
None => None,
let msg = self.queue.remove(idx);
if let Message::Data(_) = msg {
self.data_count -= 1;
}
Some(msg)
} else {
None
}
}
fn message_count(&self) -> usize {
self.message_count
fn data_count(&self) -> usize {
self.data_count
}
fn len(&self) -> usize {
@ -162,12 +169,12 @@ impl<T: Copy> MinSizeMixQueue<T> {
self.queue.push(msg)
}
fn pop(&mut self, idx: usize) -> Option<T> {
fn pop(&mut self, idx: usize) -> Option<Message<T>> {
self.queue.pop(idx)
}
fn message_count(&self) -> usize {
self.queue.message_count()
fn data_count(&self) -> usize {
self.queue.data_count()
}
fn ensure_min_size(&mut self) {
@ -211,20 +218,20 @@ impl<T: Copy> Queue<T> for PureCoinFlippingQueue<T> {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
fn pop(&mut self) -> Message<T> {
self.queue.ensure_min_size();
loop {
for i in 0..self.queue.len() {
if self.queue.flip_coin() {
return self.queue.pop(i);
return self.queue.pop(i).unwrap();
}
}
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
@ -245,15 +252,15 @@ impl<T: Copy> Queue<T> for PureRandomSamplingQueue<T> {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
fn pop(&mut self) -> Message<T> {
self.queue.ensure_min_size();
let i = self.queue.sample_index();
self.queue.pop(i)
self.queue.pop(i).unwrap()
}
fn message_count(&self) -> usize {
self.queue.message_count()
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
@ -274,7 +281,7 @@ impl<T: Copy> Queue<T> for PermutedCoinFlippingQueue<T> {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
fn pop(&mut self) -> Message<T> {
self.queue.ensure_min_size();
self.queue.shuffle();
@ -282,14 +289,14 @@ impl<T: Copy> Queue<T> for PermutedCoinFlippingQueue<T> {
loop {
for i in 0..self.queue.len() {
if self.queue.flip_coin() {
return self.queue.pop(i);
return self.queue.pop(i).unwrap();
}
}
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
@ -310,24 +317,24 @@ impl<T: Copy> Queue<T> for NoisyCoinFlippingQueue<T> {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
fn pop(&mut self) -> Message<T> {
if self.queue.len() == 0 {
return None;
return Message::Noise;
}
loop {
for i in 0..self.queue.len() {
if self.queue.flip_coin() {
return self.queue.pop(i);
return self.queue.pop(i).unwrap();
} else if i == 0 {
return None;
return Message::Noise;
}
}
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
@ -348,21 +355,21 @@ impl<T: Copy> Queue<T> for NoisyCoinFlippingRandomReleaseQueue<T> {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
fn pop(&mut self) -> Message<T> {
if self.queue.len() == 0 {
return None;
return Message::Noise;
}
if self.queue.flip_coin() {
let i = self.queue.sample_index();
self.queue.pop(i)
self.queue.pop(i).unwrap()
} else {
None
Message::Noise
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
@ -380,23 +387,23 @@ mod tests {
min_queue_size: 0,
});
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
// Check if noise is returned when queue is empty
assert_eq!(queue.pop(), Message::Noise);
// Check if queue is FIFO
queue.push(0);
queue.push(1);
assert_eq!(queue.pop(), Some(0));
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Message::Data(0));
assert_eq!(queue.pop(), Message::Data(1));
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
// Check if noise is returned when queue is empty
assert_eq!(queue.pop(), Message::Noise);
// Check if queue is FIFO again
queue.push(2);
queue.push(3);
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Message::Data(2));
assert_eq!(queue.pop(), Message::Data(3));
}
#[test]
@ -419,8 +426,8 @@ mod tests {
min_queue_size: 4,
});
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
// Check if noise is returned when queue is empty
assert_eq!(queue.pop(), Message::Noise);
// Put only 2 messages even though the min queue size is 4
queue.push(0);
@ -429,12 +436,12 @@ mod tests {
// Wait until 2 messages are returned from the queue
let mut set: HashSet<_> = vec![0, 1].into_iter().collect();
while !set.is_empty() {
if let Some(msg) = queue.pop() {
if let Message::Data(msg) = queue.pop() {
assert!(set.remove(&msg));
}
}
// Check if None (noise) is returned when there is no real message remains
assert_eq!(queue.pop(), None);
// Check if noise is returned when there is no real message remains
assert_eq!(queue.pop(), Message::Noise);
}
}

View File

@ -1,35 +0,0 @@
use queue::{new_queue, Queue, QueueConfig};
pub type MessageId = u32;
pub struct Node {
queue: Box<dyn Queue<MessageId>>,
}
impl Node {
pub fn new(queue_config: &QueueConfig) -> Self {
Node {
queue: new_queue(queue_config),
}
}
pub fn send(&mut self, msg: MessageId) {
// Schedule sending a new data message to the peer
self.queue.push(msg);
}
pub fn receive(&mut self, msg: MessageId) {
// Relay the message to another peer.
// Don't need to accept noise in this function because it anyway has to be dropped.
self.queue.push(msg);
}
pub fn read_queue(&mut self) -> Option<MessageId> {
// Returns `None` if a noise was read from the queue
self.queue.pop()
}
pub fn message_count_in_queue(&self) -> u32 {
self.queue.message_count().try_into().unwrap()
}
}

View File

@ -1,220 +0,0 @@
use queue::QueueType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ExperimentId {
Experiment1 = 1,
Experiment2 = 2,
}
impl std::str::FromStr for ExperimentId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Experiment1" => Ok(ExperimentId::Experiment1),
"2" | "Experiment2" => Ok(ExperimentId::Experiment2),
_ => Err(format!("Invalid experiment ID: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum SessionId {
Session1 = 1,
}
impl std::str::FromStr for SessionId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Session1" => Ok(SessionId::Session1),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
}
pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"paramset",
"num_nodes",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_sender_data_msgs",
"sender_data_msg_prob",
"mix_data_msg_prob",
"queue_type",
"num_iterations",
];
#[derive(Debug, Clone, PartialEq)]
pub struct ParamSet {
pub id: u16,
pub num_nodes: u32,
pub peering_degree: u32,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_sender_data_msgs: u32,
pub sender_data_msg_prob: f32,
pub mix_data_msg_prob: f32,
pub queue_type: QueueType,
pub num_iterations: usize,
}
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
session_id: SessionId,
queue_type: QueueType,
) -> Vec<Self> {
match session_id {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
}
}
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let num_nodes: u32 = 3;
let peering_degree: u32 = 1;
let transmission_rate: u16 = 1;
let min_queue_sizes: &[u16] = &[
transmission_rate.checked_div(2).unwrap(),
transmission_rate,
transmission_rate.checked_mul(2).unwrap(),
];
let num_sender_data_msgs: u32 = (transmission_rate as u32).checked_mul(1000).unwrap();
let sender_data_msg_probs: &[f32] = &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0];
let mix_data_msg_probs: &[f32] = match exp_id {
ExperimentId::Experiment1 => &[0.0],
ExperimentId::Experiment2 => &[0.00001, 0.0001, 0.001, 0.01, 0.1],
};
let num_iterations: usize = 100;
let mut id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &min_queue_size in min_queue_sizes {
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in mix_data_msg_probs {
if !Self::is_min_queue_size_applicable(&queue_type)
&& min_queue_size != min_queue_sizes[0]
{
id += 1;
continue;
}
let paramset = ParamSet {
id,
num_nodes,
peering_degree,
min_queue_size,
transmission_rate,
num_sender_data_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations,
};
id += 1;
paramsets.push(paramset);
}
}
}
paramsets
}
pub fn is_min_queue_size_applicable(queue_type: &QueueType) -> bool {
matches!(
queue_type,
QueueType::PureCoinFlipping
| QueueType::PureRandomSampling
| QueueType::PermutedCoinFlipping
)
}
pub fn as_csv_record(&self) -> Vec<String> {
vec![
self.id.to_string(),
self.num_nodes.to_string(),
self.peering_degree.to_string(),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_sender_data_msgs.to_string(),
self.sender_data_msg_prob.to_string(),
self.mix_data_msg_prob.to_string(),
format!("{:?}", self.queue_type),
self.num_iterations.to_string(),
]
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use strum::IntoEnumIterator;
use crate::paramset::ParamSet;
use super::*;
#[test]
fn test_new_all_paramsets() {
let cases = vec![
((ExperimentId::Experiment1, SessionId::Session1), 3 * 6),
((ExperimentId::Experiment2, SessionId::Session1), 3 * 6 * 5),
];
for queue_type in QueueType::iter() {
for ((exp_id, session_id), mut expected_cnt) in cases.clone().into_iter() {
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
// Check if the number of parameter sets is correct
if !ParamSet::is_min_queue_size_applicable(&queue_type) {
expected_cnt /= 3;
}
assert_eq!(paramsets.len(), expected_cnt as usize);
// Check if all parameter sets are unique
let unique_paramsets: HashSet<Vec<String>> = paramsets
.iter()
.map(|paramset| paramset.as_csv_record())
.collect();
assert_eq!(unique_paramsets.len(), paramsets.len());
// Check if paramset IDs are correct.
if ParamSet::is_min_queue_size_applicable(&queue_type) {
for (i, paramset) in paramsets.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
}
}
}
}
#[test]
fn test_id_consistency() {
let cases = vec![
(ExperimentId::Experiment1, SessionId::Session1),
(ExperimentId::Experiment2, SessionId::Session1),
];
for (exp_id, session_id) in cases.into_iter() {
let paramsets_with_min_queue_size =
ParamSet::new_all_paramsets(exp_id, session_id, QueueType::PureCoinFlipping);
let paramsets_without_min_queue_size =
ParamSet::new_all_paramsets(exp_id, session_id, QueueType::NonMix);
for (i, paramset) in paramsets_with_min_queue_size.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
for mut paramset in paramsets_without_min_queue_size.into_iter() {
// To compare ParameterSet instances, use the same queue type.
paramset.queue_type = QueueType::PureCoinFlipping;
assert_eq!(
paramset,
paramsets_with_min_queue_size[paramset.id as usize - 1]
);
}
}
}
}