implement logic

This commit is contained in:
Youngjoon Lee 2024-08-20 16:19:01 +02:00
parent 518b26e90c
commit 58fccb085e
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 401 additions and 4 deletions

View File

@ -32,6 +32,7 @@ impl std::str::FromStr for QueueType {
pub trait Queue<T: Copy> {
fn push(&mut self, msg: T);
fn pop(&mut self) -> Option<T>;
fn message_count(&self) -> usize;
}
pub struct QueueConfig {
@ -79,10 +80,15 @@ impl<T: Copy> Queue<T> for NonMixQueue<T> {
fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
}
fn message_count(&self) -> usize {
self.queue.len()
}
}
struct MixQueue<T: Copy> {
queue: Vec<Option<T>>, // None element means noise
message_count: usize,
rng: StdRng,
}
@ -90,12 +96,14 @@ impl<T: Copy> MixQueue<T> {
fn new(num_initial_noises: usize, seed: u64) -> Self {
Self {
queue: vec![None; num_initial_noises],
message_count: 0,
rng: StdRng::seed_from_u64(seed),
}
}
fn push(&mut self, data: T) {
self.queue.push(Some(data))
self.queue.push(Some(data));
self.message_count += 1;
}
fn fill_noises(&mut self, k: usize) {
@ -104,12 +112,22 @@ impl<T: Copy> MixQueue<T> {
fn pop(&mut self, idx: usize) -> Option<T> {
if idx < self.queue.len() {
self.queue.remove(idx)
match self.queue.remove(idx) {
Some(msg) => {
self.message_count -= 1;
Some(msg)
}
None => None,
}
} else {
None
}
}
fn message_count(&self) -> usize {
self.message_count
}
fn len(&self) -> usize {
self.queue.len()
}
@ -148,6 +166,10 @@ impl<T: Copy> MinSizeMixQueue<T> {
self.queue.pop(idx)
}
fn message_count(&self) -> usize {
self.queue.message_count()
}
fn ensure_min_size(&mut self) {
if self.queue.len() < self.min_pool_size as usize {
self.queue
@ -200,6 +222,10 @@ impl<T: Copy> Queue<T> for PureCoinFlippingQueue<T> {
}
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
}
}
struct PureRandomSamplingQueue<T: Copy> {
@ -225,6 +251,10 @@ impl<T: Copy> Queue<T> for PureRandomSamplingQueue<T> {
let i = self.queue.sample_index();
self.queue.pop(i)
}
fn message_count(&self) -> usize {
self.queue.message_count()
}
}
struct PermutedCoinFlippingQueue<T: Copy> {
@ -257,6 +287,10 @@ impl<T: Copy> Queue<T> for PermutedCoinFlippingQueue<T> {
}
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
}
}
struct NoisyCoinFlippingQueue<T: Copy> {
@ -291,6 +325,10 @@ impl<T: Copy> Queue<T> for NoisyCoinFlippingQueue<T> {
}
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
}
}
struct NoisyCoinFlippingRandomReleaseQueue<T: Copy> {
@ -322,6 +360,10 @@ impl<T: Copy> Queue<T> for NoisyCoinFlippingRandomReleaseQueue<T> {
None
}
}
fn message_count(&self) -> usize {
self.queue.message_count()
}
}
#[cfg(test)]

View File

@ -0,0 +1,209 @@
use std::{fmt::Display, path::Path};
use queue::QueueConfig;
use rand::{rngs::StdRng, Rng, SeedableRng};
use rustc_hash::FxHashMap;
use crate::{
node::{MessageId, Node},
paramset::ParamSet,
};
pub fn run_iteration(
paramset: ParamSet,
seed: u64,
out_latency_path: &str,
out_sent_sequence_path: &str,
out_received_sequence_path: &str,
out_data_msg_counts_path: &str,
) {
// Ensure that all output files do not exist
for path in &[
out_latency_path,
out_sent_sequence_path,
out_received_sequence_path,
out_data_msg_counts_path,
] {
assert!(!Path::new(path).exists(), "File already exists: {path}");
}
// Initialize a mix node
let mut mixnode = Node::new(&QueueConfig {
queue_type: paramset.queue_type,
seed,
min_queue_size: paramset.min_queue_size,
});
let mut next_msg_id: MessageId = 0;
// Virtual discrete time
let mut vtime: f32 = 0.0;
// Transmission interval that each queue must release a message
let transmission_interval = 1.0 / paramset.transmission_rate as f32;
// Results
let mut sent_times: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut latencies: FxHashMap<MessageId, f32> = FxHashMap::default();
let mut sent_sequence = MessageSequence::new();
let mut received_sequence = MessageSequence::new();
let mut data_msg_counts_in_queue: Vec<u32> = Vec::new();
let mut rng = StdRng::seed_from_u64(seed);
loop {
// The sender emits a message (data or noise) to the mix node.
if sent_times.len() < paramset.num_sender_data_msgs as usize
&& try_probability(&mut rng, paramset.sender_data_msg_prob)
{
let msg = next_msg_id;
next_msg_id += 1;
mixnode.receive(msg);
sent_times.insert(msg, vtime);
sent_sequence.add_message(msg);
} else {
// Generate noise and add it to the sequence to calculate ordering coefficients later,
// but don't need to send it to the mix node
// because the mix node will anyway drop the noise,
// and we don't need to record what the mix node receives.
sent_sequence.add_noise();
}
// The mix node add a new data message to its queue with a certain probability
if try_probability(&mut rng, paramset.mix_data_msg_prob) {
mixnode.send(next_msg_id);
next_msg_id += 1;
// Don't put the msg into the sent_sequence
// because sent_sequence is only for recording messages sent by the sender, not the mixnode.
}
// The mix node emits a message (data or noise) to the receiver.
// As the receiver, record the time and order of the received messages.
match mixnode.read_queue() {
Some(msg) => {
latencies.insert(msg, vtime - sent_times.get(&msg).unwrap());
received_sequence.add_message(msg);
}
None => {
received_sequence.add_noise();
}
}
// Record the number of data messages in the mix node's queue
data_msg_counts_in_queue.push(mixnode.message_count_in_queue());
// If all messages have been received by the receiver, stop the iteration.
assert!(latencies.len() <= paramset.num_sender_data_msgs as usize);
if latencies.len() == paramset.num_sender_data_msgs as usize {
break;
}
vtime += transmission_interval;
}
// Save results to CSV files
save_latencies(&latencies, &sent_times, out_latency_path);
save_sequence(&sent_sequence, out_sent_sequence_path);
save_sequence(&received_sequence, out_received_sequence_path);
save_data_msg_counts(
&data_msg_counts_in_queue,
transmission_interval,
out_data_msg_counts_path,
);
}
fn try_probability(rng: &mut StdRng, prob: f32) -> bool {
assert!(
(0.0..=1.0).contains(&prob),
"Probability must be in [0, 1]."
);
rng.gen::<f32>() < prob
}
fn save_latencies(
latencies: &FxHashMap<MessageId, f32>,
sent_times: &FxHashMap<MessageId, f32>,
path: &str,
) {
let mut writer = csv::Writer::from_path(path).unwrap();
writer
.write_record(["latency", "sent_time", "received_time"])
.unwrap();
for (msg, latency) in latencies.iter() {
let sent_time = sent_times.get(msg).unwrap();
writer
.write_record(&[
latency.to_string(),
sent_time.to_string(),
(sent_time + latency).to_string(),
])
.unwrap();
}
writer.flush().unwrap();
}
fn save_sequence(sequence: &MessageSequence, path: &str) {
let mut writer = csv::Writer::from_path(path).unwrap();
sequence.messages.iter().for_each(|entry| {
writer.write_record([entry.to_string()]).unwrap();
});
writer.flush().unwrap();
}
fn save_data_msg_counts(
data_msg_counts_in_queue: &[u32],
interval: f32,
out_data_msg_counts_path: &str,
) {
let mut writer = csv::Writer::from_path(out_data_msg_counts_path).unwrap();
writer
.write_record(["vtime", "data_msg_count_in_queue"])
.unwrap();
data_msg_counts_in_queue
.iter()
.enumerate()
.for_each(|(i, count)| {
writer
.write_record([(i as f64 * interval as f64).to_string(), count.to_string()])
.unwrap();
});
writer.flush().unwrap();
}
struct MessageSequence {
messages: Vec<SequenceEntry>,
}
impl MessageSequence {
fn new() -> Self {
Self {
messages: Vec::new(),
}
}
fn add_message(&mut self, msg: MessageId) {
self.messages.push(SequenceEntry::Message(msg));
}
fn add_noise(&mut self) {
if let Some(last) = self.messages.last_mut() {
if let SequenceEntry::Noise(cnt) = last {
*cnt += 1;
} else {
self.messages.push(SequenceEntry::Noise(1))
}
}
}
}
enum SequenceEntry {
Message(MessageId),
Noise(u32), // the number of consecutive noises
}
impl Display for SequenceEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
SequenceEntry::Message(msg) => msg.to_string(),
SequenceEntry::Noise(cnt) => format!("-{cnt}"),
};
f.write_str(s.as_str())
}
}

