diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c2628e9..55b921f 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -7,6 +7,9 @@ on: push: branches: [master] +env: + CARGO_TERM_COLOR: always + jobs: mixnet: runs-on: ubuntu-latest @@ -14,17 +17,9 @@ jobs: - uses: actions/checkout@v4 with: submodules: true - - name: Set up Python 3.x - uses: actions/setup-python@v5 - with: - python-version: "3.x" - - name: Install dependencies for mixnet + - name: Build working-directory: mixnet - run: pip install -r requirements.txt - - name: Run unit tests + run: cargo build -v + - name: Unit tests working-directory: mixnet - run: python -m unittest -v - - name: Run a short mixnet simulation - working-directory: mixnet - run: python -m cmd.main --config config.ci.yaml - + run: cargo test -v diff --git a/mixnet/.gitignore b/mixnet/.gitignore new file mode 100644 index 0000000..d01bd1a --- /dev/null +++ b/mixnet/.gitignore @@ -0,0 +1,21 @@ +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# RustRover +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file diff --git a/mixnet/Cargo.toml b/mixnet/Cargo.toml new file mode 100644 index 0000000..829907e --- /dev/null +++ b/mixnet/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +members = ["dissemination", "protocol", "ordering"] +resolver = "2" diff --git a/mixnet/README.md b/mixnet/README.md new file mode 100644 index 0000000..68c74fb --- /dev/null +++ b/mixnet/README.md @@ -0,0 +1,8 @@ +# Mixnet Simulations + +## Structure + +- [`protocol/`](./protocol): The implementation of the mixnet protocol +- [`dissemination/`](./dissemination): [Queuing Mechanism: Message Dissemination Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#5120661bdf5343319e66c9372dd623b7) +- [`ordering/`](./ordering): [Queuing Mechanism: Ordering Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4d38e8790ecd492a812c733bf140b864) +- [`analysis/`](./analysis): The analysis tools for the simulation results diff --git a/mixnet/analysis/.gitignore b/mixnet/analysis/.gitignore new file mode 100644 index 0000000..2d74227 --- /dev/null +++ b/mixnet/analysis/.gitignore @@ -0,0 +1,2 @@ +.venv/ +*.png diff --git a/mixnet/analysis/README.md b/mixnet/analysis/README.md new file mode 100644 index 0000000..a475132 --- /dev/null +++ b/mixnet/analysis/README.md @@ -0,0 +1,45 @@ +# Ordering Experiments: Analysis Tools + +This Python project contains scripts to draw plots and tables from the results of the [ordering experiments](../ordering). + +## Prerequisites +- Python 3 +- Installing dependencies +```bash +python3 -m venv .venv +source .venv/bin/activate +pip isntall -r requirements.txt +``` + +## Usages + +### Analyzing the latency results + +The following command draws plots and tables from the latency results of the ordering experiments. +```bash +python latency.py +``` +- `aggregated_csv_path` + - A path to a CSV file that contains all statistical results of all experiments that must be shown in the plot and table. + - This script expects that the CSV file has at least the following columns. + - `paramset` (int): Parameter Set ID + - `queue_type` (str): Queue Type (NonMix, PureCoinFlipping, ...) + - `latency_min`, `latency_mean` and `latency_max` (float) +- `output_dir` A directory path where all PNG files of plots and tables are stored in + + +### Analyzing the ordering coefficient results + +The following command draws plots and tables from the ordering coefficient results of the ordering experiments. +```bash +python coeff.py +``` +- `aggregated_csv_path` + - A path to a CSV file that contains all statistical results of all experiments that must be shown in the plot and table. + - This script expects that the CSV file has at least the following columns. + - `paramset` (int): Parameter Set ID + - `queue_type` (str): Queue Type (NonMix, PureCoinFlipping, ...) + - `strong_coeff_min`, `strong_coeff_mean` and `strong_coeff_max` (float) + - `casual_coeff_min`, `casual_coeff_mean` and `casual_coeff_max` (float) + - `weak_coeff_min`, `weak_coeff_mean` and `weak_coeff_max` (float) +- `output_dir` A directory path where all PNG files of plots and tables are stored in diff --git a/mixnet/analysis/coeff.py b/mixnet/analysis/coeff.py new file mode 100644 index 0000000..ec2c9dc --- /dev/null +++ b/mixnet/analysis/coeff.py @@ -0,0 +1,103 @@ +import argparse + +import matplotlib.pyplot as plt +import pandas as pd + +from common import COLORS, MARKERS, X_FIELDS + + +def analyze(path: str, outdir: str): + data = pd.read_csv(path) + for x_field in X_FIELDS: + analyze_versus(data, x_field, outdir) + + +def analyze_versus(data: pd.DataFrame, x_field: str, outdir: str): + # Group by both x_field and queue_type, then select the row with the largest paramset for each group + max_paramset_data = data.loc[ + data.groupby([x_field, "queue_type"])["paramset"].idxmax() + ] + + all_fields = [ + ["strong_coeff_min", "casual_coeff_min", "weak_coeff_min"], + ["strong_coeff_mean", "casual_coeff_mean", "weak_coeff_mean"], + ["strong_coeff_max", "casual_coeff_max", "weak_coeff_max"], + ] + + # Display the plots + fig, ax = plt.subplots(3, 3, figsize=(20, 10)) + for ax_col, fields in enumerate(all_fields): + max_y = 0 + for field in fields: + max_y = max(max_y, max_paramset_data[field].max()) + + for ax_row, field in enumerate(fields): + for queue_type in max_paramset_data["queue_type"].unique(): + queue_data = max_paramset_data[ + max_paramset_data["queue_type"] == queue_type + ] + x_values = queue_data[x_field] + y_values = queue_data[field] + ax[ax_row][ax_col].plot( + x_values, + y_values, + label=queue_type, + marker=MARKERS[queue_type], + color=COLORS[queue_type], + ) + + ax[ax_row][ax_col].set_title(f"{field} vs {x_field}", fontsize=10) + ax[ax_row][ax_col].set_xlabel(x_field) + ax[ax_row][ax_col].set_ylabel(field) + if ax_row == 0 and ax_col == len(all_fields) - 1: + ax[ax_row][ax_col].legend(bbox_to_anchor=(1, 1), loc="upper left") + ax[ax_row][ax_col].grid(True) + if max_y < 1e6: + ax[ax_row][ax_col].set_ylim(-10, max_y * 1.05) + else: + ax[ax_row][ax_col].set_ylim(bottom=-10) + + plt.tight_layout() + fig.savefig(f"{outdir}/coeff_vs_{x_field}.png") + + # Display the table of values + # Create a table with x_field, queue_type, and coefficients + flatten_fields = [ + field for zipped_fields in zip(*all_fields) for field in zipped_fields + ] + columns = [x_field, "queue_type"] + flatten_fields + table_data = max_paramset_data[columns].sort_values(by=[x_field, "queue_type"]) + # Prepare to display values with only 2 decimal places + table_data[fields] = table_data[fields].map( + lambda x: f"{x:.2e}" if abs(x) >= 1e6 else f"{x:.2f}" + ) + # Display the table as a separate subplot + fig_table, ax_table = plt.subplots( + figsize=(len(columns) * 1.8, len(table_data) * 0.3) + ) + ax_table.axis("off") # Turn off the axis + table = ax_table.table( + cellText=table_data.values, + colLabels=table_data.columns, + cellLoc="center", + loc="center", + ) + table.auto_set_font_size(False) + table.set_fontsize(10) + table.scale(1, 1.5) + for i in range(len(table_data.columns)): + table.auto_set_column_width(i) + + fig_table.savefig(f"{outdir}/coeff_vs_{x_field}_table.png") + + plt.draw() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Aggregate the results of all paramsets of an experiment" + ) + parser.add_argument("path", type=str, help="dir path") + parser.add_argument("outdir", type=str, help="output dir path") + args = parser.parse_args() + analyze(args.path, args.outdir) diff --git a/mixnet/analysis/common.py b/mixnet/analysis/common.py new file mode 100644 index 0000000..f4d50f1 --- /dev/null +++ b/mixnet/analysis/common.py @@ -0,0 +1,24 @@ +MARKERS = { + "NonMix": "o", + "PureCoinFlipping": "s", + "PureRandomSampling": "^", + "PermutedCoinFlipping": "v", + "NoisyCoinFlipping": "*", + "NoisyCoinFlippingRandomRelease": "d", +} + +COLORS = { + "NonMix": "#35A02D", + "PureCoinFlipping": "#9467BD", + "PureRandomSampling": "#8C564B", + "PermutedCoinFlipping": "#D62728", + "NoisyCoinFlipping": "#3977B4", + "NoisyCoinFlippingRandomRelease": "#F07F12", +} + +X_FIELDS = [ + "num_mixes", + "peering_degree", + "sender_data_msg_prob", + "mix_data_msg_prob", +] diff --git a/mixnet/analysis/latency.py b/mixnet/analysis/latency.py new file mode 100644 index 0000000..bc17c6c --- /dev/null +++ b/mixnet/analysis/latency.py @@ -0,0 +1,88 @@ +import argparse + +import matplotlib.pyplot as plt +import pandas as pd + +from common import COLORS, MARKERS, X_FIELDS + + +def analyze(path: str, outdir: str): + data = pd.read_csv(path) + for x_field in X_FIELDS: + analyze_versus(data, x_field, outdir) + + +def analyze_versus(data: pd.DataFrame, x_field: str, outdir: str): + # Group by both x_field and queue_type, then select the row with the largest paramset for each group + max_paramset_data = data.loc[ + data.groupby([x_field, "queue_type"])["paramset"].idxmax() + ] + + fields = ["latency_min", "latency_mean", "latency_max"] + + # Display the plots + fig, ax = plt.subplots(1, 3, figsize=(20, 4)) + for ax_col, field in enumerate(fields): + for queue_type in max_paramset_data["queue_type"].unique(): + queue_data = max_paramset_data[ + max_paramset_data["queue_type"] == queue_type + ] + x_values = queue_data[x_field] + y_values = queue_data[field] + ax[ax_col].plot( + x_values, + y_values, + label=queue_type, + marker=MARKERS[queue_type], + color=COLORS[queue_type], + ) + + ax[ax_col].set_title(f"{field} vs {x_field}", fontsize=10) + ax[ax_col].set_xlabel(x_field) + ax[ax_col].set_ylabel(field) + if ax_col == len(fields) - 1: + ax[ax_col].legend(bbox_to_anchor=(1, 1), loc="upper left") + ax[ax_col].grid(True) + ax[ax_col].set_ylim(bottom=0) + + plt.tight_layout() + fig.savefig(f"{outdir}/latency_vs_{x_field}.png") + + # Display the table of values + # Create a table with x_field, queue_type, and coefficients + columns = [x_field, "queue_type"] + fields + table_data = max_paramset_data[columns].sort_values(by=[x_field, "queue_type"]) + # Prepare to display values with only 2 decimal places + table_data[fields] = table_data[fields].map( + lambda x: f"{x:.2e}" if abs(x) >= 1e6 else f"{x:.2f}" + ) + # Display the table as a separate subplot + fig_table, ax_table = plt.subplots( + figsize=(len(columns) * 1.8, len(table_data) * 0.3) + ) + ax_table.axis("off") # Turn off the axis + table = ax_table.table( + cellText=table_data.values, + colLabels=table_data.columns, + cellLoc="center", + loc="center", + ) + table.auto_set_font_size(False) + table.set_fontsize(10) + table.scale(1, 1.5) + for i in range(len(table_data.columns)): + table.auto_set_column_width(i) + + fig_table.savefig(f"{outdir}/latency_vs_{x_field}_table.png") + + plt.draw() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Aggregate the results of all paramsets of an experiment" + ) + parser.add_argument("path", type=str, help="dir path") + parser.add_argument("outdir", type=str, help="output dir path") + args = parser.parse_args() + analyze(args.path, args.outdir) diff --git a/mixnet/analysis/requirements.txt b/mixnet/analysis/requirements.txt new file mode 100644 index 0000000..94f4c83 --- /dev/null +++ b/mixnet/analysis/requirements.txt @@ -0,0 +1,2 @@ +matplotlib==3.9.2 +pandas==2.2.2 diff --git a/mixnet/dissemination/Cargo.toml b/mixnet/dissemination/Cargo.toml new file mode 100644 index 0000000..b98e3e3 --- /dev/null +++ b/mixnet/dissemination/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "dissemination" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrono = "0.4.38" +clap = { version = "4.5.16", features = ["derive"] } +csv = "1.3.0" +protocol = { version = "0.1.0", path = "../protocol" } +rand = "0.8.5" +rayon = "1.10.0" +rustc-hash = "2.0.0" +strum = "0.26.3" +strum_macros = "0.26.4" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" + +[profile.release] +opt-level = 3 # max optimization diff --git a/mixnet/dissemination/README.md b/mixnet/dissemination/README.md new file mode 100644 index 0000000..b1ca2f3 --- /dev/null +++ b/mixnet/dissemination/README.md @@ -0,0 +1,37 @@ +# Queuing Mechanism: Message Dissemination Experiments + +This directory contains the code for the [Message Dissemination Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#5120661bdf5343319e66c9372dd623b7), which is the part of the Queuing Mechanism Experiments. + +## Usages + +```bash +cargo install --path dissemination + +dissemination --exp-id 1 --session-id 1 --queue-type PureCoinFlipping --outdir $PWD/out --num-threads 4 +``` +- `exp-id`: [Experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4543053fbb8c4a2f8d49b0dffdb4a928) ID (starting from 1) +- `session-id`: [Session](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ced2155214f442ed95b442c18f5832f6) ID (starting from 1) +- `queue-type`: NonMix, PureCoinFlipping, PureRandomSampling, PermutedCoinFlipping, NoisyCoinFlipping, or NoisyCoinFlippingRandomRelease +- `outdir`: Output directory +- `num-threads`: The number of threads to run each iteration of the parameter sets of each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4543053fbb8c4a2f8d49b0dffdb4a928) + +## Outputs + +``` +/ + dissemination_e1s1_PureCoinFlipping_2024-09-16T09:09:08.793730+00:00_0d0h12m30s/ + paramset_[1..P]/ + paramset.csv + iteration_[0..I].csv + topology_[0..I].csv +``` +- `paramset_[1..P]/`: Each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4543053fbb8c4a2f8d49b0dffdb4a928) consists of multiple parameter sets. The result of each parameter set is stored in a separate directory. +- `paramset.csv`: The detailed parameters of the parameter set. +- `iteration_[0..I].csv`: The result of each iteration. + - Columns + - `dissemination_time`: The time taken to disseminate the message to the entire network + - `sent_time`: The time when the message was sent by a sender + - `all_received_time`: The time when the message was received by all nodes + - `dissemination_time = all_received_time - sent_time` + - All times used in the simulations are virtual discrete time units. +- `topology_[0..I].csv`: The randomly generated network topology of each iteration diff --git a/mixnet/dissemination/src/iteration.rs b/mixnet/dissemination/src/iteration.rs new file mode 100644 index 0000000..3b0649e --- /dev/null +++ b/mixnet/dissemination/src/iteration.rs @@ -0,0 +1,207 @@ +use protocol::{ + node::{Node, NodeId}, + queue::{Message, QueueConfig}, + topology::{build_topology, save_topology}, +}; +use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng}; +use rustc_hash::FxHashMap; + +use crate::paramset::ParamSet; + +type MessageId = u32; + +// An interval that the sender nodes send (schedule) new messages +const MSG_INTERVAL: f32 = 1.0; + +pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) { + // Initialize nodes (not connected with each other yet) + let mut nodes: Vec> = Vec::new(); + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let peering_degrees = paramset.gen_peering_degrees(seed); + tracing::debug!("PeeringDegrees initialized."); + + for node_id in 0..paramset.num_nodes { + nodes.push(Node::new( + node_id, + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + peering_degrees[node_id as usize], + true, + )); + } + tracing::debug!("Nodes initialized."); + + // Build a random topology, and connect nodes with each other + let topology = build_topology(paramset.num_nodes, &peering_degrees, seed); + tracing::debug!("Topology built."); + save_topology(&topology, topology_path).unwrap(); + tracing::debug!("Topology saved."); + for (node_id, peers) in topology.iter().enumerate() { + peers.iter().for_each(|peer_id| { + nodes[node_id].connect(*peer_id); + }); + } + tracing::debug!("Nodes connected"); + + let mut sender_selector = SenderSelector::new( + (0..paramset.num_nodes).collect(), + paramset.num_senders, + paramset.random_senders_every_time, + seed, + ); + + // To generate unique message IDs + let mut next_msg_id: MessageId = 0; + let total_num_msgs = paramset.total_num_messages(); + // msg_id -> (sent_vtime, num_received_nodes) + let mut message_tracker: FxHashMap = FxHashMap::default(); + // To keep track of how many messages have been disseminated to all nodes + let mut num_disseminated_msgs = 0; + + let mut writer = csv::Writer::from_path(out_csv_path).unwrap(); + writer + .write_record(["dissemination_time", "sent_time", "all_received_time"]) + .unwrap(); + writer.flush().unwrap(); + + // Virtual discrete time + let mut vtime: f32; + // Transmission interval that each queue must release a message + let transmission_interval = 1.0 / paramset.transmission_rate as f32; + // Jump `vtime` to one of the following two vtimes. + // 1. The next time to send (schedule) a message. Increased by `MSG_INTERVAL`. + let mut next_messaging_vtime: f32 = 0.0; + // 2. The next time to release a message from each queue and relay them. Increased by `transmission_interval`. + let mut next_transmission_vtime: f32 = 0.0; + loop { + // If there are still messages to be sent (scheduled), + // and if the next time to send a message is earlier than the next time to relay messages. + if next_msg_id < total_num_msgs && next_messaging_vtime <= next_transmission_vtime { + // Send new messages + vtime = next_messaging_vtime; + next_messaging_vtime += MSG_INTERVAL; + + send_messages( + vtime, + sender_selector.next(), + &mut nodes, + &mut next_msg_id, + &mut message_tracker, + ); + } else { + // Release a message from each queue and relay all of them + vtime = next_transmission_vtime; + next_transmission_vtime += transmission_interval; + + relay_messages( + vtime, + &mut nodes, + &mut message_tracker, + &mut num_disseminated_msgs, + &mut writer, + ); + + // Check if all messages have been disseminated to all nodes. + if num_disseminated_msgs == total_num_msgs as usize { + break; + } + } + } +} + +fn send_messages( + vtime: f32, + sender_ids: &[NodeId], + nodes: &mut [Node], + next_msg_id: &mut MessageId, + message_tracker: &mut FxHashMap, +) { + for &sender_id in sender_ids.iter() { + nodes[sender_id as usize].send(*next_msg_id); + message_tracker.insert(*next_msg_id, (vtime, 1)); + *next_msg_id += 1; + } +} + +fn relay_messages( + vtime: f32, + nodes: &mut [Node], + message_tracker: &mut FxHashMap, + num_disseminated_msgs: &mut usize, + writer: &mut csv::Writer, +) { + // Collect messages to relay + let mut all_msgs_to_relay: Vec)>> = Vec::new(); + for node in nodes.iter_mut() { + all_msgs_to_relay.push(node.read_queues()); + } + + // Relay the messages + all_msgs_to_relay + .into_iter() + .enumerate() + .for_each(|(sender_id, msgs_to_relay)| { + let sender_id: NodeId = sender_id.try_into().unwrap(); + msgs_to_relay.into_iter().for_each(|(receiver_id, msg)| { + if let Message::Data(msg) = msg { + if nodes[receiver_id as usize].receive(msg, Some(sender_id)) { + let (sent_time, num_received_nodes) = + message_tracker.get_mut(&msg).unwrap(); + *num_received_nodes += 1; + if *num_received_nodes as usize == nodes.len() { + let dissemination_time = vtime - *sent_time; + writer + .write_record(&[ + dissemination_time.to_string(), + sent_time.to_string(), + vtime.to_string(), + ]) + .unwrap(); + writer.flush().unwrap(); + *num_disseminated_msgs += 1; + + message_tracker.remove(&msg); + } + } + } + }) + }); +} + +struct SenderSelector { + candidates: Vec, + num_senders: NodeId, + random_senders_every_time: bool, + rng: StdRng, +} + +impl SenderSelector { + fn new( + candidates: Vec, + num_senders: u32, + random_senders_every_time: bool, + seed: u64, + ) -> Self { + assert!(candidates.len() >= num_senders as usize); + Self { + candidates, + num_senders, + random_senders_every_time, + rng: StdRng::seed_from_u64(seed), + } + } + + fn next(&mut self) -> &[NodeId] { + if !self.random_senders_every_time { + // It's okay to pick the first `num_senders` nodes + // because the topology is randomly generated. + &self.candidates[..self.num_senders as usize] + } else { + self.candidates.shuffle(&mut self.rng); + &self.candidates[..self.num_senders as usize] + } + } +} diff --git a/mixnet/dissemination/src/main.rs b/mixnet/dissemination/src/main.rs new file mode 100644 index 0000000..a341e05 --- /dev/null +++ b/mixnet/dissemination/src/main.rs @@ -0,0 +1,145 @@ +use chrono::Utc; +use clap::Parser; +use protocol::queue::QueueType; +use rayon::prelude::*; +use std::{ + error::Error, + path::Path, + time::{Duration, SystemTime}, +}; + +use iteration::run_iteration; +use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; + +mod iteration; +mod paramset; + +#[derive(Debug, Parser)] +#[command(name = "Message Dessemination Time Measurement")] +struct Args { + #[arg(short, long)] + exp_id: ExperimentId, + #[arg(short, long)] + session_id: SessionId, + #[arg(short, long)] + queue_type: QueueType, + #[arg(short, long)] + outdir: String, + #[arg(short, long)] + num_threads: usize, + #[arg(short, long)] + from_paramset: Option, +} + +fn main() { + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + tracing::info!("Arguments: {:?}", args); + let Args { + exp_id, + session_id, + queue_type, + outdir, + num_threads, + from_paramset, + } = args; + + // Create a directory and initialize a CSV file only with a header + assert!( + Path::new(&outdir).is_dir(), + "Output directory does not exist: {outdir}" + ); + let subdir = format!( + "__WIP__dissemination_e{}s{}_{:?}_{}___DUR__", + exp_id as u8, + session_id as u8, + queue_type, + Utc::now().to_rfc3339() + ); + std::fs::create_dir_all(&format!("{outdir}/{subdir}")).unwrap(); + + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + let session_start_time = SystemTime::now(); + + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(); + + pool.install(|| { + paramsets.par_iter().for_each(|paramset| { + if paramset.id < from_paramset.unwrap_or(0) { + tracing::info!("ParamSet:{} skipped", paramset.id); + return; + } + + let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id); + std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); + save_paramset_info(paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); + + for i in 0..paramset.num_iterations { + let out_csv_path = format!("{paramset_dir}/__WIP__iteration_{i}.csv"); + let topology_path = format!("{paramset_dir}/topology_{i}.csv"); + + run_iteration(paramset.clone(), i as u64, &out_csv_path, &topology_path); + + let new_out_csv_path = out_csv_path.replace("__WIP__iteration_", "iteration_"); + std::fs::rename(&out_csv_path, &new_out_csv_path) + .expect("Failed to rename: {out_csv_path} -> {new_out_csv_path}"); + tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i); + } + let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_"); + std::fs::rename(¶mset_dir, &new_paramset_dir) + .expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}"); + + tracing::info!("ParamSet:{} completed", paramset.id); + }); + }); + + let session_duration = SystemTime::now() + .duration_since(session_start_time) + .unwrap(); + + // Replace "__WIP__" and "__DUR__" in the subdir string + let new_subdir = subdir + .replace("__WIP__", "") + .replace("__DUR__", &format_duration(session_duration)); + let old_path = format!("{}/{}", outdir, subdir); + let new_path = format!("{}/{}", outdir, new_subdir); + assert!( + !Path::new(&new_path).exists(), + "The new directory already exists: {new_path}" + ); + std::fs::rename(&old_path, &new_path) + .expect("Failed to rename the directory: {old_path} -> {new_path}"); + + tracing::info!("Session completed."); +} + +fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box> { + // Assert that the file does not already exist + assert!( + !Path::new(path).exists(), + "File already exists at path: {path}", + ); + + let mut wtr = csv::Writer::from_path(path)?; + wtr.write_record(PARAMSET_CSV_COLUMNS)?; + wtr.write_record(paramset.as_csv_record())?; + wtr.flush()?; + + Ok(()) +} + +fn format_duration(duration: Duration) -> String { + let total_seconds = duration.as_secs(); + + let days = total_seconds / 86_400; + let hours = (total_seconds % 86_400) / 3_600; + let minutes = (total_seconds % 3_600) / 60; + let seconds = total_seconds % 60; + + format!("{}d{}h{}m{}s", days, hours, minutes, seconds) +} diff --git a/mixnet/dissemination/src/paramset.rs b/mixnet/dissemination/src/paramset.rs new file mode 100644 index 0000000..6aaae32 --- /dev/null +++ b/mixnet/dissemination/src/paramset.rs @@ -0,0 +1,520 @@ +use protocol::queue::QueueType; +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum ExperimentId { + Experiment1 = 1, + Experiment2 = 2, + Experiment3 = 3, + Experiment4 = 4, + Experiment5 = 5, +} + +impl std::str::FromStr for ExperimentId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" | "Experiment1" => Ok(ExperimentId::Experiment1), + "2" | "Experiment2" => Ok(ExperimentId::Experiment2), + "3" | "Experiment3" => Ok(ExperimentId::Experiment3), + "4" | "Experiment4" => Ok(ExperimentId::Experiment4), + "5" | "Experiment5" => Ok(ExperimentId::Experiment5), + _ => Err(format!("Invalid experiment ID: {}", s)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum SessionId { + Session1 = 1, + Session2 = 2, + Session2_1 = 21, + Session3 = 3, +} + +impl std::str::FromStr for SessionId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" | "Session1" => Ok(SessionId::Session1), + "2" | "Session2" => Ok(SessionId::Session2), + "2.1" | "Session21" => Ok(SessionId::Session2_1), + "3" | "Session3" => Ok(SessionId::Session3), + _ => Err(format!("Invalid session ID: {}", s)), + } + } +} + +pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ + "paramset", + "num_nodes", + "peering_degree", + "min_queue_size", + "transmission_rate", + "num_sent_msgs", + "num_senders", + "random_senders_every_time", + "queue_type", + "num_iterations", +]; + +#[derive(Debug, Clone, PartialEq)] +pub struct ParamSet { + pub id: u16, + pub num_nodes: u32, + pub peering_degree_rates: PeeringDegreeRates, + pub min_queue_size: u16, + pub transmission_rate: u16, + pub num_sent_msgs: u32, + pub num_senders: u32, + pub random_senders_every_time: bool, + pub queue_type: QueueType, + pub num_iterations: usize, +} + +// peering_degree -> rate +// Use Vec instead of HashMap to avoid unexpected undeterministic behavior +type PeeringDegreeRates = Vec<(u32, f32)>; + +impl ParamSet { + pub fn new_all_paramsets( + exp_id: ExperimentId, + session_id: SessionId, + queue_type: QueueType, + ) -> Vec { + match session_id { + SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type), + SessionId::Session2 => Self::new_session2_paramsets(exp_id, queue_type), + SessionId::Session2_1 => Self::new_session2_1_paramsets(exp_id, queue_type), + SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type), + } + } + + fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let mut start_id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + for &num_nodes in &[20u32, 40u32, 80u32] { + let peering_degrees_list = &[ + vec![(num_nodes.checked_div(5).unwrap(), 1.0)], + vec![(num_nodes.checked_div(4).unwrap(), 1.0)], + vec![(num_nodes.checked_div(2).unwrap(), 1.0)], + ]; + let min_queue_size_list = &[ + num_nodes.checked_div(2).unwrap().try_into().unwrap(), + num_nodes.try_into().unwrap(), + num_nodes.checked_mul(2).unwrap().try_into().unwrap(), + ]; + let transmission_rate_list = &[ + num_nodes.checked_div(2).unwrap().try_into().unwrap(), + num_nodes.try_into().unwrap(), + num_nodes.checked_mul(2).unwrap().try_into().unwrap(), + ]; + let num_sent_msgs_list = |_| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 + | ExperimentId::Experiment4 + | ExperimentId::Experiment5 => vec![8, 16, 32], + }; + let num_senders_list = match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 + | ExperimentId::Experiment4 + | ExperimentId::Experiment5 => { + vec![ + num_nodes.checked_div(10).unwrap(), + num_nodes.checked_div(5).unwrap(), + num_nodes.checked_div(2).unwrap(), + ] + } + }; + let random_senders_every_time = exp_id == ExperimentId::Experiment5; + let num_iterations = num_nodes.checked_div(2).unwrap().try_into().unwrap(); + + let (mut new_paramsets, next_start_id) = Self::new_paramsets( + start_id, + num_nodes, + peering_degrees_list, + min_queue_size_list, + transmission_rate_list, + num_sent_msgs_list, + num_senders_list.as_slice(), + random_senders_every_time, + queue_type, + num_iterations, + ); + paramsets.append(&mut new_paramsets); + start_id = next_start_id; + } + paramsets + } + + fn new_session2_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let mut start_id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + for &num_nodes in &[100u32, 1000u32, 10000u32] { + let peering_degrees_list = &[vec![(4, 1.0)], vec![(8, 1.0)], vec![(16, 1.0)]]; + let min_queue_size_list = &[10, 50, 100]; + let transmission_rate_list = &[1, 10, 100]; + let num_sent_msgs_list = |min_queue_size: u16| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 + | ExperimentId::Experiment4 + | ExperimentId::Experiment5 => { + vec![ + min_queue_size.checked_div(2).unwrap().into(), + min_queue_size.into(), + min_queue_size.checked_mul(2).unwrap().into(), + ] + } + }; + let num_senders_list = match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 + | ExperimentId::Experiment4 + | ExperimentId::Experiment5 => { + vec![ + num_nodes.checked_div(10).unwrap(), + num_nodes.checked_div(5).unwrap(), + num_nodes.checked_div(2).unwrap(), + ] + } + }; + let random_senders_every_time = exp_id == ExperimentId::Experiment5; + let num_iterations = 20; + + let (mut new_paramsets, next_start_id) = Self::new_paramsets( + start_id, + num_nodes, + peering_degrees_list, + min_queue_size_list, + transmission_rate_list, + num_sent_msgs_list, + num_senders_list.as_slice(), + random_senders_every_time, + queue_type, + num_iterations, + ); + paramsets.append(&mut new_paramsets); + start_id = next_start_id; + } + paramsets + } + + fn new_session2_1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let mut start_id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + for &num_nodes in &[20u32, 200u32, 2000u32] { + let peering_degrees_list = &[vec![(4, 1.0)], vec![(6, 1.0)], vec![(8, 1.0)]]; + let min_queue_size_list = &[10, 50, 100]; + let transmission_rate_list = &[1]; + let num_sent_msgs_list = |_| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 + | ExperimentId::Experiment4 + | ExperimentId::Experiment5 => vec![1000], + }; + let num_senders_list = match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 + | ExperimentId::Experiment4 + | ExperimentId::Experiment5 => { + vec![ + num_nodes.checked_div(10).unwrap(), + num_nodes.checked_div(5).unwrap(), + num_nodes.checked_div(2).unwrap(), + ] + } + }; + let random_senders_every_time = exp_id == ExperimentId::Experiment5; + let num_iterations = 20; + + let (mut new_paramsets, next_start_id) = Self::new_paramsets( + start_id, + num_nodes, + peering_degrees_list, + min_queue_size_list, + transmission_rate_list, + num_sent_msgs_list, + num_senders_list.as_slice(), + random_senders_every_time, + queue_type, + num_iterations, + ); + paramsets.append(&mut new_paramsets); + start_id = next_start_id; + } + paramsets + } + + fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let start_id: u16 = 1; + + let num_nodes: u32 = 100000; + let peering_degrees = vec![(4, 0.87), (129, 0.123), (500, 0.07)]; + let min_queue_size_list = &[10, 50, 100]; + let transmission_rate_list = &[1]; + let num_sent_msgs_list = |min_queue_size: u16| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1], + ExperimentId::Experiment2 | ExperimentId::Experiment4 | ExperimentId::Experiment5 => { + vec![ + min_queue_size.checked_div(2).unwrap().into(), + min_queue_size.into(), + min_queue_size.checked_mul(2).unwrap().into(), + ] + } + }; + let num_senders_list = match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1], + ExperimentId::Experiment3 | ExperimentId::Experiment4 | ExperimentId::Experiment5 => { + vec![ + num_nodes.checked_div(10).unwrap(), + num_nodes.checked_div(5).unwrap(), + num_nodes.checked_div(2).unwrap(), + ] + } + }; + let random_senders_every_time = exp_id == ExperimentId::Experiment5; + let num_iterations = 100; + + let (paramsets, _) = Self::new_paramsets( + start_id, + num_nodes, + &[peering_degrees], + min_queue_size_list, + transmission_rate_list, + num_sent_msgs_list, + num_senders_list.as_slice(), + random_senders_every_time, + queue_type, + num_iterations, + ); + paramsets + } + + #[allow(clippy::too_many_arguments)] + fn new_paramsets( + start_id: u16, + num_nodes: u32, + peering_degrees_list: &[PeeringDegreeRates], + min_queue_size_list: &[u16], + transmission_rate_list: &[u16], + num_sent_msgs_list: impl Fn(u16) -> Vec, + num_senders_list: &[u32], + random_senders_every_time: bool, + queue_type: QueueType, + num_iterations: usize, + ) -> (Vec, u16) { + let mut id = start_id; + let mut paramsets: Vec = Vec::new(); + for peering_degrees in peering_degrees_list { + for &min_queue_size in min_queue_size_list { + for &transmission_rate in transmission_rate_list { + for &num_sent_msgs in num_sent_msgs_list(min_queue_size).iter() { + for &num_senders in num_senders_list { + if !Self::is_min_queue_size_applicable(&queue_type) + && min_queue_size != min_queue_size_list[0] + { + id += 1; + continue; + } + paramsets.push(ParamSet { + id, + num_nodes, + peering_degree_rates: peering_degrees.clone(), + min_queue_size, + transmission_rate, + num_sent_msgs, + num_senders, + random_senders_every_time, + queue_type, + num_iterations, + }); + id += 1; + } + } + } + } + } + (paramsets, id) + } + + pub fn is_min_queue_size_applicable(queue_type: &QueueType) -> bool { + matches!( + queue_type, + QueueType::PureCoinFlipping + | QueueType::PureRandomSampling + | QueueType::PermutedCoinFlipping + ) + } + + pub fn total_num_messages(&self) -> u32 { + self.num_sent_msgs.checked_mul(self.num_senders).unwrap() + } + + pub fn as_csv_record(&self) -> Vec { + let peering_degrees = self + .peering_degree_rates + .iter() + .map(|(degree, rate)| format!("({degree}:{rate})")) + .collect::>() + .join(","); + vec![ + self.id.to_string(), + self.num_nodes.to_string(), + format!("[{peering_degrees}]"), + self.min_queue_size.to_string(), + self.transmission_rate.to_string(), + self.num_sent_msgs.to_string(), + self.num_senders.to_string(), + self.random_senders_every_time.to_string(), + format!("{:?}", self.queue_type), + self.num_iterations.to_string(), + ] + } + + pub fn gen_peering_degrees(&self, seed: u64) -> Vec { + let mut vec = Vec::with_capacity(self.num_nodes as usize); + self.peering_degree_rates.iter().for_each(|(degree, rate)| { + let num_nodes = std::cmp::min( + (self.num_nodes as f32 * rate).round() as u32, + self.num_nodes - vec.len() as u32, + ); + vec.extend(std::iter::repeat(*degree).take(num_nodes as usize)); + }); + assert_eq!(vec.len(), self.num_nodes as usize); + vec.shuffle(&mut StdRng::seed_from_u64(seed)); + vec + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use strum::IntoEnumIterator; + + use crate::paramset::ParamSet; + + use super::*; + + #[test] + fn test_new_all_paramsets() { + let cases = vec![ + ( + (ExperimentId::Experiment1, SessionId::Session1), + 3u32.pow(4), + ), + ( + (ExperimentId::Experiment2, SessionId::Session1), + 3u32.pow(5), + ), + ( + (ExperimentId::Experiment3, SessionId::Session1), + 3u32.pow(5), + ), + ( + (ExperimentId::Experiment4, SessionId::Session1), + 3u32.pow(6), + ), + ( + (ExperimentId::Experiment5, SessionId::Session1), + 3u32.pow(6), + ), + ( + (ExperimentId::Experiment1, SessionId::Session2), + 3u32.pow(4), + ), + ( + (ExperimentId::Experiment4, SessionId::Session2), + 3u32.pow(6), + ), + ( + (ExperimentId::Experiment5, SessionId::Session2), + 3u32.pow(6), + ), + ( + (ExperimentId::Experiment1, SessionId::Session2_1), + 3u32.pow(3), + ), + ( + (ExperimentId::Experiment4, SessionId::Session2_1), + 3u32.pow(4), + ), + ( + (ExperimentId::Experiment5, SessionId::Session2_1), + 3u32.pow(4), + ), + ( + (ExperimentId::Experiment5, SessionId::Session3), + 3u32.pow(3), + ), + ]; + + for queue_type in QueueType::iter() { + for ((exp_id, session_id), mut expected_cnt) in cases.clone().into_iter() { + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + // Check if the number of parameter sets is correct + if !ParamSet::is_min_queue_size_applicable(&queue_type) { + expected_cnt /= 3; + } + assert_eq!(paramsets.len(), expected_cnt as usize); + + // Check if all parameter sets are unique + let unique_paramsets: HashSet> = paramsets + .iter() + .map(|paramset| paramset.as_csv_record()) + .collect(); + assert_eq!(unique_paramsets.len(), paramsets.len()); + + // Check if paramset IDs are correct. + if ParamSet::is_min_queue_size_applicable(&queue_type) { + for (i, paramset) in paramsets.iter().enumerate() { + assert_eq!(paramset.id as usize, i + 1); + } + } + } + } + } + + #[test] + fn test_id_consistency() { + let cases = vec![ + (ExperimentId::Experiment1, SessionId::Session1), + (ExperimentId::Experiment2, SessionId::Session1), + (ExperimentId::Experiment3, SessionId::Session1), + (ExperimentId::Experiment4, SessionId::Session1), + (ExperimentId::Experiment5, SessionId::Session1), + (ExperimentId::Experiment1, SessionId::Session2), + (ExperimentId::Experiment4, SessionId::Session2), + (ExperimentId::Experiment5, SessionId::Session2), + (ExperimentId::Experiment1, SessionId::Session2_1), + (ExperimentId::Experiment4, SessionId::Session2_1), + (ExperimentId::Experiment5, SessionId::Session2_1), + (ExperimentId::Experiment5, SessionId::Session3), + ]; + + for (exp_id, session_id) in cases.into_iter() { + let paramsets_with_min_queue_size = + ParamSet::new_all_paramsets(exp_id, session_id, QueueType::PureCoinFlipping); + let paramsets_without_min_queue_size = + ParamSet::new_all_paramsets(exp_id, session_id, QueueType::NonMix); + + for (i, paramset) in paramsets_with_min_queue_size.iter().enumerate() { + assert_eq!(paramset.id as usize, i + 1); + } + + for mut paramset in paramsets_without_min_queue_size.into_iter() { + // To compare ParameterSet instances, use the same queue type. + paramset.queue_type = QueueType::PureCoinFlipping; + assert_eq!( + paramset, + paramsets_with_min_queue_size[paramset.id as usize - 1] + ); + } + } + } +} diff --git a/mixnet/ordering/Cargo.toml b/mixnet/ordering/Cargo.toml new file mode 100644 index 0000000..317d30f --- /dev/null +++ b/mixnet/ordering/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "ordering" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrono = "0.4.38" +clap = { version = "4.5.16", features = ["derive"] } +crossbeam = "0.8.4" +csv = "1.3.0" +protocol = { version = "0.1.0", path = "../protocol" } +rand = "0.8.5" +rustc-hash = "2.0.0" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" +walkdir = "2.3" +glob = "0.3" +polars = { version = "0.42.0", features = [ + "csv", + "diagonal_concat", + "polars-io", + "zip_with", +] } + +[profile.release] +opt-level = 3 # max optimization + +[dev-dependencies] +strum = "0.26.3" diff --git a/mixnet/ordering/README.md b/mixnet/ordering/README.md new file mode 100644 index 0000000..656268d --- /dev/null +++ b/mixnet/ordering/README.md @@ -0,0 +1,91 @@ +# Queuing Mechanism: Ordering Experiments + +This directory contains the code for the [Ordering Experiments](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#4d38e8790ecd492a812c733bf140b864), which is the part of the Queuing Mechanism Experiments. + +## Usages + +```bash +cargo install --path ordering + +ordering --exp-id 1 --session-id 1 --queue-type PureCoinFlipping --outdir $PWD/out --num-threads 4 +``` +- `exp-id`: [Experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ffbcb5071dbb482bad035ef01cf8d49d) ID (starting from 1) +- `session-id`: [Session](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#df7de2a64b1e4e778c5d793bf03be25e) ID (starting from 1) +- `queue-type`: NonMix, PureCoinFlipping, PureRandomSampling, PermutedCoinFlipping, NoisyCoinFlipping, or NoisyCoinFlippingRandomRelease +- `outdir`: Output directory +- `num-threads`: The number of threads to run each iteration of the parameter sets of each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ffbcb5071dbb482bad035ef01cf8d49d) + +## Outputs + +``` +/ + ordering_e1s1_PureCoinFlipping_2024-09-16T09:18:59.482141+00:00_0d0h0m10s/ + paramset_[1..P]/ + paramset.csv + iteration_[0..I]_0d0h0m0s/ + topology.csv + latency.csv + data_msg_counts.csv + sent_seq_[0..S].csv + recv_seq_[0..R].csv +``` +- `paramset_[1..P]/`: Each [experiment](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ffbcb5071dbb482bad035ef01cf8d49d) consists of multiple parameter sets. The result of each parameter set is stored in a separate directory. +- `paramset.csv`: The detailed parameters of the parameter set. +- `iteration_[0..I]_0d0h0m0s`: The result of each iteration (with the elapsed time) + - `toplogy.csv`: The randomly generated topology of the network + - `latency.csv`: The latency for each message to be delivered from the sender to the receiver + - `data_msg_counts.csv`: The number of data messages staying in each queue in each time window + - `sent_seq_[0..S].csv`: The sequence of sent messages by the sender + - `sent_seq_[0..R].csv`: The sequence of received messages by the receiver + +## Aggregation Tools + +Since the ordering experiments are heavy, the aggregation tools are provided separately to aggregate the results of all experiments ans parameter sets after the experiments are done. + +### Latency Aggregation + +This tool reads all `**/iteration_*/latency.csv` files and aggregates all latencies into a single CSV file: `**/paramset_[1..P]/latency_stats.csv`, as below. +```csv +min,median,mean,std,max +0,93.0,123.07003891050584,109.38605760356785,527 +``` + +### Data Message Counts Aggregation + +This tool reads all `**/iteration_*/data_msg_counts.csv` files and aggregates all counts into a single CSV file: `**/paramset_[1..P]/data_msg_counts_stats.csv`, as below. +```csv +min,median,mean,std,max +0,1.0,9.231619223429988,31.290104671648148,289 +``` + +### Ordering Coefficient Calculation + +This tool is not an aggregation tool, but it calculates the [strong/casual/weak ordering coefficients](https://www.notion.so/Nomos-Mix-Queueing-Mechanism-Experimentation-Methodology-d629af5a2d43473c9ec9ba191f6d904d?pvs=4#ee984d48bd6b4fe3b2acc1000e4ae77b) +from the `**/iteration_*/sent_seq_*.csv` and `**/iteration_*/recv_seq_*.csv` files. +The result is stored in CSV files: `**/iteration_*/coeffs_[sender_id]_[receiver_id].csv`, as below. +```csv +sender,receiver,strong,casual,weak +0,1,0,1,4 +``` + +### Ordering Coefficient Aggregation + +This tool reads all `**/iteration_*/coeffs_*_*.csv` files (calculated [above](#ordering-coefficient-calculation)) and aggregates all coefficients into three CSV file: `**/paramset_[1..P]/[strong|casual|weak]_coeff_stats.csv`, as below. +```csv +min,median,mean,std,max +0,0.0,0.25,0.4442616583193193,1 +``` + +### Final Aggregation Across All Experiments + +This tool reads all of the following files: +- `**/latency_stats.csv` +- `**/data_msg_counts_stats.csv` +- `**/[strong|casual|weak]_coeff_stats.csv` +and aggregates them into a single CSV file: `aggregated.csv`, as below. +```csv +paramset,num_mixes,num_paths,random_topology,peering_degree,min_queue_size,transmission_rate,num_senders,num_sender_msgs,sender_data_msg_prob,mix_data_msg_prob,queue_type,num_iterations,data_msg_count_min,data_msg_count_median,data_msg_count_mean,data_msg_count_std,data_msg_count_max,latency_min,latency_median,latency_mean,latency_std,latency_max,strong_coeff_min,strong_coeff_median,strong_coeff_mean,strong_coeff_std,strong_coeff_max,casual_coeff_min,casual_coeff_median,casual_coeff_mean,casual_coeff_std,casual_coeff_max,weak_coeff_min,weak_coeff_median,weak_coeff_mean,weak_coeff_std,weak_coeff_max +1,8,0,true,2,10,1,1,10000,0.01,0.0625,NoisyCoinFlipping,10,0,1.0,9.231619223429988,31.290104671648148,289,0,93.0,123.07003891050584,109.38605760356785,527,0,0.0,0.25,0.4442616583193193,1,0,0.0,0.45,0.6863327411532597,2,0,2.0,1.85,1.5312533566021211,5 +... +``` +This `aggregated.csv` file can be analyzed by the [analysis tools](../analysis). diff --git a/mixnet/ordering/src/bin/aggregate.rs b/mixnet/ordering/src/bin/aggregate.rs new file mode 100644 index 0000000..6cd5cc9 --- /dev/null +++ b/mixnet/ordering/src/bin/aggregate.rs @@ -0,0 +1,127 @@ +use std::{ + env, + fs::File, + path::{Path, PathBuf}, +}; + +use polars::prelude::*; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + let mut schema = Schema::new(); + schema.with_column("paramset".into(), DataType::Int64); + schema.with_column("num_mixes".into(), DataType::Int64); + schema.with_column("num_paths".into(), DataType::Int64); + schema.with_column("random_topology".into(), DataType::Boolean); + schema.with_column("peering_degree".into(), DataType::String); + schema.with_column("min_queue_size".into(), DataType::Int64); + schema.with_column("transmission_rate".into(), DataType::Int64); + schema.with_column("num_senders".into(), DataType::Int64); + schema.with_column("num_sender_msgs".into(), DataType::Int64); + schema.with_column("sender_data_msg_prob".into(), DataType::Float32); + schema.with_column("mix_data_msg_prob".into(), DataType::Float32); + schema.with_column("queue_type".into(), DataType::String); + schema.with_column("num_iterations".into(), DataType::Int64); + + let mut dataframes: Vec = Vec::new(); + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut df = CsvReadOptions::default() + .with_has_header(true) + .with_schema(Some(SchemaRef::new(schema.clone()))) + .try_into_reader_with_file_path(Some(entry.path().join("paramset.csv"))) + .unwrap() + .finish() + .unwrap(); + + add_stats_columns( + &mut df, + entry.path().join("data_msg_counts_stats.csv"), + "data_msg_count_", + ); + add_stats_columns(&mut df, entry.path().join("latency_stats.csv"), "latency_"); + + for prefix in ["strong", "casual", "weak"] { + let coeff_stats_path = entry.path().join(format!("{}_coeff_stats.csv", prefix)); + if coeff_stats_path.exists() { + add_stats_columns(&mut df, coeff_stats_path, &format!("{}_coeff_", prefix)); + } + } + + dataframes.push(df); + } + } + + if !dataframes.is_empty() { + let df = polars::functions::concat_df_diagonal(dataframes.as_slice()).unwrap(); + let mut df = df + .sort(["paramset", "queue_type"], SortMultipleOptions::default()) + .unwrap(); + let outpath = Path::new(path).join("aggregated.csv"); + let mut file = File::create(&outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath.display()); + } +} + +fn add_stats_columns(df: &mut DataFrame, path: PathBuf, col_prefix: &str) { + let mut schema = Schema::new(); + schema.with_column("min".into(), DataType::UInt64); + schema.with_column("median".into(), DataType::Float64); + schema.with_column("mean".into(), DataType::Float64); + schema.with_column("std".into(), DataType::Float64); + schema.with_column("max".into(), DataType::UInt64); + + let stats_df = CsvReadOptions::default() + .with_has_header(true) + .with_schema(Some(SchemaRef::new(schema))) + .try_into_reader_with_file_path(Some(path)) + .unwrap() + .finish() + .unwrap(); + df.with_column( + stats_df["min"] + .head(Some(1)) + .with_name(format!("{col_prefix}min").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["median"] + .head(Some(1)) + .with_name(format!("{col_prefix}median").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["mean"] + .head(Some(1)) + .with_name(format!("{col_prefix}mean").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["std"] + .head(Some(1)) + .with_name(format!("{col_prefix}std").as_str()), + ) + .unwrap(); + df.with_column( + stats_df["max"] + .head(Some(1)) + .with_name(format!("{col_prefix}max").as_str()), + ) + .unwrap(); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +} diff --git a/mixnet/ordering/src/bin/coeff.rs b/mixnet/ordering/src/bin/coeff.rs new file mode 100644 index 0000000..9422570 --- /dev/null +++ b/mixnet/ordering/src/bin/coeff.rs @@ -0,0 +1,522 @@ +use std::{fs::File, path::PathBuf}; + +use clap::Parser; +use glob::glob; +use polars::prelude::*; +use walkdir::WalkDir; + +use ordering::message::{DataMessage, SenderIdx}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Entry { + Data(DataMessage), + Noise(u32), // the number of consecutive noises +} + +fn load_sequence(path: &str) -> Vec { + let mut entries: Vec = Vec::new(); + + let mut reader = csv::ReaderBuilder::new() + .has_headers(false) + .from_path(path) + .unwrap(); + + for result in reader.records() { + let record = result.unwrap(); + let value = &record[0]; + + if let Ok(num) = value.parse::() { + assert!(num < 0); + entries.push(Entry::Noise(num.unsigned_abs())); + } else { + entries.push(Entry::Data(parse_data_msg(value))); + } + } + + entries +} + +fn parse_data_msg(value: &str) -> DataMessage { + let parts: Vec<&str> = value.split(':').collect(); + assert_eq!(parts.len(), 2); + DataMessage { + sender: parts[0].parse::().unwrap(), + msg_id: parts[1].parse::().unwrap(), + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum OrderingType { + Strong, + Casual, + Weak, +} + +fn coeff(seq1: &[Entry], seq2: &[Entry], ordering_type: OrderingType) -> u64 { + let mut coeff: u64 = 0; + let mut i = 0; + + while i < seq1.len() { + if let Entry::Data(_) = &seq1[i] { + let (c, next_i) = coeff_from(seq1, i, seq2, ordering_type); + coeff = coeff.saturating_add(c); + if coeff == u64::MAX { + // no need to continue + break; + } + + if next_i != i { + i = next_i; + } else { + i += 1; + } + } else { + i += 1; + } + } + + coeff +} + +fn coeff_from( + seq1: &[Entry], + start_idx: usize, + seq2: &[Entry], + ordering_type: OrderingType, +) -> (u64, usize) { + let msg1 = match seq1[start_idx] { + Entry::Data(msg) => msg, + _ => panic!("Entry at index {start_idx} must be Message"), + }; + + for (j, entry) in seq2.iter().enumerate() { + if let Entry::Data(msg2) = entry { + if msg1 == *msg2 { + // Found the 1st matching msg. Start finding the next adjacent matching msg. + match ordering_type { + OrderingType::Strong => { + return strong_coeff_from(seq1, start_idx, seq2, j); + } + OrderingType::Casual => { + return casual_coeff_from(seq1, start_idx, seq2, j); + } + OrderingType::Weak => { + return weak_coeff_from(seq1, start_idx, seq2, j); + } + } + } + } + } + + // Couldn't find any matching msg in seq2. Returning the zero coefficients and the same start index. + (0, start_idx) +} + +fn strong_coeff_from( + seq1: &[Entry], + start_idx: usize, + seq2: &[Entry], + seq2_start_idx: usize, +) -> (u64, usize) { + // Find the number of consecutive matching exactly pairs that don't contain noises. + let mut num_consecutive_pairs: u64 = 0; + let mut i = start_idx + 1; + let mut j = seq2_start_idx + 1; + while i < seq1.len() && j < seq2.len() { + match (&seq1[i], &seq2[j]) { + (Entry::Data(msg1), Entry::Data(msg2)) => { + if msg1 == msg2 { + num_consecutive_pairs += 1; + i += 1; + j += 1; + } else { + break; + } + } + _ => break, + } + } + + let coeff = if num_consecutive_pairs == 0 { + 0 + } else { + num_consecutive_pairs.saturating_pow(num_consecutive_pairs.try_into().unwrap()) + }; + (coeff, i) +} + +fn casual_coeff_from( + seq1: &[Entry], + start_idx: usize, + seq2: &[Entry], + seq2_start_idx: usize, +) -> (u64, usize) { + // Find the number of consecutive matching pairs while accounting for noises. + let mut coeff = 0; + let mut i = start_idx + 1; + let mut j = seq2_start_idx + 1; + while i < seq1.len() && j < seq2.len() { + match (&seq1[i], &seq2[j]) { + (Entry::Noise(cnt1), Entry::Noise(cnt2)) => { + if cnt1 == cnt2 { + i += 1; + j += 1; + } else { + break; + } + } + (Entry::Data(msg1), Entry::Data(msg2)) => { + if msg1 == msg2 { + coeff += 1; + i += 1; + j += 1; + } else { + break; + } + } + _ => break, + } + } + (coeff, i) +} + +fn weak_coeff_from( + seq1: &[Entry], + start_idx: usize, + seq2: &[Entry], + seq2_start_idx: usize, +) -> (u64, usize) { + // Find the number of consecutive matching pairs with ignoring noises. + let mut coeff = 0; + let mut i = start_idx + 1; + let mut j = seq2_start_idx + 1; + while i < seq1.len() && j < seq2.len() { + i = skip_noise(seq1, i); + j = skip_noise(seq2, j); + if i < seq1.len() && j < seq2.len() && seq1[i] == seq2[j] { + coeff += 1; + i += 1; + j += 1; + } else { + break; + } + } + (coeff, i) +} + +fn skip_noise(seq: &[Entry], mut index: usize) -> usize { + while index < seq.len() { + if let Entry::Data(_) = seq[index] { + break; + } + index += 1; + } + index +} + +#[derive(Debug, Parser)] +#[command(name = "Calculating ordering coefficients")] +struct Args { + #[arg(short, long)] + path: String, + #[arg(short, long)] + num_threads: usize, + #[arg(short, long)] + sent_seq_limit: Option, +} + +fn main() { + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + calculate_coeffs(&args); +} + +fn calculate_coeffs(args: &Args) { + let mut tasks: Vec = Vec::new(); + for entry in WalkDir::new(args.path.as_str()) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("iteration_") { + for sent_seq_file in glob(&format!("{}/sent_seq_*.csv", entry.path().display())) + .unwrap() + .filter_map(Result::ok) + { + let sender = + extract_id(&sent_seq_file.file_name().unwrap().to_string_lossy()).unwrap(); + + for recv_seq_file in glob(&format!("{}/recv_seq_*.csv", entry.path().display())) + .unwrap() + .filter_map(Result::ok) + { + let receiver = + extract_id(&recv_seq_file.file_name().unwrap().to_string_lossy()).unwrap(); + + let task = Task { + sent_seq_file: sent_seq_file.clone(), + recv_seq_file: recv_seq_file.clone(), + sender, + receiver, + outpath: entry + .path() + .join(format!("coeffs_{}_{}.csv", sender, receiver)), + sent_seq_limit: args.sent_seq_limit, + }; + tasks.push(task); + } + } + } + } + + let (task_tx, task_rx) = crossbeam::channel::unbounded::(); + let mut threads = Vec::with_capacity(args.num_threads); + for _ in 0..args.num_threads { + let task_rx = task_rx.clone(); + + let thread = std::thread::spawn(move || { + while let Ok(task) = task_rx.recv() { + task.run(); + } + }); + threads.push(thread); + } + + for task in tasks { + task_tx.send(task).unwrap(); + } + // Close the task sender channel, so that the threads can know that there's no task remains. + drop(task_tx); + + for thread in threads { + thread.join().unwrap(); + } +} + +fn extract_id(filename: &str) -> Option { + if let Some(stripped) = filename.strip_suffix(".csv") { + if let Some(stripped) = stripped.strip_prefix("sent_seq_") { + return stripped.parse::().ok(); + } else if let Some(stripped) = stripped.strip_prefix("recv_seq_") { + return stripped.parse::().ok(); + } + } + None +} + +struct Task { + sent_seq_file: PathBuf, + recv_seq_file: PathBuf, + sender: u8, + receiver: u8, + outpath: PathBuf, + sent_seq_limit: Option, +} + +impl Task { + fn run(&self) { + tracing::info!( + "Processing:\n {}\n {}", + self.sent_seq_file.display(), + self.recv_seq_file.display() + ); + + let mut sent_seq = load_sequence(self.sent_seq_file.to_str().unwrap()); + let recv_seq = load_sequence(self.recv_seq_file.to_str().unwrap()); + + if let Some(limit) = self.sent_seq_limit { + if sent_seq.len() > limit { + sent_seq.truncate(limit); + } + } + + let strong = coeff(&sent_seq, &recv_seq, OrderingType::Strong); + let casual = coeff(&sent_seq, &recv_seq, OrderingType::Casual); + let weak = coeff(&sent_seq, &recv_seq, OrderingType::Weak); + + let mut df = DataFrame::new(vec![ + Series::new("sender", &[self.sender as u64]), + Series::new("receiver", &[self.receiver as u64]), + Series::new("strong", &[strong]), + Series::new("casual", &[casual]), + Series::new("weak", &[weak]), + ]) + .unwrap() + .sort(["sender", "receiver"], SortMultipleOptions::default()) + .unwrap(); + let mut file = File::create(&self.outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + tracing::info!("Saved {}", self.outpath.display()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_strong_coeff() { + // Empty sequences + assert_eq!(coeff(&[], &[], OrderingType::Strong), 0); + + // One matching pair without noise + let seq = vec![data(1), data(2)]; + assert_eq!(coeff(&seq, &seq, OrderingType::Strong), 1); + + // No matching pair due to noise + let seq = vec![data(1), noise(10), data(2)]; + assert_eq!(coeff(&seq, &seq, OrderingType::Strong), 0); + + // One matching pair without noise from different sequences + let seq1 = vec![data(1), data(2), data(3)]; + let seq2 = vec![data(1), data(2), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 1); + let seq1 = vec![data(4), data(2), data(3)]; + let seq2 = vec![data(1), data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 1); + + // One pair, not two because of noise + let seq1 = vec![data(1), noise(10), data(2), data(3)]; + let seq2 = vec![data(1), noise(10), data(2), data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 1); + + // No match + let seq1 = vec![data(1), data(2)]; + let seq2 = vec![data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 0); + let seq1 = vec![data(1), data(2)]; + let seq2 = vec![data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 0); + + // Matching pairs in different indexes + let seq1 = vec![data(1), data(2), data(3), data(4)]; + let seq2 = vec![data(2), data(3), data(4), data(1)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 4); + let seq1 = vec![data(1), data(2), data(3), data(4)]; + let seq2 = vec![data(1), data(2), data(5), data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Strong), 2); + } + + #[test] + fn test_casual_coeff() { + // Empty sequences + assert_eq!(coeff(&[], &[], OrderingType::Casual), 0); + + // One matching pair without noise + let seq = vec![data(1), data(2)]; + assert_eq!(coeff(&seq, &seq, OrderingType::Casual), 1); + + // One matching pair with noise + let seq = vec![data(1), noise(10), data(2)]; + assert_eq!(coeff(&seq, &seq, OrderingType::Casual), 1); + + // One matching pair without noise from different sequences + let seq1 = vec![data(1), data(2), data(3)]; + let seq2 = vec![data(1), data(2), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1); + let seq1 = vec![data(4), data(2), data(3)]; + let seq2 = vec![data(1), data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1); + + // One matching pair with noise from different sequences + let seq1 = vec![data(1), noise(10), data(2), data(3)]; + let seq2 = vec![data(1), noise(10), data(2), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1); + let seq1 = vec![data(4), data(2), noise(10), data(3)]; + let seq2 = vec![data(1), data(2), noise(10), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 1); + + // Two pairs with noise + let seq1 = vec![data(1), noise(10), data(2), data(3)]; + let seq2 = vec![data(1), noise(10), data(2), data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 2); + + // No match + let seq1 = vec![data(1), data(2)]; + let seq2 = vec![data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0); + let seq1 = vec![data(1), data(2)]; + let seq2 = vec![data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0); + + // No match because of noise + let seq1 = vec![data(1), noise(10), data(2)]; + let seq2 = vec![data(1), data(2)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0); + let seq1 = vec![data(1), noise(10), data(2)]; + let seq2 = vec![data(1), noise(5), data(2)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 0); + + // Matching pairs in different indexes + let seq1 = vec![data(1), data(2), data(3), data(4)]; + let seq2 = vec![data(2), data(3), data(4), data(1)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 2); + let seq1 = vec![data(1), data(2), data(3), data(4)]; + let seq2 = vec![data(1), data(2), data(5), data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Casual), 2); + } + + #[test] + fn test_weak_coeff() { + // Empty sequences + assert_eq!(coeff(&[], &[], OrderingType::Weak), 0); + + // One matching pair without noise + let seq = vec![data(1), data(2)]; + assert_eq!(coeff(&seq, &seq, OrderingType::Weak), 1); + + // One matching pair with noise + let seq = vec![data(1), noise(10), data(2)]; + assert_eq!(coeff(&seq, &seq, OrderingType::Weak), 1); + + // One matching pair without noise from different sequences + let seq1 = vec![data(1), data(2), data(3)]; + let seq2 = vec![data(1), data(2), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1); + let seq1 = vec![data(4), data(2), data(3)]; + let seq2 = vec![data(1), data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1); + + // One matching pair with noise from different sequences + let seq1 = vec![data(1), noise(10), data(2), data(3)]; + let seq2 = vec![data(1), noise(5), data(2), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1); + let seq1 = vec![data(4), data(2), noise(10), data(3)]; + let seq2 = vec![data(1), data(2), noise(5), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1); + let seq1 = vec![data(4), data(2), noise(10), data(3)]; + let seq2 = vec![data(1), data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 1); + + // Two pairs with noise + let seq1 = vec![data(1), noise(10), data(2), data(3)]; + let seq2 = vec![data(1), noise(5), data(2), data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 2); + + // No match + let seq1 = vec![data(1), data(2)]; + let seq2 = vec![data(2), data(3)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 0); + let seq1 = vec![data(1), data(2)]; + let seq2 = vec![data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 0); + + // Matching pairs in different indexes + let seq1 = vec![data(1), data(2), data(3), data(4)]; + let seq2 = vec![data(2), data(3), data(4), data(1)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 2); + let seq1 = vec![data(1), data(2), data(3), data(4)]; + let seq2 = vec![data(1), data(2), data(5), data(3), data(4)]; + assert_eq!(coeff(&seq1, &seq2, OrderingType::Weak), 2); + } + + fn data(msg_id: u32) -> Entry { + Entry::Data(DataMessage { sender: 0, msg_id }) + } + + fn noise(count: u32) -> Entry { + Entry::Noise(count) + } +} diff --git a/mixnet/ordering/src/bin/coeff_aggr.rs b/mixnet/ordering/src/bin/coeff_aggr.rs new file mode 100644 index 0000000..8eee577 --- /dev/null +++ b/mixnet/ordering/src/bin/coeff_aggr.rs @@ -0,0 +1,95 @@ +use glob::glob; +use polars::prelude::*; +use std::env; +use std::fs::File; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + let mut input_schema = Schema::new(); + input_schema.with_column("sender".into(), DataType::UInt64); + input_schema.with_column("receiver".into(), DataType::UInt64); + input_schema.with_column("strong".into(), DataType::UInt64); + input_schema.with_column("casual".into(), DataType::UInt64); + input_schema.with_column("weak".into(), DataType::UInt64); + + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut strongs = Series::new_empty("", &DataType::UInt64); + let mut casuals = Series::new_empty("", &DataType::UInt64); + let mut weaks = Series::new_empty("", &DataType::UInt64); + + let pattern = format!("{}/**/coeffs_*.csv", entry.path().display()); + for file in glob(&pattern).unwrap().filter_map(Result::ok) { + let df = CsvReadOptions::default() + .with_has_header(true) + .with_schema(Some(SchemaRef::new(input_schema.clone()))) + .try_into_reader_with_file_path(Some(file.clone())) + .unwrap() + .finish() + .unwrap(); + + extend_series(&mut strongs, &df, "strong"); + extend_series(&mut casuals, &df, "casual"); + extend_series(&mut weaks, &df, "weak"); + + println!("Processed {}", file.display()); + } + + save_stats( + &strongs, + &format!("{}/strong_coeff_stats.csv", entry.path().display()), + ); + save_stats( + &casuals, + &format!("{}/casual_coeff_stats.csv", entry.path().display()), + ); + save_stats( + &weaks, + &format!("{}/weak_coeff_stats.csv", entry.path().display()), + ); + } + } +} + +fn extend_series(series: &mut Series, df: &DataFrame, column: &str) { + series + .extend( + &df.column(column) + .unwrap() + .u64() + .unwrap() + .clone() + .into_series(), + ) + .unwrap(); +} + +fn save_stats(aggregated: &Series, outpath: &str) { + let mut df = DataFrame::new(vec![ + Series::new("min", &[aggregated.min::().unwrap()]), + Series::new("median", &[aggregated.median().unwrap()]), + Series::new("mean", &[aggregated.mean().unwrap()]), + Series::new("std", &[aggregated.std(1).unwrap()]), + Series::new("max", &[aggregated.max::().unwrap()]), + ]) + .unwrap(); + + let mut file = File::create(outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +} diff --git a/mixnet/ordering/src/bin/datamsgcount.rs b/mixnet/ordering/src/bin/datamsgcount.rs new file mode 100644 index 0000000..6441ece --- /dev/null +++ b/mixnet/ordering/src/bin/datamsgcount.rs @@ -0,0 +1,72 @@ +use glob::glob; +use polars::prelude::*; +use std::env; +use std::fs::File; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut aggregated_series = Series::new_empty("", &DataType::Int64); + let pattern = format!("{}/**/data_msg_counts.csv", entry.path().display()); + + for file in glob(&pattern).unwrap().filter_map(Result::ok) { + let df = CsvReadOptions::default() + .with_has_header(true) + .try_into_reader_with_file_path(Some(file.clone())) + .unwrap() + .finish() + .unwrap(); + + // Drop the 'vtime' column and collect all remaining value columns + let df_without_vtime = df.drop("vtime").unwrap_or_else(|_| df.clone()); + for col in df_without_vtime.get_columns() { + aggregated_series + .extend(&col.i64().unwrap().clone().into_series()) + .unwrap(); + } + + println!("Processed {}", file.display()); + } + + let output_file = format!("{}/data_msg_counts_stats.csv", entry.path().display()); + save_stats(&aggregated_series, &output_file); + } + } +} + +fn save_stats(aggregated: &Series, outpath: &str) { + let min = aggregated.min::().unwrap(); + let max = aggregated.max::().unwrap(); + let mean = aggregated.mean().unwrap(); + let median = aggregated.median().unwrap(); + let std = aggregated.std(1).unwrap(); + + let mut df = DataFrame::new(vec![ + Series::new("min", &[min]), + Series::new("median", &[median]), + Series::new("mean", &[mean]), + Series::new("std", &[std]), + Series::new("max", &[max]), + ]) + .unwrap(); + + let mut file = File::create(outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +} diff --git a/mixnet/ordering/src/bin/latency.rs b/mixnet/ordering/src/bin/latency.rs new file mode 100644 index 0000000..de51b4f --- /dev/null +++ b/mixnet/ordering/src/bin/latency.rs @@ -0,0 +1,75 @@ +use glob::glob; +use polars::prelude::*; +use std::env; +use std::fs::File; +use walkdir::WalkDir; + +fn aggregate(path: &str) { + for entry in WalkDir::new(path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_dir()) + { + let dir_name = entry.path().file_name().unwrap().to_string_lossy(); + if dir_name.starts_with("paramset_") { + let mut aggregated_series = Series::new_empty("", &DataType::Int64); + let pattern = format!("{}/**/latency.csv", entry.path().display()); + + for file in glob(&pattern).unwrap().filter_map(Result::ok) { + let df = CsvReadOptions::default() + .with_has_header(true) + .try_into_reader_with_file_path(Some(file.clone())) + .unwrap() + .finish() + .unwrap(); + + aggregated_series + .extend( + &df.column("latency") + .unwrap() + .i64() + .unwrap() + .clone() + .into_series(), + ) + .unwrap(); + + println!("Processed {}", file.display()); + } + + let output_file = format!("{}/latency_stats.csv", entry.path().display()); + save_stats(&aggregated_series, &output_file); + } + } +} + +fn save_stats(aggregated: &Series, outpath: &str) { + let min = aggregated.min::().unwrap(); + let max = aggregated.max::().unwrap(); + let mean = aggregated.mean().unwrap(); + let median = aggregated.median().unwrap(); + let std = aggregated.std(1).unwrap(); + + let mut df = DataFrame::new(vec![ + Series::new("min", &[min]), + Series::new("median", &[median]), + Series::new("mean", &[mean]), + Series::new("std", &[std]), + Series::new("max", &[max]), + ]) + .unwrap(); + + let mut file = File::create(outpath).unwrap(); + CsvWriter::new(&mut file).finish(&mut df).unwrap(); + println!("Saved {}", outpath); +} + +fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Usage: {} ", args[0]); + std::process::exit(1); + } + let path = &args[1]; + aggregate(path); +} diff --git a/mixnet/ordering/src/iteration.rs b/mixnet/ordering/src/iteration.rs new file mode 100644 index 0000000..22a6886 --- /dev/null +++ b/mixnet/ordering/src/iteration.rs @@ -0,0 +1,226 @@ +use std::{collections::hash_map::Entry, time::SystemTime}; + +use protocol::{ + node::{MessagesToRelay, Node, NodeId}, + queue::Message, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use rustc_hash::FxHashMap; + +use crate::{ + format_duration, + outputs::Outputs, + paramset::ParamSet, + topology::{build_random_network, build_striped_network, RECEIVER_NODE_ID}, +}; +use ordering::message::{DataMessage, DataMessageGenerator}; + +const QUEUE_DATA_MSG_COUNT_MEASUREMENT_INTERVAL: f32 = 100.0; + +pub struct Iteration { + pub paramset: ParamSet, + pub iteration_idx: usize, + pub paramset_dir: String, +} + +impl Iteration { + pub fn start(&mut self) { + let dir = format!( + "{}/iteration_{}__WIP_DUR__", + self.paramset_dir, self.iteration_idx + ); + std::fs::create_dir_all(dir.as_str()).unwrap(); + + let mut outputs = Outputs::new( + format!("{dir}/latency__WIP__.csv"), + (0..self.paramset.num_senders) + .map(|sender_idx| format!("{dir}/sent_seq_{sender_idx}__WIP__.csv")) + .collect(), + (0..self.paramset.num_sender_or_receiver_conns()) + .map(|conn_idx| format!("{dir}/recv_seq_{conn_idx}__WIP__.csv")) + .collect(), + format!("{dir}/data_msg_counts__WIP__.csv"), + format!("{dir}/topology.csv"), + ); + + let start_time = SystemTime::now(); + + let vtime = self.run(self.iteration_idx as u64, &mut outputs); + outputs.close(); + outputs.rename_paths("__WIP__.csv", ".csv"); + + let duration = format_duration(SystemTime::now().duration_since(start_time).unwrap()); + let new_dir = dir.replace("__WIP_DUR__", &format!("_{duration}")); + std::fs::rename(dir, new_dir).unwrap(); + + tracing::info!( + "ParamSet:{}, Iteration:{} completed. Duration:{}, vtime:{}", + self.paramset.id, + self.iteration_idx, + duration, + vtime + ); + } + + fn run(&mut self, seed: u64, outputs: &mut Outputs) -> f32 { + let paramset = &self.paramset; + + let (mut mixnodes, all_sender_peers, receiver_peers) = if paramset.random_topology { + build_random_network(paramset, seed, outputs) + } else { + build_striped_network(paramset, seed) + }; + // Check node ID consistency + for (i, node) in mixnodes.iter().enumerate() { + assert_eq!(node.id as usize, i); + } + + // For N senders + 1 mix (all mixnodes will share the same sender ID) + let mut data_msg_gen = DataMessageGenerator::new(paramset.num_senders + 1); + let mix_msg_sender_id = paramset.num_senders; + + // Virtual discrete time + let mut vtime: f32 = 0.0; + let mut recent_vtime_queue_data_msg_count_measured: f32 = 0.0; + // Transmission interval that each queue must release a message + let transmission_interval = 1.0 / paramset.transmission_rate as f32; + // Results + let mut all_sent_count = 0; // all data + noise sent by all senders + let all_sent_count_target = (paramset.num_sender_msgs as usize) + .checked_mul(paramset.num_senders as usize) + .unwrap(); + let mut sent_data_msgs: FxHashMap = FxHashMap::default(); + let mut recv_data_msgs: FxHashMap = FxHashMap::default(); + + outputs.write_header_queue_data_msg_counts(&mixnodes); + + let mut data_msg_rng = StdRng::seed_from_u64(seed); + loop { + tracing::trace!( + "VTIME:{}, ALL_SENT:{}, DATA_SENT:{}, DATA_RECEIVED:{}", + vtime, + all_sent_count, + sent_data_msgs.len(), + recv_data_msgs.len(), + ); + + // All senders emit a message (data or noise) to all of their own adjacent peers. + if all_sent_count < all_sent_count_target { + // For each sender + for (sender_idx, sender_peers) in all_sender_peers.iter() { + if Self::try_probability(&mut data_msg_rng, paramset.sender_data_msg_prob) { + let msg = data_msg_gen.next(sender_idx); + sender_peers.iter().for_each(|peer_id| { + mixnodes + .get_mut(*peer_id as usize) + .unwrap() + .receive(msg, None); + }); + sent_data_msgs.insert(msg, vtime); + outputs.add_sent_msg(&msg) + } else { + // Generate noise and add it to the sequence to calculate ordering coefficients later, + // but don't need to send it to the mix nodes + // because the mix nodes will anyway drop the noise, + // and we don't need to record what the mix nodes receive. + outputs.add_sent_noise(sender_idx); + } + all_sent_count += 1; + } + } + + // Each mix node add a new data message to its queue with a certain probability + if paramset.mix_data_msg_prob > 0.0 { + for node in mixnodes.iter_mut() { + if Self::try_probability(&mut data_msg_rng, paramset.mix_data_msg_prob) { + node.send(data_msg_gen.next(mix_msg_sender_id)); + // We don't put the msg into the sent_sequence + // because sent_sequence is only for recording messages sent by the senders, not the mixnode. + } + } + } + + // Each mix node relays a message (data or noise) to the next mix node or the receiver. + // As the receiver, record the time and order of the received messages. + AllMessagesToRelay::new(&mut mixnodes).into_iter().for_each( + |(relayer_id, msgs_to_relay)| { + msgs_to_relay.into_iter().for_each(|(peer_id, msg)| { + if peer_id == RECEIVER_NODE_ID { + match msg { + Message::Data(msg) => { + // If msg was sent by the sender (not by any mix) + if let Some(&sent_time) = sent_data_msgs.get(&msg) { + // If this is the first time to see the msg, + // update stats that must ignore duplicate messages. + if let Entry::Vacant(e) = recv_data_msgs.entry(msg) { + e.insert(vtime); + outputs.add_latency(&msg, sent_time, vtime); + } + } + // Record msg to the sequence + let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap(); + outputs.add_recv_msg(&msg, conn_idx); + } + Message::Noise => { + // Record noise to the sequence + let conn_idx = receiver_peers.conn_idx(&relayer_id).unwrap(); + outputs.add_recv_noise(conn_idx); + } + } + } else if let Message::Data(msg) = msg { + let peer = mixnodes.get_mut(peer_id as usize).unwrap(); + assert_eq!(peer.id, peer_id); + peer.receive(msg, Some(relayer_id)); + } + }); + }, + ); + + // Record the number of data messages in each mix node's queues + if vtime == 0.0 + || vtime - recent_vtime_queue_data_msg_count_measured + >= QUEUE_DATA_MSG_COUNT_MEASUREMENT_INTERVAL + { + outputs.add_queue_data_msg_counts(vtime, &mixnodes); + recent_vtime_queue_data_msg_count_measured = vtime; + } + + // If all senders finally emitted all data+noise messages, + // and If all data messages have been received by the receiver, + // stop the iteration. + if all_sent_count == all_sent_count_target + && sent_data_msgs.len() == recv_data_msgs.len() + { + break; + } + + vtime += transmission_interval; + } + + vtime + } + + fn try_probability(rng: &mut StdRng, prob: f32) -> bool { + assert!( + (0.0..=1.0).contains(&prob), + "Probability must be in [0, 1]." + ); + rng.gen::() < prob + } +} + +struct AllMessagesToRelay(Vec<(NodeId, MessagesToRelay)>); + +impl AllMessagesToRelay { + fn new(mixnodes: &mut [Node]) -> Self { + let mut all_msgs_to_relay = Vec::with_capacity(mixnodes.len()); + for node in mixnodes.iter_mut() { + all_msgs_to_relay.push((node.id, node.read_queues())); + } + Self(all_msgs_to_relay) + } + + fn into_iter(self) -> impl Iterator)>)> { + self.0.into_iter() + } +} diff --git a/mixnet/ordering/src/lib.rs b/mixnet/ordering/src/lib.rs new file mode 100644 index 0000000..e216a50 --- /dev/null +++ b/mixnet/ordering/src/lib.rs @@ -0,0 +1 @@ +pub mod message; diff --git a/mixnet/ordering/src/main.rs b/mixnet/ordering/src/main.rs new file mode 100644 index 0000000..5d237c4 --- /dev/null +++ b/mixnet/ordering/src/main.rs @@ -0,0 +1,227 @@ +mod iteration; +mod outputs; +mod paramset; +mod sequence; +mod topology; + +use std::{ + collections::{hash_map::Entry, HashMap}, + error::Error, + path::Path, + time::{Duration, SystemTime}, +}; + +use chrono::Utc; +use clap::Parser; +use iteration::Iteration; +use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS}; +use protocol::queue::QueueType; + +#[derive(Debug, Parser)] +#[command(name = "Ordering Measurement")] +struct Args { + #[arg(short, long)] + exp_id: ExperimentId, + #[arg(short, long)] + session_id: SessionId, + #[arg(short, long)] + queue_type: QueueType, + #[arg(short, long)] + outdir: String, + #[arg(short, long)] + num_threads: usize, + #[arg(short, long, default_value_t = false)] + reverse_order: bool, + #[arg(short, long)] + from_paramset: Option, + #[arg(short, long)] + to_paramset: Option, +} + +fn main() { + tracing_subscriber::fmt::init(); + + let args = Args::parse(); + tracing::info!("Arguments: {:?}", args); + let Args { + exp_id, + session_id, + queue_type, + outdir, + num_threads, + reverse_order, + from_paramset, + to_paramset, + } = args; + + // Create a directory and initialize a CSV file only with a header + assert!( + Path::new(&outdir).is_dir(), + "Output directory does not exist: {outdir}" + ); + let subdir = format!( + "__WIP__ordering_e{}s{}_{:?}_{}___DUR__", + exp_id as u8, + session_id as u8, + queue_type, + Utc::now().to_rfc3339() + ); + let rootdir = format!("{outdir}/{subdir}"); + std::fs::create_dir_all(&rootdir).unwrap(); + + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + let session_start_time = SystemTime::now(); + + let iterations = prepare_all_iterations( + ¶msets, + from_paramset, + to_paramset, + reverse_order, + &rootdir, + ); + run_all_iterations(iterations, num_threads, paramsets.len()); + + let session_duration = SystemTime::now() + .duration_since(session_start_time) + .unwrap(); + + // Replace "__WIP__" and "__DUR__" in the subdir string + let new_subdir = subdir + .replace("__WIP__", "") + .replace("__DUR__", &format_duration(session_duration)); + let old_path = format!("{}/{}", outdir, subdir); + let new_path = format!("{}/{}", outdir, new_subdir); + assert!( + !Path::new(&new_path).exists(), + "The new directory already exists: {new_path}" + ); + std::fs::rename(&old_path, &new_path) + .expect("Failed to rename the directory: {old_path} -> {new_path}"); + + tracing::info!("Session completed."); +} + +fn prepare_all_iterations( + paramsets: &[ParamSet], + from_paramset: Option, + to_paramset: Option, + reverse_order: bool, + rootdir: &str, +) -> Vec { + let mut iterations: Vec = Vec::new(); + for paramset in paramsets.iter() { + if paramset.id < from_paramset.unwrap_or(0) { + tracing::info!("ParamSet:{} skipped", paramset.id); + continue; + } else if paramset.id > to_paramset.unwrap_or(u16::MAX) { + tracing::info!("ParamSets:{}~ skipped", paramset.id); + break; + } + + let paramset_dir = format!("{rootdir}/__WIP__paramset_{}", paramset.id); + std::fs::create_dir_all(paramset_dir.as_str()).unwrap(); + save_paramset_info(paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap(); + + for i in 0..paramset.num_iterations { + iterations.push(Iteration { + paramset: paramset.clone(), + iteration_idx: i, + paramset_dir: paramset_dir.clone(), + }); + } + } + + if reverse_order { + iterations.reverse(); + } + iterations +} + +fn run_all_iterations(iterations: Vec, num_threads: usize, num_paramsets: usize) { + let (task_tx, task_rx) = crossbeam::channel::unbounded::(); + let (noti_tx, noti_rx) = crossbeam::channel::unbounded::(); + + let mut threads = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let task_rx = task_rx.clone(); + let noti_tx = noti_tx.clone(); + + let thread = std::thread::spawn(move || { + while let Ok(mut iteration) = task_rx.recv() { + iteration.start(); + noti_tx.send(iteration).unwrap(); + } + }); + threads.push(thread); + } + + let num_all_iterations = iterations.len(); + for iteration in iterations { + task_tx.send(iteration).unwrap(); + } + // Close the task sender channel, so that the threads can know that there's no task remains. + drop(task_tx); + + let mut paramset_progresses: HashMap = HashMap::new(); + let mut num_done_paramsets = 0; + for _ in 0..num_all_iterations { + let iteration = noti_rx.recv().unwrap(); + + match paramset_progresses.entry(iteration.paramset.id) { + Entry::Occupied(mut e) => { + *e.get_mut() += 1; + } + Entry::Vacant(e) => { + e.insert(1); + } + } + + if *paramset_progresses.get(&iteration.paramset.id).unwrap() + == iteration.paramset.num_iterations + { + num_done_paramsets += 1; + let new_paramset_dir = iteration + .paramset_dir + .replace("__WIP__paramset", "paramset"); + std::fs::rename(iteration.paramset_dir, new_paramset_dir).unwrap(); + tracing::info!( + "ParamSet:{} is done ({} iterations). {}/{} ParamSets done.", + iteration.paramset.id, + iteration.paramset.num_iterations, + num_done_paramsets, + num_paramsets, + ); + } + } + + for thread in threads { + thread.join().unwrap(); + } +} + +fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box> { + // Assert that the file does not already exist + assert!( + !Path::new(path).exists(), + "File already exists at path: {path}", + ); + + let mut wtr = csv::Writer::from_path(path)?; + wtr.write_record(PARAMSET_CSV_COLUMNS)?; + wtr.write_record(paramset.as_csv_record())?; + wtr.flush()?; + + Ok(()) +} + +fn format_duration(duration: Duration) -> String { + let total_seconds = duration.as_secs(); + + let days = total_seconds / 86_400; + let hours = (total_seconds % 86_400) / 3_600; + let minutes = (total_seconds % 3_600) / 60; + let seconds = total_seconds % 60; + + format!("{}d{}h{}m{}s", days, hours, minutes, seconds) +} diff --git a/mixnet/ordering/src/message.rs b/mixnet/ordering/src/message.rs new file mode 100644 index 0000000..f62f8a4 --- /dev/null +++ b/mixnet/ordering/src/message.rs @@ -0,0 +1,33 @@ +use std::fmt::Display; + +pub type SenderIdx = u8; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct DataMessage { + pub sender: SenderIdx, + pub msg_id: u32, +} + +impl Display for DataMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{}:{}", self.sender, self.msg_id)) + } +} + +pub struct DataMessageGenerator { + next_msg_ids: Vec, +} + +impl DataMessageGenerator { + pub fn new(num_senders: u8) -> Self { + Self { + next_msg_ids: vec![0; num_senders as usize], + } + } + + pub fn next(&mut self, sender: SenderIdx) -> DataMessage { + let msg_id = self.next_msg_ids[sender as usize]; + self.next_msg_ids[sender as usize] += 1; + DataMessage { sender, msg_id } + } +} diff --git a/mixnet/ordering/src/outputs.rs b/mixnet/ordering/src/outputs.rs new file mode 100644 index 0000000..61922ed --- /dev/null +++ b/mixnet/ordering/src/outputs.rs @@ -0,0 +1,231 @@ +use std::{fs::File, path::Path}; + +use protocol::{ + node::{Node, NodeId}, + topology::Topology, +}; + +use crate::{sequence::SequenceWriter, topology::AllSenderPeers}; +use ordering::message::{DataMessage, SenderIdx}; + +pub struct Outputs { + closed: bool, + // gradual writing + latency_path: String, + latency_writer: csv::Writer, + sent_sequence_paths: Vec, + sent_sequence_writers: Vec, + recv_sequence_paths: Vec, + recv_sequence_writers: Vec, + queue_data_msg_counts_path: String, + queue_data_msg_counts_writer: csv::Writer, + // bulk writing + pub topology_path: String, +} + +impl Outputs { + pub fn new( + latency_path: String, + sent_sequence_paths: Vec, + recv_sequence_paths: Vec, + queue_data_msg_counts_path: String, + topology_path: String, + ) -> Self { + // Ensure that all output files do not exist + for path in [ + latency_path.clone(), + queue_data_msg_counts_path.clone(), + topology_path.clone(), + ] + .iter() + .chain(sent_sequence_paths.iter()) + .chain(recv_sequence_paths.iter()) + { + assert!(!Path::new(path).exists(), "File already exists: {path}"); + } + + // Prepare writers and headers + let mut latency_writer = csv::Writer::from_path(&latency_path).unwrap(); + latency_writer + .write_record(["msg", "latency", "sent_time", "recv_time"]) + .unwrap(); + latency_writer.flush().unwrap(); + let sent_sequence_writers = sent_sequence_paths + .iter() + .map(|path| SequenceWriter::new(path)) + .collect::>(); + let recv_sequence_writers = recv_sequence_paths + .iter() + .map(|path| SequenceWriter::new(path)) + .collect::>(); + let queue_data_msg_counts_writer = + csv::Writer::from_path(&queue_data_msg_counts_path).unwrap(); + + Self { + closed: false, + latency_path, + latency_writer, + sent_sequence_paths, + sent_sequence_writers, + recv_sequence_paths, + recv_sequence_writers, + queue_data_msg_counts_path, + queue_data_msg_counts_writer, + topology_path, + } + } + + pub fn close(&mut self) { + self.latency_writer.flush().unwrap(); + for seq in &mut self.sent_sequence_writers { + seq.flush(); + } + for seq in &mut self.recv_sequence_writers { + seq.flush(); + } + self.queue_data_msg_counts_writer.flush().unwrap(); + + self.closed = true; + } + + pub fn add_latency(&mut self, msg: &DataMessage, sent_time: f32, recv_time: f32) { + self.latency_writer + .write_record(&[ + msg.to_string(), + (recv_time - sent_time).to_string(), + sent_time.to_string(), + recv_time.to_string(), + ]) + .unwrap(); + } + + pub fn add_sent_msg(&mut self, msg: &DataMessage) { + let writer = &mut self.sent_sequence_writers[msg.sender as usize]; + writer.add_message(msg); + } + + pub fn add_sent_noise(&mut self, sender_idx: SenderIdx) { + let writer = &mut self.sent_sequence_writers[sender_idx as usize]; + writer.add_noise(); + } + + pub fn add_recv_msg(&mut self, msg: &DataMessage, conn_idx: usize) { + let writer = &mut self.recv_sequence_writers[conn_idx]; + writer.add_message(msg); + } + + pub fn add_recv_noise(&mut self, conn_idx: usize) { + let writer = &mut self.recv_sequence_writers[conn_idx]; + writer.add_noise(); + } + + pub fn write_header_queue_data_msg_counts(&mut self, mixnodes: &[Node]) { + let writer = &mut self.queue_data_msg_counts_writer; + let mut header = vec!["vtime".to_string()]; + mixnodes + .iter() + .map(|node| (node.id, node.queue_data_msg_counts())) + .for_each(|(node_id, counts)| { + let num_queues = counts.len(); + (0..num_queues).for_each(|q_idx| { + header.push(format!("node{}_q{}", node_id, q_idx)); + }); + }); + writer.write_record(header).unwrap(); + writer.flush().unwrap(); + } + + pub fn add_queue_data_msg_counts(&mut self, vtime: f32, mixnodes: &[Node]) { + let writer = &mut self.queue_data_msg_counts_writer; + let mut record = vec![vtime.to_string()]; + mixnodes + .iter() + .map(|node| node.queue_data_msg_counts()) + .for_each(|counts| { + counts.iter().for_each(|count| { + record.push(count.to_string()); + }); + }); + writer.write_record(record).unwrap(); + } + + pub fn write_topology( + &self, + topology: &Topology, + all_sender_peers: &AllSenderPeers, + receiver_peers: &[NodeId], + ) { + let mut writer = csv::Writer::from_path(&self.topology_path).unwrap(); + writer.write_record(["node", "num_peers", "peers"]).unwrap(); + + // Write peers of mix nodes + for (node_id, peers) in topology.iter().enumerate() { + writer + .write_record(&[ + node_id.to_string(), + peers.len().to_string(), + format!( + "[{}]", + peers + .iter() + .map(|peer_id| peer_id.to_string()) + .collect::>() + .join(",") + ), + ]) + .unwrap(); + } + + // Write peers of senders + for (sender_idx, peers) in all_sender_peers.iter() { + writer + .write_record(&[ + format!("sender-{}", sender_idx), + peers.len().to_string(), + format!( + "[{}]", + peers + .iter() + .map(|peer_id| peer_id.to_string()) + .collect::>() + .join(",") + ), + ]) + .unwrap(); + } + + // Write peers of the receiver + writer + .write_record(&[ + "receiver".to_string(), + receiver_peers.len().to_string(), + format!( + "[{}]", + receiver_peers + .iter() + .map(|peer_id| peer_id.to_string()) + .collect::>() + .join(",") + ), + ]) + .unwrap(); + + writer.flush().unwrap(); + } + + pub fn rename_paths(&self, from: &str, to: &str) { + assert!(self.closed); + + for path in [ + &self.latency_path.clone(), + &self.queue_data_msg_counts_path.clone(), + ] + .into_iter() + .chain(self.sent_sequence_paths.iter()) + .chain(self.recv_sequence_paths.iter()) + { + let new_path = path.replace(from, to); + std::fs::rename(path, new_path).unwrap(); + } + } +} diff --git a/mixnet/ordering/src/paramset.rs b/mixnet/ordering/src/paramset.rs new file mode 100644 index 0000000..6b34ef4 --- /dev/null +++ b/mixnet/ordering/src/paramset.rs @@ -0,0 +1,368 @@ +use std::fmt::Display; + +use protocol::queue::QueueType; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum ExperimentId { + Experiment1 = 1, + Experiment2 = 2, + Experiment3 = 3, + Experiment4 = 4, + Experiment5 = 5, + Experiment6 = 6, +} + +impl std::str::FromStr for ExperimentId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" => Ok(ExperimentId::Experiment1), + "2" => Ok(ExperimentId::Experiment2), + "3" => Ok(ExperimentId::Experiment3), + "4" => Ok(ExperimentId::Experiment4), + "5" => Ok(ExperimentId::Experiment5), + "6" => Ok(ExperimentId::Experiment6), + _ => Err(format!("Invalid experiment ID: {}", s)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum SessionId { + Session1 = 1, + Session3 = 3, +} + +impl std::str::FromStr for SessionId { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "1" => Ok(SessionId::Session1), + "3" => Ok(SessionId::Session3), + _ => Err(format!("Invalid session ID: {}", s)), + } + } +} + +pub const PARAMSET_CSV_COLUMNS: &[&str] = &[ + "paramset", + "num_mixes", + "num_paths", + "random_topology", + "peering_degree", + "min_queue_size", + "transmission_rate", + "num_senders", + "num_sender_msgs", + "sender_data_msg_prob", + "mix_data_msg_prob", + "queue_type", + "num_iterations", +]; + +#[derive(Debug, Clone, PartialEq)] +pub struct ParamSet { + pub id: u16, + pub num_mixes: u32, + pub num_paths: u16, + pub random_topology: bool, + pub peering_degree: PeeringDegree, + pub min_queue_size: u16, + pub transmission_rate: u16, + pub num_senders: u8, + pub num_sender_msgs: u32, + pub sender_data_msg_prob: f32, + pub mix_data_msg_prob: f32, + pub queue_type: QueueType, + pub num_iterations: usize, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum PeeringDegree { + Fixed(u32), + Random(Vec<(u32, f32)>), +} + +impl Display for PeeringDegree { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PeeringDegree::Fixed(c) => write!(f, "{c}"), + PeeringDegree::Random(c_probs) => write!(f, "{c_probs:?}"), + } + } +} + +impl ParamSet { + pub fn new_all_paramsets( + exp_id: ExperimentId, + session_id: SessionId, + queue_type: QueueType, + ) -> Vec { + match session_id { + SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type), + SessionId::Session3 => Self::new_session3_paramsets(exp_id, queue_type), + } + } + + fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let transmission_rate: u16 = 1; + let min_queue_size: u16 = 10; + let num_senders: u8 = match exp_id { + ExperimentId::Experiment3 | ExperimentId::Experiment4 => 2, + _ => 1, + }; + let num_sender_msgs: u32 = match exp_id { + ExperimentId::Experiment6 => 10000, + _ => 1000000, + }; + let sender_data_msg_probs: &[f32] = match exp_id { + ExperimentId::Experiment6 => &[0.01, 0.1, 0.5], + _ => &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0], + }; + let mix_data_msg_probs = |num_mixes: u32| match exp_id { + ExperimentId::Experiment1 | ExperimentId::Experiment3 | ExperimentId::Experiment5 => { + vec![0.0] + } + ExperimentId::Experiment2 | ExperimentId::Experiment4 => vec![0.001, 0.01, 0.1], + ExperimentId::Experiment6 => { + let g: f32 = num_mixes as f32; + vec![1.0 / (2.0 * g), 1.0 / g, 2.0 / g] + } + }; + + let mut id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + match exp_id { + ExperimentId::Experiment1 + | ExperimentId::Experiment2 + | ExperimentId::Experiment3 + | ExperimentId::Experiment4 => { + for &num_paths in &[1, 2, 3, 4] { + for &num_mixes in &[1, 2, 3, 4] { + for &sender_data_msg_prob in sender_data_msg_probs { + for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) { + let paramset = ParamSet { + id, + num_mixes, + num_paths, + random_topology: false, + peering_degree: PeeringDegree::Fixed(1), + min_queue_size, + transmission_rate, + num_senders, + num_sender_msgs, + sender_data_msg_prob, + mix_data_msg_prob, + queue_type, + num_iterations: 1, + }; + id += 1; + paramsets.push(paramset); + } + } + } + } + } + ExperimentId::Experiment5 | ExperimentId::Experiment6 => { + for &num_mixes in &[8, 16, 32] { + for &peering_degree in &[2, 3, 4] { + for &sender_data_msg_prob in sender_data_msg_probs { + for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) { + let paramset = ParamSet { + id, + num_mixes, + num_paths: 0, // since we're gonna build random topology + random_topology: true, + peering_degree: PeeringDegree::Fixed(peering_degree), + min_queue_size, + transmission_rate, + num_senders, + num_sender_msgs, + sender_data_msg_prob, + mix_data_msg_prob, + queue_type, + num_iterations: 10, + }; + id += 1; + paramsets.push(paramset); + } + } + } + } + } + } + + paramsets + } + + fn new_session3_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec { + let sender_data_msg_probs: &[f32] = match exp_id { + ExperimentId::Experiment5 => &[0.01, 0.1, 0.5, 0.9, 0.99, 1.0], + ExperimentId::Experiment6 => &[0.01, 0.1, 0.5], + _ => { + panic!("Only Experiment5 and Experiment6 are supported for Session3"); + } + }; + let mix_data_msg_probs = |num_mixes: u32| match exp_id { + ExperimentId::Experiment5 => { + vec![0.0] + } + ExperimentId::Experiment6 => { + let g: f32 = num_mixes as f32; + vec![1.0 / (2.0 * g), 1.0 / g, 2.0 / g] + } + _ => { + panic!("Only Experiment5 and Experiment6 are supported for Session3"); + } + }; + + let mut id: u16 = 1; + let mut paramsets: Vec = Vec::new(); + match exp_id { + ExperimentId::Experiment5 | ExperimentId::Experiment6 => { + let num_mixes = 32; + for &sender_data_msg_prob in sender_data_msg_probs { + for &mix_data_msg_prob in &mix_data_msg_probs(num_mixes) { + let paramset = ParamSet { + id, + num_mixes, + num_paths: 0, // since we're gonna build random topology + random_topology: true, + peering_degree: PeeringDegree::Random(vec![ + (2, 0.87), + (12, 0.123), + (24, 0.007), + ]), + min_queue_size: 10, + transmission_rate: 1, + num_senders: 1, + num_sender_msgs: match exp_id { + ExperimentId::Experiment6 => 10000, + _ => 1000000, + }, + sender_data_msg_prob, + mix_data_msg_prob, + queue_type, + num_iterations: 10, + }; + id += 1; + paramsets.push(paramset); + } + } + } + _ => { + panic!("Only Experiment5 and Experiment6 are supported for Session3"); + } + } + + paramsets + } + + pub fn num_sender_or_receiver_conns(&self) -> usize { + if self.random_topology { + match &self.peering_degree { + PeeringDegree::Fixed(c) => *c as usize, + PeeringDegree::Random(c_probs) => { + c_probs.iter().map(|(c, _)| *c).min().unwrap() as usize + } + } + } else { + self.num_paths as usize + } + } + + pub fn as_csv_record(&self) -> Vec { + vec![ + self.id.to_string(), + self.num_mixes.to_string(), + self.num_paths.to_string(), + self.random_topology.to_string(), + self.peering_degree.to_string(), + self.min_queue_size.to_string(), + self.transmission_rate.to_string(), + self.num_senders.to_string(), + self.num_sender_msgs.to_string(), + self.sender_data_msg_prob.to_string(), + self.mix_data_msg_prob.to_string(), + format!("{:?}", self.queue_type), + self.num_iterations.to_string(), + ] + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use strum::IntoEnumIterator; + + use crate::paramset::ParamSet; + + use super::*; + + #[test] + fn test_new_all_paramsets() { + let cases = vec![ + ((ExperimentId::Experiment1, SessionId::Session1), 4 * 4 * 6), + ( + (ExperimentId::Experiment2, SessionId::Session1), + 4 * 4 * 6 * 3, + ), + ((ExperimentId::Experiment3, SessionId::Session1), 4 * 4 * 6), + ( + (ExperimentId::Experiment4, SessionId::Session1), + 4 * 4 * 6 * 3, + ), + ((ExperimentId::Experiment5, SessionId::Session1), 3 * 3 * 6), + ( + (ExperimentId::Experiment6, SessionId::Session1), + 3 * 3 * 3 * 3, + ), + ((ExperimentId::Experiment5, SessionId::Session3), 6), + ((ExperimentId::Experiment6, SessionId::Session3), 3 * 3), + ]; + + for queue_type in QueueType::iter() { + for ((exp_id, session_id), expected_cnt) in cases.clone().into_iter() { + let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type); + + assert_eq!( + paramsets.len(), + expected_cnt as usize, + "queue_type:{:?}, exp:{:?}, session:{:?}", + queue_type, + exp_id, + session_id, + ); + + // Check if all parameter sets are unique + let unique_paramsets: HashSet> = paramsets + .iter() + .map(|paramset| paramset.as_csv_record()) + .collect(); + assert_eq!(unique_paramsets.len(), paramsets.len()); + + // Check if paramset IDs are correct. + for (i, paramset) in paramsets.iter().enumerate() { + assert_eq!(paramset.id as usize, i + 1); + println!("{:?}", paramset); + } + + // Check PeeringDegree + for paramset in paramsets.iter() { + match session_id { + SessionId::Session1 => { + assert!(matches!(paramset.peering_degree, PeeringDegree::Fixed(_))) + } + SessionId::Session3 => { + assert!(matches!(paramset.peering_degree, PeeringDegree::Random(_))) + } + } + } + } + } + } +} diff --git a/mixnet/ordering/src/sequence.rs b/mixnet/ordering/src/sequence.rs new file mode 100644 index 0000000..8b26cfb --- /dev/null +++ b/mixnet/ordering/src/sequence.rs @@ -0,0 +1,41 @@ +use std::fs::File; + +use ordering::message::DataMessage; + +#[derive(Debug)] +pub struct SequenceWriter { + noise_buf: u32, + writer: csv::Writer, +} + +impl SequenceWriter { + pub fn new(path: &str) -> Self { + Self { + noise_buf: 0, + writer: csv::Writer::from_path(path).unwrap(), + } + } + + pub fn flush(&mut self) { + self.clear_buf(); + self.writer.flush().unwrap(); + } + + fn clear_buf(&mut self) { + if self.noise_buf > 0 { + self.writer + .write_record(&[format!("-{}", self.noise_buf)]) + .unwrap(); + self.noise_buf = 0; + } + } + + pub fn add_message(&mut self, msg: &DataMessage) { + self.clear_buf(); + self.writer.write_record(&[msg.to_string()]).unwrap(); + } + + pub fn add_noise(&mut self) { + self.noise_buf += 1; + } +} diff --git a/mixnet/ordering/src/topology.rs b/mixnet/ordering/src/topology.rs new file mode 100644 index 0000000..2384b20 --- /dev/null +++ b/mixnet/ordering/src/topology.rs @@ -0,0 +1,224 @@ +use std::{fmt::Debug, hash::Hash}; + +use protocol::{ + node::{Node, NodeId}, + queue::QueueConfig, + topology::build_topology, +}; +use rand::{rngs::StdRng, seq::SliceRandom, RngCore, SeedableRng}; +use rustc_hash::FxHashMap; + +use crate::{ + outputs::Outputs, + paramset::{ParamSet, PeeringDegree}, +}; +use ordering::message::SenderIdx; + +pub const RECEIVER_NODE_ID: NodeId = NodeId::MAX; + +pub fn build_striped_network( + paramset: &ParamSet, + seed: u64, +) -> (Vec>, AllSenderPeers, ReceiverPeers) { + assert!(!paramset.random_topology); + let peering_degree = match paramset.peering_degree { + PeeringDegree::Fixed(c) => c, + PeeringDegree::Random(_) => { + panic!("PeeringDegree::Random not supported for striped network"); + } + }; + + let mut next_node_id: NodeId = 0; + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let mut mixnodes: Vec> = + Vec::with_capacity(paramset.num_paths as usize * paramset.num_mixes as usize); + let mut paths: Vec> = Vec::with_capacity(paramset.num_paths as usize); + for _ in 0..paramset.num_paths { + let mut ids = Vec::with_capacity(paramset.num_mixes as usize); + for _ in 0..paramset.num_mixes { + let id = next_node_id; + next_node_id += 1; + mixnodes.push(Node::new( + id, + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + peering_degree, + false, // disable cache + )); + ids.push(id); + } + paths.push(ids); + } + + // Connect mix nodes + let mut receiver_peers = ReceiverPeers::new(); + for path in paths.iter() { + for (i, id) in path.iter().enumerate() { + if i != path.len() - 1 { + let peer_id = path[i + 1]; + let mixnode = mixnodes.get_mut(*id as usize).unwrap(); + assert_eq!(mixnode.id, *id); + mixnode.connect(peer_id); + } else { + let mixnode = mixnodes.get_mut(*id as usize).unwrap(); + assert_eq!(mixnode.id, *id); + mixnode.connect(RECEIVER_NODE_ID); + + receiver_peers.add(*id, receiver_peers.len()); + } + } + } + + let mut all_sender_peers = AllSenderPeers::new(paramset.num_senders); + let sender_peers = paths + .iter() + .map(|path| *path.first().unwrap()) + .collect::>(); + (0..paramset.num_senders).for_each(|_| { + all_sender_peers.add(sender_peers.clone()); + }); + + (mixnodes, all_sender_peers, receiver_peers) +} + +pub fn build_random_network( + paramset: &ParamSet, + seed: u64, + outputs: &mut Outputs, +) -> (Vec>, AllSenderPeers, ReceiverPeers) { + assert!(paramset.random_topology); + + let peering_degrees = match ¶mset.peering_degree { + PeeringDegree::Fixed(c) => vec![*c; paramset.num_mixes as usize], + PeeringDegree::Random(c_probs) => { + // Sort c_probs by the probability in ascending order + let mut c_probs = c_probs.clone(); + c_probs.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap()); + + let mut degrees = Vec::with_capacity(paramset.num_mixes as usize); + for (i, (c, prob)) in c_probs.iter().enumerate() { + let count = if i < c_probs.len() - 1 { + (prob * paramset.num_mixes as f32).ceil() as u32 + } else { + let num_determined: u32 = degrees.len().try_into().unwrap(); + paramset.num_mixes - num_determined + }; + degrees.extend(std::iter::repeat(*c).take(count as usize)); + } + degrees + } + }; + + // Init mix nodes + let mut queue_seed_rng = StdRng::seed_from_u64(seed); + let mut mixnodes: Vec> = Vec::with_capacity(paramset.num_mixes as usize); + for id in 0..paramset.num_mixes { + mixnodes.push(Node::new( + id, + QueueConfig { + queue_type: paramset.queue_type, + seed: queue_seed_rng.next_u64(), + min_queue_size: paramset.min_queue_size, + }, + peering_degrees[id as usize], + true, // enable cache + )); + } + + // Choose sender's peers and receiver's peers randomly + let mut peers_rng = StdRng::seed_from_u64(seed); + let mut candidates: Vec = mixnodes.iter().map(|mixnode| mixnode.id).collect(); + let num_sender_or_receiver_conns = paramset.num_sender_or_receiver_conns(); + assert!(candidates.len() >= num_sender_or_receiver_conns); + let mut all_sender_peers = AllSenderPeers::new(paramset.num_senders); + for _ in 0..paramset.num_senders { + candidates.as_mut_slice().shuffle(&mut peers_rng); + let mut peers: Vec = candidates + .iter() + .cloned() + .take(num_sender_or_receiver_conns) + .collect(); + peers.sort(); + all_sender_peers.add(peers); + } + candidates.as_mut_slice().shuffle(&mut peers_rng); + let mut receiver_peer_ids: Vec = candidates + .iter() + .cloned() + .take(num_sender_or_receiver_conns) + .collect(); + receiver_peer_ids.sort(); + + // Connect mix nodes + let topology = build_topology( + mixnodes.len().try_into().unwrap(), + &mixnodes + .iter() + .map(|mixnode| mixnode.peering_degree) + .collect::>(), + seed, + ); + for (node_id, peers) in topology.iter().enumerate() { + peers.iter().for_each(|peer_id| { + let mixnode = mixnodes.get_mut(node_id).unwrap(); + assert_eq!(mixnode.id as usize, node_id); + mixnode.connect(*peer_id); + }); + } + + // Connect the selected mix nodes with the receiver + let mut receiver_peers = ReceiverPeers::new(); + for (conn_idx, mixnode_id) in receiver_peer_ids.iter().enumerate() { + let mixnode = mixnodes.get_mut(*mixnode_id as usize).unwrap(); + assert_eq!(mixnode.id, *mixnode_id); + mixnode.connect(RECEIVER_NODE_ID); + + receiver_peers.add(*mixnode_id, conn_idx); + } + + outputs.write_topology(&topology, &all_sender_peers, &receiver_peer_ids); + + (mixnodes, all_sender_peers, receiver_peers) +} + +pub struct AllSenderPeers(Vec>); + +impl AllSenderPeers { + fn new(num_senders: u8) -> Self { + Self(Vec::with_capacity(num_senders as usize)) + } + + fn add(&mut self, peers: Vec) { + self.0.push(peers) + } + + pub fn iter(&self) -> impl Iterator)> { + self.0 + .iter() + .enumerate() + .map(|(idx, v)| (idx.try_into().unwrap(), v)) + } +} + +pub struct ReceiverPeers(FxHashMap); + +impl ReceiverPeers { + fn new() -> Self { + ReceiverPeers(FxHashMap::default()) + } + + fn add(&mut self, peer_id: NodeId, conn_idx: usize) { + self.0.insert(peer_id, conn_idx); + } + + pub fn conn_idx(&self, node_id: &NodeId) -> Option { + self.0.get(node_id).cloned() + } + + fn len(&self) -> usize { + self.0.len() + } +} diff --git a/mixnet/protocol/Cargo.toml b/mixnet/protocol/Cargo.toml new file mode 100644 index 0000000..cd05301 --- /dev/null +++ b/mixnet/protocol/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "protocol" +version = "0.1.0" +edition = "2021" + +[dependencies] +csv = "1.3.0" +rand = "0.8.5" +rustc-hash = "2.0.0" +strum = "0.26.3" +strum_macros = "0.26.4" +tracing = "0.1.40" diff --git a/mixnet/protocol/src/lib.rs b/mixnet/protocol/src/lib.rs new file mode 100644 index 0000000..5a2da42 --- /dev/null +++ b/mixnet/protocol/src/lib.rs @@ -0,0 +1,3 @@ +pub mod node; +pub mod queue; +pub mod topology; diff --git a/mixnet/protocol/src/node.rs b/mixnet/protocol/src/node.rs new file mode 100644 index 0000000..59488d8 --- /dev/null +++ b/mixnet/protocol/src/node.rs @@ -0,0 +1,131 @@ +use std::{fmt::Debug, hash::Hash}; + +use rustc_hash::{FxHashMap, FxHashSet}; + +use crate::queue::{new_queue, Message, Queue, QueueConfig}; + +pub type NodeId = u32; + +pub struct Node +where + M: Debug + Copy + Clone + PartialEq + Eq + Hash, +{ + pub id: NodeId, + queue_config: QueueConfig, + // To have the deterministic result, we use Vec instead of FxHashMap. + // Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning. + // Instead, use `connected_peers` to build `queues` efficiently. + queues: Vec<(NodeId, Box>)>, + connected_peers: FxHashSet, + // A cache to avoid relaying the same message multiple times. + received_msgs: Option>, + pub peering_degree: u32, +} + +pub type MessagesToRelay = Vec<(NodeId, Message)>; + +impl Node +where + M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash, +{ + pub fn new( + id: NodeId, + queue_config: QueueConfig, + peering_degree: u32, + enable_cache: bool, + ) -> Self { + Node:: { + id, + queue_config, + queues: Vec::new(), + connected_peers: FxHashSet::default(), + received_msgs: if enable_cache { + Some(FxHashMap::default()) + } else { + None + }, + peering_degree, + } + } + + pub fn connect(&mut self, peer_id: NodeId) { + if self.connected_peers.insert(peer_id) { + let pos = self + .queues + .binary_search_by(|probe| probe.0.cmp(&peer_id)) + .unwrap_or_else(|pos| pos); + self.queues + .insert(pos, (peer_id, new_queue::(&self.queue_config))); + } + } + + pub fn send(&mut self, msg: M) { + assert!(self.check_and_update_cache(msg, true)); + for (_, queue) in self.queues.iter_mut() { + queue.push(msg); + } + } + + pub fn receive(&mut self, msg: M, from: Option) -> bool { + // If `from` is None, it means that the message is being sent from the node outside of the gossip network. + // In this case, the received count in the cache must start from 0, so it can be removed from the cache only when it is received from C gossip peers (not C-1). + // For that, we set `sending` to true, as if this node is sending the message. + let sending = from.is_none(); + let first_received = self.check_and_update_cache(msg, sending); + if first_received { + for (node_id, queue) in self.queues.iter_mut() { + match from { + Some(sender) => { + if *node_id != sender { + queue.push(msg); + } + } + None => queue.push(msg), + } + } + } + first_received + } + + pub fn read_queues(&mut self) -> MessagesToRelay { + let mut msgs_to_relay: MessagesToRelay = Vec::with_capacity(self.queues.len()); + self.queues.iter_mut().for_each(|(node_id, queue)| { + msgs_to_relay.push((*node_id, queue.pop())); + }); + msgs_to_relay + } + + pub fn queue_data_msg_counts(&self) -> Vec { + self.queues + .iter() + .map(|(_, queue)| queue.data_count()) + .collect() + } + + fn check_and_update_cache(&mut self, msg: M, sending: bool) -> bool { + if let Some(received_msgs) = &mut self.received_msgs { + let first_received = if let Some(count) = received_msgs.get_mut(&msg) { + *count += 1; + false + } else { + received_msgs.insert(msg, if sending { 0 } else { 1 }); + true + }; + + // If the message have been received from all connected peers, remove it from the cache + // because there is no possibility that the message will be received again. + if received_msgs.get(&msg).unwrap() == &self.peering_degree { + tracing::debug!( + "Remove message from cache: {:?} because it has been received {} times, assuming that each inbound peer sent the message once.", + msg, + self.peering_degree + ); + received_msgs.remove(&msg); + } + + first_received + } else { + true + } + } +} diff --git a/mixnet/protocol/src/queue.rs b/mixnet/protocol/src/queue.rs new file mode 100644 index 0000000..c288d01 --- /dev/null +++ b/mixnet/protocol/src/queue.rs @@ -0,0 +1,453 @@ +use std::collections::VecDeque; + +use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng}; +use strum_macros::EnumIter; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] +pub enum QueueType { + NonMix, + PureCoinFlipping, + PureRandomSampling, + PermutedCoinFlipping, + NoisyCoinFlipping, + NoisyCoinFlippingRandomRelease, +} + +impl std::str::FromStr for QueueType { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "NonMix" => Ok(QueueType::NonMix), + "PureCoinFlipping" => Ok(QueueType::PureCoinFlipping), + "PureRandomSampling" => Ok(QueueType::PureRandomSampling), + "PermutedCoinFlipping" => Ok(QueueType::PermutedCoinFlipping), + "NoisyCoinFlipping" => Ok(QueueType::NoisyCoinFlipping), + "NoisyCoinFlippingRandomRelease" => Ok(QueueType::NoisyCoinFlippingRandomRelease), + _ => Err(format!("Unknown queue type: {}", s)), + } + } +} + +pub trait Queue { + fn push(&mut self, data: T); + fn pop(&mut self) -> Message; + fn data_count(&self) -> usize; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Message { + Data(T), + Noise, +} + +pub struct QueueConfig { + pub queue_type: QueueType, + pub seed: u64, + pub min_queue_size: u16, +} + +pub fn new_queue(cfg: &QueueConfig) -> Box> { + match cfg.queue_type { + QueueType::NonMix => Box::new(NonMixQueue::new()), + QueueType::PureCoinFlipping => { + Box::new(PureCoinFlippingQueue::new(cfg.min_queue_size, cfg.seed)) + } + QueueType::PureRandomSampling => { + Box::new(PureRandomSamplingQueue::new(cfg.min_queue_size, cfg.seed)) + } + QueueType::PermutedCoinFlipping => { + Box::new(PermutedCoinFlippingQueue::new(cfg.min_queue_size, cfg.seed)) + } + QueueType::NoisyCoinFlipping => Box::new(NoisyCoinFlippingQueue::new(cfg.seed)), + QueueType::NoisyCoinFlippingRandomRelease => { + Box::new(NoisyCoinFlippingRandomReleaseQueue::new(cfg.seed)) + } + } +} + +struct NonMixQueue { + queue: VecDeque, // don't need to contain Noise +} + +impl NonMixQueue { + fn new() -> Self { + Self { + queue: VecDeque::new(), + } + } +} + +impl Queue for NonMixQueue { + fn push(&mut self, data: T) { + self.queue.push_back(data) + } + + fn pop(&mut self) -> Message { + match self.queue.pop_front() { + Some(data) => Message::Data(data), + None => Message::Noise, + } + } + + fn data_count(&self) -> usize { + self.queue.len() + } +} + +struct MixQueue { + queue: Vec>, + data_count: usize, + rng: StdRng, +} + +impl MixQueue { + fn new(num_initial_noises: usize, seed: u64) -> Self { + Self { + queue: vec![Message::Noise; num_initial_noises], + data_count: 0, + rng: StdRng::seed_from_u64(seed), + } + } + + fn push(&mut self, data: T) { + self.queue.push(Message::Data(data)); + self.data_count += 1; + } + + fn fill_noises(&mut self, k: usize) { + self.queue.extend(std::iter::repeat(Message::Noise).take(k)) + } + + fn pop(&mut self, idx: usize) -> Option> { + if idx < self.queue.len() { + let msg = self.queue.remove(idx); + if let Message::Data(_) = msg { + self.data_count -= 1; + } + Some(msg) + } else { + None + } + } + + fn data_count(&self) -> usize { + self.data_count + } + + fn len(&self) -> usize { + self.queue.len() + } + + fn flip_coin(&mut self) -> bool { + self.rng.gen_bool(0.5) + } + + fn sample_index(&mut self) -> usize { + self.rng.gen_range(0..self.queue.len()) + } + + fn shuffle(&mut self) { + self.queue.as_mut_slice().shuffle(&mut self.rng); + } +} + +struct MinSizeMixQueue { + queue: MixQueue, + min_pool_size: u16, +} + +impl MinSizeMixQueue { + fn new(min_pool_size: u16, seed: u64) -> Self { + Self { + queue: MixQueue::new(min_pool_size as usize, seed), + min_pool_size, + } + } + + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self, idx: usize) -> Option> { + self.queue.pop(idx) + } + + fn data_count(&self) -> usize { + self.queue.data_count() + } + + fn ensure_min_size(&mut self) { + if self.queue.len() < self.min_pool_size as usize { + self.queue + .fill_noises(self.min_pool_size as usize - self.queue.len()); + } + } + + fn len(&self) -> usize { + self.queue.len() + } + + fn flip_coin(&mut self) -> bool { + self.queue.flip_coin() + } + + fn sample_index(&mut self) -> usize { + self.queue.sample_index() + } + + fn shuffle(&mut self) { + self.queue.shuffle() + } +} + +struct PureCoinFlippingQueue { + queue: MinSizeMixQueue, +} + +impl PureCoinFlippingQueue { + fn new(min_pool_size: u16, seed: u64) -> Self { + Self { + queue: MinSizeMixQueue::new(min_pool_size, seed), + } + } +} + +impl Queue for PureCoinFlippingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Message { + self.queue.ensure_min_size(); + + loop { + for i in 0..self.queue.len() { + if self.queue.flip_coin() { + return self.queue.pop(i).unwrap(); + } + } + } + } + + fn data_count(&self) -> usize { + self.queue.data_count() + } +} + +struct PureRandomSamplingQueue { + queue: MinSizeMixQueue, +} + +impl PureRandomSamplingQueue { + fn new(min_pool_size: u16, seed: u64) -> Self { + Self { + queue: MinSizeMixQueue::new(min_pool_size, seed), + } + } +} + +impl Queue for PureRandomSamplingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Message { + self.queue.ensure_min_size(); + + let i = self.queue.sample_index(); + self.queue.pop(i).unwrap() + } + + fn data_count(&self) -> usize { + self.queue.data_count() + } +} + +struct PermutedCoinFlippingQueue { + queue: MinSizeMixQueue, +} + +impl PermutedCoinFlippingQueue { + fn new(min_pool_size: u16, seed: u64) -> Self { + Self { + queue: MinSizeMixQueue::new(min_pool_size, seed), + } + } +} + +impl Queue for PermutedCoinFlippingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Message { + self.queue.ensure_min_size(); + + self.queue.shuffle(); + + loop { + for i in 0..self.queue.len() { + if self.queue.flip_coin() { + return self.queue.pop(i).unwrap(); + } + } + } + } + + fn data_count(&self) -> usize { + self.queue.data_count() + } +} + +struct NoisyCoinFlippingQueue { + queue: MixQueue, + idx: usize, +} + +impl NoisyCoinFlippingQueue { + pub fn new(seed: u64) -> Self { + Self { + queue: MixQueue::new(0, seed), + idx: 0, + } + } +} + +impl Queue for NoisyCoinFlippingQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Message { + if self.queue.len() == 0 { + return Message::Noise; + } + + loop { + if self.idx >= self.queue.len() { + self.idx = 0; + } + + if self.queue.flip_coin() { + return self.queue.pop(self.idx).unwrap(); + } else if self.idx == 0 { + return Message::Noise; + } else { + self.idx += 1; + } + } + } + + fn data_count(&self) -> usize { + self.queue.data_count() + } +} + +struct NoisyCoinFlippingRandomReleaseQueue { + queue: MixQueue, +} + +impl NoisyCoinFlippingRandomReleaseQueue { + pub fn new(seed: u64) -> Self { + Self { + queue: MixQueue::new(0, seed), + } + } +} + +impl Queue for NoisyCoinFlippingRandomReleaseQueue { + fn push(&mut self, msg: T) { + self.queue.push(msg) + } + + fn pop(&mut self) -> Message { + if self.queue.len() == 0 { + return Message::Noise; + } + + if self.queue.flip_coin() { + let i = self.queue.sample_index(); + self.queue.pop(i).unwrap() + } else { + Message::Noise + } + } + + fn data_count(&self) -> usize { + self.queue.data_count() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + #[test] + fn test_non_mix_queue() { + let mut queue = new_queue(&QueueConfig { + queue_type: QueueType::NonMix, + seed: 0, + min_queue_size: 0, + }); + + // Check if noise is returned when queue is empty + assert_eq!(queue.pop(), Message::Noise); + + // Check if queue is FIFO + queue.push(0); + queue.push(1); + assert_eq!(queue.pop(), Message::Data(0)); + assert_eq!(queue.pop(), Message::Data(1)); + + // Check if noise is returned when queue is empty + assert_eq!(queue.pop(), Message::Noise); + + // Check if queue is FIFO again + queue.push(2); + queue.push(3); + assert_eq!(queue.pop(), Message::Data(2)); + assert_eq!(queue.pop(), Message::Data(3)); + } + + #[test] + fn test_mix_queues() { + for queue_type in [ + QueueType::PureCoinFlipping, + QueueType::PureRandomSampling, + QueueType::PermutedCoinFlipping, + QueueType::NoisyCoinFlipping, + QueueType::NoisyCoinFlippingRandomRelease, + ] { + test_mix_queue(queue_type); + } + } + + fn test_mix_queue(queue_type: QueueType) { + let mut queue = new_queue(&QueueConfig { + queue_type, + seed: 0, + min_queue_size: 4, + }); + + // Check if noise is returned when queue is empty + assert_eq!(queue.pop(), Message::Noise); + + // Put only 2 messages even though the min queue size is 4 + queue.push(0); + queue.push(1); + + // Wait until 2 messages are returned from the queue + let mut set: HashSet<_> = vec![0, 1].into_iter().collect(); + while !set.is_empty() { + if let Message::Data(msg) = queue.pop() { + assert!(set.remove(&msg)); + } + } + + // Check if noise is returned when there is no real message remains + assert_eq!(queue.pop(), Message::Noise); + } +} diff --git a/mixnet/protocol/src/topology.rs b/mixnet/protocol/src/topology.rs new file mode 100644 index 0000000..b1bceae --- /dev/null +++ b/mixnet/protocol/src/topology.rs @@ -0,0 +1,91 @@ +use std::{collections::HashSet, error::Error}; + +use crate::node::NodeId; +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; + +pub type Topology = Vec>; + +pub fn build_topology(num_nodes: u32, peering_degrees: &[u32], seed: u64) -> Topology { + assert_eq!(num_nodes as usize, peering_degrees.len()); + // Assert that peering degrees are sorted in descending order + assert!(peering_degrees.windows(2).all(|w| w[0] >= w[1])); + + let mut rng = StdRng::seed_from_u64(seed); + + loop { + let mut topology: Vec> = Vec::new(); + for _ in 0..num_nodes { + topology.push(HashSet::new()); + } + + for node in 0..num_nodes { + let mut others: Vec = Vec::new(); + for other in (0..node).chain(node + 1..num_nodes) { + // Check if the other node is not already connected to the current node + // and the other node has not reached the peering degree. + if !topology[node as usize].contains(&other) + && topology[other as usize].len() < peering_degrees[other as usize] as usize + { + others.push(other); + } + } + + // How many more connections the current node needs + let num_needs = peering_degrees[node as usize] as usize - topology[node as usize].len(); + // Smaple peers as many as possible and connect them to the current node + let k = std::cmp::min(num_needs, others.len()); + others.as_mut_slice().shuffle(&mut rng); + others.into_iter().take(k).for_each(|peer| { + topology[node as usize].insert(peer); + topology[peer as usize].insert(node); + }); + } + + if are_all_nodes_connected(&topology) { + let mut sorted_topology: Vec> = Vec::new(); + for peers in topology.iter() { + let mut sorted_peers: Vec = peers.iter().copied().collect(); + sorted_peers.sort(); + sorted_topology.push(sorted_peers); + } + return sorted_topology; + } + } +} + +fn are_all_nodes_connected(topology: &[HashSet]) -> bool { + let visited = dfs(topology, 0); + visited.len() == topology.len() +} + +fn dfs(topology: &[HashSet], start_node: NodeId) -> HashSet { + let mut visited: HashSet = HashSet::new(); + let mut stack: Vec = Vec::new(); + stack.push(start_node); + while let Some(node) = stack.pop() { + visited.insert(node); + for peer in topology[node as usize].iter() { + if !visited.contains(peer) { + stack.push(*peer); + } + } + } + visited +} + +pub fn save_topology(topology: &Topology, path: &str) -> Result<(), Box> { + let mut wtr = csv::Writer::from_path(path)?; + wtr.write_record(["node", "num_peers", "peers"])?; + + for (node, peers) in topology.iter().enumerate() { + let node: NodeId = node.try_into().unwrap(); + let peers_str: Vec = peers.iter().map(|peer_id| peer_id.to_string()).collect(); + wtr.write_record(&[ + node.to_string(), + peers.len().to_string(), + format!("[{}]", peers_str.join(",")), + ])?; + } + wtr.flush()?; + Ok(()) +}