add queues

This commit is contained in:
Youngjoon Lee 2024-08-17 03:41:46 +09:00
parent c7ac0340a3
commit cd35121d5e
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
5 changed files with 907 additions and 7 deletions

View File

@ -0,0 +1,119 @@
use std::{collections::HashMap, error::Error};
use rand::{rngs::StdRng, RngCore, SeedableRng};
use crate::{
node::{MessageId, Node, NodeId},
paramset::ParamSet,
queue::QueueConfig,
topology::build_topology,
};
pub fn run_iteration(paramset: ParamSet, seed: u64, out_csv_path: &str, topology_path: &str) {
// Initialize nodes
let mut nodes: HashMap<NodeId, Node> = HashMap::new();
let mut queue_seed_rng = StdRng::seed_from_u64(seed);
for i in 0..paramset.num_nodes {
nodes.insert(
i,
Node::new(QueueConfig {
queue_type: paramset.queue_type,
seed: queue_seed_rng.next_u64(),
min_queue_size: paramset.min_queue_size,
}),
);
}
// Connect nodes
let topology = build_topology(paramset.num_nodes, paramset.peering_degree, seed);
save_topology(&topology, topology_path).unwrap();
for (node_id, peers) in topology.iter() {
peers.iter().for_each(|peer_id| {
nodes.get_mut(node_id).unwrap().connect(*peer_id);
});
}
let sender_ids: Vec<NodeId> = (0..paramset.num_senders).collect();
let mut vtime: f32 = 0.0;
let interval: f32 = 1.0 / paramset.transmission_rate as f32;
let mut next_msg_id: MessageId = 0;
let mut sent_msgs: HashMap<MessageId, (f32, u16)> = HashMap::new();
let mut num_disseminated_msgs = 0;
let mut writer = csv::Writer::from_path(out_csv_path).unwrap();
writer
.write_record(["dissemination_time", "sent_time", "all_received_time"])
.unwrap();
loop {
// Send new messages
assert!(sent_msgs.len() % (paramset.num_senders as usize) == 0);
if sent_msgs.len() / (paramset.num_senders as usize) < paramset.num_sent_msgs as usize {
for sender_id in sender_ids.iter() {
nodes.get_mut(sender_id).unwrap().send(next_msg_id);
sent_msgs.insert(next_msg_id, (vtime, 1));
next_msg_id += 1;
}
}
// Collect messages to relay
let mut all_msgs_to_relay = Vec::new();
for (node_id, node) in nodes.iter_mut() {
let msgs_to_relay = node.read_queues();
msgs_to_relay.iter().for_each(|(receiver_id, msg)| {
all_msgs_to_relay.push((*receiver_id, *msg, *node_id));
});
}
// Relay the messages
all_msgs_to_relay
.into_iter()
.for_each(|(receiver_id, msg, sender_id)| {
if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) {
let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes == paramset.num_nodes {
let dissemination_time = vtime - *sent_time;
writer
.write_record(&[
dissemination_time.to_string(),
sent_time.to_string(),
vtime.to_string(),
])
.unwrap();
num_disseminated_msgs += 1;
}
}
});
// Check if all messages have been disseminated to all nodes.
if num_disseminated_msgs == (paramset.num_senders * paramset.num_sent_msgs) as usize {
break;
}
vtime += interval;
}
}
fn save_topology(
topology: &HashMap<u16, std::collections::HashSet<u16>>,
topology_path: &str,
) -> Result<(), Box<dyn Error>> {
let mut wtr = csv::Writer::from_path(topology_path)?;
wtr.write_record(["node", "num_peers", "peers"])?;
let mut sorted_keys: Vec<&u16> = topology.keys().collect();
sorted_keys.sort();
for &node in &sorted_keys {
let peers = topology.get(node).unwrap();
let peers_str: Vec<String> = peers.iter().map(|peer_id| peer_id.to_string()).collect();
wtr.write_record(&[
node.to_string(),
peers.len().to_string(),
format!("\"[{}]\"", peers_str.join(",")),
])?;
}
wtr.flush()?;
Ok(())
}

View File

