Mixnet: Dissemination and Ordering Experiments (#13)

This commit is contained in:
Youngjoon Lee 2024-09-16 18:50:20 +09:00 committed by GitHub
parent db4348f3ff
commit 714d50f493
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 4284 additions and 12 deletions

View File

@ -7,6 +7,9 @@ on:
push:
branches: [master]
env:
CARGO_TERM_COLOR: always
jobs:
mixnet:
runs-on: ubuntu-latest
@ -14,17 +17,9 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python 3.x
uses: actions/setup-python@v5
with:
python-version: "3.x"
- name: Install dependencies for mixnet
- name: Build
working-directory: mixnet
run: pip install -r requirements.txt
- name: Run unit tests
run: cargo build -v
- name: Unit tests
working-directory: mixnet
run: python -m unittest -v
- name: Run a short mixnet simulation
working-directory: mixnet
run: python -m cmd.main --config config.ci.yaml
run: cargo test -v

21
mixnet/.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

3
mixnet/Cargo.toml Normal file
View File

@ -0,0 +1,3 @@
[workspace]
members = ["dissemination", "protocol", "ordering"]
resolver = "2"

8
mixnet/README.md Normal file
View File

@ -0,0 +1,8 @@
# Mixnet Simulations
## Structure
- [`protocol/`](./protocol): The implementation of the mixnet protocol
- [`dissemination/`](./dissemination): [Queuing Mechanism: Message Dissemination Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#5120661bdf5343319e66c9372dd623b7)
- [`ordering/`](./ordering): [Queuing Mechanism: Ordering Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4d38e8790ecd492a812c733bf140b864)
- [`analysis/`](./analysis): The analysis tools for the simulation results

2
mixnet/analysis/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.venv/
*.png

45
mixnet/analysis/README.md Normal file
View File

@ -0,0 +1,45 @@
# Ordering Experiments: Analysis Tools
This Python project contains scripts to draw plots and tables from the results of the [ordering experiments](../ordering).
## Prerequisites
- Python 3
- Installing dependencies
```bash
python3 -m venv .venv
source .venv/bin/activate
pip isntall -r requirements.txt
```
## Usages
### Analyzing the latency results
The following command draws plots and tables from the latency results of the ordering experiments.
```bash
python latency.py <aggregated_csv_path> <output_dir>
```
- `aggregated_csv_path`
- A path to a CSV file that contains all statistical results of all experiments that must be shown in the plot and table.
- This script expects that the CSV file has at least the following columns.
- `paramset` (int): Parameter Set ID
- `queue_type` (str): Queue Type (NonMix, PureCoinFlipping, ...)
- `latency_min`, `latency_mean` and `latency_max` (float)
- `output_dir` A directory path where all PNG files of plots and tables are stored in
### Analyzing the ordering coefficient results
The following command draws plots and tables from the ordering coefficient results of the ordering experiments.
```bash
python coeff.py <aggregated_csv_path> <output_dir>
```
- `aggregated_csv_path`
- A path to a CSV file that contains all statistical results of all experiments that must be shown in the plot and table.
- This script expects that the CSV file has at least the following columns.
- `paramset` (int): Parameter Set ID
- `queue_type` (str): Queue Type (NonMix, PureCoinFlipping, ...)
- `strong_coeff_min`, `strong_coeff_mean` and `strong_coeff_max` (float)
- `casual_coeff_min`, `casual_coeff_mean` and `casual_coeff_max` (float)
- `weak_coeff_min`, `weak_coeff_mean` and `weak_coeff_max` (float)
- `output_dir` A directory path where all PNG files of plots and tables are stored in

103
mixnet/analysis/coeff.py Normal file
View File

@ -0,0 +1,103 @@
import argparse
import matplotlib.pyplot as plt
import pandas as pd
from common import COLORS, MARKERS, X_FIELDS
def analyze(path: str, outdir: str):
data = pd.read_csv(path)
for x_field in X_FIELDS:
analyze_versus(data, x_field, outdir)
def analyze_versus(data: pd.DataFrame, x_field: str, outdir: str):
# Group by both x_field and queue_type, then select the row with the largest paramset for each group
max_paramset_data = data.loc[
data.groupby([x_field, "queue_type"])["paramset"].idxmax()
]
all_fields = [
["strong_coeff_min", "casual_coeff_min", "weak_coeff_min"],
["strong_coeff_mean", "casual_coeff_mean", "weak_coeff_mean"],
["strong_coeff_max", "casual_coeff_max", "weak_coeff_max"],
]
# Display the plots
fig, ax = plt.subplots(3, 3, figsize=(20, 10))
for ax_col, fields in enumerate(all_fields):
max_y = 0
for field in fields:
max_y = max(max_y, max_paramset_data[field].max())
for ax_row, field in enumerate(fields):
for queue_type in max_paramset_data["queue_type"].unique():
queue_data = max_paramset_data[
max_paramset_data["queue_type"] == queue_type
]
x_values = queue_data[x_field]
y_values = queue_data[field]
ax[ax_row][ax_col].plot(
x_values,
y_values,
label=queue_type,
marker=MARKERS[queue_type],
color=COLORS[queue_type],
)
ax[ax_row][ax_col].set_title(f"{field} vs {x_field}", fontsize=10)
ax[ax_row][ax_col].set_xlabel(x_field)
ax[ax_row][ax_col].set_ylabel(field)
if ax_row == 0 and ax_col == len(all_fields) - 1:
ax[ax_row][ax_col].legend(bbox_to_anchor=(1, 1), loc="upper left")
ax[ax_row][ax_col].grid(True)
if max_y < 1e6:
ax[ax_row][ax_col].set_ylim(-10, max_y * 1.05)
else:
ax[ax_row][ax_col].set_ylim(bottom=-10)
plt.tight_layout()
fig.savefig(f"{outdir}/coeff_vs_{x_field}.png")
# Display the table of values
# Create a table with x_field, queue_type, and coefficients
flatten_fields = [
field for zipped_fields in zip(*all_fields) for field in zipped_fields
]
columns = [x_field, "queue_type"] + flatten_fields
table_data = max_paramset_data[columns].sort_values(by=[x_field, "queue_type"])
# Prepare to display values with only 2 decimal places
table_data[fields] = table_data[fields].map(
lambda x: f"{x:.2e}" if abs(x) >= 1e6 else f"{x:.2f}"
)
# Display the table as a separate subplot
fig_table, ax_table = plt.subplots(
figsize=(len(columns) * 1.8, len(table_data) * 0.3)
)
ax_table.axis("off") # Turn off the axis
table = ax_table.table(
cellText=table_data.values,
colLabels=table_data.columns,
cellLoc="center",
loc="center",
)
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 1.5)
for i in range(len(table_data.columns)):
table.auto_set_column_width(i)
fig_table.savefig(f"{outdir}/coeff_vs_{x_field}_table.png")
plt.draw()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Aggregate the results of all paramsets of an experiment"
)
parser.add_argument("path", type=str, help="dir path")
parser.add_argument("outdir", type=str, help="output dir path")
args = parser.parse_args()
analyze(args.path, args.outdir)

24
mixnet/analysis/common.py Normal file
View File

@ -0,0 +1,24 @@
MARKERS = {
"NonMix": "o",
"PureCoinFlipping": "s",
"PureRandomSampling": "^",
"PermutedCoinFlipping": "v",
"NoisyCoinFlipping": "*",
"NoisyCoinFlippingRandomRelease": "d",
}
COLORS = {
"NonMix": "#35A02D",
"PureCoinFlipping": "#9467BD",
"PureRandomSampling": "#8C564B",
"PermutedCoinFlipping": "#D62728",
"NoisyCoinFlipping": "#3977B4",
"NoisyCoinFlippingRandomRelease": "#F07F12",
}
X_FIELDS = [
"num_mixes",
"peering_degree",
"sender_data_msg_prob",
"mix_data_msg_prob",
]

View File

@ -0,0 +1,88 @@
import argparse
import matplotlib.pyplot as plt
import pandas as pd
from common import COLORS, MARKERS, X_FIELDS
def analyze(path: str, outdir: str):
data = pd.read_csv(path)
for x_field in X_FIELDS:
analyze_versus(data, x_field, outdir)
def analyze_versus(data: pd.DataFrame, x_field: str, outdir: str):
# Group by both x_field and queue_type, then select the row with the largest paramset for each group
max_paramset_data = data.loc[
data.groupby([x_field, "queue_type"])["paramset"].idxmax()
]
fields = ["latency_min", "latency_mean", "latency_max"]
# Display the plots
fig, ax = plt.subplots(1, 3, figsize=(20, 4))
for ax_col, field in enumerate(fields):
for queue_type in max_paramset_data["queue_type"].unique():
queue_data = max_paramset_data[
max_paramset_data["queue_type"] == queue_type
]
x_values = queue_data[x_field]
y_values = queue_data[field]
ax[ax_col].plot(
x_values,
y_values,
label=queue_type,
marker=MARKERS[queue_type],
color=COLORS[queue_type],
)
ax[ax_col].set_title(f"{field} vs {x_field}", fontsize=10)
ax[ax_col].set_xlabel(x_field)
ax[ax_col].set_ylabel(field)
if ax_col == len(fields) - 1:
ax[ax_col].legend(bbox_to_anchor=(1, 1), loc="upper left")
ax[ax_col].grid(True)
ax[ax_col].set_ylim(bottom=0)
plt.tight_layout()
fig.savefig(f"{outdir}/latency_vs_{x_field}.png")
# Display the table of values
# Create a table with x_field, queue_type, and coefficients
columns = [x_field, "queue_type"] + fields
table_data = max_paramset_data[columns].sort_values(by=[x_field, "queue_type"])
# Prepare to display values with only 2 decimal places
table_data[fields] = table_data[fields].map(
lambda x: f"{x:.2e}" if abs(x) >= 1e6 else f"{x:.2f}"
)
# Display the table as a separate subplot
fig_table, ax_table = plt.subplots(
figsize=(len(columns) * 1.8, len(table_data) * 0.3)
)
ax_table.axis("off") # Turn off the axis
table = ax_table.table(
cellText=table_data.values,
colLabels=table_data.columns,
cellLoc="center",
loc="center",
)
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 1.5)
for i in range(len(table_data.columns)):
table.auto_set_column_width(i)
fig_table.savefig(f"{outdir}/latency_vs_{x_field}_table.png")
plt.draw()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Aggregate the results of all paramsets of an experiment"
)
parser.add_argument("path", type=str, help="dir path")
parser.add_argument("outdir", type=str, help="output dir path")
args = parser.parse_args()
analyze(args.path, args.outdir)

View File

@ -0,0 +1,2 @@
matplotlib==3.9.2
pandas==2.2.2

View File

@ -0,0 +1,20 @@
[package]
name = "dissemination"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.38"
clap = { version = "4.5.16", features = ["derive"] }
csv = "1.3.0"
protocol = { version = "0.1.0", path = "../protocol" }
rand = "0.8.5"
rayon = "1.10.0"
rustc-hash = "2.0.0"
strum = "0.26.3"
strum_macros = "0.26.4"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
[profile.release]
opt-level = 3 # max optimization

View File

@ -0,0 +1,37 @@
# Queuing Mechanism: Message Dissemination Experiments
This directory contains the code for the [Message Dissemination Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#5120661bdf5343319e66c9372dd623b7), which is the part of the Queuing Mechanism Experiments.
## Usages
```bash
cargo install --path dissemination
dissemination --exp-id 1 --session-id 1 --queue-type PureCoinFlipping --outdir $PWD/out --num-threads 4
```
- `exp-id`: [Experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4543053fbb8c4a2f8d49b0dffdb4a928) ID (starting from 1)
- `session-id`: [Session](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ced2155214f442ed95b442c18f5832f6) ID (starting from 1)
- `queue-type`: NonMix, PureCoinFlipping, PureRandomSampling, PermutedCoinFlipping, NoisyCoinFlipping, or NoisyCoinFlippingRandomRelease
- `outdir`: Output directory
- `num-threads`: The number of threads to run each iteration of the parameter sets of each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4543053fbb8c4a2f8d49b0dffdb4a928)
## Outputs
```
<outdir>/
dissemination_e1s1_PureCoinFlipping_2024-09-16T09:09:08.793730+00:00_0d0h12m30s/
paramset_[1..P]/
paramset.csv
iteration_[0..I].csv
topology_[0..I].csv
```
- `paramset_[1..P]/`: Each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4543053fbb8c4a2f8d49b0dffdb4a928) consists of multiple parameter sets. The result of each parameter set is stored in a separate directory.
- `paramset.csv`: The detailed parameters of the parameter set.
- `iteration_[0..I].csv`: The result of each iteration.
- Columns
- `dissemination_time`: The time taken to disseminate the message to the entire network
- `sent_time`: The time when the message was sent by a sender
- `all_received_time`: The time when the message was received by all nodes
- `dissemination_time = all_received_time - sent_time`
- All times used in the simulations are virtual discrete time units.
- `topology_[0..I].csv`: The randomly generated network topology of each iteration

View File

