diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index 38aa373..21a4835 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -85,20 +85,20 @@ "stream_settings": { "path": "test.json" }, - "node_count": 100, + "node_count": 10, "seed": 0, "record_settings": {}, "wards": [ { - "sum": 100 + "sum": 10 } ], "connected_peers_count": 4, "data_message_lottery_interval": "20s", "stake_proportion": 1.0, - "epoch_duration": "200s", + "epoch_duration": "20s", "slot_duration": "1s", - "slots_per_epoch": 200, + "slots_per_epoch": 20, "number_of_hops": 2, "persistent_transmission": { "max_emission_frequency": 1.0, diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs index e2e026a..5f39fb9 100644 --- a/simlib/blendnet-sims/src/node/blend/mod.rs +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -46,14 +46,14 @@ use stream_wrapper::CrossbeamReceiverStream; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BlendMessage { message: Vec, - tracks: Vec, + history: Vec, } impl BlendMessage { pub fn new(message: Vec) -> Self { Self { message, - tracks: Vec::new(), + history: Vec::new(), } } } @@ -65,7 +65,7 @@ impl PayloadSize for BlendMessage { } #[derive(Debug, Clone, Serialize, Deserialize)] -enum MessageTrack { +enum MessageHistoryEvent { PersistentTransmissionScheduled { #[serde(with = "node_id_serde")] node_id: NodeId, @@ -103,9 +103,9 @@ enum MessageTrack { }, } -struct BlendOutgoingMessageWithRoute { +struct BlendOutgoingMessageWithHistory { outgoing_message: BlendOutgoingMessage, - tracks: Vec, + history: Vec, } #[derive(Deserialize)] @@ -149,10 +149,10 @@ pub struct BlendNode { num_persistent_transmission_scheduled: usize, crypto_processor: CryptographicProcessor, - temporal_sender: channel::Sender, + temporal_sender: channel::Sender, temporal_update_time_sender: channel::Sender, temporal_processor_messages: - TemporalStream, TemporalScheduler>, + TemporalStream, TemporalScheduler>, num_temporal_processor_scheduled: usize, epoch_update_sender: channel::Sender, @@ -323,8 +323,8 @@ impl BlendNode { &Self::parse_payload(&message.message), ); message - .tracks - .push(MessageTrack::PersistentTransmissionScheduled { + .history + .push(MessageHistoryEvent::PersistentTransmissionScheduled { node_id: self.id, step_id: self.state.step_id, index: self.num_persistent_transmission_scheduled, @@ -344,9 +344,9 @@ impl BlendNode { BlendOutgoingMessage::Outbound(unwrapped_message) }; - self.schedule_temporal_processor(BlendOutgoingMessageWithRoute { + self.schedule_temporal_processor(BlendOutgoingMessageWithHistory { outgoing_message: temporal_message, - tracks: message.tracks, + history: message.history, }); } Err(e) => { @@ -355,15 +355,15 @@ impl BlendNode { } } - fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithRoute) { + fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) { let payload = match &message.outgoing_message { BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()), BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(msg), }; self.log_message("TemporalProcessorScheduled", &payload); message - .tracks - .push(MessageTrack::TemporalProcessorScheduled { + .history + .push(MessageHistoryEvent::TemporalProcessorScheduled { node_id: self.id, step_id: self.state.step_id, index: self.num_temporal_processor_scheduled, @@ -394,16 +394,16 @@ impl BlendNode { self.log_message(format!("MessageReleasedFrom{}", from).as_str(), payload); } - fn log_message_fully_unwrapped(&self, payload: &Payload, tracks: Vec) { + fn log_message_fully_unwrapped(&self, payload: &Payload, history: Vec) { log!( "MessageFullyUnwrapped", - MessageWithTracks { + MessageWithHistory { message: MessageLog { payload_id: payload.id(), step_id: self.state.step_id, node_id: self.id.index(), }, - tracks, + history, } ); } @@ -472,8 +472,8 @@ impl Node for BlendNode { for mut network_message in self.receive() { network_message .payload - .tracks - .push(MessageTrack::NetworkReceived { + .history + .push(MessageHistoryEvent::NetworkReceived { from_node_id: network_message.from, to_node_id: self.id, delay: self @@ -495,18 +495,18 @@ impl Node for BlendNode { pin!(&mut self.temporal_processor_messages).poll_next(&mut cx) { // Add a TemporalProcessorReleased track - match outgoing_msg_with_route.tracks.last().unwrap() { - MessageTrack::TemporalProcessorScheduled { + match outgoing_msg_with_route.history.last().unwrap() { + MessageHistoryEvent::TemporalProcessorScheduled { node_id, step_id, .. } => { assert_eq!(*node_id, self.id); - outgoing_msg_with_route - .tracks - .push(MessageTrack::TemporalProcessorReleased { + outgoing_msg_with_route.history.push( + MessageHistoryEvent::TemporalProcessorReleased { node_id: self.id, step_id: self.state.step_id, duration: self.duration_between(*step_id, self.state.step_id), - }); + }, + ); self.num_temporal_processor_scheduled -= 1; } track => panic!("Unexpected message track: {:?}", track), @@ -521,13 +521,13 @@ impl Node for BlendNode { ); self.schedule_persistent_transmission(BlendMessage { message, - tracks: outgoing_msg_with_route.tracks, + history: outgoing_msg_with_route.history, }); } BlendOutgoingMessage::FullyUnwrapped(payload) => { let payload = Payload::load(payload); self.log_message_released_from("TemporalProcessor", &payload); - self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.tracks); + self.log_message_fully_unwrapped(&payload, outgoing_msg_with_route.history); self.state.num_messages_fully_unwrapped += 1; //TODO: create a tracing event } @@ -551,13 +551,13 @@ impl Node for BlendNode { { let mut msg: BlendMessage = bincode::deserialize(&msg).unwrap(); // Add a PersistentTransmissionReleased track - match msg.tracks.last().unwrap() { - MessageTrack::PersistentTransmissionScheduled { + match msg.history.last().unwrap() { + MessageHistoryEvent::PersistentTransmissionScheduled { node_id, step_id, .. } => { assert_eq!(*node_id, self.id); - msg.tracks - .push(MessageTrack::PersistentTransmissionReleased { + msg.history + .push(MessageHistoryEvent::PersistentTransmissionReleased { node_id: self.id, step_id: self.state.step_id, duration: self.duration_between(*step_id, self.state.step_id), @@ -595,9 +595,9 @@ struct MessageLog { } #[derive(Debug, Serialize, Deserialize)] -struct MessageWithTracks { +struct MessageWithHistory { message: MessageLog, - tracks: Vec, + history: Vec, } #[derive(Debug, Serialize, Deserialize)]