@ -1,23 +1,29 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashMap, HashSet};
use crate::queue::{new_queue, Queue, QueueConfig};
pub type NodeId = u16;
pub type MessageId = u32;
pub struct Node {
queues: HashMap<NodeId, VecDeque<MessageId>>,
queue_config: QueueConfig,
queues: HashMap<NodeId, Box<dyn Queue<MessageId>>>,
received_msgs: HashSet<MessageId>,
}
impl Node {
pub fn new() -> Self {
pub fn new(queue_config: QueueConfig) -> Self {
Node {
queue_config,
queues: HashMap::new(),
received_msgs: HashSet::new(),
}
}
pub fn connect(&mut self, peer_id: NodeId) {
self.queues.entry(peer_id).or_default();
self.queues
.entry(peer_id)
.or_insert(new_queue(&self.queue_config));
}
pub fn num_queues(&self) -> usize {
@ -27,7 +33,7 @@ impl Node {
pub fn send(&mut self, msg: MessageId) {
assert!(self.received_msgs.insert(msg));
for (_, queue) in self.queues.iter_mut() {
queue.push_back(msg);
queue.push(msg);
}
}
@ -36,7 +42,7 @@ impl Node {
if first_received {
for (node_id, queue) in self.queues.iter_mut() {
if *node_id != from {
queue.push_back(msg);
queue.push(msg);
}
}
}
@ -46,7 +52,7 @@ impl Node {
pub fn read_queues(&mut self) -> Vec<(NodeId, MessageId)> {
let mut msgs_to_relay: Vec<(NodeId, MessageId)> = Vec::new();
for (node_id, queue) in self.queues.iter_mut() {
if let Some(msg) = queue.pop_front() {
if let Some(msg) = queue.pop() {
msgs_to_relay.push((*node_id, msg));
}
}

View File

@ -0,0 +1,349 @@
use crate::queue::QueueType;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum ExperimentId {
Experiment1 = 1,
Experiment2 = 2,
Experiment3 = 3,
Experiment4 = 4,
}
impl std::str::FromStr for ExperimentId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Experiment1" => Ok(ExperimentId::Experiment1),
"2" | "Experiment2" => Ok(ExperimentId::Experiment2),
"3" | "Experiment3" => Ok(ExperimentId::Experiment3),
"4" | "Experiment4" => Ok(ExperimentId::Experiment4),
_ => Err(format!("Invalid experiment ID: {}", s)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
pub enum SessionId {
Session1 = 1,
Session2 = 2,
Session2_1 = 21,
}
impl std::str::FromStr for SessionId {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"1" | "Session1" => Ok(SessionId::Session1),
"2" | "Session2" => Ok(SessionId::Session2),
"2.1" | "Session21" => Ok(SessionId::Session2_1),
_ => Err(format!("Invalid session ID: {}", s)),
}
}
}
pub const PARAMSET_CSV_COLUMNS: &[&str] = &[
"paramset",
"num_nodes",
"peering_degree",
"min_queue_size",
"transmission_rate",
"num_sent_msgs",
"num_senders",
"queue_type",
"num_iterations",
];
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ParamSet {
pub id: u16,
pub num_nodes: u16,
pub peering_degree: u16,
pub min_queue_size: u16,
pub transmission_rate: u16,
pub num_sent_msgs: u16,
pub num_senders: u16,
pub queue_type: QueueType,
pub num_iterations: u16,
}
impl ParamSet {
pub fn new_all_paramsets(
exp_id: ExperimentId,
session_id: SessionId,
queue_type: QueueType,
) -> Vec<Self> {
match session_id {
SessionId::Session1 => Self::new_session1_paramsets(exp_id, queue_type),
SessionId::Session2 => Self::new_session2_paramsets(exp_id, queue_type),
SessionId::Session2_1 => Self::new_session2_1_paramsets(exp_id, queue_type),
}
}
fn new_session1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[20, 40, 80] {
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
&[num_nodes / 5, num_nodes / 4, num_nodes / 2],
&[num_nodes / 2, num_nodes, num_nodes * 2],
&[num_nodes / 2, num_nodes, num_nodes * 2],
|_| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2 | ExperimentId::Experiment4 => vec![8, 16, 32],
},
match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3 | ExperimentId::Experiment4 => {
vec![num_nodes / 10, num_nodes / 5, num_nodes / 2]
}
}
.as_slice(),
queue_type,
num_nodes / 2,
);
paramsets.append(&mut new_paramsets);
start_id = next_start_id;
}
paramsets
}
fn new_session2_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[100, 1000, 10000] {
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
&[4, 8, 16],
&[10, 50, 100],
&[1, 10, 100],
|min_queue_size| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2 | ExperimentId::Experiment4 => {
vec![min_queue_size / 2, min_queue_size, min_queue_size * 2]
}
},
match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3 | ExperimentId::Experiment4 => {
vec![num_nodes / 10, num_nodes / 5, num_nodes / 2]
}
}
.as_slice(),
queue_type,
20,
);
paramsets.append(&mut new_paramsets);
start_id = next_start_id;
}
paramsets
}
fn new_session2_1_paramsets(exp_id: ExperimentId, queue_type: QueueType) -> Vec<ParamSet> {
let mut start_id: u16 = 1;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &num_nodes in &[20, 200, 2000] {
let (mut new_paramsets, next_start_id) = Self::new_paramsets(
start_id,
num_nodes,
&[4, 6, 8],
&[10, 50, 100],
&[1],
|_| match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment3 => vec![1],
ExperimentId::Experiment2 | ExperimentId::Experiment4 => vec![1000],
},
match exp_id {
ExperimentId::Experiment1 | ExperimentId::Experiment2 => vec![1],
ExperimentId::Experiment3 | ExperimentId::Experiment4 => {
vec![num_nodes / 10, num_nodes / 5, num_nodes / 2]
}
}
.as_slice(),
queue_type,
20,
);
paramsets.append(&mut new_paramsets);
start_id = next_start_id;
}
paramsets
}
#[allow(clippy::too_many_arguments)]
fn new_paramsets(
start_id: u16,
num_nodes: u16,
peering_degree_list: &[u16],
min_queue_size_list: &[u16],
transmission_rate_list: &[u16],
num_sent_msgs_list: impl Fn(u16) -> Vec<u16>,
num_senders_list: &[u16],
queue_type: QueueType,
num_iterations: u16,
) -> (Vec<ParamSet>, u16) {
let mut id = start_id;
let mut paramsets: Vec<ParamSet> = Vec::new();
for &peering_degree in peering_degree_list {
for &min_queue_size in min_queue_size_list {
for &transmission_rate in transmission_rate_list {
for &num_sent_msgs in num_sent_msgs_list(min_queue_size).iter() {
for &num_senders in num_senders_list {
if !Self::is_min_queue_size_applicable(&queue_type)
&& min_queue_size != min_queue_size_list[0]
{
id += 1;
continue;
}
paramsets.push(ParamSet {
id,
num_nodes,
peering_degree,
min_queue_size,
transmission_rate,
num_sent_msgs,
num_senders,
queue_type,
num_iterations,
});
id += 1;
}
}
}
}
}
(paramsets, id)
}
pub fn is_min_queue_size_applicable(queue_type: &QueueType) -> bool {
matches!(
queue_type,
QueueType::PureCoinFlipping
| QueueType::PureRandomSampling
| QueueType::PermutedCoinFlipping
)
}
pub fn as_csv_record(&self) -> Vec<String> {
vec![
self.id.to_string(),
self.num_nodes.to_string(),
self.peering_degree.to_string(),
self.min_queue_size.to_string(),
self.transmission_rate.to_string(),
self.num_sent_msgs.to_string(),
self.num_senders.to_string(),
format!("{:?}", self.queue_type),
self.num_iterations.to_string(),
]
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use strum::IntoEnumIterator;
use crate::paramset::ParamSet;
use super::*;
#[test]
fn test_new_all_paramsets() {
let cases = vec![
(
(ExperimentId::Experiment1, SessionId::Session1),
3u32.pow(4),
),
(
(ExperimentId::Experiment2, SessionId::Session1),
3u32.pow(5),
),
(
(ExperimentId::Experiment3, SessionId::Session1),
3u32.pow(5),
),
(
(ExperimentId::Experiment4, SessionId::Session1),
3u32.pow(6),
),
(
(ExperimentId::Experiment1, SessionId::Session2),
3u32.pow(4),
),
(
(ExperimentId::Experiment4, SessionId::Session2),
3u32.pow(6),
),
(
(ExperimentId::Experiment1, SessionId::Session2_1),
3u32.pow(3),
),
(
(ExperimentId::Experiment4, SessionId::Session2_1),
3u32.pow(4),
),
];
for queue_type in QueueType::iter() {
for ((exp_id, session_id), mut expected_cnt) in cases.clone().into_iter() {
let paramsets = ParamSet::new_all_paramsets(exp_id, session_id, queue_type);
// Check if the number of parameter sets is correct
if !ParamSet::is_min_queue_size_applicable(&queue_type) {
expected_cnt /= 3;
}
assert_eq!(paramsets.len(), expected_cnt as usize);
// Check if all parameter sets are unique
let unique_paramsets: HashSet<ParamSet> = paramsets.clone().into_iter().collect();
assert_eq!(unique_paramsets.len(), paramsets.len());
// Check if paramset IDs are correct.
if ParamSet::is_min_queue_size_applicable(&queue_type) {
for (i, paramset) in paramsets.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
}
}
}
}
#[test]
fn test_id_consistency() {
let cases = vec![
(ExperimentId::Experiment1, SessionId::Session1),
(ExperimentId::Experiment2, SessionId::Session1),
(ExperimentId::Experiment3, SessionId::Session1),
(ExperimentId::Experiment4, SessionId::Session1),
(ExperimentId::Experiment1, SessionId::Session2),
(ExperimentId::Experiment4, SessionId::Session2),
(ExperimentId::Experiment1, SessionId::Session2_1),
(ExperimentId::Experiment4, SessionId::Session2_1),
];
for (exp_id, session_id) in cases.into_iter() {
let paramsets_with_min_queue_size =
ParamSet::new_all_paramsets(exp_id, session_id, QueueType::PureCoinFlipping);
let paramsets_without_min_queue_size =
ParamSet::new_all_paramsets(exp_id, session_id, QueueType::NonMix);
for (i, paramset) in paramsets_with_min_queue_size.iter().enumerate() {
assert_eq!(paramset.id as usize, i + 1);
}
for mut paramset in paramsets_without_min_queue_size.into_iter() {
// To compare ParameterSet instances, use the same queue type.
paramset.queue_type = QueueType::PureCoinFlipping;
assert_eq!(
paramset,
paramsets_with_min_queue_size[paramset.id as usize - 1]
);
}
}
}
}