@ -0,0 +1,207 @@
use protocol::{
node::{Node, NodeId},
queue::{Message, QueueConfig},
topology::{build_topology, save_topology},
};
use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng};
use rustc_hash::FxHashMap;
use crate::paramset::ParamSet;
type MessageId = u32;
// An interval that the sender nodes send (schedule) new messages
const MSG_INTERVAL: f32 = 1.0;
pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) {
// Initialize nodes (not connected with each other yet)
let mut nodes: Vec<Node<MessageId>> = Vec::new();
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let peering_degrees = paramset.gen_peering_degrees(seed);
tracing::debug!("PeeringDegrees initialized.");
for node_id in 0..paramset.num_nodes {
nodes.push(Node::new(
node_id,
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
peering_degrees[node_id as usize],
true,
));
}
tracing::debug!("Nodes initialized.");
// Build a random topology, and connect nodes with each other
let topology = build_topology(paramset.num_nodes, &peering_degrees, seed);
tracing::debug!("Topology built.");
save_topology(&topology, topology_path).unwrap();
tracing::debug!("Topology saved.");
for (node_id, peers) in topology.iter().enumerate() {
peers.iter().for_each(|peer_id| {
nodes[node_id].connect(*peer_id);
});
}
tracing::debug!("Nodes connected");
let mut sender_selector = SenderSelector::new(
(0..paramset.num_nodes).collect(),
paramset.num_senders,
paramset.random_senders_every_time,
seed,
);
// To generate unique message IDs
let mut next_msg_id: MessageId = 0;
let total_num_msgs = paramset.total_num_messages();
// msg_id -> (sent_vtime, num_received_nodes)
let mut message_tracker: FxHashMap<MessageId, (f32, u16)> = FxHashMap::default();
// To keep track of how many messages have been disseminated to all nodes
let mut num_disseminated_msgs = 0;
let mut writer = csv::Writer::from_path(out_csv_path).unwrap();
writer
.write_record(["dissemination_time", "sent_time", "all_received_time"])
.unwrap();
writer.flush().unwrap();
// Virtual discrete time
let mut vtime: f32;
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Jump `vtime` to one of the following two vtimes.
// 1. The next time to send (schedule) a message. Increased by `MSG_INTERVAL`.
let mut next_messaging_vtime: f32 = 0.0;
// 2. The next time to release a message from each queue and relay them. Increased by `transmission_interval`.
let mut next_transmission_vtime: f32 = 0.0;
loop {
// If there are still messages to be sent (scheduled),
// and if the next time to send a message is earlier than the next time to relay messages.
if next_msg_id < total_num_msgs && next_messaging_vtime <= next_transmission_vtime {
// Send new messages
vtime = next_messaging_vtime;
next_messaging_vtime += MSG_INTERVAL;
send_messages(
vtime,
sender_selector.next(),
&mut nodes,
&mut next_msg_id,
&mut message_tracker,
);
} else {
// Release a message from each queue and relay all of them
vtime = next_transmission_vtime;
next_transmission_vtime += transmission_interval;
relay_messages(
vtime,
&mut nodes,
&mut message_tracker,
&mut num_disseminated_msgs,
&mut writer,
);
// Check if all messages have been disseminated to all nodes.
if num_disseminated_msgs == total_num_msgs as usize {
break;
}
}
}
}
fn send_messages(
vtime: f32,
sender_ids: &[NodeId],
nodes: &mut [Node<MessageId>],
next_msg_id: &mut MessageId,
message_tracker: &mut FxHashMap<MessageId, (f32, u16)>,
) {
for &sender_id in sender_ids.iter() {
nodes[sender_id as usize].send(*next_msg_id);
message_tracker.insert(*next_msg_id, (vtime, 1));
*next_msg_id += 1;
}
}
fn relay_messages(
vtime: f32,
nodes: &mut [Node<MessageId>],
message_tracker: &mut FxHashMap<MessageId, (f32, u16)>,
num_disseminated_msgs: &mut usize,
writer: &mut csv::Writer<std::fs::File>,
) {
// Collect messages to relay
let mut all_msgs_to_relay: Vec<Vec<(NodeId, Message<MessageId>)>> = Vec::new();
for node in nodes.iter_mut() {
all_msgs_to_relay.push(node.read_queues());
}
// Relay the messages
all_msgs_to_relay
.into_iter()
.enumerate()
.for_each(|(sender_id, msgs_to_relay)| {
let sender_id: NodeId = sender_id.try_into().unwrap();
msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| {
if let Message::Data(msg) = msg {
if nodes[receiver_id as usize].receive(msg, Some(sender_id)) {
let (sent_time, num_received_nodes) =
message_tracker.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes as usize == nodes.len() {
let dissemination_time = vtime - *sent_time;
writer
.write_record(&[
dissemination_time.to_string(),
sent_time.to_string(),
vtime.to_string(),
])
.unwrap();
writer.flush().unwrap();
*num_disseminated_msgs += 1;
message_tracker.remove(&msg);
}
}
}
})
});
}
struct SenderSelector {
candidates: Vec<NodeId>,
num_senders: NodeId,
random_senders_every_time: bool,
rng: StdRng,
}
impl SenderSelector {
fn new(
candidates: Vec<NodeId>,
num_senders: u32,
random_senders_every_time: bool,
seed: u64,
) -> Self {
assert!(candidates.len() >= num_senders as usize);
Self {
candidates,
num_senders,
random_senders_every_time,
rng: StdRng::seed_from_u64(seed),
}
}
fn next(&mut self) -> &[NodeId] {
if !self.random_senders_every_time {
// It's okay to pick the first `num_senders` nodes
// because the topology is randomly generated.
&self.candidates[..self.num_senders as usize]
} else {
self.candidates.shuffle(&mut self.rng);
&self.candidates[..self.num_senders as usize]
}
}
}

View File

@ -0,0 +1,145 @@
use chrono::Utc;
use clap::Parser;
use protocol::queue::QueueType;
use rayon::prelude::*;
use std::{
error::Error,
path::Path,
time::{Duration, SystemTime},
};
use iteration::run_iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
mod iteration;
mod paramset;
#[derive(Debug, Parser)]
#[command(name = "Message Dessemination Time Measurement")]
struct Args {
#[arg(short, long)]
exp_id: ExperimentId,
#[arg(short, long)]
session_id: SessionId,
#[arg(short, long)]
queue_type: QueueType,
#[arg(short, long)]
outdir: String,
#[arg(short, long)]
num_threads: usize,
#[arg(short, long)]
from_paramset: Option<u16>,
}
fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
tracing::info!("Arguments: {:?}", args);
let Args {
exp_id,
session_id,
queue_type,
outdir,
num_threads,
from_paramset,
} = args;
// Create a directory and initialize a CSV file only with a header
assert!(
Path::new(&outdir).is_dir(),
"Output directory does not exist: {outdir}"
);
let subdir = format!(
"__WIP__dissemination_e{}s{}_{:?}_{}___DUR__",
exp_id as u8,
session_id as u8,
queue_type,
Utc::now().to_rfc3339()
);
std::fs::create_dir_all(&format!("{outdir}/{subdir}")).unwrap();
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
let session_start_time = SystemTime::now();
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.unwrap();
pool.install(|| {
paramsets.par_iter().for_each(|paramset| {
if paramset.id < from_paramset.unwrap_or(0) {
tracing::info!("ParamSet:{} skipped", paramset.id);
return;
}
let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id);
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
save_paramset_info(paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
for i in 0..paramset.num_iterations {
let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv");
let topology_path = format!("{paramset_dir}/topology_{i}.csv");
run_iteration(paramset.clone(), i as u64, &out_csv_path, &topology_path);
let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_");
std::fs::rename(&out_csv_path, &new_out_csv_path)
.expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}");
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
}
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
std::fs::rename(&paramset_dir, &new_paramset_dir)
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}");
tracing::info!("ParamSet:{} completed", paramset.id);
});
});
let session_duration = SystemTime::now()
.duration_since(session_start_time)
.unwrap();
// Replace "__WIP__" and "__DUR__" in the subdir string
let new_subdir = subdir
.replace("__WIP__", "")
.replace("__DUR__", &format_duration(session_duration));
let old_path = format!("{}/{}", outdir, subdir);
let new_path = format!("{}/{}", outdir, new_subdir);
assert!(
!Path::new(&new_path).exists(),
"The new directory already exists: {new_path}"
);
std::fs::rename(&old_path, &new_path)
.expect("Failed to rename the directory: {old_path} -> {new_path}");
tracing::info!("Session completed.");
}
fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box<dyn Error>> {
// Assert that the file does not already exist
assert!(
!Path::new(path).exists(),
"File already exists at path: {path}",
);
let mut wtr = csv::Writer::from_path(path)?;
wtr.write_record(PARAMSET_CSV_COLUMNS)?;
wtr.write_record(paramset.as_csv_record())?;
wtr.flush()?;
Ok(())
}
fn format_duration(duration: Duration) -> String {
let total_seconds = duration.as_secs();
let days = total_seconds / 86_400;
let hours = (total_seconds % 86_400) / 3_600;
let minutes = (total_seconds % 3_600) / 60;
let seconds = total_seconds % 60;
format!("{}d{}h{}m{}s", days, hours, minutes, seconds)
}

View File

@ -0,0 +1,520 @@
use protocol::queue::QueueType;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ExperimentId {
Experiment1 = 1,
Experiment2 = 2,
Experiment3 = 3,
Experiment4 = 4,
Experiment5 = 5,
}
impl std::str::FromStr for ExperimentId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Experiment1" => Ok(ExperimentId::Experiment1),
"2" | "Experiment2" => Ok(ExperimentId::Experiment2),
"3" | "Experiment3" => Ok(ExperimentId::Experiment3),
"4" | "Experiment4" => Ok(ExperimentId::Experiment4),
"5" | "Experiment5" => Ok(ExperimentId::Experiment5),
_ => Err(format!("Invalid experiment ID: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum SessionId {
Session1 = 1,
Session2 = 2,
Session2_1 = 21,
Session3 = 3,
}
impl std::str::FromStr for SessionId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Session1" => Ok(SessionId::Session1),
"2" | "Session2" => Ok(SessionId::Session2),
"2.1" | "Session21" => Ok(SessionId::Session2_1),
"3" | "Session3" => Ok(SessionId::Session3),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
}
pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"paramset",
"num_nodes",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_sent_msgs",
"num_senders",
"random_senders_every_time",
"queue_type",
"num_iterations",
];
#[derive(Debug, Clone, PartialEq)]
pub struct ParamSet {
pub id: u16,
pub num_nodes: u32,
pub peering_degree_rates: PeeringDegreeRates,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_sent_msgs: u32,
pub num_senders: u32,
pub random_senders_every_time: bool,
pub queue_type: QueueType,
pub num_iterations: usize,
}
// peering_degree -> rate
// Use Vec instead of HashMap to avoid unexpected undeterministic behavior
type PeeringDegreeRates = Vec<(u32, f32)>;
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
session_id: SessionId,
queue_type: QueueType,
) -> Vec<Self> {
match session_id {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
SessionId::Session2 => Self::new_session2_paramsets(exp_id, queue_type),
SessionId::Session2_1 => Self::new_session2_1_paramsets(exp_id, queue_type),
SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type),
}
}
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[20u32, 40u32, 80u32] {
let peering_degrees_list = &[
vec![(num_nodes.checked_div(5).unwrap(), 1.0)],
vec![(num_nodes.checked_div(4).unwrap(), 1.0)],
vec![(num_nodes.checked_div(2).unwrap(), 1.0)],
];
let min_queue_size_list = &[
num_nodes.checked_div(2).unwrap().try_into().unwrap(),
num_nodes.try_into().unwrap(),
num_nodes.checked_mul(2).unwrap().try_into().unwrap(),
];
let transmission_rate_list = &[
num_nodes.checked_div(2).unwrap().try_into().unwrap(),
num_nodes.try_into().unwrap(),
num_nodes.checked_mul(2).unwrap().try_into().unwrap(),
];
let num_sent_msgs_list = |_| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => vec![8, 16, 32],
};
let num_senders_list = match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
let num_iterations = num_nodes.checked_div(2).unwrap().try_into().unwrap();
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
peering_degrees_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
num_senders_list.as_slice(),
random_senders_every_time,
queue_type,
num_iterations,
);
paramsets.append(&mut new_paramsets);
start_id = next_start_id;
}
paramsets
}
fn new_session2_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[100u32, 1000u32, 10000u32] {
let peering_degrees_list = &[vec![(4, 1.0)], vec![(8, 1.0)], vec![(16, 1.0)]];
let min_queue_size_list = &[10, 50, 100];
let transmission_rate_list = &[1, 10, 100];
let num_sent_msgs_list = |min_queue_size: u16| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![
min_queue_size.checked_div(2).unwrap().into(),
min_queue_size.into(),
min_queue_size.checked_mul(2).unwrap().into(),
]
}
};
let num_senders_list = match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
let num_iterations = 20;
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
peering_degrees_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
num_senders_list.as_slice(),
random_senders_every_time,
queue_type,
num_iterations,
);
paramsets.append(&mut new_paramsets);
start_id = next_start_id;
}
paramsets
}
fn new_session2_1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[20u32, 200u32, 2000u32] {
let peering_degrees_list = &[vec![(4, 1.0)], vec![(6, 1.0)], vec![(8, 1.0)]];
let min_queue_size_list = &[10, 50, 100];
let transmission_rate_list = &[1];
let num_sent_msgs_list = |_| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => vec![1000],
};
let num_senders_list = match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3
| ExperimentId::Experiment4
| ExperimentId::Experiment5 => {
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
let num_iterations = 20;
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
peering_degrees_list,
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
num_senders_list.as_slice(),
random_senders_every_time,
queue_type,
num_iterations,
);
paramsets.append(&mut new_paramsets);
start_id = next_start_id;
}
paramsets
}
fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let start_id: u16 = 1;
let num_nodes: u32 = 100000;
let peering_degrees = vec![(4, 0.87), (129, 0.123), (500, 0.07)];
let min_queue_size_list = &[10, 50, 100];
let transmission_rate_list = &[1];
let num_sent_msgs_list = |min_queue_size: u16| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2 | ExperimentId::Experiment4 | ExperimentId::Experiment5 => {
vec![
min_queue_size.checked_div(2).unwrap().into(),
min_queue_size.into(),
min_queue_size.checked_mul(2).unwrap().into(),
]
}
};
let num_senders_list = match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3 | ExperimentId::Experiment4 | ExperimentId::Experiment5 => {
vec![
num_nodes.checked_div(10).unwrap(),
num_nodes.checked_div(5).unwrap(),
num_nodes.checked_div(2).unwrap(),
]
}
};
let random_senders_every_time = exp_id == ExperimentId::Experiment5;
let num_iterations = 100;
let (paramsets, _) = Self::new_paramsets(
start_id,
num_nodes,
&[peering_degrees],
min_queue_size_list,
transmission_rate_list,
num_sent_msgs_list,
num_senders_list.as_slice(),
random_senders_every_time,
queue_type,
num_iterations,
);
paramsets
}
#[allow(clippy::too_many_arguments)]
fn new_paramsets(
start_id: u16,
num_nodes: u32,
peering_degrees_list: &[PeeringDegreeRates],
min_queue_size_list: &[u16],
transmission_rate_list: &[u16],
num_sent_msgs_list: impl Fn(u16) -> Vec<u32>,
num_senders_list: &[u32],
random_senders_every_time: bool,
queue_type: QueueType,
num_iterations: usize,
) -> (Vec<ParamSet>, u16) {
let mut id = start_id;
let mut paramsets: Vec<ParamSet> = Vec::new();
for peering_degrees in peering_degrees_list {
for &min_queue_size in min_queue_size_list {
for &transmission_rate in transmission_rate_list {
for &num_sent_msgs in num_sent_msgs_list(min_queue_size).iter() {
for &num_senders in num_senders_list {
if !Self::is_min_queue_size_applicable(&queue_type)
&& min_queue_size != min_queue_size_list[0]
{
id += 1;
continue;
}
paramsets.push(ParamSet {
id,
num_nodes,
peering_degree_rates: peering_degrees.clone(),
min_queue_size,
transmission_rate,
num_sent_msgs,
num_senders,
random_senders_every_time,
queue_type,
num_iterations,
});
id += 1;
}
}
}
}
}
(paramsets, id)
}
pub fn is_min_queue_size_applicable(queue_type: &QueueType) -> bool {
matches!(
queue_type,
QueueType::PureCoinFlipping
| QueueType::PureRandomSampling
| QueueType::PermutedCoinFlipping
)
}
pub fn total_num_messages(&self) -> u32 {
self.num_sent_msgs.checked_mul(self.num_senders).unwrap()
}
pub fn as_csv_record(&self) -> Vec<String> {
let peering_degrees = self
.peering_degree_rates
.iter()
.map(|(degree, rate)| format!("({degree}:{rate})"))
.collect::<Vec<String>>()
.join(",");
vec![
self.id.to_string(),
self.num_nodes.to_string(),
format!("[{peering_degrees}]"),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_sent_msgs.to_string(),
self.num_senders.to_string(),
self.random_senders_every_time.to_string(),
format!("{:?}", self.queue_type),
self.num_iterations.to_string(),
]
}
pub fn gen_peering_degrees(&self, seed: u64) -> Vec<u32> {
let mut vec = Vec::with_capacity(self.num_nodes as usize);
self.peering_degree_rates.iter().for_each(|(degree, rate)| {
let num_nodes = std::cmp::min(
(self.num_nodes as f32 * rate).round() as u32,
self.num_nodes - vec.len() as u32,
);
vec.extend(std::iter::repeat(*degree).take(num_nodes as usize));
});
assert_eq!(vec.len(), self.num_nodes as usize);
vec.shuffle(&mut StdRng::seed_from_u64(seed));
vec
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use strum::IntoEnumIterator;
use crate::paramset::ParamSet;
use super::*;
#[test]
fn test_new_all_paramsets() {
let cases = vec![
(
(ExperimentId::Experiment1, SessionId::Session1),
3u32.pow(4),
),
(
(ExperimentId::Experiment2, SessionId::Session1),
3u32.pow(5),
),
(
(ExperimentId::Experiment3, SessionId::Session1),
3u32.pow(5),
),
(
(ExperimentId::Experiment4, SessionId::Session1),
3u32.pow(6),
),
(
(ExperimentId::Experiment5, SessionId::Session1),
3u32.pow(6),
),
(
(ExperimentId::Experiment1, SessionId::Session2),
3u32.pow(4),
),
(
(ExperimentId::Experiment4, SessionId::Session2),
3u32.pow(6),
),
(
(ExperimentId::Experiment5, SessionId::Session2),
3u32.pow(6),
),
(
(ExperimentId::Experiment1, SessionId::Session2_1),
3u32.pow(3),
),
(
(ExperimentId::Experiment4, SessionId::Session2_1),
3u32.pow(4),
),
(
(ExperimentId::Experiment5, SessionId::Session2_1),
3u32.pow(4),
),
(
(ExperimentId::Experiment5, SessionId::Session3),
3u32.pow(3),
),
];
for queue_type in QueueType::iter() {
for ((exp_id, session_id), mut expected_cnt) in cases.clone().into_iter() {
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
// Check if the number of parameter sets is correct
if !ParamSet::is_min_queue_size_applicable(&queue_type) {
expected_cnt /= 3;
}
assert_eq!(paramsets.len(), expected_cnt as usize);
// Check if all parameter sets are unique
let unique_paramsets: HashSet<Vec<String>> = paramsets
.iter()
.map(|paramset| paramset.as_csv_record())
.collect();
assert_eq!(unique_paramsets.len(), paramsets.len());
// Check if paramset IDs are correct.
if ParamSet::is_min_queue_size_applicable(&queue_type) {
for (i, paramset) in paramsets.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
}
}
}
}
#[test]
fn test_id_consistency() {
let cases = vec![
(ExperimentId::Experiment1, SessionId::Session1),
(ExperimentId::Experiment2, SessionId::Session1),
(ExperimentId::Experiment3, SessionId::Session1),
(ExperimentId::Experiment4, SessionId::Session1),
(ExperimentId::Experiment5, SessionId::Session1),
(ExperimentId::Experiment1, SessionId::Session2),
(ExperimentId::Experiment4, SessionId::Session2),
(ExperimentId::Experiment5, SessionId::Session2),
(ExperimentId::Experiment1, SessionId::Session2_1),
(ExperimentId::Experiment4, SessionId::Session2_1),
(ExperimentId::Experiment5, SessionId::Session2_1),
(ExperimentId::Experiment5, SessionId::Session3),
];
for (exp_id, session_id) in cases.into_iter() {
let paramsets_with_min_queue_size =
ParamSet::new_all_paramsets(exp_id, session_id, QueueType::PureCoinFlipping);
let paramsets_without_min_queue_size =
ParamSet::new_all_paramsets(exp_id, session_id, QueueType::NonMix);
for (i, paramset) in paramsets_with_min_queue_size.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
for mut paramset in paramsets_without_min_queue_size.into_iter() {
// To compare ParameterSet instances, use the same queue type.
paramset.queue_type = QueueType::PureCoinFlipping;
assert_eq!(
paramset,
paramsets_with_min_queue_size[paramset.id as usize - 1]
);
}
}
}
}

