Add node step time to report (#271)

* Skip step if previous node exeeded step time

* Fix clippy

* Passively keep track of step execution time in node
This commit is contained in:
gusto 2023-07-26 10:21:05 +03:00 committed by GitHub
parent ef0b0701f3
commit 976b1f9577
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 151 additions and 137 deletions

View File

@ -8,6 +8,7 @@ mod timeout;
// std // std
use std::hash::Hash; use std::hash::Hash;
use std::time::Instant;
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
// crates // crates
use bls_signatures::PrivateKey; use bls_signatures::PrivateKey;
@ -41,6 +42,7 @@ const ROOT_COMMITTEE: &str = "root_committee";
const PARENT_COMMITTEE: &str = "parent_committee"; const PARENT_COMMITTEE: &str = "parent_committee";
const CHILD_COMMITTEES: &str = "child_committees"; const CHILD_COMMITTEES: &str = "child_committees";
const COMMITTED_BLOCKS: &str = "committed_blocks"; const COMMITTED_BLOCKS: &str = "committed_blocks";
const STEP_DURATION: &str = "step_duration";
pub const CARNOT_RECORD_KEYS: &[&str] = &[ pub const CARNOT_RECORD_KEYS: &[&str] = &[
NODE_ID, NODE_ID,
@ -55,6 +57,7 @@ pub const CARNOT_RECORD_KEYS: &[&str] = &[
PARENT_COMMITTEE, PARENT_COMMITTEE,
CHILD_COMMITTEES, CHILD_COMMITTEES,
COMMITTED_BLOCKS, COMMITTED_BLOCKS,
STEP_DURATION,
]; ];
static RECORD_SETTINGS: std::sync::OnceLock<HashMap<String, bool>> = std::sync::OnceLock::new(); static RECORD_SETTINGS: std::sync::OnceLock<HashMap<String, bool>> = std::sync::OnceLock::new();
@ -73,6 +76,7 @@ pub struct CarnotState {
parent_committe: Committee, parent_committe: Committee,
child_committees: Vec<Committee>, child_committees: Vec<Committee>,
committed_blocks: Vec<BlockId>, committed_blocks: Vec<BlockId>,
step_duration: Duration,
} }
impl serde::Serialize for CarnotState { impl serde::Serialize for CarnotState {
@ -135,6 +139,7 @@ impl serde::Serialize for CarnotState {
COMMITTED_BLOCKS => { COMMITTED_BLOCKS => {
ser.serialize_field(COMMITTED_BLOCKS, &self.committed_blocks)? ser.serialize_field(COMMITTED_BLOCKS, &self.committed_blocks)?
} }
STEP_DURATION => ser.serialize_field(STEP_DURATION, &self.step_duration)?,
_ => {} _ => {}
} }
} }
@ -186,6 +191,7 @@ impl<O: Overlay> From<&Carnot<O>> for CarnotState {
last_view_timeout_qc: value.last_view_timeout_qc(), last_view_timeout_qc: value.last_view_timeout_qc(),
committed_blocks: value.committed_blocks(), committed_blocks: value.committed_blocks(),
highest_voted_view: Default::default(), highest_voted_view: Default::default(),
step_duration: Default::default(),
} }
} }
} }
@ -215,9 +221,10 @@ pub struct CarnotNode<O: Overlay> {
event_builder: event_builder::EventBuilder, event_builder: event_builder::EventBuilder,
engine: Carnot<O>, engine: Carnot<O>,
random_beacon_pk: PrivateKey, random_beacon_pk: PrivateKey,
step_duration: Duration,
} }
impl<O: Overlay> CarnotNode<O> { impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> CarnotNode<O> {
pub fn new<R: Rng>( pub fn new<R: Rng>(
id: consensus_engine::NodeId, id: consensus_engine::NodeId,
settings: CarnotSettings, settings: CarnotSettings,
@ -244,6 +251,7 @@ impl<O: Overlay> CarnotNode<O> {
event_builder: event_builder::EventBuilder::new(id, timeout), event_builder: event_builder::EventBuilder::new(id, timeout),
engine, engine,
random_beacon_pk, random_beacon_pk,
step_duration: Duration::ZERO,
}; };
this.state = CarnotState::from(&this.engine); this.state = CarnotState::from(&this.engine);
this this
@ -314,44 +322,8 @@ impl<O: Overlay> CarnotNode<O> {
} }
} }
} }
}
impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for CarnotNode<O> { fn process_event(&mut self, event: Event<[u8; 32]>) {
type Settings = CarnotSettings;
type State = CarnotState;
fn id(&self) -> NodeId {
self.id
}
fn current_view(&self) -> View {
self.engine.current_view()
}
fn state(&self) -> &CarnotState {
&self.state
}
fn step(&mut self, elapsed: Duration) {
// split messages per view, we just one to process the current engine processing view or proposals or timeoutqcs
let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self
.network_interface
.receive_messages()
.into_iter()
.map(NetworkMessage::get_payload)
.partition(|m| {
m.view() == self.engine.current_view()
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))
});
self.message_cache.prune(self.engine.current_view().prev());
self.message_cache.update(other_view_messages);
current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view()));
let events = self
.event_builder
.step(current_view_messages, &self.engine, elapsed);
for event in events {
let mut output = None; let mut output = None;
match event { match event {
Event::Proposal { block } => { Event::Proposal { block } => {
@ -379,7 +351,12 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
} }
} }
Err(_) => { Err(_) => {
tracing::error!(node = %self.id, current_view = %self.engine.current_view(), block_view = %block.header().view, block = %block.header().id, "receive block proposal, but is invalid"); tracing::error!(
node = %self.id,
current_view = %self.engine.current_view(),
block_view = %block.header().view, block = %block.header().id,
"receive block proposal, but is invalid",
);
} }
} }
@ -416,10 +393,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
qc.clone(), qc.clone(),
[].into_iter(), [].into_iter(),
self.id, self.id,
RandomBeaconState::generate_happy( RandomBeaconState::generate_happy(qc.view().next(), &self.random_beacon_pk),
qc.view().next(),
&self.random_beacon_pk,
),
), ),
}); });
} }
@ -435,7 +409,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
timeout_view = %timeout_qc.view(), timeout_view = %timeout_qc.view(),
"receive new view message" "receive new view message"
); );
let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views); let (new, out) = self.engine.approve_new_view(timeout_qc, new_views);
output = Some(Output::Send(out)); output = Some(Output::Send(out));
self.engine = new; self.engine = new;
} }
@ -446,7 +420,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
timeout_view = %timeout_qc.view(), timeout_view = %timeout_qc.view(),
"receive timeout qc message" "receive timeout qc message"
); );
self.engine = self.engine.receive_timeout_qc(timeout_qc.clone()); self.engine = self.engine.receive_timeout_qc(timeout_qc);
} }
Event::RootTimeout { timeouts } => { Event::RootTimeout { timeouts } => {
tracing::debug!("root timeout {:?}", timeouts); tracing::debug!("root timeout {:?}", timeouts);
@ -461,11 +435,8 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
.max_by_key(|qc| qc.view) .max_by_key(|qc| qc.view)
.expect("empty root committee") .expect("empty root committee")
.clone(); .clone();
let timeout_qc = TimeoutQc::new( let timeout_qc =
timeouts.iter().next().unwrap().view, TimeoutQc::new(timeouts.iter().next().unwrap().view, high_qc, self.id);
high_qc,
self.id(),
);
output = Some(Output::BroadcastTimeoutQc { timeout_qc }); output = Some(Output::BroadcastTimeoutQc { timeout_qc });
} }
} }
@ -488,9 +459,52 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
self.handle_output(event); self.handle_output(event);
} }
} }
}
impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for CarnotNode<O> {
type Settings = CarnotSettings;
type State = CarnotState;
fn id(&self) -> NodeId {
self.id
}
fn current_view(&self) -> View {
self.engine.current_view()
}
fn state(&self) -> &CarnotState {
&self.state
}
fn step(&mut self, elapsed: Duration) {
let step_duration = Instant::now();
// split messages per view, we just want to process the current engine processing view or proposals or timeoutqcs
let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self
.network_interface
.receive_messages()
.into_iter()
.map(NetworkMessage::get_payload)
.partition(|m| {
m.view() == self.engine.current_view()
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))
});
self.message_cache.prune(self.engine.current_view().prev());
self.message_cache.update(other_view_messages);
current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view()));
let events = self
.event_builder
.step(current_view_messages, &self.engine, elapsed);
for event in events {
self.process_event(event);
}
// update state // update state
self.state = CarnotState::from(&self.engine); self.state = CarnotState::from(&self.engine);
self.state.step_duration = step_duration.elapsed();
} }
} }