View File

@ -0,0 +1,358 @@
use std::collections::VecDeque;
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use strum_macros::EnumIter;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)]
pub enum QueueType {
NonMix,
PureCoinFlipping,
PureRandomSampling,
PermutedCoinFlipping,
NoisyCoinFlipping,
}
impl std::str::FromStr for QueueType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"NonMix" => Ok(QueueType::NonMix),
"PureCoinFlipping" => Ok(QueueType::PureCoinFlipping),
"PureRandomSampling" => Ok(QueueType::PureRandomSampling),
"PermutedCoinFlipping" => Ok(QueueType::PermutedCoinFlipping),
"NoisyCoinFlipping" => Ok(QueueType::NoisyCoinFlipping),
_ => Err(format!("Unknown queue type: {}", s)),
}
}
}
pub trait Queue<T: Copy> {
fn push(&mut self, msg: T);
fn pop(&mut self) -> Option<T>;
fn len(&self) -> usize;
}
pub struct QueueConfig {
pub queue_type: QueueType,
pub seed: u64,
pub min_queue_size: u16,
}
pub fn new_queue<T: 'static + Copy>(cfg: &QueueConfig) -> Box<dyn Queue<T>> {
match cfg.queue_type {
QueueType::NonMix => Box::new(NonMixQueue::new()),
QueueType::PureCoinFlipping => Box::new(PureCoinFlippingQueue::new(
cfg.min_queue_size,
StdRng::seed_from_u64(cfg.seed),
)),
QueueType::PureRandomSampling => Box::new(PureRandomSamplingQueue::new(
cfg.min_queue_size,
StdRng::seed_from_u64(cfg.seed),
)),
QueueType::PermutedCoinFlipping => Box::new(PermutedCoinFlippingQueue::new(
cfg.min_queue_size,
StdRng::seed_from_u64(cfg.seed),
)),
QueueType::NoisyCoinFlipping => {
Box::new(NoisyCoinFlippingQueue::new(StdRng::seed_from_u64(cfg.seed)))
}
}
}
struct NonMixQueue<T: Copy> {
queue: VecDeque<T>,
}
impl<T: Copy> NonMixQueue<T> {
fn new() -> Self {
Self {
queue: VecDeque::new(),
}
}
}
impl<T: Copy> Queue<T> for NonMixQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push_back(msg)
}
fn pop(&mut self) -> Option<T> {
self.queue.pop_front()
}
fn len(&self) -> usize {
self.queue.len()
}
}
struct MixQueue<T: Copy> {
queue: Vec<Option<T>>,
rng: StdRng,
}
impl<T: Copy> MixQueue<T> {
fn new(rng: StdRng) -> Self {
Self {
queue: Vec::new(),
rng,
}
}
fn push(&mut self, data: T) {
self.queue.push(Some(data))
}
fn pop(&mut self, idx: usize) -> Option<T> {
if idx < self.queue.len() {
self.queue.remove(idx)
} else {
None
}
}
fn len(&self) -> usize {
self.queue.len()
}
}
struct MinSizeMixQueue<T: Copy> {
queue: MixQueue<T>,
min_pool_size: u16,
}
impl<T: Copy> MinSizeMixQueue<T> {
fn new(min_pool_size: u16, rng: StdRng) -> Self {
let mut queue = MixQueue::new(rng);
queue.queue = vec![None; min_pool_size as usize];
Self {
queue,
min_pool_size,
}
}
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self, idx: usize) -> Option<T> {
self.queue.pop(idx)
}
fn ensure_min_size(&mut self) {
if self.queue.len() < self.min_pool_size as usize {
self.queue.queue.extend(
std::iter::repeat(None).take(self.min_pool_size as usize - self.queue.queue.len()),
);
}
}
fn len(&self) -> usize {
self.queue.len()
}
}
struct PureCoinFlippingQueue<T: Copy> {
queue: MinSizeMixQueue<T>,
}
impl<T: Copy> PureCoinFlippingQueue<T> {
fn new(min_pool_size: u16, rng: StdRng) -> Self {
Self {
queue: MinSizeMixQueue::new(min_pool_size, rng),
}
}
}
impl<T: Copy> Queue<T> for PureCoinFlippingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
self.queue.ensure_min_size();
loop {
for i in 0..self.len() {
if self.queue.queue.rng.gen_bool(0.5) {
return self.queue.pop(i);
}
}
}
}
fn len(&self) -> usize {
self.queue.len()
}
}
struct PureRandomSamplingQueue<T: Copy> {
queue: MinSizeMixQueue<T>,
}
impl<T: Copy> PureRandomSamplingQueue<T> {
fn new(min_pool_size: u16, rng: StdRng) -> Self {
Self {
queue: MinSizeMixQueue::new(min_pool_size, rng),
}
}
}
impl<T: Copy> Queue<T> for PureRandomSamplingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
self.queue.ensure_min_size();
let i = self.queue.queue.rng.gen_range(0..self.queue.len());
self.queue.pop(i)
}
fn len(&self) -> usize {
self.queue.len()
}
}
struct PermutedCoinFlippingQueue<T: Copy> {
queue: MinSizeMixQueue<T>,
}
impl<T: Copy> PermutedCoinFlippingQueue<T> {
fn new(min_pool_size: u16, rng: StdRng) -> Self {
Self {
queue: MinSizeMixQueue::new(min_pool_size, rng),
}
}
}
impl<T: Copy> Queue<T> for PermutedCoinFlippingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
self.queue.ensure_min_size();
self.queue
.queue
.queue
.as_mut_slice()
.shuffle(&mut self.queue.queue.rng);
loop {
for i in 0..self.queue.len() {
if self.queue.queue.rng.gen_bool(0.5) {
return self.queue.pop(i);
}
}
}
}
fn len(&self) -> usize {
self.queue.len()
}
}
struct NoisyCoinFlippingQueue<T: Copy> {
queue: MixQueue<T>,
}
impl<T: Copy> NoisyCoinFlippingQueue<T> {
pub fn new(rng: StdRng) -> Self {
Self {
queue: MixQueue::new(rng),
}
}
}
impl<T: Copy> Queue<T> for NoisyCoinFlippingQueue<T> {
fn push(&mut self, msg: T) {
self.queue.push(msg)
}
fn pop(&mut self) -> Option<T> {
if self.queue.len() == 0 {
return None;
}
loop {
for i in 0..self.queue.len() {
if self.queue.rng.gen_bool(0.5) {
return self.queue.pop(i);
} else if i == 0 {
return None;
}
}
}
}
fn len(&self) -> usize {
self.queue.len()
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
#[test]
fn test_non_mix_queue() {
let mut queue = new_queue(QueueType::NonMix, 0, 0);
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
// Check if queue is FIFO
queue.push(0);
queue.push(1);
assert_eq!(queue.pop(), Some(0));
assert_eq!(queue.pop(), Some(1));
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
// Check if queue is FIFO again
queue.push(2);
queue.push(3);
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
}
#[test]
fn test_mix_queues() {
for queue_type in [
QueueType::PureCoinFlipping,
QueueType::PureRandomSampling,
QueueType::PermutedCoinFlipping,
QueueType::NoisyCoinFlipping,
] {
test_mix_queue(queue_type);
}
}
fn test_mix_queue(queue_type: QueueType) {
let mut queue = new_queue(queue_type, 0, 4);
// Check if None (noise) is returned when queue is empty
assert_eq!(queue.pop(), None);
// Put only 2 messages even though the min queue size is 4
queue.push(0);
queue.push(1);
// Wait until 2 messages are returned from the queue
let mut set: HashSet<_> = vec![0, 1].into_iter().collect();
while !set.is_empty() {
if let Some(msg) = queue.pop() {
assert!(set.remove(&msg));
}
}
// Check if None (noise) is returned when there is no real message remains
assert_eq!(queue.pop(), None);
}
}

View File

@ -0,0 +1,68 @@
use std::collections::{HashMap, HashSet};
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use crate::node::NodeId;
pub fn build_topology(
num_nodes: u16,
peering_degree: u16,
seed: u64,
) -> HashMap<NodeId, HashSet<NodeId>> {
let mut rng = StdRng::seed_from_u64(seed);
loop {
let mut topology: HashMap<NodeId, HashSet<NodeId>> = HashMap::new();
for node in 0..num_nodes {
topology.insert(node, HashSet::new());
}
for node in 0..num_nodes {
let mut others: Vec<NodeId> = Vec::new();
for other in (0..node).chain(node + 1..num_nodes) {
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
if !topology.get(&node).unwrap().contains(&other)
&& topology.get(&other).unwrap().len() < peering_degree as usize
{
others.push(other);
}
}
// How many more connections the current node needs
let num_needs = peering_degree as usize - topology.get(&node).unwrap().len();
// Smaple peers as many as possible and connect them to the current node
let k = std::cmp::min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);
others.into_iter().take(k).for_each(|peer| {
topology.get_mut(&node).unwrap().insert(peer);
topology.get_mut(&peer).unwrap().insert(node);
});
}
if are_all_nodes_connected(&topology) {
return topology;
}
}
}
fn are_all_nodes_connected(topology: &HashMap<NodeId, HashSet<NodeId>>) -> bool {
let start_node = topology.keys().next().unwrap();
let visited = dfs(topology, *start_node);
visited.len() == topology.len()
}
fn dfs(topology: &HashMap<NodeId, HashSet<NodeId>>, start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node);
while let Some(node) = stack.pop() {
visited.insert(node);
for peer in topology.get(&node).unwrap().iter() {
if !visited.contains(peer) {
stack.push(*peer);
}
}
}
visited
}