View File

@ -0,0 +1,29 @@
[package]
name = "ordering"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = "0.4.38"
clap = { version = "4.5.16", features = ["derive"] }
crossbeam = "0.8.4"
csv = "1.3.0"
protocol = { version = "0.1.0", path = "../protocol" }
rand = "0.8.5"
rustc-hash = "2.0.0"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
walkdir = "2.3"
glob = "0.3"
polars = { version = "0.42.0", features = [
"csv",
"diagonal_concat",
"polars-io",
"zip_with",
] }
[profile.release]
opt-level = 3 # max optimization
[dev-dependencies]
strum = "0.26.3"

91
mixnet/ordering/README.md Normal file
View File

@ -0,0 +1,91 @@
# Queuing Mechanism: Ordering Experiments
This directory contains the code for the [Ordering Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4d38e8790ecd492a812c733bf140b864), which is the part of the Queuing Mechanism Experiments.
## Usages
```bash
cargo install --path ordering
ordering --exp-id 1 --session-id 1 --queue-type PureCoinFlipping --outdir $PWD/out --num-threads 4
```
- `exp-id`: [Experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ffbcb5071dbb482bad035ef01cf8d49d) ID (starting from 1)
- `session-id`: [Session](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#df7de2a64b1e4e778c5d793bf03be25e) ID (starting from 1)
- `queue-type`: NonMix, PureCoinFlipping, PureRandomSampling, PermutedCoinFlipping, NoisyCoinFlipping, or NoisyCoinFlippingRandomRelease
- `outdir`: Output directory
- `num-threads`: The number of threads to run each iteration of the parameter sets of each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ffbcb5071dbb482bad035ef01cf8d49d)
## Outputs
```
<outdir>/
ordering_e1s1_PureCoinFlipping_2024-09-16T09:18:59.482141+00:00_0d0h0m10s/
paramset_[1..P]/
paramset.csv
iteration_[0..I]_0d0h0m0s/
topology.csv
latency.csv
data_msg_counts.csv
sent_seq_[0..S].csv
recv_seq_[0..R].csv
```
- `paramset_[1..P]/`: Each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ffbcb5071dbb482bad035ef01cf8d49d) consists of multiple parameter sets. The result of each parameter set is stored in a separate directory.
- `paramset.csv`: The detailed parameters of the parameter set.
- `iteration_[0..I]_0d0h0m0s`: The result of each iteration (with the elapsed time)
- `toplogy.csv`: The randomly generated topology of the network
- `latency.csv`: The latency for each message to be delivered from the sender to the receiver
- `data_msg_counts.csv`: The number of data messages staying in each queue in each time window
- `sent_seq_[0..S].csv`: The sequence of sent messages by the sender
- `sent_seq_[0..R].csv`: The sequence of received messages by the receiver
## Aggregation Tools
Since the ordering experiments are heavy, the aggregation tools are provided separately to aggregate the results of all experiments ans parameter sets after the experiments are done.
### Latency Aggregation
This tool reads all `**/iteration_*/latency.csv` files and aggregates all latencies into a single CSV file: `**/paramset_[1..P]/latency_stats.csv`, as below.
```csv
min,median,mean,std,max
0,93.0,123.07003891050584,109.38605760356785,527
```
### Data Message Counts Aggregation
This tool reads all `**/iteration_*/data_msg_counts.csv` files and aggregates all counts into a single CSV file: `**/paramset_[1..P]/data_msg_counts_stats.csv`, as below.
```csv
min,median,mean,std,max
0,1.0,9.231619223429988,31.290104671648148,289
```
### Ordering Coefficient Calculation
This tool is not an aggregation tool, but it calculates the [strong/casual/weak ordering coefficients](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ee984d48bd6b4fe3b2acc1000e4ae77b)
from the `**/iteration_*/sent_seq_*.csv` and `**/iteration_*/recv_seq_*.csv` files.
The result is stored in CSV files: `**/iteration_*/coeffs_[sender_id]_[receiver_id].csv`, as below.
```csv
sender,receiver,strong,casual,weak
0,1,0,1,4
```
### Ordering Coefficient Aggregation
This tool reads all `**/iteration_*/coeffs_*_*.csv` files (calculated [above](#ordering-coefficient-calculation)) and aggregates all coefficients into three CSV file: `**/paramset_[1..P]/[strong|casual|weak]_coeff_stats.csv`, as below.
```csv
min,median,mean,std,max
0,0.0,0.25,0.4442616583193193,1
```
### Final Aggregation Across All Experiments
This tool reads all of the following files:
- `**/latency_stats.csv`
- `**/data_msg_counts_stats.csv`
- `**/[strong|casual|weak]_coeff_stats.csv`
and aggregates them into a single CSV file: `aggregated.csv`, as below.
```csv
paramset,num_mixes,num_paths,random_topology,peering_degree,min_queue_size,transmission_rate,num_senders,num_sender_msgs,sender_data_msg_prob,mix_data_msg_prob,queue_type,num_iterations,data_msg_count_min,data_msg_count_median,data_msg_count_mean,data_msg_count_std,data_msg_count_max,latency_min,latency_median,latency_mean,latency_std,latency_max,strong_coeff_min,strong_coeff_median,strong_coeff_mean,strong_coeff_std,strong_coeff_max,casual_coeff_min,casual_coeff_median,casual_coeff_mean,casual_coeff_std,casual_coeff_max,weak_coeff_min,weak_coeff_median,weak_coeff_mean,weak_coeff_std,weak_coeff_max
1,8,0,true,2,10,1,1,10000,0.01,0.0625,NoisyCoinFlipping,10,0,1.0,9.231619223429988,31.290104671648148,289,0,93.0,123.07003891050584,109.38605760356785,527,0,0.0,0.25,0.4442616583193193,1,0,0.0,0.45,0.6863327411532597,2,0,2.0,1.85,1.5312533566021211,5
...
```
This `aggregated.csv` file can be analyzed by the [analysis tools](../analysis).

View File

@ -0,0 +1,127 @@
use std::{
env,
fs::File,
path::{Path, PathBuf},
};
use polars::prelude::*;
use walkdir::WalkDir;
fn aggregate(path: &str) {
let mut schema = Schema::new();
schema.with_column("paramset".into(), DataType::Int64);
schema.with_column("num_mixes".into(), DataType::Int64);
schema.with_column("num_paths".into(), DataType::Int64);
schema.with_column("random_topology".into(), DataType::Boolean);
schema.with_column("peering_degree".into(), DataType::String);
schema.with_column("min_queue_size".into(), DataType::Int64);
schema.with_column("transmission_rate".into(), DataType::Int64);
schema.with_column("num_senders".into(), DataType::Int64);
schema.with_column("num_sender_msgs".into(), DataType::Int64);
schema.with_column("sender_data_msg_prob".into(), DataType::Float32);
schema.with_column("mix_data_msg_prob".into(), DataType::Float32);
schema.with_column("queue_type".into(), DataType::String);
schema.with_column("num_iterations".into(), DataType::Int64);
let mut dataframes: Vec<DataFrame> = Vec::new();
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut df = CsvReadOptions::default()
.with_has_header(true)
.with_schema(Some(SchemaRef::new(schema.clone())))
.try_into_reader_with_file_path(Some(entry.path().join("paramset.csv")))
.unwrap()
.finish()
.unwrap();
add_stats_columns(
&mut df,
entry.path().join("data_msg_counts_stats.csv"),
"data_msg_count_",
);
add_stats_columns(&mut df, entry.path().join("latency_stats.csv"), "latency_");
for prefix in ["strong", "casual", "weak"] {
let coeff_stats_path = entry.path().join(format!("{}_coeff_stats.csv", prefix));
if coeff_stats_path.exists() {
add_stats_columns(&mut df, coeff_stats_path, &format!("{}_coeff_", prefix));
}
}
dataframes.push(df);
}
}
if !dataframes.is_empty() {
let df = polars::functions::concat_df_diagonal(dataframes.as_slice()).unwrap();
let mut df = df
.sort(["paramset", "queue_type"], SortMultipleOptions::default())
.unwrap();
let outpath = Path::new(path).join("aggregated.csv");
let mut file = File::create(&outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath.display());
}
}
fn add_stats_columns(df: &mut DataFrame, path: PathBuf, col_prefix: &str) {
let mut schema = Schema::new();
schema.with_column("min".into(), DataType::UInt64);
schema.with_column("median".into(), DataType::Float64);
schema.with_column("mean".into(), DataType::Float64);
schema.with_column("std".into(), DataType::Float64);
schema.with_column("max".into(), DataType::UInt64);
let stats_df = CsvReadOptions::default()
.with_has_header(true)
.with_schema(Some(SchemaRef::new(schema)))
.try_into_reader_with_file_path(Some(path))
.unwrap()
.finish()
.unwrap();
df.with_column(
stats_df["min"]
.head(Some(1))
.with_name(format!("{col_prefix}min").as_str()),
)
.unwrap();
df.with_column(
stats_df["median"]
.head(Some(1))
.with_name(format!("{col_prefix}median").as_str()),
)
.unwrap();
df.with_column(
stats_df["mean"]
.head(Some(1))
.with_name(format!("{col_prefix}mean").as_str()),
)
.unwrap();
df.with_column(
stats_df["std"]
.head(Some(1))
.with_name(format!("{col_prefix}std").as_str()),
)
.unwrap();
df.with_column(
stats_df["max"]
.head(Some(1))
.with_name(format!("{col_prefix}max").as_str()),
)
.unwrap();
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}

