From bd00f9dd3a141f450701b07100ec969231cf8977 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 16 Aug 2024 20:56:16 +0900 Subject: [PATCH] wip: working version --- mixnet-rs/.gitignore | 21 ++++ mixnet-rs/Cargo.toml | 5 + mixnet-rs/dissemination/Cargo.toml | 9 ++ mixnet-rs/dissemination/src/main.rs | 164 ++++++++++++++++++++++++++++ mixnet-rs/dissemination/src/node.rs | 62 +++++++++++ 5 files changed, 261 insertions(+) create mode 100644 mixnet-rs/.gitignore create mode 100644 mixnet-rs/Cargo.toml create mode 100644 mixnet-rs/dissemination/Cargo.toml create mode 100644 mixnet-rs/dissemination/src/main.rs create mode 100644 mixnet-rs/dissemination/src/node.rs diff --git a/mixnet-rs/.gitignore b/mixnet-rs/.gitignore new file mode 100644 index 0000000..d01bd1a --- /dev/null +++ b/mixnet-rs/.gitignore @@ -0,0 +1,21 @@ +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# RustRover +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file diff --git a/mixnet-rs/Cargo.toml b/mixnet-rs/Cargo.toml new file mode 100644 index 0000000..0457842 --- /dev/null +++ b/mixnet-rs/Cargo.toml @@ -0,0 +1,5 @@ +[workspace] +members = [ + "dissemination" +] +resolver = "2" diff --git a/mixnet-rs/dissemination/Cargo.toml b/mixnet-rs/dissemination/Cargo.toml new file mode 100644 index 0000000..44657c0 --- /dev/null +++ b/mixnet-rs/dissemination/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "dissemination" +version = "0.1.0" +edition = "2021" + +[dependencies] +rand = "0.8.5" +tracing = "0.1.40" +tracing-subscriber = "0.3.18" diff --git a/mixnet-rs/dissemination/src/main.rs b/mixnet-rs/dissemination/src/main.rs new file mode 100644 index 0000000..cb78806 --- /dev/null +++ b/mixnet-rs/dissemination/src/main.rs @@ -0,0 +1,164 @@ +use rand::seq::SliceRandom; +use rand::{rngs::StdRng, SeedableRng}; +use std::cmp::min; +use std::collections::{HashMap, HashSet}; +use std::f32::consts::PI; +use tracing::info; + +use node::{MessageId, Node, NodeId}; + +mod node; + +fn main() { + tracing_subscriber::fmt::init(); + + let seed: u64 = 0; + let gtr: f32 = 10.0; + let num_nodes: u16 = 10; + let peering_degree: u16 = 2; + let num_senders: u16 = 1; + let num_sending_msgs: u16 = 3; + assert!(num_nodes >= 2); + assert!(num_senders <= num_nodes); + + // Initialize nodes + let mut nodes: HashMap = HashMap::new(); + for i in 0..num_nodes { + nodes.insert(i, Node::new(i)); + } + + // Connect nodes + let topology = build_topology(num_nodes, peering_degree, seed); + for (node_id, peers) in topology.iter() { + peers.iter().for_each(|peer_id| { + nodes.get_mut(node_id).unwrap().connect(*peer_id); + }); + } + + let sender_ids: Vec = (0..num_senders).collect(); + + let mut vtime: f32 = 0.0; + let interval: f32 = 1.0 / gtr; + let mut next_msg_id: MessageId = 0; + let mut sent_msgs: HashMap = HashMap::new(); + let mut disseminated_msgs: HashMap = HashMap::new(); + loop { + // Send new messages + assert!(sent_msgs.len() % (num_senders as usize) == 0); + if sent_msgs.len() / (num_senders as usize) < num_sending_msgs as usize { + for sender_id in sender_ids.iter() { + nodes.get_mut(sender_id).unwrap().send(next_msg_id); + sent_msgs.insert(next_msg_id, (vtime, 1)); + next_msg_id += 1; + } + } + + // Collect messages to relay + let mut all_msgs_to_relay = Vec::new(); + for (node_id, node) in nodes.iter_mut() { + let msgs_to_relay = node.read_queues(); + msgs_to_relay.iter().for_each(|(receiver_id, msg)| { + all_msgs_to_relay.push((*receiver_id, *msg, *node_id)); + }); + } + + // Relay the messages + all_msgs_to_relay + .into_iter() + .for_each(|(receiver_id, msg, sender_id)| { + if nodes.get_mut(&receiver_id).unwrap().receive(msg, sender_id) { + let (sent_time, num_received_nodes) = sent_msgs.get_mut(&msg).unwrap(); + *num_received_nodes += 1; + if *num_received_nodes == num_nodes { + assert!(!disseminated_msgs.contains_key(&msg)); + disseminated_msgs.insert(msg, vtime - *sent_time); + } + } + }); + + // Check if all messages have been disseminated to all nodes. + if disseminated_msgs.len() == (num_senders * num_sending_msgs) as usize { + info!( + "vtime:{vtime}: All {} messages have been disseminated to all nodes. Exiting...", + disseminated_msgs.len() + ); + for (msg, latency) in disseminated_msgs.iter() { + info!( + "Message {} took {} time units to disseminate.", + msg, latency + ); + } + break; + } else { + info!( + "vtime:{vtime}: {} messages have been disseminated to all nodes.", + disseminated_msgs.len() + ); + } + + vtime += interval; + } +} + +fn build_topology( + num_nodes: u16, + peering_degree: u16, + seed: u64, +) -> HashMap> { + let mut rng = StdRng::seed_from_u64(seed); + + loop { + let mut topology: HashMap> = HashMap::new(); + for node in 0..num_nodes { + topology.insert(node, HashSet::new()); + } + + for node in 0..num_nodes { + let mut others: Vec = Vec::new(); + for other in (0..node).chain(node + 1..num_nodes) { + // Check if the other node is not already connected to the current node + // and the other node has not reached the peering degree. + if !topology.get(&node).unwrap().contains(&other) + && topology.get(&other).unwrap().len() < peering_degree as usize + { + others.push(other); + } + } + + // How many more connections the current node needs + let num_needs = peering_degree as usize - topology.get(&node).unwrap().len(); + // Smaple peers as many as possible and connect them to the current node + let k = min(num_needs, others.len()); + others.as_mut_slice().shuffle(&mut rng); + others.into_iter().take(k).for_each(|peer| { + topology.get_mut(&node).unwrap().insert(peer); + topology.get_mut(&peer).unwrap().insert(node); + }); + } + + if are_all_nodes_connected(&topology) { + return topology; + } + } +} + +fn are_all_nodes_connected(topology: &HashMap>) -> bool { + let start_node = topology.keys().next().unwrap(); + let visited = dfs(topology, *start_node); + visited.len() == topology.len() +} + +fn dfs(topology: &HashMap>, start_node: NodeId) -> HashSet { + let mut visited: HashSet = HashSet::new(); + let mut stack: Vec = Vec::new(); + stack.push(start_node); + while let Some(node) = stack.pop() { + visited.insert(node); + for peer in topology.get(&node).unwrap().iter() { + if !visited.contains(peer) { + stack.push(*peer); + } + } + } + visited +} diff --git a/mixnet-rs/dissemination/src/node.rs b/mixnet-rs/dissemination/src/node.rs new file mode 100644 index 0000000..1c2d46a --- /dev/null +++ b/mixnet-rs/dissemination/src/node.rs @@ -0,0 +1,62 @@ +use std::collections::{HashMap, HashSet, VecDeque}; + +pub type NodeId = u16; +pub type MessageId = u32; + +pub struct Node { + id: NodeId, + queues: HashMap>, + received_msgs: HashSet, +} + +impl Node { + pub fn new(id: NodeId) -> Self { + Node { + id, + queues: HashMap::new(), + received_msgs: HashSet::new(), + } + } + + pub fn connect(&mut self, peer_id: NodeId) { + self.queues.entry(peer_id).or_default(); + tracing::info!("Node {} connected to {}", self.id, peer_id); + } + + pub fn num_queues(&self) -> usize { + self.queues.len() + } + + pub fn send(&mut self, msg: MessageId) { + assert!(self.received_msgs.insert(msg)); + for (peer_id, queue) in self.queues.iter_mut() { + queue.push_back(msg); + tracing::info!("Node {} sent message {} to peer {}", self.id, msg, peer_id); + } + } + + pub fn receive(&mut self, msg: MessageId, from: NodeId) -> bool { + let first_received = self.received_msgs.insert(msg); + if first_received { + tracing::info!("Node {} received message {} from {}", self.id, msg, from); + for (node_id, queue) in self.queues.iter_mut() { + if *node_id != from { + queue.push_back(msg); + } + } + } else { + tracing::info!("Node {} ignored message {} from {}", self.id, msg, from); + } + first_received + } + + pub fn read_queues(&mut self) -> Vec<(NodeId, MessageId)> { + let mut msgs_to_relay: Vec<(NodeId, MessageId)> = Vec::new(); + for (node_id, queue) in self.queues.iter_mut() { + if let Some(msg) = queue.pop_front() { + msgs_to_relay.push((*node_id, msg)); + } + } + msgs_to_relay + } +}