From 2aa1fecab233e339ac81346037b5ad7ffc78247f Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:39:38 +0900 Subject: [PATCH] refactor --- .../blendnet-sims/src/node/blend/message.rs | 78 +++++++ simlib/blendnet-sims/src/node/blend/mod.rs | 205 +++++------------- 2 files changed, 135 insertions(+), 148 deletions(-) diff --git a/simlib/blendnet-sims/src/node/blend/message.rs b/simlib/blendnet-sims/src/node/blend/message.rs index 4a30a7d..e314181 100644 --- a/simlib/blendnet-sims/src/node/blend/message.rs +++ b/simlib/blendnet-sims/src/node/blend/message.rs @@ -1,3 +1,8 @@ +use std::{ops::Mul, time::Duration}; + +use netrunner::node::serialize_node_id_as_index; +use netrunner::node::NodeId; +use serde::Serialize; use uuid::Uuid; pub type PayloadId = String; @@ -23,6 +28,79 @@ impl Payload { } } +#[derive(Debug, Clone, Serialize)] +pub struct MessageHistory(Vec); + +impl MessageHistory { + pub fn new() -> Self { + Self(Vec::new()) + } + + pub fn add( + &mut self, + node_id: NodeId, + step_id: usize, + step_time: Duration, + event_type: MessageEventType, + ) { + let duration_from_prev = self.0.last().map_or(Duration::ZERO, |prev_event| { + step_time.mul((step_id - prev_event.step_id).try_into().unwrap()) + }); + self.0.push(MessageEvent { + node_id, + step_id, + duration_from_prev, + event_type, + }); + } + + pub fn last_event_type(&self) -> Option<&MessageEventType> { + self.0.last().map(|event| &event.event_type) + } + + pub fn total_duration(&self) -> Duration { + self.0.iter().map(|event| event.duration_from_prev).sum() + } +} + +#[derive(Debug, Clone, Serialize)] +struct MessageEvent { + #[serde(serialize_with = "serialize_node_id_as_index")] + node_id: NodeId, + step_id: usize, + #[serde(serialize_with = "duration_as_millis")] + duration_from_prev: Duration, + event_type: MessageEventType, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub enum MessageEventType { + Created, + PersistentTransmissionScheduled { + index: usize, + }, + PersistentTransmissionReleased, + TemporalProcessorScheduled { + index: usize, + }, + TemporalProcessorReleased, + NetworkSent { + #[serde(serialize_with = "serialize_node_id_as_index")] + to: NodeId, + }, + NetworkReceived { + #[serde(serialize_with = "serialize_node_id_as_index")] + from: NodeId, + }, +} + +pub fn duration_as_millis(duration: &Duration, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u64(duration.as_millis().try_into().unwrap()) +} + #[cfg(test)] mod tests { use super::Payload; diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index 65e881e..28c3a34 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -13,9 +13,9 @@ use cached::{Cached, TimedCache}; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; -use message::{Payload, PayloadId}; +use message::{duration_as_millis, MessageEventType, MessageHistory, Payload, PayloadId}; use netrunner::network::NetworkMessage; -use netrunner::node::{serialize_node_id_as_index, Node, NodeId, NodeIdExt}; +use netrunner::node::{Node, NodeId, NodeIdExt}; use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, @@ -37,28 +37,26 @@ use scheduler::{Interval, TemporalScheduler}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use state::BlendnodeState; -use std::ops::Mul; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; #[derive(Debug, Clone, Serialize)] pub struct BlendMessage { message: Vec, - history: Vec, + history: MessageHistory, } impl BlendMessage { - fn new(message: Vec, node_id: NodeId, step_id: usize) -> Self { - Self { - message, - history: vec![MessageEvent::Created { node_id, step_id }], - } + fn new(message: Vec, node_id: NodeId, step_id: usize, step_time: Duration) -> Self { + let mut history = MessageHistory::new(); + history.add(node_id, step_id, step_time, MessageEventType::Created); + Self { message, history } } fn new_drop() -> Self { Self { message: Vec::new(), - history: vec![], + history: MessageHistory::new(), } } @@ -73,60 +71,9 @@ impl PayloadSize for BlendMessage { } } -#[derive(Debug, Clone, Serialize)] -enum MessageEvent { - Created { - #[serde(serialize_with = "serialize_node_id_as_index")] - node_id: NodeId, - step_id: usize, - }, - PersistentTransmissionScheduled { - #[serde(serialize_with = "serialize_node_id_as_index")] - node_id: NodeId, - step_id: usize, - index: usize, - }, - PersistentTransmissionReleased { - #[serde(serialize_with = "serialize_node_id_as_index")] - node_id: NodeId, - step_id: usize, - #[serde(serialize_with = "serialize_duration_as_millis")] - duration: Duration, - }, - TemporalProcessorScheduled { - #[serde(serialize_with = "serialize_node_id_as_index")] - node_id: NodeId, - step_id: usize, - index: usize, - }, - TemporalProcessorReleased { - #[serde(serialize_with = "serialize_node_id_as_index")] - node_id: NodeId, - step_id: usize, - #[serde(serialize_with = "serialize_duration_as_millis")] - duration: Duration, - }, - NetworkSent { - #[serde(serialize_with = "serialize_node_id_as_index")] - from_node_id: NodeId, - #[serde(serialize_with = "serialize_node_id_as_index")] - to_node_id: NodeId, - step_id: usize, - }, - NetworkReceived { - #[serde(serialize_with = "serialize_node_id_as_index")] - from_node_id: NodeId, - #[serde(serialize_with = "serialize_node_id_as_index")] - to_node_id: NodeId, - step_id: usize, - #[serde(serialize_with = "serialize_duration_as_millis")] - latency: Duration, - }, -} - struct BlendOutgoingMessageWithHistory { outgoing_message: BlendOutgoingMessage, - history: Vec, + history: MessageHistory, } #[derive(Deserialize)] @@ -356,7 +303,8 @@ impl BlendNode { self.slot_update_sender.send(elapsed).unwrap(); } - fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec) { + fn log_message_fully_unwrapped(&self, payload: &Payload, history: MessageHistory) { + let total_duration = history.total_duration(); log!( "MessageFullyUnwrapped", MessageWithHistoryLog { @@ -366,89 +314,63 @@ impl BlendNode { node_id: self.id.index(), }, history, + total_duration, } ); } - fn record_network_sent_event(&self, history: &mut Vec, to: NodeId) { - history.push(MessageEvent::NetworkSent { - from_node_id: self.id, - to_node_id: to, - step_id: self.state.step_id, - }) + fn new_blend_message(&self, message: Vec) -> BlendMessage { + BlendMessage::new(message, self.id, self.state.step_id, self.step_time) } - fn record_network_received_event(&self, history: &mut Vec, from: NodeId) { - match history.last().unwrap() { - MessageEvent::NetworkSent { - from_node_id, - to_node_id, - step_id, - } => { - assert_eq!(*from_node_id, from); - assert_eq!(*to_node_id, self.id); - history.push(MessageEvent::NetworkReceived { - from_node_id: from, - to_node_id: self.id, - step_id: self.state.step_id, - latency: self.duration_between(*step_id, self.state.step_id), - }); - } - event => panic!("Unexpected message event: {:?}", event), - } + fn record_network_sent_event(&self, history: &mut MessageHistory, to: NodeId) { + self.record_message_event(history, MessageEventType::NetworkSent { to }); } - fn record_persistent_scheduled_event(&mut self, history: &mut Vec) { - history.push(MessageEvent::PersistentTransmissionScheduled { - node_id: self.id, - step_id: self.state.step_id, - index: self.state.cur_num_persistent_scheduled, - }) + fn record_network_received_event(&self, history: &mut MessageHistory, from: NodeId) { + assert_eq!( + history.last_event_type(), + Some(&MessageEventType::NetworkSent { to: self.id }) + ); + self.record_message_event(history, MessageEventType::NetworkReceived { from }); } - fn record_persistent_released_event(&mut self, history: &mut Vec) { - match history.last().unwrap() { - MessageEvent::PersistentTransmissionScheduled { - node_id, step_id, .. - } => { - assert_eq!(*node_id, self.id); - history.push(MessageEvent::PersistentTransmissionReleased { - node_id: self.id, - step_id: self.state.step_id, - duration: self.duration_between(*step_id, self.state.step_id), - }); - } - event => panic!("Unexpected message event: {:?}", event), - } + fn record_persistent_scheduled_event(&self, history: &mut MessageHistory) { + self.record_message_event( + history, + MessageEventType::PersistentTransmissionScheduled { + index: self.state.cur_num_persistent_scheduled, + }, + ); } - fn record_temporal_scheduled_event(&mut self, history: &mut Vec) { - history.push(MessageEvent::TemporalProcessorScheduled { - node_id: self.id, - step_id: self.state.step_id, - index: self.state.cur_num_temporal_scheduled, - }) + fn record_persistent_released_event(&self, history: &mut MessageHistory) { + assert!(matches!( + history.last_event_type(), + Some(MessageEventType::PersistentTransmissionScheduled { .. }) + )); + self.record_message_event(history, MessageEventType::PersistentTransmissionReleased); } - fn record_temporal_released_event(&mut self, history: &mut Vec) { - match history.last().unwrap() { - MessageEvent::TemporalProcessorScheduled { - node_id, step_id, .. - } => { - assert_eq!(*node_id, self.id); - history.push(MessageEvent::TemporalProcessorReleased { - node_id: self.id, - step_id: self.state.step_id, - duration: self.duration_between(*step_id, self.state.step_id), - }); - } - event => panic!("Unexpected message event: {:?}", event), - } + fn record_temporal_scheduled_event(&self, history: &mut MessageHistory) { + self.record_message_event( + history, + MessageEventType::TemporalProcessorScheduled { + index: self.state.cur_num_temporal_scheduled, + }, + ); } - fn duration_between(&self, from_step: usize, to_step: usize) -> Duration { - self.step_time - .mul((to_step - from_step).try_into().unwrap()) + fn record_temporal_released_event(&self, history: &mut MessageHistory) { + assert!(matches!( + history.last_event_type(), + Some(MessageEventType::TemporalProcessorScheduled { .. }) + )); + self.record_message_event(history, MessageEventType::TemporalProcessorReleased); + } + + fn record_message_event(&self, history: &mut MessageHistory, event_type: MessageEventType) { + history.add(self.id, self.state.step_id, self.step_time, event_type); } } @@ -478,11 +400,7 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(BlendMessage::new( - message, - self.id, - self.state.step_id, - )); + self.schedule_persistent_transmission(self.new_blend_message(message)); } } @@ -534,11 +452,7 @@ impl Node for BlendNode { .crypto_processor .wrap_message(payload.as_bytes()) .unwrap(); - self.schedule_persistent_transmission(BlendMessage::new( - message, - self.id, - self.state.step_id, - )); + self.schedule_persistent_transmission(self.new_blend_message(message)); } // Proceed persistent transmission @@ -574,12 +488,7 @@ struct MessageLog { #[derive(Debug, Serialize)] struct MessageWithHistoryLog { message: MessageLog, - history: Vec, -} - -pub fn serialize_duration_as_millis(duration: &Duration, s: S) -> Result -where - S: serde::Serializer, -{ - s.serialize_u64(duration.as_millis().try_into().unwrap()) + history: MessageHistory, + #[serde(serialize_with = "duration_as_millis")] + total_duration: Duration, }