From 631c1a691620b91b2533841f977c4c5cfe15e798 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:25:13 +0700 Subject: [PATCH] Store `PayloadId`s in the state to measure latency (#34) * Store `MessageId`s in the state to measure latency * Use String for PayloadId --- simlib/mixnet-sims/src/node/mix/message.rs | 39 ++++++++++++++++++++ simlib/mixnet-sims/src/node/mix/mod.rs | 42 +++++++++++----------- simlib/mixnet-sims/src/node/mix/state.rs | 7 ++-- 3 files changed, 66 insertions(+), 22 deletions(-) create mode 100644 simlib/mixnet-sims/src/node/mix/message.rs diff --git a/simlib/mixnet-sims/src/node/mix/message.rs b/simlib/mixnet-sims/src/node/mix/message.rs new file mode 100644 index 0000000..4a30a7d --- /dev/null +++ b/simlib/mixnet-sims/src/node/mix/message.rs @@ -0,0 +1,39 @@ +use uuid::Uuid; + +pub type PayloadId = String; + +pub struct Payload(Uuid); + +impl Payload { + pub fn new() -> Self { + Self(Uuid::new_v4()) + } + + pub fn id(&self) -> PayloadId { + self.0.to_string() + } + + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } + + pub fn load(data: Vec) -> Self { + assert_eq!(data.len(), 16); + Self(data.try_into().unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::Payload; + + #[test] + fn payload() { + let payload = Payload::new(); + println!("{}", payload.id()); + let bytes = payload.as_bytes(); + assert_eq!(bytes.len(), 16); + let loaded_payload = Payload::load(bytes.to_vec()); + assert_eq!(bytes, loaded_payload.as_bytes()); + } +} diff --git a/simlib/mixnet-sims/src/node/mix/mod.rs b/simlib/mixnet-sims/src/node/mix/mod.rs index 3608253..b891c1d 100644 --- a/simlib/mixnet-sims/src/node/mix/mod.rs +++ b/simlib/mixnet-sims/src/node/mix/mod.rs @@ -1,5 +1,6 @@ pub mod consensus_streams; pub mod lottery; +mod message; pub mod scheduler; pub mod state; pub mod stream_wrapper; @@ -8,6 +9,7 @@ use crate::node::mix::consensus_streams::{Epoch, Slot}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; +use message::Payload; use multiaddr::Multiaddr; use netrunner::network::NetworkMessage; use netrunner::node::{Node, NodeId}; @@ -33,15 +35,9 @@ use scheduler::{Interval, TemporalRelease}; use serde::Deserialize; use sha2::{Digest, Sha256}; use state::MixnodeState; -use std::collections::HashSet; -use std::pin::pin; -use std::{ - pin::{self}, - task::Poll, - time::Duration, -}; +use std::collections::{HashMap, HashSet}; +use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; -use uuid::Uuid; #[derive(Debug, Clone)] pub struct MixMessage(Vec); @@ -194,7 +190,8 @@ impl MixNode { state: MixnodeState { node_id: id, step_id: 0, - num_messages_broadcasted: 0, + data_messages_generated: HashMap::new(), + data_messages_fully_unwrapped: HashMap::new(), }, data_msg_lottery_update_time_sender, data_msg_lottery_interval, @@ -251,10 +248,6 @@ impl MixNode { self.epoch_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap(); } - - fn build_message_payload() -> [u8; 16] { - Uuid::new_v4().into_bytes() - } } impl Node for MixNode { @@ -277,8 +270,14 @@ impl Node for MixNode { if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { if self.data_msg_lottery.run() { - let payload = Self::build_message_payload(); - let message = self.crypto_processor.wrap_message(&payload).unwrap(); + let payload = Payload::new(); + self.state + .data_messages_generated + .insert(payload.id(), self.state.step_id); + let message = self + .crypto_processor + .wrap_message(payload.as_bytes()) + .unwrap(); self.persistent_sender.send(message).unwrap(); } } @@ -300,14 +299,16 @@ impl Node for MixNode { MixOutgoingMessage::Outbound(msg) => { self.persistent_sender.send(msg).unwrap(); } - MixOutgoingMessage::FullyUnwrapped(_) => { - tracing::info!("fully unwrapped message: Node:{}", self.id); - self.state.num_messages_broadcasted += 1; + MixOutgoingMessage::FullyUnwrapped(payload) => { + let payload = Payload::load(payload); + self.state + .data_messages_fully_unwrapped + .insert(payload.id(), self.state.step_id); //TODO: create a tracing event } } } - if let Poll::Ready(Some(msg)) = pin::pin!(&mut self.cover_traffic).poll_next(&mut cx) { + if let Poll::Ready(Some(msg)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { let message = self.crypto_processor.wrap_message(&msg).unwrap(); self.persistent_sender.send(message).unwrap(); } @@ -326,7 +327,8 @@ impl Node for MixNode { match ward { WardCondition::Max(_) => false, WardCondition::Sum(condition) => { - *condition.step_result.borrow_mut() += self.state.num_messages_broadcasted; + *condition.step_result.borrow_mut() += + self.state.data_messages_fully_unwrapped.len(); false } } diff --git a/simlib/mixnet-sims/src/node/mix/state.rs b/simlib/mixnet-sims/src/node/mix/state.rs index e075344..4f450b6 100644 --- a/simlib/mixnet-sims/src/node/mix/state.rs +++ b/simlib/mixnet-sims/src/node/mix/state.rs @@ -1,4 +1,4 @@ -use std::any::Any; +use std::{any::Any, collections::HashMap}; use serde::Serialize; @@ -9,12 +9,15 @@ use netrunner::{ warding::SimulationState, }; +use super::message::PayloadId; + #[derive(Debug, Clone, Serialize)] pub struct MixnodeState { #[serde(serialize_with = "serialize_node_id_as_index")] pub node_id: NodeId, pub step_id: usize, - pub num_messages_broadcasted: usize, + pub data_messages_generated: HashMap, + pub data_messages_fully_unwrapped: HashMap, } #[derive(Serialize)]