wip: working version

This commit is contained in:
Youngjoon Lee 2024-08-16 20:56:16 +09:00
parent dbf1b78134
commit bd00f9dd3a
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
5 changed files with 261 additions and 0 deletions

21
mixnet-rs/.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
# Generated by Cargo
# will have compiled files and executables
debug/
target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

5
mixnet-rs/Cargo.toml Normal file
View File

@ -0,0 +1,5 @@
[workspace]
members = [
"dissemination"
]
resolver = "2"

View File

@ -0,0 +1,9 @@
[package]
name = "dissemination"
version = "0.1.0"
edition = "2021"
[dependencies]
rand = "0.8.5"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

View File

@ -0,0 +1,164 @@
use rand::seq::SliceRandom;
use rand::{rngs::StdRng, SeedableRng};
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::f32::consts::PI;
use tracing::info;
use node::{MessageId, Node, NodeId};
mod node;
fn main() {
tracing_subscriber::fmt::init();
let seed: u64 = 0;
let gtr: f32 = 10.0;
let num_nodes: u16 = 10;
let peering_degree: u16 = 2;
let num_senders: u16 = 1;
let num_sending_msgs: u16 = 3;
assert!(num_nodes >= 2);
assert!(num_senders <= num_nodes);
// Initialize nodes
let mut nodes: HashMap<NodeId, Node> = HashMap::new();
for i in 0..num_nodes {
nodes.insert(i, Node::new(i));
}
// Connect nodes
let topology = build_topology(num_nodes, peering_degree, seed);
for (node_id, peers) in topology.iter() {
peers.iter().for_each(|peer_id| {
nodes.get_mut(node_id).unwrap().connect(*peer_id);
});
}
let sender_ids: Vec<NodeId> = (0..num_senders).collect();
let mut vtime: f32 = 0.0;
let interval: f32 = 1.0 / gtr;
let mut next_msg_id: MessageId = 0;
let mut sent_msgs: HashMap<MessageId, (f32, u16)> = HashMap::new();
let mut disseminated_msgs: HashMap<MessageId, f32> = HashMap::new();
loop {
// Send new messages
assert!(sent_msgs.len() % (num_senders as usize) == 0);
if sent_msgs.len() / (num_senders as usize) < num_sending_msgs as usize {
for sender_id in sender_ids.iter() {
nodes.get_mut(sender_id).unwrap().send(next_msg_id);
sent_msgs.insert(next_msg_id, (vtime, 1));
next_msg_id += 1;
}
}
// Collect messages to relay
let mut all_msgs_to_relay = Vec::new();
for (node_id, node) in nodes.iter_mut() {
let msgs_to_relay = node.read_queues();
msgs_to_relay.iter().for_each(|(receiver_id, msg)| {
all_msgs_to_relay.push((*receiver_id, *msg, *node_id));
});
}
// Relay the messages
all_msgs_to_relay
.into_iter()
.for_each(|(receiver_id, msg, sender_id)| {
if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) {
let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap();
*num_received_nodes += 1;
if *num_received_nodes == num_nodes {
assert!(!disseminated_msgs.contains_key(&msg));
disseminated_msgs.insert(msg, vtime - *sent_time);
}
}
});
// Check if all messages have been disseminated to all nodes.
if disseminated_msgs.len() == (num_senders * num_sending_msgs) as usize {
info!(
"vtime:{vtime}: All {} messages have been disseminated to all nodes. Exiting...",
disseminated_msgs.len()
);
for (msg, latency) in disseminated_msgs.iter() {
info!(
"Message {} took {} time units to disseminate.",
msg, latency
);
}
break;
} else {
info!(
"vtime:{vtime}: {} messages have been disseminated to all nodes.",
disseminated_msgs.len()
);
}
vtime += interval;
}
}
fn build_topology(
num_nodes: u16,
peering_degree: u16,
seed: u64,
) -> HashMap<NodeId, HashSet<NodeId>> {
let mut rng = StdRng::seed_from_u64(seed);
loop {
let mut topology: HashMap<NodeId, HashSet<NodeId>> = HashMap::new();
for node in 0..num_nodes {
topology.insert(node, HashSet::new());
}
for node in 0..num_nodes {
let mut others: Vec<NodeId> = Vec::new();
for other in (0..node).chain(node + 1..num_nodes) {
// Check if the other node is not already connected to the current node
// and the other node has not reached the peering degree.
if !topology.get(&node).unwrap().contains(&other)
&& topology.get(&other).unwrap().len() < peering_degree as usize
{
others.push(other);
}
}
// How many more connections the current node needs
let num_needs = peering_degree as usize - topology.get(&node).unwrap().len();
// Smaple peers as many as possible and connect them to the current node
let k = min(num_needs, others.len());
others.as_mut_slice().shuffle(&mut rng);
others.into_iter().take(k).for_each(|peer| {
topology.get_mut(&node).unwrap().insert(peer);
topology.get_mut(&peer).unwrap().insert(node);
});
}
if are_all_nodes_connected(&topology) {
return topology;
}
}
}
fn are_all_nodes_connected(topology: &HashMap<NodeId, HashSet<NodeId>>) -> bool {
let start_node = topology.keys().next().unwrap();
let visited = dfs(topology, *start_node);
visited.len() == topology.len()
}
fn dfs(topology: &HashMap<NodeId, HashSet<NodeId>>, start_node: NodeId) -> HashSet<NodeId> {
let mut visited: HashSet<NodeId> = HashSet::new();
let mut stack: Vec<NodeId> = Vec::new();
stack.push(start_node);
while let Some(node) = stack.pop() {
visited.insert(node);
for peer in topology.get(&node).unwrap().iter() {
if !visited.contains(peer) {
stack.push(*peer);
}
}
}
visited
}

View File

@ -0,0 +1,62 @@
use std::collections::{HashMap, HashSet, VecDeque};
pub type NodeId = u16;
pub type MessageId = u32;
pub struct Node {
id: NodeId,
queues: HashMap<NodeId, VecDeque<MessageId>>,
received_msgs: HashSet<MessageId>,
}
impl Node {
pub fn new(id: NodeId) -> Self {
Node {
id,
queues: HashMap::new(),
received_msgs: HashSet::new(),
}
}
pub fn connect(&mut self, peer_id: NodeId) {
self.queues.entry(peer_id).or_default();
tracing::info!("Node {} connected to {}", self.id, peer_id);
}
pub fn num_queues(&self) -> usize {
self.queues.len()
}
pub fn send(&mut self, msg: MessageId) {
assert!(self.received_msgs.insert(msg));
for (peer_id, queue) in self.queues.iter_mut() {
queue.push_back(msg);
tracing::info!("Node {} sent message {} to peer {}", self.id, msg, peer_id);
}
}
pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool {
let first_received = self.received_msgs.insert(msg);
if first_received {
tracing::info!("Node {} received message {} from {}", self.id, msg, from);
for (node_id, queue) in self.queues.iter_mut() {
if *node_id != from {
queue.push_back(msg);
}
}
} else {
tracing::info!("Node {} ignored message {} from {}", self.id, msg, from);
}
first_received
}
pub fn read_queues(&mut self) -> Vec<(NodeId, MessageId)> {
let mut msgs_to_relay: Vec<(NodeId, MessageId)> = Vec::new();
for (node_id, queue) in self.queues.iter_mut() {
if let Some(msg) = queue.pop_front() {
msgs_to_relay.push((*node_id, msg));
}
}
msgs_to_relay
}
}