View File

@ -0,0 +1,522 @@
use std::{fs::File, path::PathBuf};
use clap::Parser;
use glob::glob;
use polars::prelude::*;
use walkdir::WalkDir;
use ordering::message::{DataMessage, SenderIdx};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Entry {
Data(DataMessage),
Noise(u32), // the number of consecutive noises
}
fn load_sequence(path: &str) -> Vec<Entry> {
let mut entries: Vec<Entry> = Vec::new();
let mut reader = csv::ReaderBuilder::new()
.has_headers(false)
.from_path(path)
.unwrap();
for result in reader.records() {
let record = result.unwrap();
let value = &record[0];
if let Ok(num) = value.parse::<i32>() {
assert!(num < 0);
entries.push(Entry::Noise(num.unsigned_abs()));
} else {
entries.push(Entry::Data(parse_data_msg(value)));
}
}
entries
}
fn parse_data_msg(value: &str) -> DataMessage {
let parts: Vec<&str> = value.split(':').collect();
assert_eq!(parts.len(), 2);
DataMessage {
sender: parts[0].parse::<SenderIdx>().unwrap(),
msg_id: parts[1].parse::<u32>().unwrap(),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum OrderingType {
Strong,
Casual,
Weak,
}
fn coeff(seq1: &[Entry], seq2: &[Entry], ordering_type: OrderingType) -> u64 {
let mut coeff: u64 = 0;
let mut i = 0;
while i < seq1.len() {
if let Entry::Data(_) = &seq1[i] {
let (c, next_i) = coeff_from(seq1, i, seq2, ordering_type);
coeff = coeff.saturating_add(c);
if coeff == u64::MAX {
// no need to continue
break;
}
if next_i != i {
i = next_i;
} else {
i += 1;
}
} else {
i += 1;
}
}
coeff
}
fn coeff_from(
seq1: &[Entry],
start_idx: usize,
seq2: &[Entry],
ordering_type: OrderingType,
) -> (u64, usize) {
let msg1 = match seq1[start_idx] {
Entry::Data(msg) => msg,
_ => panic!("Entry at index {start_idx} must be Message"),
};
for (j, entry) in seq2.iter().enumerate() {
if let Entry::Data(msg2) = entry {
if msg1 == *msg2 {
// Found the 1st matching msg. Start finding the next adjacent matching msg.
match ordering_type {
OrderingType::Strong => {
return strong_coeff_from(seq1, start_idx, seq2, j);
}
OrderingType::Casual => {
return casual_coeff_from(seq1, start_idx, seq2, j);
}
OrderingType::Weak => {
return weak_coeff_from(seq1, start_idx, seq2, j);
}
}
}
}
}
// Couldn't find any matching msg in seq2. Returning the zero coefficients and the same start index.
(0, start_idx)
}
fn strong_coeff_from(
seq1: &[Entry],
start_idx: usize,
seq2: &[Entry],
seq2_start_idx: usize,
) -> (u64, usize) {
// Find the number of consecutive matching exactly pairs that don't contain noises.
let mut num_consecutive_pairs: u64 = 0;
let mut i = start_idx + 1;
let mut j = seq2_start_idx + 1;
while i < seq1.len() && j < seq2.len() {
match (&seq1[i], &seq2[j]) {
(Entry::Data(msg1), Entry::Data(msg2)) => {
if msg1 == msg2 {
num_consecutive_pairs += 1;
i += 1;
j += 1;
} else {
break;
}
}
_ => break,
}
}
let coeff = if num_consecutive_pairs == 0 {
0
} else {
num_consecutive_pairs.saturating_pow(num_consecutive_pairs.try_into().unwrap())
};
(coeff, i)
}
fn casual_coeff_from(
seq1: &[Entry],
start_idx: usize,
seq2: &[Entry],
seq2_start_idx: usize,
) -> (u64, usize) {
// Find the number of consecutive matching pairs while accounting for noises.
let mut coeff = 0;
let mut i = start_idx + 1;
let mut j = seq2_start_idx + 1;
while i < seq1.len() && j < seq2.len() {
match (&seq1[i], &seq2[j]) {
(Entry::Noise(cnt1), Entry::Noise(cnt2)) => {
if cnt1 == cnt2 {
i += 1;
j += 1;
} else {
break;
}
}
(Entry::Data(msg1), Entry::Data(msg2)) => {
if msg1 == msg2 {
coeff += 1;
i += 1;
j += 1;
} else {
break;
}
}
_ => break,
}
}
(coeff, i)
}
fn weak_coeff_from(
seq1: &[Entry],
start_idx: usize,
seq2: &[Entry],
seq2_start_idx: usize,
) -> (u64, usize) {
// Find the number of consecutive matching pairs with ignoring noises.
let mut coeff = 0;
let mut i = start_idx + 1;
let mut j = seq2_start_idx + 1;
while i < seq1.len() && j < seq2.len() {
i = skip_noise(seq1, i);
j = skip_noise(seq2, j);
if i < seq1.len() && j < seq2.len() && seq1[i] == seq2[j] {
coeff += 1;
i += 1;
j += 1;
} else {
break;
}
}
(coeff, i)
}
fn skip_noise(seq: &[Entry], mut index: usize) -> usize {
while index < seq.len() {
if let Entry::Data(_) = seq[index] {
break;
}
index += 1;
}
index
}
#[derive(Debug, Parser)]
#[command(name = "Calculating ordering coefficients")]
struct Args {
#[arg(short, long)]
path: String,
#[arg(short, long)]
num_threads: usize,
#[arg(short, long)]
sent_seq_limit: Option<usize>,
}
fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
calculate_coeffs(&args);
}
fn calculate_coeffs(args: &Args) {
let mut tasks: Vec<Task> = Vec::new();
for entry in WalkDir::new(args.path.as_str())
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("iteration_") {
for sent_seq_file in glob(&format!("{}/sent_seq_*.csv", entry.path().display()))
.unwrap()
.filter_map(Result::ok)
{
let sender =
extract_id(&sent_seq_file.file_name().unwrap().to_string_lossy()).unwrap();
for recv_seq_file in glob(&format!("{}/recv_seq_*.csv", entry.path().display()))
.unwrap()
.filter_map(Result::ok)
{
let receiver =
extract_id(&recv_seq_file.file_name().unwrap().to_string_lossy()).unwrap();
let task = Task {
sent_seq_file: sent_seq_file.clone(),
recv_seq_file: recv_seq_file.clone(),
sender,
receiver,
outpath: entry
.path()
.join(format!("coeffs_{}_{}.csv", sender, receiver)),
sent_seq_limit: args.sent_seq_limit,
};
tasks.push(task);
}
}
}
}
let (task_tx, task_rx) = crossbeam::channel::unbounded::<Task>();
let mut threads = Vec::with_capacity(args.num_threads);
for _ in 0..args.num_threads {
let task_rx = task_rx.clone();
let thread = std::thread::spawn(move || {
while let Ok(task) = task_rx.recv() {
task.run();
}
});
threads.push(thread);
}
for task in tasks {
task_tx.send(task).unwrap();
}
// Close the task sender channel, so that the threads can know that there's no task remains.
drop(task_tx);
for thread in threads {
thread.join().unwrap();
}
}
fn extract_id(filename: &str) -> Option<u8> {
if let Some(stripped) = filename.strip_suffix(".csv") {
if let Some(stripped) = stripped.strip_prefix("sent_seq_") {
return stripped.parse::<u8>().ok();
} else if let Some(stripped) = stripped.strip_prefix("recv_seq_") {
return stripped.parse::<u8>().ok();
}
}
None
}
struct Task {
sent_seq_file: PathBuf,
recv_seq_file: PathBuf,
sender: u8,
receiver: u8,
outpath: PathBuf,
sent_seq_limit: Option<usize>,
}
impl Task {
fn run(&self) {
tracing::info!(
"Processing:\n {}\n {}",
self.sent_seq_file.display(),
self.recv_seq_file.display()
);
let mut sent_seq = load_sequence(self.sent_seq_file.to_str().unwrap());
let recv_seq = load_sequence(self.recv_seq_file.to_str().unwrap());
if let Some(limit) = self.sent_seq_limit {
if sent_seq.len() > limit {
sent_seq.truncate(limit);
}
}
let strong = coeff(&sent_seq, &recv_seq, OrderingType::Strong);
let casual = coeff(&sent_seq, &recv_seq, OrderingType::Casual);
let weak = coeff(&sent_seq, &recv_seq, OrderingType::Weak);
let mut df = DataFrame::new(vec![
Series::new("sender", &[self.sender as u64]),
Series::new("receiver", &[self.receiver as u64]),
Series::new("strong", &[strong]),
Series::new("casual", &[casual]),
Series::new("weak", &[weak]),
])
.unwrap()
.sort(["sender", "receiver"], SortMultipleOptions::default())
.unwrap();
let mut file = File::create(&self.outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
tracing::info!("Saved {}", self.outpath.display());
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_strong_coeff() {
// Empty sequences
assert_eq!(coeff(&[], &[], OrderingType::Strong), 0);
// One matching pair without noise
let seq = vec![data(1), data(2)];
assert_eq!(coeff(&seq, &seq, OrderingType::Strong), 1);
// No matching pair due to noise
let seq = vec![data(1), noise(10), data(2)];
assert_eq!(coeff(&seq, &seq, OrderingType::Strong), 0);
// One matching pair without noise from different sequences
let seq1 = vec![data(1), data(2), data(3)];
let seq2 = vec![data(1), data(2), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 1);
let seq1 = vec![data(4), data(2), data(3)];
let seq2 = vec![data(1), data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 1);
// One pair, not two because of noise
let seq1 = vec![data(1), noise(10), data(2), data(3)];
let seq2 = vec![data(1), noise(10), data(2), data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 1);
// No match
let seq1 = vec![data(1), data(2)];
let seq2 = vec![data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 0);
let seq1 = vec![data(1), data(2)];
let seq2 = vec![data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 0);
// Matching pairs in different indexes
let seq1 = vec![data(1), data(2), data(3), data(4)];
let seq2 = vec![data(2), data(3), data(4), data(1)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 4);
let seq1 = vec![data(1), data(2), data(3), data(4)];
let seq2 = vec![data(1), data(2), data(5), data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 2);
}
#[test]
fn test_casual_coeff() {
// Empty sequences
assert_eq!(coeff(&[], &[], OrderingType::Casual), 0);
// One matching pair without noise
let seq = vec![data(1), data(2)];
assert_eq!(coeff(&seq, &seq, OrderingType::Casual), 1);
// One matching pair with noise
let seq = vec![data(1), noise(10), data(2)];
assert_eq!(coeff(&seq, &seq, OrderingType::Casual), 1);
// One matching pair without noise from different sequences
let seq1 = vec![data(1), data(2), data(3)];
let seq2 = vec![data(1), data(2), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1);
let seq1 = vec![data(4), data(2), data(3)];
let seq2 = vec![data(1), data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1);
// One matching pair with noise from different sequences
let seq1 = vec![data(1), noise(10), data(2), data(3)];
let seq2 = vec![data(1), noise(10), data(2), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1);
let seq1 = vec![data(4), data(2), noise(10), data(3)];
let seq2 = vec![data(1), data(2), noise(10), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1);
// Two pairs with noise
let seq1 = vec![data(1), noise(10), data(2), data(3)];
let seq2 = vec![data(1), noise(10), data(2), data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 2);
// No match
let seq1 = vec![data(1), data(2)];
let seq2 = vec![data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0);
let seq1 = vec![data(1), data(2)];
let seq2 = vec![data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0);
// No match because of noise
let seq1 = vec![data(1), noise(10), data(2)];
let seq2 = vec![data(1), data(2)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0);
let seq1 = vec![data(1), noise(10), data(2)];
let seq2 = vec![data(1), noise(5), data(2)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0);
// Matching pairs in different indexes
let seq1 = vec![data(1), data(2), data(3), data(4)];
let seq2 = vec![data(2), data(3), data(4), data(1)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 2);
let seq1 = vec![data(1), data(2), data(3), data(4)];
let seq2 = vec![data(1), data(2), data(5), data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 2);
}
#[test]
fn test_weak_coeff() {
// Empty sequences
assert_eq!(coeff(&[], &[], OrderingType::Weak), 0);
// One matching pair without noise
let seq = vec![data(1), data(2)];
assert_eq!(coeff(&seq, &seq, OrderingType::Weak), 1);
// One matching pair with noise
let seq = vec![data(1), noise(10), data(2)];
assert_eq!(coeff(&seq, &seq, OrderingType::Weak), 1);
// One matching pair without noise from different sequences
let seq1 = vec![data(1), data(2), data(3)];
let seq2 = vec![data(1), data(2), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1);
let seq1 = vec![data(4), data(2), data(3)];
let seq2 = vec![data(1), data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1);
// One matching pair with noise from different sequences
let seq1 = vec![data(1), noise(10), data(2), data(3)];
let seq2 = vec![data(1), noise(5), data(2), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1);
let seq1 = vec![data(4), data(2), noise(10), data(3)];
let seq2 = vec![data(1), data(2), noise(5), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1);
let seq1 = vec![data(4), data(2), noise(10), data(3)];
let seq2 = vec![data(1), data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1);
// Two pairs with noise
let seq1 = vec![data(1), noise(10), data(2), data(3)];
let seq2 = vec![data(1), noise(5), data(2), data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 2);
// No match
let seq1 = vec![data(1), data(2)];
let seq2 = vec![data(2), data(3)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 0);
let seq1 = vec![data(1), data(2)];
let seq2 = vec![data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 0);
// Matching pairs in different indexes
let seq1 = vec![data(1), data(2), data(3), data(4)];
let seq2 = vec![data(2), data(3), data(4), data(1)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 2);
let seq1 = vec![data(1), data(2), data(3), data(4)];
let seq2 = vec![data(1), data(2), data(5), data(3), data(4)];
assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 2);
}
fn data(msg_id: u32) -> Entry {
Entry::Data(DataMessage { sender: 0, msg_id })
}
fn noise(count: u32) -> Entry {
Entry::Noise(count)
}
}

View File

@ -0,0 +1,95 @@
use glob::glob;
use polars::prelude::*;
use std::env;
use std::fs::File;
use walkdir::WalkDir;
fn aggregate(path: &str) {
let mut input_schema = Schema::new();
input_schema.with_column("sender".into(), DataType::UInt64);
input_schema.with_column("receiver".into(), DataType::UInt64);
input_schema.with_column("strong".into(), DataType::UInt64);
input_schema.with_column("casual".into(), DataType::UInt64);
input_schema.with_column("weak".into(), DataType::UInt64);
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut strongs = Series::new_empty("", &DataType::UInt64);
let mut casuals = Series::new_empty("", &DataType::UInt64);
let mut weaks = Series::new_empty("", &DataType::UInt64);
let pattern = format!("{}/**/coeffs_*.csv", entry.path().display());
for file in glob(&pattern).unwrap().filter_map(Result::ok) {
let df = CsvReadOptions::default()
.with_has_header(true)
.with_schema(Some(SchemaRef::new(input_schema.clone())))
.try_into_reader_with_file_path(Some(file.clone()))
.unwrap()
.finish()
.unwrap();
extend_series(&mut strongs, &df, "strong");
extend_series(&mut casuals, &df, "casual");
extend_series(&mut weaks, &df, "weak");
println!("Processed {}", file.display());
}
save_stats(
&strongs,
&format!("{}/strong_coeff_stats.csv", entry.path().display()),
);
save_stats(
&casuals,
&format!("{}/casual_coeff_stats.csv", entry.path().display()),
);
save_stats(
&weaks,
&format!("{}/weak_coeff_stats.csv", entry.path().display()),
);
}
}
}
fn extend_series(series: &mut Series, df: &DataFrame, column: &str) {
series
.extend(
&df.column(column)
.unwrap()
.u64()
.unwrap()
.clone()
.into_series(),
)
.unwrap();
}
fn save_stats(aggregated: &Series, outpath: &str) {
let mut df = DataFrame::new(vec![
Series::new("min", &[aggregated.min::<u64>().unwrap()]),
Series::new("median", &[aggregated.median().unwrap()]),
Series::new("mean", &[aggregated.mean().unwrap()]),
Series::new("std", &[aggregated.std(1).unwrap()]),
Series::new("max", &[aggregated.max::<u64>().unwrap()]),
])
.unwrap();
let mut file = File::create(outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}

View File

@ -0,0 +1,72 @@
use glob::glob;
use polars::prelude::*;
use std::env;
use std::fs::File;
use walkdir::WalkDir;
fn aggregate(path: &str) {
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut aggregated_series = Series::new_empty("", &DataType::Int64);
let pattern = format!("{}/**/data_msg_counts.csv", entry.path().display());
for file in glob(&pattern).unwrap().filter_map(Result::ok) {
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(file.clone()))
.unwrap()
.finish()
.unwrap();
// Drop the 'vtime' column and collect all remaining value columns
let df_without_vtime = df.drop("vtime").unwrap_or_else(|_| df.clone());
for col in df_without_vtime.get_columns() {
aggregated_series
.extend(&col.i64().unwrap().clone().into_series())
.unwrap();
}
println!("Processed {}", file.display());
}
let output_file = format!("{}/data_msg_counts_stats.csv", entry.path().display());
save_stats(&aggregated_series, &output_file);
}
}
}
fn save_stats(aggregated: &Series, outpath: &str) {
let min = aggregated.min::<i64>().unwrap();
let max = aggregated.max::<i64>().unwrap();
let mean = aggregated.mean().unwrap();
let median = aggregated.median().unwrap();
let std = aggregated.std(1).unwrap();
let mut df = DataFrame::new(vec![
Series::new("min", &[min]),
Series::new("median", &[median]),
Series::new("mean", &[mean]),
Series::new("std", &[std]),
Series::new("max", &[max]),
])
.unwrap();
let mut file = File::create(outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}

View File

@ -0,0 +1,75 @@
use glob::glob;
use polars::prelude::*;
use std::env;
use std::fs::File;
use walkdir::WalkDir;
fn aggregate(path: &str) {
for entry in WalkDir::new(path)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_dir())
{
let dir_name = entry.path().file_name().unwrap().to_string_lossy();
if dir_name.starts_with("paramset_") {
let mut aggregated_series = Series::new_empty("", &DataType::Int64);
let pattern = format!("{}/**/latency.csv", entry.path().display());
for file in glob(&pattern).unwrap().filter_map(Result::ok) {
let df = CsvReadOptions::default()
.with_has_header(true)
.try_into_reader_with_file_path(Some(file.clone()))
.unwrap()
.finish()
.unwrap();
aggregated_series
.extend(
&df.column("latency")
.unwrap()
.i64()
.unwrap()
.clone()
.into_series(),
)
.unwrap();
println!("Processed {}", file.display());
}
let output_file = format!("{}/latency_stats.csv", entry.path().display());
save_stats(&aggregated_series, &output_file);
}
}
}
fn save_stats(aggregated: &Series, outpath: &str) {
let min = aggregated.min::<i64>().unwrap();
let max = aggregated.max::<i64>().unwrap();
let mean = aggregated.mean().unwrap();
let median = aggregated.median().unwrap();
let std = aggregated.std(1).unwrap();
let mut df = DataFrame::new(vec![
Series::new("min", &[min]),
Series::new("median", &[median]),
Series::new("mean", &[mean]),
Series::new("std", &[std]),
Series::new("max", &[max]),
])
.unwrap();
let mut file = File::create(outpath).unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
println!("Saved {}", outpath);
}
fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
eprintln!("Usage: {} <path>", args[0]);
std::process::exit(1);
}
let path = &args[1];
aggregate(path);
}