View File

@ -1,7 +1,17 @@
mod iteration;
mod node;
mod paramset;
use std::{
error::Error,
path::Path,
time::{Duration, SystemTime},
};
use chrono::Utc;
use clap::Parser;
use paramset::{ExperimentId, SessionId};
use iteration::run_iteration;
use paramset::{ExperimentId, ParamSet, SessionId, PARAMSET_CSV_COLUMNS};
use queue::QueueType;
#[derive(Debug, Parser)]
@ -20,5 +30,106 @@ struct Args {
}
fn main() {
println!("Hello, world!");
tracing_subscriber::fmt::init();
let args = Args::parse();
tracing::info!("Arguments: {:?}", args);
let Args {
exp_id,
session_id,
queue_type,
outdir,
from_paramset,
} = args;
// Create a directory and initialize a CSV file only with a header
assert!(
Path::new(&outdir).is_dir(),
"Output directory does not exist: {outdir}"
);
let subdir = format!(
"__WIP__dissemination_e{}s{}_{:?}_{}___DUR__",
exp_id as u8,
session_id as u8,
queue_type,
Utc::now().to_rfc3339()
);
std::fs::create_dir_all(&format!("{outdir}/{subdir}")).unwrap();
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
let session_start_time = SystemTime::now();
for paramset in paramsets {
if paramset.id < from_paramset.unwrap_or(0) {
tracing::info!("ParamSet:{} skipped", paramset.id);
continue;
}
let paramset_dir = format!("{outdir}/{subdir}/__WIP__paramset_{}", paramset.id);
std::fs::create_dir_all(paramset_dir.as_str()).unwrap();
save_paramset_info(&paramset, format!("{paramset_dir}/paramset.csv").as_str()).unwrap();
for i in 0..paramset.num_iterations {
run_iteration(
paramset.clone(),
i as u64,
&format!("{paramset_dir}/iteration_{i}_latency.csv"),
&format!("{paramset_dir}/iteration_{i}_sent_seq.csv"),
&format!("{paramset_dir}/iteration_{i}_recv_seq.csv"),
&format!("{paramset_dir}/iteration_{i}_data_msg_counts.csv"),
);
tracing::info!("ParamSet:{}, Iteration:{} completed.", paramset.id, i);
}
let new_paramset_dir = paramset_dir.replace("__WIP__paramset_", "paramset_");
std::fs::rename(&paramset_dir, &new_paramset_dir)
.expect("Failed to rename: {paramset_dir} -> {new_paramset_dir}: {e}");
tracing::info!("ParamSet:{} completed", paramset.id);
}
let session_duration = SystemTime::now()
.duration_since(session_start_time)
.unwrap();
// Replace "__WIP__" and "__DUR__" in the subdir string
let new_subdir = subdir
.replace("__WIP__", "")
.replace("__DUR__", &format_duration(session_duration));
let old_path = format!("{}/{}", outdir, subdir);
let new_path = format!("{}/{}", outdir, new_subdir);
assert!(
!Path::new(&new_path).exists(),
"The new directory already exists: {new_path}"
);
std::fs::rename(&old_path, &new_path)
.expect("Failed to rename the directory: {old_path} -> {new_path}");
tracing::info!("Session completed.");
}
fn save_paramset_info(paramset: &ParamSet, path: &str) -> Result<(), Box<dyn Error>> {
// Assert that the file does not already exist
assert!(
!Path::new(path).exists(),
"File already exists at path: {path}",
);
let mut wtr = csv::Writer::from_path(path)?;
wtr.write_record(PARAMSET_CSV_COLUMNS)?;
wtr.write_record(paramset.as_csv_record())?;
wtr.flush()?;
Ok(())
}
fn format_duration(duration: Duration) -> String {
let total_seconds = duration.as_secs();
let days = total_seconds / 86_400;
let hours = (total_seconds % 86_400) / 3_600;
let minutes = (total_seconds % 3_600) / 60;
let seconds = total_seconds % 60;
format!("{}d{}h{}m{}s", days, hours, minutes, seconds)
}

View File

@ -0,0 +1,35 @@
use queue::{new_queue, Queue, QueueConfig};
pub type MessageId = u32;
pub struct Node {
queue: Box<dyn Queue<MessageId>>,
}
impl Node {
pub fn new(queue_config: &QueueConfig) -> Self {
Node {
queue: new_queue(queue_config),
}
}
pub fn send(&mut self, msg: MessageId) {
// Schedule sending a new data message to the peer
self.queue.push(msg);
}
pub fn receive(&mut self, msg: MessageId) {
// Relay the message to another peer.
// Don't need to accept noise in this function because it anyway has to be dropped.
self.queue.push(msg);
}
pub fn read_queue(&mut self) -> Option<MessageId> {
// Returns `None` if a noise was read from the queue
self.queue.pop()
}
pub fn message_count_in_queue(&self) -> u32 {
self.queue.message_count().try_into().unwrap()
}
}