From 976b1f9577124f381e2b68d30e97ba783ffb3cc6 Mon Sep 17 00:00:00 2001 From: gusto Date: Wed, 26 Jul 2023 10:21:05 +0300 Subject: [PATCH] Add node step time to report (#271) * Skip step if previous node exeeded step time * Fix clippy * Passively keep track of step execution time in node --- simulations/src/node/carnot/mod.rs | 288 +++++++++++++++-------------- 1 file changed, 151 insertions(+), 137 deletions(-) diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index d6f64808..0cb1bc81 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -8,6 +8,7 @@ mod timeout; // std use std::hash::Hash; +use std::time::Instant; use std::{collections::HashMap, time::Duration}; // crates use bls_signatures::PrivateKey; @@ -41,6 +42,7 @@ const ROOT_COMMITTEE: &str = "root_committee"; const PARENT_COMMITTEE: &str = "parent_committee"; const CHILD_COMMITTEES: &str = "child_committees"; const COMMITTED_BLOCKS: &str = "committed_blocks"; +const STEP_DURATION: &str = "step_duration"; pub const CARNOT_RECORD_KEYS: &[&str] = &[ NODE_ID, @@ -55,6 +57,7 @@ pub const CARNOT_RECORD_KEYS: &[&str] = &[ PARENT_COMMITTEE, CHILD_COMMITTEES, COMMITTED_BLOCKS, + STEP_DURATION, ]; static RECORD_SETTINGS: std::sync::OnceLock> = std::sync::OnceLock::new(); @@ -73,6 +76,7 @@ pub struct CarnotState { parent_committe: Committee, child_committees: Vec, committed_blocks: Vec, + step_duration: Duration, } impl serde::Serialize for CarnotState { @@ -135,6 +139,7 @@ impl serde::Serialize for CarnotState { COMMITTED_BLOCKS => { ser.serialize_field(COMMITTED_BLOCKS, &self.committed_blocks)? } + STEP_DURATION => ser.serialize_field(STEP_DURATION, &self.step_duration)?, _ => {} } } @@ -186,6 +191,7 @@ impl From<&Carnot> for CarnotState { last_view_timeout_qc: value.last_view_timeout_qc(), committed_blocks: value.committed_blocks(), highest_voted_view: Default::default(), + step_duration: Default::default(), } } } @@ -215,9 +221,10 @@ pub struct CarnotNode { event_builder: event_builder::EventBuilder, engine: Carnot, random_beacon_pk: PrivateKey, + step_duration: Duration, } -impl CarnotNode { +impl> CarnotNode { pub fn new( id: consensus_engine::NodeId, settings: CarnotSettings, @@ -244,6 +251,7 @@ impl CarnotNode { event_builder: event_builder::EventBuilder::new(id, timeout), engine, random_beacon_pk, + step_duration: Duration::ZERO, }; this.state = CarnotState::from(&this.engine); this @@ -314,6 +322,143 @@ impl CarnotNode { } } } + + fn process_event(&mut self, event: Event<[u8; 32]>) { + let mut output = None; + match event { + Event::Proposal { block } => { + let current_view = self.engine.current_view(); + tracing::info!( + node=%self.id, + last_committed_view=%self.engine.latest_committed_view(), + current_view = %current_view, + block_view = %block.header().view, + block = %block.header().id, + parent_block=%block.header().parent(), + "receive block proposal", + ); + match self.engine.receive_block(block.header().clone()) { + Ok(mut new) => { + if self.engine.current_view() != new.current_view() { + new = new + .update_overlay(|overlay| { + overlay.update_leader_selection(|leader_selection| { + leader_selection.on_new_block_received(block.clone()) + }) + }) + .unwrap_or(new); + self.engine = new; + } + } + Err(_) => { + tracing::error!( + node = %self.id, + current_view = %self.engine.current_view(), + block_view = %block.header().view, block = %block.header().id, + "receive block proposal, but is invalid", + ); + } + } + + if self.engine.overlay().is_member_of_leaf_committee(self.id) { + output = Some(Output::Send(consensus_engine::Send { + to: self.engine.parent_committee(), + payload: Payload::Vote(Vote { + view: self.engine.current_view(), + block: block.header().id, + }), + })) + } + } + // This branch means we already get enough votes for this block + // So we can just call approve_block + Event::Approve { block, .. } => { + tracing::info!( + node = %self.id, + current_view = %self.engine.current_view(), + block_view = %block.view, + block = %block.id, + parent_block=%block.parent(), + "receive approve message" + ); + let (new, out) = self.engine.approve_block(block); + tracing::info!(vote=?out, node=%self.id); + output = Some(Output::Send(out)); + self.engine = new; + } + Event::ProposeBlock { qc } => { + output = Some(Output::BroadcastProposal { + proposal: nomos_core::block::Block::new( + qc.view().next(), + qc.clone(), + [].into_iter(), + self.id, + RandomBeaconState::generate_happy(qc.view().next(), &self.random_beacon_pk), + ), + }); + } + // This branch means we already get enough new view msgs for this qc + // So we can just call approve_new_view + Event::NewView { + timeout_qc, + new_views, + } => { + tracing::info!( + node = %self.id, + current_view = %self.engine.current_view(), + timeout_view = %timeout_qc.view(), + "receive new view message" + ); + let (new, out) = self.engine.approve_new_view(timeout_qc, new_views); + output = Some(Output::Send(out)); + self.engine = new; + } + Event::TimeoutQc { timeout_qc } => { + tracing::info!( + node = %self.id, + current_view = %self.engine.current_view(), + timeout_view = %timeout_qc.view(), + "receive timeout qc message" + ); + self.engine = self.engine.receive_timeout_qc(timeout_qc); + } + Event::RootTimeout { timeouts } => { + tracing::debug!("root timeout {:?}", timeouts); + if self.engine.is_member_of_root_committee() { + assert!(timeouts + .iter() + .all(|t| t.view == self.engine.current_view())); + let high_qc = timeouts + .iter() + .map(|t| &t.high_qc) + .chain(std::iter::once(&self.engine.high_qc())) + .max_by_key(|qc| qc.view) + .expect("empty root committee") + .clone(); + let timeout_qc = + TimeoutQc::new(timeouts.iter().next().unwrap().view, high_qc, self.id); + output = Some(Output::BroadcastTimeoutQc { timeout_qc }); + } + } + Event::LocalTimeout => { + tracing::info!( + node = %self.id, + current_view = %self.engine.current_view(), + "receive local timeout message" + ); + let (new, out) = self.engine.local_timeout(); + self.engine = new; + output = out.map(Output::Send); + } + Event::None => { + tracing::error!("unimplemented none branch"); + unreachable!("none event will never be constructed") + } + } + if let Some(event) = output { + self.handle_output(event); + } + } } impl> Node for CarnotNode { @@ -333,7 +478,9 @@ impl> Node for Car } fn step(&mut self, elapsed: Duration) { - // split messages per view, we just one to process the current engine processing view or proposals or timeoutqcs + let step_duration = Instant::now(); + + // split messages per view, we just want to process the current engine processing view or proposals or timeoutqcs let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self .network_interface .receive_messages() @@ -352,145 +499,12 @@ impl> Node for Car .step(current_view_messages, &self.engine, elapsed); for event in events { - let mut output = None; - match event { - Event::Proposal { block } => { - let current_view = self.engine.current_view(); - tracing::info!( - node=%self.id, - last_committed_view=%self.engine.latest_committed_view(), - current_view = %current_view, - block_view = %block.header().view, - block = %block.header().id, - parent_block=%block.header().parent(), - "receive block proposal", - ); - match self.engine.receive_block(block.header().clone()) { - Ok(mut new) => { - if self.engine.current_view() != new.current_view() { - new = new - .update_overlay(|overlay| { - overlay.update_leader_selection(|leader_selection| { - leader_selection.on_new_block_received(block.clone()) - }) - }) - .unwrap_or(new); - self.engine = new; - } - } - Err(_) => { - tracing::error!(node = %self.id, current_view = %self.engine.current_view(), block_view = %block.header().view, block = %block.header().id, "receive block proposal, but is invalid"); - } - } - - if self.engine.overlay().is_member_of_leaf_committee(self.id) { - output = Some(Output::Send(consensus_engine::Send { - to: self.engine.parent_committee(), - payload: Payload::Vote(Vote { - view: self.engine.current_view(), - block: block.header().id, - }), - })) - } - } - // This branch means we already get enough votes for this block - // So we can just call approve_block - Event::Approve { block, .. } => { - tracing::info!( - node = %self.id, - current_view = %self.engine.current_view(), - block_view = %block.view, - block = %block.id, - parent_block=%block.parent(), - "receive approve message" - ); - let (new, out) = self.engine.approve_block(block); - tracing::info!(vote=?out, node=%self.id); - output = Some(Output::Send(out)); - self.engine = new; - } - Event::ProposeBlock { qc } => { - output = Some(Output::BroadcastProposal { - proposal: nomos_core::block::Block::new( - qc.view().next(), - qc.clone(), - [].into_iter(), - self.id, - RandomBeaconState::generate_happy( - qc.view().next(), - &self.random_beacon_pk, - ), - ), - }); - } - // This branch means we already get enough new view msgs for this qc - // So we can just call approve_new_view - Event::NewView { - timeout_qc, - new_views, - } => { - tracing::info!( - node = %self.id, - current_view = %self.engine.current_view(), - timeout_view = %timeout_qc.view(), - "receive new view message" - ); - let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views); - output = Some(Output::Send(out)); - self.engine = new; - } - Event::TimeoutQc { timeout_qc } => { - tracing::info!( - node = %self.id, - current_view = %self.engine.current_view(), - timeout_view = %timeout_qc.view(), - "receive timeout qc message" - ); - self.engine = self.engine.receive_timeout_qc(timeout_qc.clone()); - } - Event::RootTimeout { timeouts } => { - tracing::debug!("root timeout {:?}", timeouts); - if self.engine.is_member_of_root_committee() { - assert!(timeouts - .iter() - .all(|t| t.view == self.engine.current_view())); - let high_qc = timeouts - .iter() - .map(|t| &t.high_qc) - .chain(std::iter::once(&self.engine.high_qc())) - .max_by_key(|qc| qc.view) - .expect("empty root committee") - .clone(); - let timeout_qc = TimeoutQc::new( - timeouts.iter().next().unwrap().view, - high_qc, - self.id(), - ); - output = Some(Output::BroadcastTimeoutQc { timeout_qc }); - } - } - Event::LocalTimeout => { - tracing::info!( - node = %self.id, - current_view = %self.engine.current_view(), - "receive local timeout message" - ); - let (new, out) = self.engine.local_timeout(); - self.engine = new; - output = out.map(Output::Send); - } - Event::None => { - tracing::error!("unimplemented none branch"); - unreachable!("none event will never be constructed") - } - } - if let Some(event) = output { - self.handle_output(event); - } + self.process_event(event); } // update state self.state = CarnotState::from(&self.engine); + self.state.step_duration = step_duration.elapsed(); } }