View File

@ -0,0 +1,226 @@
use std::{collections::hash_map::Entry, time::SystemTime};
use protocol::{
node::{MessagesToRelay, Node, NodeId},
queue::Message,
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
format_duration,
outputs::Outputs,
paramset::ParamSet,
topology::{build_random_network, build_striped_network, RECEIVER_NODE_ID},
};
use ordering::message::{DataMessage, DataMessageGenerator};
const QUEUE_DATA_MSG_COUNT_MEASUREMENT_INTERVAL: f32 = 100.0;
pub struct Iteration {
pub paramset: ParamSet,
pub iteration_idx: usize,
pub paramset_dir: String,
}
impl Iteration {
pub fn start(&mut self) {
let dir = format!(
"{}/iteration_{}__WIP_DUR__",
self.paramset_dir, self.iteration_idx
);
std::fs::create_dir_all(dir.as_str()).unwrap();
let mut outputs = Outputs::new(
format!("{dir}/latency__WIP__.csv"),
(0..self.paramset.num_senders)
.map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv"))
.collect(),
(0..self.paramset.num_sender_or_receiver_conns())
.map(|conn_idx| format!("{dir}/recv_seq_{conn_idx}__WIP__.csv"))
.collect(),
format!("{dir}/data_msg_counts__WIP__.csv"),
format!("{dir}/topology.csv"),
);
let start_time = SystemTime::now();
let vtime = self.run(self.iteration_idx as u64, &mut outputs);
outputs.close();
outputs.rename_paths("__WIP__.csv", ".csv");
let duration = format_duration(SystemTime::now().duration_since(start_time).unwrap());
let new_dir = dir.replace("__WIP_DUR__", &format!("_{duration}"));
std::fs::rename(dir, new_dir).unwrap();
tracing::info!(
"ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}",
self.paramset.id,
self.iteration_idx,
duration,
vtime
);
}
fn run(&mut self, seed: u64, outputs: &mut Outputs) -> f32 {
let paramset = &self.paramset;
let (mut mixnodes, all_sender_peers, receiver_peers) = if paramset.random_topology {
build_random_network(paramset, seed, outputs)
} else {
build_striped_network(paramset, seed)
};
// Check node ID consistency
for (i, node) in mixnodes.iter().enumerate() {
assert_eq!(node.id as usize, i);
}
// For N senders + 1 mix (all mixnodes will share the same sender ID)
let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1);
let mix_msg_sender_id = paramset.num_senders;
// Virtual discrete time
let mut vtime: f32 = 0.0;
let mut recent_vtime_queue_data_msg_count_measured: f32 = 0.0;
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Results
let mut all_sent_count = 0; // all data + noise sent by all senders
let all_sent_count_target = (paramset.num_sender_msgs as usize)
.checked_mul(paramset.num_senders as usize)
.unwrap();
let mut sent_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
let mut recv_data_msgs: FxHashMap<DataMessage, f32> = FxHashMap::default();
outputs.write_header_queue_data_msg_counts(&mixnodes);
let mut data_msg_rng = StdRng::seed_from_u64(seed);
loop {
tracing::trace!(
"VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}",
vtime,
all_sent_count,
sent_data_msgs.len(),
recv_data_msgs.len(),
);
// All senders emit a message (data or noise) to all of their own adjacent peers.
if all_sent_count < all_sent_count_target {
// For each sender
for (sender_idx, sender_peers) in all_sender_peers.iter() {
if Self::try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) {
let msg = data_msg_gen.next(sender_idx);
sender_peers.iter().for_each(|peer_id| {
mixnodes
.get_mut(*peer_id as usize)
.unwrap()
.receive(msg, None);
});
sent_data_msgs.insert(msg, vtime);
outputs.add_sent_msg(&msg)
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix nodes
// because the mix nodes will anyway drop the noise,
// and we don't need to record what the mix nodes receive.
outputs.add_sent_noise(sender_idx);
}
all_sent_count += 1;
}
}
// Each mix node add a new data message to its queue with a certain probability
if paramset.mix_data_msg_prob > 0.0 {
for node in mixnodes.iter_mut() {
if Self::try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) {
node.send(data_msg_gen.next(mix_msg_sender_id));
// We don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the senders, not the mixnode.
}
}
}
// Each mix node relays a message (data or noise) to the next mix node or the receiver.
// As the receiver, record the time and order of the received messages.
AllMessagesToRelay::new(&mut mixnodes).into_iter().for_each(
|(relayer_id, msgs_to_relay)| {
msgs_to_relay.into_iter().for_each(|(peer_id, msg)| {
if peer_id == RECEIVER_NODE_ID {
match msg {
Message::Data(msg) => {
// If msg was sent by the sender (not by any mix)
if let Some(&sent_time) = sent_data_msgs.get(&msg) {
// If this is the first time to see the msg,
// update stats that must ignore duplicate messages.
if let Entry::Vacant(e) = recv_data_msgs.entry(msg) {
e.insert(vtime);
outputs.add_latency(&msg, sent_time, vtime);
}
}
// Record msg to the sequence
let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap();
outputs.add_recv_msg(&msg, conn_idx);
}
Message::Noise => {
// Record noise to the sequence
let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap();
outputs.add_recv_noise(conn_idx);
}
}
} else if let Message::Data(msg) = msg {
let peer = mixnodes.get_mut(peer_id as usize).unwrap();
assert_eq!(peer.id, peer_id);
peer.receive(msg, Some(relayer_id));
}
});
},
);
// Record the number of data messages in each mix node's queues
if vtime == 0.0
|| vtime - recent_vtime_queue_data_msg_count_measured
>= QUEUE_DATA_MSG_COUNT_MEASUREMENT_INTERVAL
{
outputs.add_queue_data_msg_counts(vtime, &mixnodes);
recent_vtime_queue_data_msg_count_measured = vtime;
}
// If all senders finally emitted all data+noise messages,
// and If all data messages have been received by the receiver,
// stop the iteration.
if all_sent_count == all_sent_count_target
&& sent_data_msgs.len() == recv_data_msgs.len()
{
break;
}
vtime += transmission_interval;
}
vtime
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
assert!(
(0.0..=1.0).contains(&prob),
"Probability must be in [0, 1]."
);
rng.gen::<f32>() < prob
}
}
struct AllMessagesToRelay(Vec<(NodeId, MessagesToRelay<DataMessage>)>);
impl AllMessagesToRelay {
fn new(mixnodes: &mut [Node<DataMessage>]) -> Self {
let mut all_msgs_to_relay = Vec::with_capacity(mixnodes.len());
for node in mixnodes.iter_mut() {
all_msgs_to_relay.push((node.id, node.read_queues()));
}
Self(all_msgs_to_relay)
}
fn into_iter(self) -> impl Iterator<Item = (NodeId, Vec<(NodeId, Message<DataMessage>)>)> {
self.0.into_iter()
}
}

View File

@ -0,0 +1 @@
pub mod message;

227
mixnet/ordering/src/main.rs Normal file
View File

@ -0,0 +1,227 @@
mod iteration;
mod outputs;
mod paramset;
mod sequence;
mod topology;
use std::{
collections::{hash_map::Entry, HashMap},
error::Error,
path::Path,
time::{Duration, SystemTime},
};
use chrono::Utc;
use clap::Parser;
use iteration::Iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
use protocol::queue::QueueType;
#[derive(Debug, Parser)]
#[command(name = "Ordering Measurement")]
struct Args {
#[arg(short, long)]
exp_id: ExperimentId,
#[arg(short, long)]
session_id: SessionId,
#[arg(short, long)]
queue_type: QueueType,
#[arg(short, long)]
outdir: String,
#[arg(short, long)]
num_threads: usize,
#[arg(short, long, default_value_t = false)]
reverse_order: bool,
#[arg(short, long)]
from_paramset: Option<u16>,
#[arg(short, long)]
to_paramset: Option<u16>,
}
fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
tracing::info!("Arguments: {:?}", args);
let Args {
exp_id,
session_id,
queue_type,
outdir,
num_threads,
reverse_order,
from_paramset,
to_paramset,
} = args;
// Create a directory and initialize a CSV file only with a header
assert!(
Path::new(&outdir).is_dir(),
"Output directory does not exist: {outdir}"
);
let subdir = format!(
"__WIP__ordering_e{}s{}_{:?}_{}___DUR__",
exp_id as u8,
session_id as u8,
queue_type,
Utc::now().to_rfc3339()
);
let rootdir = format!("{outdir}/{subdir}");
std::fs::create_dir_all(&rootdir).unwrap();
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
let session_start_time = SystemTime::now();
let iterations = prepare_all_iterations(
&paramsets,
from_paramset,
to_paramset,
reverse_order,
&rootdir,
);
run_all_iterations(iterations, num_threads, paramsets.len());
let session_duration = SystemTime::now()
.duration_since(session_start_time)
.unwrap();
// Replace "__WIP__" and "__DUR__" in the subdir string
let new_subdir = subdir
.replace("__WIP__", "")
.replace("__DUR__", &format_duration(session_duration));
let old_path = format!("{}/{}", outdir, subdir);
let new_path = format!("{}/{}", outdir, new_subdir);
assert!(
!Path::new(&new_path).exists(),
"The new directory already exists: {new_path}"
);
std::fs::rename(&old_path, &new_path)
.expect("Failed to rename the directory: {old_path} -> {new_path}");
tracing::info!("Session completed.");
}
fn prepare_all_iterations(
paramsets: &[ParamSet],
from_paramset: Option<u16>,
to_paramset: Option<u16>,
reverse_order: bool,
rootdir: &str,
) -> Vec<Iteration> {
let mut iterations: Vec<Iteration> = Vec::new();
for paramset in paramsets.iter() {
if paramset.id < from_paramset.unwrap_or(0) {
tracing::info!("ParamSet:{} skipped", paramset.id);
continue;
} else if paramset.id > to_paramset.unwrap_or(u16::MAX) {
tracing::info!("ParamSets:{}~ skipped", paramset.id);
break;
}
let paramset_dir = format!("{rootdir}/__WIP__paramset_{}", paramset.id);
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
save_paramset_info(paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
for i in 0..paramset.num_iterations {
iterations.push(Iteration {
paramset: paramset.clone(),
iteration_idx: i,
paramset_dir: paramset_dir.clone(),
});
}
}
if reverse_order {
iterations.reverse();
}
iterations
}
fn run_all_iterations(iterations: Vec<Iteration>, num_threads: usize, num_paramsets: usize) {
let (task_tx, task_rx) = crossbeam::channel::unbounded::<Iteration>();
let (noti_tx, noti_rx) = crossbeam::channel::unbounded::<Iteration>();
let mut threads = Vec::with_capacity(num_threads);
for _ in 0..num_threads {
let task_rx = task_rx.clone();
let noti_tx = noti_tx.clone();
let thread = std::thread::spawn(move || {
while let Ok(mut iteration) = task_rx.recv() {
iteration.start();
noti_tx.send(iteration).unwrap();
}
});
threads.push(thread);
}
let num_all_iterations = iterations.len();
for iteration in iterations {
task_tx.send(iteration).unwrap();
}
// Close the task sender channel, so that the threads can know that there's no task remains.
drop(task_tx);
let mut paramset_progresses: HashMap<u16, usize> = HashMap::new();
let mut num_done_paramsets = 0;
for _ in 0..num_all_iterations {
let iteration = noti_rx.recv().unwrap();
match paramset_progresses.entry(iteration.paramset.id) {
Entry::Occupied(mut e) => {
*e.get_mut() += 1;
}
Entry::Vacant(e) => {
e.insert(1);
}
}
if *paramset_progresses.get(&iteration.paramset.id).unwrap()
== iteration.paramset.num_iterations
{
num_done_paramsets += 1;
let new_paramset_dir = iteration
.paramset_dir
.replace("__WIP__paramset", "paramset");
std::fs::rename(iteration.paramset_dir, new_paramset_dir).unwrap();
tracing::info!(
"ParamSet:{} is done ({} iterations). {}/{} ParamSets done.",
iteration.paramset.id,
iteration.paramset.num_iterations,
num_done_paramsets,
num_paramsets,
);
}
}
for thread in threads {
thread.join().unwrap();
}
}
fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box<dyn Error>> {
// Assert that the file does not already exist
assert!(
!Path::new(path).exists(),
"File already exists at path: {path}",
);
let mut wtr = csv::Writer::from_path(path)?;
wtr.write_record(PARAMSET_CSV_COLUMNS)?;
wtr.write_record(paramset.as_csv_record())?;
wtr.flush()?;
Ok(())
}
fn format_duration(duration: Duration) -> String {
let total_seconds = duration.as_secs();
let days = total_seconds / 86_400;
let hours = (total_seconds % 86_400) / 3_600;
let minutes = (total_seconds % 3_600) / 60;
let seconds = total_seconds % 60;
format!("{}d{}h{}m{}s", days, hours, minutes, seconds)
}

View File

@ -0,0 +1,33 @@
use std::fmt::Display;
pub type SenderIdx = u8;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DataMessage {
pub sender: SenderIdx,
pub msg_id: u32,
}
impl Display for DataMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{}:{}", self.sender, self.msg_id))
}
}
pub struct DataMessageGenerator {
next_msg_ids: Vec<u32>,
}
impl DataMessageGenerator {
pub fn new(num_senders: u8) -> Self {
Self {
next_msg_ids: vec![0; num_senders as usize],
}
}
pub fn next(&mut self, sender: SenderIdx) -> DataMessage {
let msg_id = self.next_msg_ids[sender as usize];
self.next_msg_ids[sender as usize] += 1;
DataMessage { sender, msg_id }
}
}

View File

@ -0,0 +1,231 @@
use std::{fs::File, path::Path};
use protocol::{
node::{Node, NodeId},
topology::Topology,
};
use crate::{sequence::SequenceWriter, topology::AllSenderPeers};
use ordering::message::{DataMessage, SenderIdx};
pub struct Outputs {
closed: bool,
// gradual writing
latency_path: String,
latency_writer: csv::Writer<File>,
sent_sequence_paths: Vec<String>,
sent_sequence_writers: Vec<SequenceWriter>,
recv_sequence_paths: Vec<String>,
recv_sequence_writers: Vec<SequenceWriter>,
queue_data_msg_counts_path: String,
queue_data_msg_counts_writer: csv::Writer<File>,
// bulk writing
pub topology_path: String,
}
impl Outputs {
pub fn new(
latency_path: String,
sent_sequence_paths: Vec<String>,
recv_sequence_paths: Vec<String>,
queue_data_msg_counts_path: String,
topology_path: String,
) -> Self {
// Ensure that all output files do not exist
for path in [
latency_path.clone(),
queue_data_msg_counts_path.clone(),
topology_path.clone(),
]
.iter()
.chain(sent_sequence_paths.iter())
.chain(recv_sequence_paths.iter())
{
assert!(!Path::new(path).exists(), "File already exists: {path}");
}
// Prepare writers and headers
let mut latency_writer = csv::Writer::from_path(&latency_path).unwrap();
latency_writer
.write_record(["msg", "latency", "sent_time", "recv_time"])
.unwrap();
latency_writer.flush().unwrap();
let sent_sequence_writers = sent_sequence_paths
.iter()
.map(|path| SequenceWriter::new(path))
.collect::<Vec<_>>();
let recv_sequence_writers = recv_sequence_paths
.iter()
.map(|path| SequenceWriter::new(path))
.collect::<Vec<_>>();
let queue_data_msg_counts_writer =
csv::Writer::from_path(&queue_data_msg_counts_path).unwrap();
Self {
closed: false,
latency_path,
latency_writer,
sent_sequence_paths,
sent_sequence_writers,
recv_sequence_paths,
recv_sequence_writers,
queue_data_msg_counts_path,
queue_data_msg_counts_writer,
topology_path,
}
}
pub fn close(&mut self) {
self.latency_writer.flush().unwrap();
for seq in &mut self.sent_sequence_writers {
seq.flush();
}
for seq in &mut self.recv_sequence_writers {
seq.flush();
}
self.queue_data_msg_counts_writer.flush().unwrap();
self.closed = true;
}
pub fn add_latency(&mut self, msg: &DataMessage, sent_time: f32, recv_time: f32) {
self.latency_writer
.write_record(&[
msg.to_string(),
(recv_time - sent_time).to_string(),
sent_time.to_string(),
recv_time.to_string(),
])
.unwrap();
}
pub fn add_sent_msg(&mut self, msg: &DataMessage) {
let writer = &mut self.sent_sequence_writers[msg.sender as usize];
writer.add_message(msg);
}
pub fn add_sent_noise(&mut self, sender_idx: SenderIdx) {
let writer = &mut self.sent_sequence_writers[sender_idx as usize];
writer.add_noise();
}
pub fn add_recv_msg(&mut self, msg: &DataMessage, conn_idx: usize) {
let writer = &mut self.recv_sequence_writers[conn_idx];
writer.add_message(msg);
}
pub fn add_recv_noise(&mut self, conn_idx: usize) {
let writer = &mut self.recv_sequence_writers[conn_idx];
writer.add_noise();
}
pub fn write_header_queue_data_msg_counts(&mut self, mixnodes: &[Node<DataMessage>]) {
let writer = &mut self.queue_data_msg_counts_writer;
let mut header = vec!["vtime".to_string()];
mixnodes
.iter()
.map(|node| (node.id, node.queue_data_msg_counts()))
.for_each(|(node_id, counts)| {
let num_queues = counts.len();
(0..num_queues).for_each(|q_idx| {
header.push(format!("node{}_q{}", node_id, q_idx));
});
});
writer.write_record(header).unwrap();
writer.flush().unwrap();
}
pub fn add_queue_data_msg_counts(&mut self, vtime: f32, mixnodes: &[Node<DataMessage>]) {
let writer = &mut self.queue_data_msg_counts_writer;
let mut record = vec![vtime.to_string()];
mixnodes
.iter()
.map(|node| node.queue_data_msg_counts())
.for_each(|counts| {
counts.iter().for_each(|count| {
record.push(count.to_string());
});
});
writer.write_record(record).unwrap();
}
pub fn write_topology(
&self,
topology: &Topology,
all_sender_peers: &AllSenderPeers,
receiver_peers: &[NodeId],
) {
let mut writer = csv::Writer::from_path(&self.topology_path).unwrap();
writer.write_record(["node", "num_peers", "peers"]).unwrap();
// Write peers of mix nodes
for (node_id, peers) in topology.iter().enumerate() {
writer
.write_record(&[
node_id.to_string(),
peers.len().to_string(),
format!(
"[{}]",
peers
.iter()
.map(|peer_id| peer_id.to_string())
.collect::<Vec<_>>()
.join(",")
),
])
.unwrap();
}
// Write peers of senders
for (sender_idx, peers) in all_sender_peers.iter() {
writer
.write_record(&[
format!("sender-{}", sender_idx),
peers.len().to_string(),
format!(
"[{}]",
peers
.iter()
.map(|peer_id| peer_id.to_string())
.collect::<Vec<_>>()
.join(",")
),
])
.unwrap();
}
// Write peers of the receiver
writer
.write_record(&[
"receiver".to_string(),
receiver_peers.len().to_string(),
format!(
"[{}]",
receiver_peers
.iter()
.map(|peer_id| peer_id.to_string())
.collect::<Vec<_>>()
.join(",")
),
])
.unwrap();
writer.flush().unwrap();
}
pub fn rename_paths(&self, from: &str, to: &str) {
assert!(self.closed);
for path in [
&self.latency_path.clone(),
&self.queue_data_msg_counts_path.clone(),
]
.into_iter()
.chain(self.sent_sequence_paths.iter())
.chain(self.recv_sequence_paths.iter())
{
let new_path = path.replace(from, to);
std::fs::rename(path, new_path).unwrap();
}
}
}

View File

@ -0,0 +1,368 @@
use std::fmt::Display;
use protocol::queue::QueueType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ExperimentId {
Experiment1 = 1,
Experiment2 = 2,
Experiment3 = 3,
Experiment4 = 4,
Experiment5 = 5,
Experiment6 = 6,
}
impl std::str::FromStr for ExperimentId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" => Ok(ExperimentId::Experiment1),
"2" => Ok(ExperimentId::Experiment2),
"3" => Ok(ExperimentId::Experiment3),
"4" => Ok(ExperimentId::Experiment4),
"5" => Ok(ExperimentId::Experiment5),
"6" => Ok(ExperimentId::Experiment6),
_ => Err(format!("Invalid experiment ID: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum SessionId {
Session1 = 1,
Session3 = 3,
}
impl std::str::FromStr for SessionId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" => Ok(SessionId::Session1),
"3" => Ok(SessionId::Session3),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
}
pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"paramset",
"num_mixes",
"num_paths",
"random_topology",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_senders",
"num_sender_msgs",
"sender_data_msg_prob",
"mix_data_msg_prob",
"queue_type",
"num_iterations",
];
#[derive(Debug, Clone, PartialEq)]
pub struct ParamSet {
pub id: u16,
pub num_mixes: u32,
pub num_paths: u16,
pub random_topology: bool,
pub peering_degree: PeeringDegree,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_senders: u8,
pub num_sender_msgs: u32,
pub sender_data_msg_prob: f32,
pub mix_data_msg_prob: f32,
pub queue_type: QueueType,
pub num_iterations: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PeeringDegree {
Fixed(u32),
Random(Vec<(u32, f32)>),
}
impl Display for PeeringDegree {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeeringDegree::Fixed(c) => write!(f, "{c}"),
PeeringDegree::Random(c_probs) => write!(f, "{c_probs:?}"),
}
}
}
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
session_id: SessionId,
queue_type: QueueType,
) -> Vec<Self> {
match session_id {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type),
}
}
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let transmission_rate: u16 = 1;
let min_queue_size: u16 = 10;
let num_senders: u8 = match exp_id {
ExperimentId::Experiment3 | ExperimentId::Experiment4 => 2,
_ => 1,
};
let num_sender_msgs: u32 = match exp_id {
ExperimentId::Experiment6 => 10000,
_ => 1000000,
};
let sender_data_msg_probs: &[f32] = match exp_id {
ExperimentId::Experiment6 => &[0.01, 0.1, 0.5],
_ => &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0],
};
let mix_data_msg_probs = |num_mixes: u32| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 | ExperimentId::Experiment5 => {
vec![0.0]
}
ExperimentId::Experiment2 | ExperimentId::Experiment4 => vec![0.001, 0.01, 0.1],
ExperimentId::Experiment6 => {
let g: f32 = num_mixes as f32;
vec![1.0 / (2.0 * g), 1.0 / g, 2.0 / g]
}
};
let mut id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
match exp_id {
ExperimentId::Experiment1
| ExperimentId::Experiment2
| ExperimentId::Experiment3
| ExperimentId::Experiment4 => {
for &num_paths in &[1, 2, 3, 4] {
for &num_mixes in &[1, 2, 3, 4] {
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) {
let paramset = ParamSet {
id,
num_mixes,
num_paths,
random_topology: false,
peering_degree: PeeringDegree::Fixed(1),
min_queue_size,
transmission_rate,
num_senders,
num_sender_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations: 1,
};
id += 1;
paramsets.push(paramset);
}
}
}
}
}
ExperimentId::Experiment5 | ExperimentId::Experiment6 => {
for &num_mixes in &[8, 16, 32] {
for &peering_degree in &[2, 3, 4] {
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) {
let paramset = ParamSet {
id,
num_mixes,
num_paths: 0, // since we're gonna build random topology
random_topology: true,
peering_degree: PeeringDegree::Fixed(peering_degree),
min_queue_size,
transmission_rate,
num_senders,
num_sender_msgs,
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations: 10,
};
id += 1;
paramsets.push(paramset);
}
}
}
}
}
}
paramsets
}
fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let sender_data_msg_probs: &[f32] = match exp_id {
ExperimentId::Experiment5 => &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0],
ExperimentId::Experiment6 => &[0.01, 0.1, 0.5],
_ => {
panic!("Only Experiment5 and Experiment6 are supported for Session3");
}
};
let mix_data_msg_probs = |num_mixes: u32| match exp_id {
ExperimentId::Experiment5 => {
vec![0.0]
}
ExperimentId::Experiment6 => {
let g: f32 = num_mixes as f32;
vec![1.0 / (2.0 * g), 1.0 / g, 2.0 / g]
}
_ => {
panic!("Only Experiment5 and Experiment6 are supported for Session3");
}
};
let mut id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
match exp_id {
ExperimentId::Experiment5 | ExperimentId::Experiment6 => {
let num_mixes = 32;
for &sender_data_msg_prob in sender_data_msg_probs {
for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) {
let paramset = ParamSet {
id,
num_mixes,
num_paths: 0, // since we're gonna build random topology
random_topology: true,
peering_degree: PeeringDegree::Random(vec![
(2, 0.87),
(12, 0.123),
(24, 0.007),
]),
min_queue_size: 10,
transmission_rate: 1,
num_senders: 1,
num_sender_msgs: match exp_id {
ExperimentId::Experiment6 => 10000,
_ => 1000000,
},
sender_data_msg_prob,
mix_data_msg_prob,
queue_type,
num_iterations: 10,
};
id += 1;
paramsets.push(paramset);
}
}
}
_ => {
panic!("Only Experiment5 and Experiment6 are supported for Session3");
}
}
paramsets
}
pub fn num_sender_or_receiver_conns(&self) -> usize {
if self.random_topology {
match &self.peering_degree {
PeeringDegree::Fixed(c) => *c as usize,
PeeringDegree::Random(c_probs) => {
c_probs.iter().map(|(c, _)| *c).min().unwrap() as usize
}
}
} else {
self.num_paths as usize
}
}
pub fn as_csv_record(&self) -> Vec<String> {
vec![
self.id.to_string(),
self.num_mixes.to_string(),
self.num_paths.to_string(),
self.random_topology.to_string(),
self.peering_degree.to_string(),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_senders.to_string(),
self.num_sender_msgs.to_string(),
self.sender_data_msg_prob.to_string(),
self.mix_data_msg_prob.to_string(),
format!("{:?}", self.queue_type),
self.num_iterations.to_string(),
]
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use strum::IntoEnumIterator;
use crate::paramset::ParamSet;
use super::*;
#[test]
fn test_new_all_paramsets() {
let cases = vec![
((ExperimentId::Experiment1, SessionId::Session1), 4 * 4 * 6),
(
(ExperimentId::Experiment2, SessionId::Session1),
4 * 4 * 6 * 3,
),
((ExperimentId::Experiment3, SessionId::Session1), 4 * 4 * 6),
(
(ExperimentId::Experiment4, SessionId::Session1),
4 * 4 * 6 * 3,
),
((ExperimentId::Experiment5, SessionId::Session1), 3 * 3 * 6),
(
(ExperimentId::Experiment6, SessionId::Session1),
3 * 3 * 3 * 3,
),
((ExperimentId::Experiment5, SessionId::Session3), 6),
((ExperimentId::Experiment6, SessionId::Session3), 3 * 3),
];
for queue_type in QueueType::iter() {
for ((exp_id, session_id), expected_cnt) in cases.clone().into_iter() {
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
assert_eq!(
paramsets.len(),
expected_cnt as usize,
"queue_type:{:?}, exp:{:?}, session:{:?}",
queue_type,
exp_id,
session_id,
);
// Check if all parameter sets are unique
let unique_paramsets: HashSet<Vec<String>> = paramsets
.iter()
.map(|paramset| paramset.as_csv_record())
.collect();
assert_eq!(unique_paramsets.len(), paramsets.len());
// Check if paramset IDs are correct.
for (i, paramset) in paramsets.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
println!("{:?}", paramset);
}
// Check PeeringDegree
for paramset in paramsets.iter() {
match session_id {
SessionId::Session1 => {
assert!(matches!(paramset.peering_degree, PeeringDegree::Fixed(_)))
}
SessionId::Session3 => {
assert!(matches!(paramset.peering_degree, PeeringDegree::Random(_)))
}
}
}
}
}
}
}

View File

@ -0,0 +1,41 @@
use std::fs::File;
use ordering::message::DataMessage;
#[derive(Debug)]
pub struct SequenceWriter {
noise_buf: u32,
writer: csv::Writer<File>,
}
impl SequenceWriter {
pub fn new(path: &str) -> Self {
Self {
noise_buf: 0,
writer: csv::Writer::from_path(path).unwrap(),
}
}
pub fn flush(&mut self) {
self.clear_buf();
self.writer.flush().unwrap();
}
fn clear_buf(&mut self) {
if self.noise_buf > 0 {
self.writer
.write_record(&[format!("-{}", self.noise_buf)])
.unwrap();
self.noise_buf = 0;
}
}
pub fn add_message(&mut self, msg: &DataMessage) {
self.clear_buf();
self.writer.write_record(&[msg.to_string()]).unwrap();
}
pub fn add_noise(&mut self) {
self.noise_buf += 1;
}
}

View File

@ -0,0 +1,224 @@
use std::{fmt::Debug, hash::Hash};
use protocol::{
node::{Node, NodeId},
queue::QueueConfig,
topology::build_topology,
};
use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
outputs::Outputs,
paramset::{ParamSet, PeeringDegree},
};
use ordering::message::SenderIdx;
pub const RECEIVER_NODE_ID: NodeId = NodeId::MAX;
pub fn build_striped_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash>(
paramset: &ParamSet,
seed: u64,
) -> (Vec<Node<M>>, AllSenderPeers, ReceiverPeers) {
assert!(!paramset.random_topology);
let peering_degree = match paramset.peering_degree {
PeeringDegree::Fixed(c) => c,
PeeringDegree::Random(_) => {
panic!("PeeringDegree::Random not supported for striped network");
}
};
let mut next_node_id: NodeId = 0;
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: Vec<Node<M>> =
Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize);
let mut paths: Vec<Vec<NodeId>> = Vec::with_capacity(paramset.num_paths as usize);
for _ in 0..paramset.num_paths {
let mut ids = Vec::with_capacity(paramset.num_mixes as usize);
for _ in 0..paramset.num_mixes {
let id = next_node_id;
next_node_id += 1;
mixnodes.push(Node::new(
id,
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
peering_degree,
false, // disable cache
));
ids.push(id);
}
paths.push(ids);
}
// Connect mix nodes
let mut receiver_peers = ReceiverPeers::new();
for path in paths.iter() {
for (i, id) in path.iter().enumerate() {
if i != path.len() - 1 {
let peer_id = path[i + 1];
let mixnode = mixnodes.get_mut(*id as usize).unwrap();
assert_eq!(mixnode.id, *id);
mixnode.connect(peer_id);
} else {
let mixnode = mixnodes.get_mut(*id as usize).unwrap();
assert_eq!(mixnode.id, *id);
mixnode.connect(RECEIVER_NODE_ID);
receiver_peers.add(*id, receiver_peers.len());
}
}
}
let mut all_sender_peers = AllSenderPeers::new(paramset.num_senders);
let sender_peers = paths
.iter()
.map(|path| *path.first().unwrap())
.collect::<Vec<_>>();
(0..paramset.num_senders).for_each(|_| {
all_sender_peers.add(sender_peers.clone());
});
(mixnodes, all_sender_peers, receiver_peers)
}
pub fn build_random_network<M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash>(
paramset: &ParamSet,
seed: u64,
outputs: &mut Outputs,
) -> (Vec<Node<M>>, AllSenderPeers, ReceiverPeers) {
assert!(paramset.random_topology);
let peering_degrees = match &paramset.peering_degree {
PeeringDegree::Fixed(c) => vec![*c; paramset.num_mixes as usize],
PeeringDegree::Random(c_probs) => {
// Sort c_probs by the probability in ascending order
let mut c_probs = c_probs.clone();
c_probs.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
let mut degrees = Vec::with_capacity(paramset.num_mixes as usize);
for (i, (c, prob)) in c_probs.iter().enumerate() {
let count = if i < c_probs.len() - 1 {
(prob * paramset.num_mixes as f32).ceil() as u32
} else {
let num_determined: u32 = degrees.len().try_into().unwrap();
paramset.num_mixes - num_determined
};
degrees.extend(std::iter::repeat(*c).take(count as usize));
}
degrees
}
};
// Init mix nodes
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
let mut mixnodes: Vec<Node<M>> = Vec::with_capacity(paramset.num_mixes as usize);
for id in 0..paramset.num_mixes {
mixnodes.push(Node::new(
id,
QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
},
peering_degrees[id as usize],
true, // enable cache
));
}
// Choose sender's peers and receiver's peers randomly
let mut peers_rng = StdRng::seed_from_u64(seed);
let mut candidates: Vec<NodeId> = mixnodes.iter().map(|mixnode| mixnode.id).collect();
let num_sender_or_receiver_conns = paramset.num_sender_or_receiver_conns();
assert!(candidates.len() >= num_sender_or_receiver_conns);
let mut all_sender_peers = AllSenderPeers::new(paramset.num_senders);
for _ in 0..paramset.num_senders {
candidates.as_mut_slice().shuffle(&mut peers_rng);
let mut peers: Vec<NodeId> = candidates
.iter()
.cloned()
.take(num_sender_or_receiver_conns)
.collect();
peers.sort();
all_sender_peers.add(peers);
}
candidates.as_mut_slice().shuffle(&mut peers_rng);
let mut receiver_peer_ids: Vec<NodeId> = candidates
.iter()
.cloned()
.take(num_sender_or_receiver_conns)
.collect();
receiver_peer_ids.sort();
// Connect mix nodes
let topology = build_topology(
mixnodes.len().try_into().unwrap(),
&mixnodes
.iter()
.map(|mixnode| mixnode.peering_degree)
.collect::<Vec<u32>>(),
seed,
);
for (node_id, peers) in topology.iter().enumerate() {
peers.iter().for_each(|peer_id| {
let mixnode = mixnodes.get_mut(node_id).unwrap();
assert_eq!(mixnode.id as usize, node_id);
mixnode.connect(*peer_id);
});
}
// Connect the selected mix nodes with the receiver
let mut receiver_peers = ReceiverPeers::new();
for (conn_idx, mixnode_id) in receiver_peer_ids.iter().enumerate() {
let mixnode = mixnodes.get_mut(*mixnode_id as usize).unwrap();
assert_eq!(mixnode.id, *mixnode_id);
mixnode.connect(RECEIVER_NODE_ID);
receiver_peers.add(*mixnode_id, conn_idx);
}
outputs.write_topology(&topology, &all_sender_peers, &receiver_peer_ids);
(mixnodes, all_sender_peers, receiver_peers)
}
pub struct AllSenderPeers(Vec<Vec<NodeId>>);
impl AllSenderPeers {
fn new(num_senders: u8) -> Self {
Self(Vec::with_capacity(num_senders as usize))
}
fn add(&mut self, peers: Vec<NodeId>) {
self.0.push(peers)
}
pub fn iter(&self) -> impl Iterator<Item = (SenderIdx, &Vec<NodeId>)> {
self.0
.iter()
.enumerate()
.map(|(idx, v)| (idx.try_into().unwrap(), v))
}
}
pub struct ReceiverPeers(FxHashMap<NodeId, usize>);
impl ReceiverPeers {
fn new() -> Self {
ReceiverPeers(FxHashMap::default())
}
fn add(&mut self, peer_id: NodeId, conn_idx: usize) {
self.0.insert(peer_id, conn_idx);
}
pub fn conn_idx(&self, node_id: &NodeId) -> Option<usize> {
self.0.get(node_id).cloned()
}
fn len(&self) -> usize {
self.0.len()
}
}

View File

@ -0,0 +1,12 @@
[package]
name = "protocol"
version = "0.1.0"
edition = "2021"
[dependencies]
csv = "1.3.0"
rand = "0.8.5"
rustc-hash = "2.0.0"
strum = "0.26.3"
strum_macros = "0.26.4"
tracing = "0.1.40"

View File

@ -0,0 +1,3 @@
pub mod node;
pub mod queue;
pub mod topology;

131
mixnet/protocol/src/node.rs Normal file
View File

@ -0,0 +1,131 @@
use std::{fmt::Debug, hash::Hash};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::queue::{new_queue, Message, Queue, QueueConfig};
pub type NodeId = u32;
pub struct Node<M>
where
M: Debug + Copy + Clone + PartialEq + Eq + Hash,
{
pub id: NodeId,
queue_config: QueueConfig,
// To have the deterministic result, we use Vec instead of FxHashMap.
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
// Instead, use `connected_peers` to build `queues` efficiently.
queues: Vec<(NodeId, Box<dyn Queue<M>>)>,
connected_peers: FxHashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: Option<FxHashMap<M, u32>>,
pub peering_degree: u32,
}
pub type MessagesToRelay<M> = Vec<(NodeId, Message<M>)>;
impl<M> Node<M>
where
M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash,
{
pub fn new(
id: NodeId,
queue_config: QueueConfig,
peering_degree: u32,
enable_cache: bool,
) -> Self {
Node::<M> {
id,
queue_config,
queues: Vec::new(),
connected_peers: FxHashSet::default(),
received_msgs: if enable_cache {
Some(FxHashMap::default())
} else {
None
},
peering_degree,
}
}
pub fn connect(&mut self, peer_id: NodeId) {
if self.connected_peers.insert(peer_id) {
let pos = self
.queues
.binary_search_by(|probe| probe.0.cmp(&peer_id))
.unwrap_or_else(|pos| pos);
self.queues
.insert(pos, (peer_id, new_queue::<M>(&self.queue_config)));
}
}
pub fn send(&mut self, msg: M) {
assert!(self.check_and_update_cache(msg, true));
for (_, queue) in self.queues.iter_mut() {
queue.push(msg);
}
}
pub fn receive(&mut self, msg: M, from: Option<NodeId>) -> bool {
// If `from` is None, it means that the message is being sent from the node outside of the gossip network.
// In this case, the received count in the cache must start from 0, so it can be removed from the cache only when it is received from C gossip peers (not C-1).
// For that, we set `sending` to true, as if this node is sending the message.
let sending = from.is_none();
let first_received = self.check_and_update_cache(msg, sending);
if first_received {
for (node_id, queue) in self.queues.iter_mut() {
match from {
Some(sender) => {
if *node_id != sender {
queue.push(msg);
}
}
None => queue.push(msg),
}
}
}
first_received
}
pub fn read_queues(&mut self) -> MessagesToRelay<M> {
let mut msgs_to_relay: MessagesToRelay<M> = Vec::with_capacity(self.queues.len());
self.queues.iter_mut().for_each(|(node_id, queue)| {
msgs_to_relay.push((*node_id, queue.pop()));
});
msgs_to_relay
}
pub fn queue_data_msg_counts(&self) -> Vec<usize> {
self.queues
.iter()
.map(|(_, queue)| queue.data_count())
.collect()
}
fn check_and_update_cache(&mut self, msg: M, sending: bool) -> bool {
if let Some(received_msgs) = &mut self.received_msgs {
let first_received = if let Some(count) = received_msgs.get_mut(&msg) {
*count += 1;
false
} else {
received_msgs.insert(msg, if sending { 0 } else { 1 });
true
};
// If the message have been received from all connected peers, remove it from the cache
// because there is no possibility that the message will be received again.
if received_msgs.get(&msg).unwrap() == &self.peering_degree {
tracing::debug!(
"Remove message from cache: {:?} because it has been received {} times, assuming that each inbound peer sent the message once.",
msg,
self.peering_degree
);
received_msgs.remove(&msg);
}
first_received
} else {
true
}
}
}

View File

@ -0,0 +1,453 @@
use std::collections::VecDeque;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use strum_macros::EnumIter;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)]
pub enum QueueType {
NonMix,
PureCoinFlipping,
PureRandomSampling,
PermutedCoinFlipping,
NoisyCoinFlipping,
NoisyCoinFlippingRandomRelease,
}
impl std::str::FromStr for QueueType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NonMix" => Ok(QueueType::NonMix),
"PureCoinFlipping" => Ok(QueueType::PureCoinFlipping),
"PureRandomSampling" => Ok(QueueType::PureRandomSampling),
"PermutedCoinFlipping" => Ok(QueueType::PermutedCoinFlipping),
"NoisyCoinFlipping" => Ok(QueueType::NoisyCoinFlipping),
"NoisyCoinFlippingRandomRelease" => Ok(QueueType::NoisyCoinFlippingRandomRelease),
_ => Err(format!("Unknown queue type: {}", s)),
}
}
}
pub trait Queue<T: Copy> {
fn push(&mut self, data: T);
fn pop(&mut self) -> Message<T>;
fn data_count(&self) -> usize;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Message<T: Copy> {
Data(T),
Noise,
}
pub struct QueueConfig {
pub queue_type: QueueType,
pub seed: u64,
pub min_queue_size: u16,
}
pub fn new_queue<T: 'static + Copy>(cfg: &QueueConfig) -> Box<dyn Queue<T>> {
match cfg.queue_type {
QueueType::NonMix => Box::new(NonMixQueue::new()),
QueueType::PureCoinFlipping => {
Box::new(PureCoinFlippingQueue::new(cfg.min_queue_size, cfg.seed))
}
QueueType::PureRandomSampling => {
Box::new(PureRandomSamplingQueue::new(cfg.min_queue_size, cfg.seed))
}
QueueType::PermutedCoinFlipping => {
Box::new(PermutedCoinFlippingQueue::new(cfg.min_queue_size, cfg.seed))
}
QueueType::NoisyCoinFlipping => Box::new(NoisyCoinFlippingQueue::new(cfg.seed)),
QueueType::NoisyCoinFlippingRandomRelease => {
Box::new(NoisyCoinFlippingRandomReleaseQueue::new(cfg.seed))
}
}
}
struct NonMixQueue<T: Copy> {
queue: VecDeque<T>, // don't need to contain Noise
}
impl<T: Copy> NonMixQueue<T> {
fn new() -> Self {
Self {
queue: VecDeque::new(),
}
}
}
impl<T: Copy> Queue<T> for NonMixQueue<T> {
fn push(&mut self, data: T) {
self.queue.push_back(data)
}
fn pop(&mut self) -> Message<T> {
match self.queue.pop_front() {
Some(data) => Message::Data(data),
None => Message::Noise,
}
}
fn data_count(&self) -> usize {
self.queue.len()
}
}
struct MixQueue<T: Copy> {
queue: Vec<Message<T>>,
data_count: usize,
rng: StdRng,
}
impl<T: Copy> MixQueue<T> {
fn new(num_initial_noises: usize, seed: u64) -> Self {
Self {
queue: vec![Message::Noise; num_initial_noises],
data_count: 0,
rng: StdRng::seed_from_u64(seed),
}
}
fn push(&mut self, data: T) {
self.queue.push(Message::Data(data));
self.data_count += 1;
}
fn fill_noises(&mut self, k: usize) {
self.queue.extend(std::iter::repeat(Message::Noise).take(k))
}
fn pop(&mut self, idx: usize) -> Option<Message<T>> {
if idx < self.queue.len() {
let msg = self.queue.remove(idx);
if let Message::Data(_) = msg {
self.data_count -= 1;
}
Some(msg)
} else {
None
}
}
fn data_count(&self) -> usize {
self.data_count
}
fn len(&self) -> usize {
self.queue.len()
}
fn flip_coin(&mut self) -> bool {
self.rng.gen_bool(0.5)
}
fn sample_index(&mut self) -> usize {
self.rng.gen_range(0..self.queue.len())
}
fn shuffle(&mut self) {
self.queue.as_mut_slice().shuffle(&mut self.rng);
}
}
struct MinSizeMixQueue<T: Copy> {
queue: MixQueue<T>,
min_pool_size: u16,
}
impl<T: Copy> MinSizeMixQueue<T> {
fn new(min_pool_size: u16, seed: u64) -> Self {
Self {
queue: MixQueue::new(min_pool_size as usize, seed),
min_pool_size,
}
}
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self, idx: usize) -> Option<Message<T>> {
self.queue.pop(idx)
}
fn data_count(&self) -> usize {
self.queue.data_count()
}
fn ensure_min_size(&mut self) {
if self.queue.len() < self.min_pool_size as usize {
self.queue
.fill_noises(self.min_pool_size as usize - self.queue.len());
}
}
fn len(&self) -> usize {
self.queue.len()
}
fn flip_coin(&mut self) -> bool {
self.queue.flip_coin()
}
fn sample_index(&mut self) -> usize {
self.queue.sample_index()
}
fn shuffle(&mut self) {
self.queue.shuffle()
}
}
struct PureCoinFlippingQueue<T: Copy> {
queue: MinSizeMixQueue<T>,
}
impl<T: Copy> PureCoinFlippingQueue<T> {
fn new(min_pool_size: u16, seed: u64) -> Self {
Self {
queue: MinSizeMixQueue::new(min_pool_size, seed),
}
}
}
impl<T: Copy> Queue<T> for PureCoinFlippingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Message<T> {
self.queue.ensure_min_size();
loop {
for i in 0..self.queue.len() {
if self.queue.flip_coin() {
return self.queue.pop(i).unwrap();
}
}
}
}
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
struct PureRandomSamplingQueue<T: Copy> {
queue: MinSizeMixQueue<T>,
}
impl<T: Copy> PureRandomSamplingQueue<T> {
fn new(min_pool_size: u16, seed: u64) -> Self {
Self {
queue: MinSizeMixQueue::new(min_pool_size, seed),
}
}
}
impl<T: Copy> Queue<T> for PureRandomSamplingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Message<T> {
self.queue.ensure_min_size();
let i = self.queue.sample_index();
self.queue.pop(i).unwrap()
}
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
struct PermutedCoinFlippingQueue<T: Copy> {
queue: MinSizeMixQueue<T>,
}
impl<T: Copy> PermutedCoinFlippingQueue<T> {
fn new(min_pool_size: u16, seed: u64) -> Self {
Self {
queue: MinSizeMixQueue::new(min_pool_size, seed),
}
}
}
impl<T: Copy> Queue<T> for PermutedCoinFlippingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Message<T> {
self.queue.ensure_min_size();
self.queue.shuffle();
loop {
for i in 0..self.queue.len() {
if self.queue.flip_coin() {
return self.queue.pop(i).unwrap();
}
}
}
}
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
struct NoisyCoinFlippingQueue<T: Copy> {
queue: MixQueue<T>,
idx: usize,
}
impl<T: Copy> NoisyCoinFlippingQueue<T> {
pub fn new(seed: u64) -> Self {
Self {
queue: MixQueue::new(0, seed),
idx: 0,
}
}
}
impl<T: Copy> Queue<T> for NoisyCoinFlippingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Message<T> {
if self.queue.len() == 0 {
return Message::Noise;
}
loop {
if self.idx >= self.queue.len() {
self.idx = 0;
}
if self.queue.flip_coin() {
return self.queue.pop(self.idx).unwrap();
} else if self.idx == 0 {
return Message::Noise;
} else {
self.idx += 1;
}
}
}
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
struct NoisyCoinFlippingRandomReleaseQueue<T: Copy> {
queue: MixQueue<T>,
}
impl<T: Copy> NoisyCoinFlippingRandomReleaseQueue<T> {
pub fn new(seed: u64) -> Self {
Self {
queue: MixQueue::new(0, seed),
}
}
}
impl<T: Copy> Queue<T> for NoisyCoinFlippingRandomReleaseQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Message<T> {
if self.queue.len() == 0 {
return Message::Noise;
}
if self.queue.flip_coin() {
let i = self.queue.sample_index();
self.queue.pop(i).unwrap()
} else {
Message::Noise
}
}
fn data_count(&self) -> usize {
self.queue.data_count()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
#[test]
fn test_non_mix_queue() {
let mut queue = new_queue(&QueueConfig {
queue_type: QueueType::NonMix,
seed: 0,
min_queue_size: 0,
});
// Check if noise is returned when queue is empty
assert_eq!(queue.pop(), Message::Noise);
// Check if queue is FIFO
queue.push(0);
queue.push(1);
assert_eq!(queue.pop(), Message::Data(0));
assert_eq!(queue.pop(), Message::Data(1));
// Check if noise is returned when queue is empty
assert_eq!(queue.pop(), Message::Noise);
// Check if queue is FIFO again
queue.push(2);
queue.push(3);
assert_eq!(queue.pop(), Message::Data(2));
assert_eq!(queue.pop(), Message::Data(3));
}
#[test]
fn test_mix_queues() {
for queue_type in [
QueueType::PureCoinFlipping,
QueueType::PureRandomSampling,
QueueType::PermutedCoinFlipping,
QueueType::NoisyCoinFlipping,
QueueType::NoisyCoinFlippingRandomRelease,
] {
test_mix_queue(queue_type);
}
}
fn test_mix_queue(queue_type: QueueType) {
let mut queue = new_queue(&QueueConfig {
queue_type,
seed: 0,
min_queue_size: 4,
});
// Check if noise is returned when queue is empty
assert_eq!(queue.pop(), Message::Noise);
// Put only 2 messages even though the min queue size is 4
queue.push(0);
queue.push(1);
// Wait until 2 messages are returned from the queue
let mut set: HashSet<_> = vec![0, 1].into_iter().collect();
while !set.is_empty() {
if let Message::Data(msg) = queue.pop() {
assert!(set.remove(&msg));
}
}
// Check if noise is returned when there is no real message remains
assert_eq!(queue.pop(), Message::Noise);
}
}

View File

@ -0,0 +1,91 @@
use std::{collections::HashSet, error::Error};
use crate::node::NodeId;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
pub type Topology = Vec<Vec<NodeId>>;
pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology {
assert_eq!(num_nodes as usize, peering_degrees.len());
// Assert that peering degrees are sorted in descending order
assert!(peering_degrees.windows(2).all(|w| w[0] >= w[1]));
let mut rng = StdRng::seed_from_u64(seed);
loop {
let mut topology: Vec<HashSet<NodeId>> = Vec::new();
for _ in 0..num_nodes {
topology.push(HashSet::new());
}
for node in 0..num_nodes {
let mut others: Vec<NodeId> = Vec::new();
for other in (0..node).chain(node + 1..num_nodes) {
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
if !topology[node as usize].contains(&other)
&& topology[other as usize].len() < peering_degrees[other as usize] as usize
{
others.push(other);
}
}
// How many more connections the current node needs
let num_needs = peering_degrees[node as usize] as usize - topology[node as usize].len();
// Smaple peers as many as possible and connect them to the current node
let k = std::cmp::min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);
others.into_iter().take(k).for_each(|peer| {
topology[node as usize].insert(peer);
topology[peer as usize].insert(node);
});
}
if are_all_nodes_connected(&topology) {
let mut sorted_topology: Vec<Vec<NodeId>> = Vec::new();
for peers in topology.iter() {
let mut sorted_peers: Vec<NodeId> = peers.iter().copied().collect();
sorted_peers.sort();
sorted_topology.push(sorted_peers);
}
return sorted_topology;
}
}
}
fn are_all_nodes_connected(topology: &[HashSet<NodeId>]) -> bool {
let visited = dfs(topology, 0);
visited.len() == topology.len()
}
fn dfs(topology: &[HashSet<NodeId>], start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node);
while let Some(node) = stack.pop() {
visited.insert(node);
for peer in topology[node as usize].iter() {
if !visited.contains(peer) {
stack.push(*peer);
}
}
}
visited
}
pub fn save_topology(topology: &Topology, path: &str) -> Result<(), Box<dyn Error>> {
let mut wtr = csv::Writer::from_path(path)?;
wtr.write_record(["node", "num_peers", "peers"])?;
for (node, peers) in topology.iter().enumerate() {
let node: NodeId = node.try_into().unwrap();
let peers_str: Vec<String> = peers.iter().map(|peer_id| peer_id.to_string()).collect();
wtr.write_record(&[
node.to_string(),
peers.len().to_string(),
format!("[{}]", peers_str.join(",")),
])?;
}
wtr.flush()?;
Ok(())
}