Simulation unhappy path (#193)

* Use elapsed time

* Added timeout

* Extract tally

* Missing elapsed time

* Fix new view leader behaviour

* Fix tests

* Fix timeout double check

* Fix logs

* TimeoutHandler nitpicks

* Clippy happy

* Fix timeout sub

* Modify discard messages comment
This commit is contained in:
Daniel Sanchez 2023-06-19 15:27:14 +02:00 committed by GitHub
parent faacd10172
commit bed0b9448d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 249 additions and 106 deletions

View File

@ -1,11 +1,14 @@
use crate::node::carnot::messages::CarnotMessage;
use crate::node::carnot::{messages::CarnotMessage, tally::Tally, timeout::TimeoutHandler};
use crate::util::parse_idx;
use consensus_engine::{Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote};
use consensus_engine::{
AggregateQc, Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote,
};
use nomos_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg};
use nomos_consensus::NodeId;
use nomos_core::block::Block;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::hash::Hash;
use std::time::Duration;
pub type CarnotTx = [u8; 32];
@ -14,28 +17,54 @@ pub(crate) struct EventBuilder {
leader_vote_message: Tally<VoteMsg>,
vote_message: Tally<VoteMsg>,
timeout_message: Tally<TimeoutMsg>,
leader_new_view_message: Tally<NewViewMsg>,
new_view_message: Tally<NewViewMsg>,
timeout_handler: TimeoutHandler,
pub(crate) current_view: View,
}
impl EventBuilder {
pub fn new(id: NodeId) -> Self {
pub fn new(id: NodeId, timeout: Duration) -> Self {
Self {
vote_message: Default::default(),
leader_vote_message: Default::default(),
timeout_message: Default::default(),
leader_new_view_message: Default::default(),
new_view_message: Default::default(),
current_view: View::default(),
id,
timeout_handler: TimeoutHandler::new(timeout),
}
}
fn local_timeout(&mut self, view: View, elapsed: Duration) -> bool {
if self.timeout_handler.step(view, elapsed) {
self.timeout_handler.prune_by_view(view);
true
} else {
false
}
}
pub fn step<O: Overlay>(
&mut self,
messages: Vec<CarnotMessage>,
mut messages: Vec<CarnotMessage>,
engine: &Carnot<O>,
elapsed: Duration,
) -> Vec<Event<CarnotTx>> {
let mut events = Vec::new();
// check timeout and exit
if self.local_timeout(engine.current_view(), elapsed) {
events.push(Event::LocalTimeout);
// if we timeout discard incoming current view messages
messages.retain(|msg| {
matches!(
msg,
CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_)
)
});
}
// only run when the engine is in the genesis view
if engine.highest_voted_view() == -1
&& engine.overlay().is_member_of_leaf_committee(self.id)
@ -67,7 +96,14 @@ impl EventBuilder {
events.push(Event::Proposal { block })
}
CarnotMessage::TimeoutQc(msg) => {
let timeout_qc = msg.qc.clone();
events.push(Event::TimeoutQc { timeout_qc: msg.qc });
if engine.overlay().is_member_of_leaf_committee(self.id) {
events.push(Event::NewView {
timeout_qc,
new_views: Default::default(),
});
}
}
CarnotMessage::Vote(msg) => {
let msg_view = msg.vote.view;
@ -138,66 +174,48 @@ impl EventBuilder {
}
CarnotMessage::NewView(msg) => {
let msg_view = msg.vote.view;
let voter = msg.voter;
let timeout_qc = msg.vote.timeout_qc.clone();
self.current_view = core::cmp::max(self.current_view, msg_view);
// if we are the leader, then use the leader threshold, otherwise use the leaf threshold
let threshold = if engine.is_next_leader() {
let is_next_view_leader = engine.is_next_leader();
let is_message_from_root_committee =
engine.overlay().is_member_of_root_committee(voter);
let tally = if is_message_from_root_committee {
&mut self.leader_new_view_message
} else {
&mut self.new_view_message
};
// if the message comes from the root committee, then use the leader threshold, otherwise use the leaf threshold
let threshold = if is_message_from_root_committee {
engine.leader_super_majority_threshold()
} else {
engine.super_majority_threshold()
};
if let Some(new_views) =
self.new_view_message.tally_by(msg_view, msg, threshold)
{
events.push(Event::NewView {
new_views: new_views.into_iter().map(|v| v.vote).collect(),
timeout_qc,
})
if let Some(votes) = tally.tally_by(msg_view, msg, threshold) {
if is_next_view_leader && is_message_from_root_committee {
let high_qc = engine.high_qc();
events.push(Event::ProposeBlock {
qc: Qc::Aggregated(AggregateQc {
high_qc,
view: msg_view + 1,
}),
});
} else {
events.push(Event::NewView {
timeout_qc,
new_views: votes.into_iter().map(|v| v.vote).collect(),
});
}
}
}
}
}
events
}
}
struct Tally<T: core::hash::Hash + Eq> {
cache: HashMap<View, HashSet<T>>,
threshold: usize,
}
impl<T: core::hash::Hash + Eq> Default for Tally<T> {
fn default() -> Self {
Self::new(0)
}
}
impl<T: core::hash::Hash + Eq> Tally<T> {
fn new(threshold: usize) -> Self {
Self {
cache: Default::default(),
threshold,
}
}
fn tally(&mut self, view: View, message: T) -> Option<HashSet<T>> {
self.tally_by(view, message, self.threshold)
}
fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option<HashSet<T>> {
let entries = self.cache.entry(view).or_default();
entries.insert(message);
let entries = entries.len();
if entries == threshold {
Some(self.cache.remove(&view).unwrap())
} else {
None
}
}
}
pub enum Event<Tx: Clone + Hash + Eq> {
Proposal {
block: Block<Tx>,

View File

@ -3,6 +3,8 @@
mod event_builder;
mod message_cache;
mod messages;
mod tally;
mod timeout;
// std
use std::hash::Hash;
@ -22,7 +24,7 @@ use consensus_engine::overlay::RandomBeaconState;
use consensus_engine::{
Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote,
};
use nomos_consensus::network::messages::ProposalChunkMsg;
use nomos_consensus::network::messages::{ProposalChunkMsg, TimeoutQcMsg};
use nomos_consensus::{
leader_selection::UpdateableLeaderSelection,
network::messages::{NewViewMsg, TimeoutMsg, VoteMsg},
@ -117,6 +119,7 @@ impl<O: Overlay> CarnotNode<O> {
let overlay = O::new(overlay_settings);
let engine = Carnot::from_genesis(id, genesis.header().clone(), overlay);
let state = CarnotState::from(&engine);
let timeout = settings.timeout;
// pk is generated in an insecure way, but for simulation purpouses using a rng like smallrng is more useful
let mut pk_buff = [0; 32];
rng.fill_bytes(&mut pk_buff);
@ -127,7 +130,7 @@ impl<O: Overlay> CarnotNode<O> {
settings,
network_interface,
message_cache: MessageCache::new(),
event_builder: event_builder::EventBuilder::new(id),
event_builder: event_builder::EventBuilder::new(id, timeout),
engine,
random_beacon_pk,
}
@ -186,8 +189,14 @@ impl<O: Overlay> CarnotNode<O> {
);
}
}
Output::BroadcastTimeoutQc { .. } => {
unimplemented!()
Output::BroadcastTimeoutQc { timeout_qc } => {
self.network_interface.send_message(
self.id,
CarnotMessage::TimeoutQc(TimeoutQcMsg {
source: self.id,
qc: timeout_qc,
}),
);
}
Output::BroadcastProposal { proposal } => {
for node in &self.settings.nodes {
@ -221,7 +230,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
&self.state
}
fn step(&mut self) {
fn step(&mut self, elapsed: Duration) {
// split messages per view, we just one to process the current engine processing view or proposals or timeoutqcs
let (mut current_view_messages, other_view_messages): (Vec<_>, Vec<_>) = self
.network_interface
@ -236,7 +245,9 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
self.message_cache.update(other_view_messages);
current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view()));
let events = self.event_builder.step(current_view_messages, &self.engine);
let events = self
.event_builder
.step(current_view_messages, &self.engine, elapsed);
for event in events {
let mut output: Vec<Output<CarnotTx>> = vec![];
@ -313,28 +324,60 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
// This branch means we already get enough new view msgs for this qc
// So we can just call approve_new_view
Event::NewView {
timeout_qc: _,
new_views: _,
timeout_qc,
new_views,
} => {
// let (new, out) = self.engine.approve_new_view(timeout_qc, new_views);
// output = Some(out);
// self.engine = new;
// let next_view = timeout_qc.view + 2;
// if self.engine.is_leader_for_view(next_view) {
// self.gather_new_views(&[self.id].into_iter().collect(), timeout_qc);
// }
tracing::error!("unimplemented new view branch");
unimplemented!()
let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views);
output.push(Output::Send(out));
self.engine = new;
tracing::info!(
node = parse_idx(&self.id),
current_view = self.engine.current_view(),
timeout_view = timeout_qc.view,
"receive new view message"
);
}
Event::TimeoutQc { timeout_qc } => {
self.engine = self.engine.receive_timeout_qc(timeout_qc);
tracing::info!(
node = parse_idx(&self.id),
current_view = self.engine.current_view(),
timeout_view = timeout_qc.view,
"receive timeout qc message"
);
self.engine = self.engine.receive_timeout_qc(timeout_qc.clone());
}
Event::RootTimeout { timeouts } => {
println!("root timeouts: {timeouts:?}");
tracing::debug!("root timeout {:?}", timeouts);
if self.engine.is_member_of_root_committee() {
assert!(timeouts
.iter()
.all(|t| t.view == self.engine.current_view()));
let high_qc = timeouts
.iter()
.map(|t| &t.high_qc)
.chain(std::iter::once(&self.engine.high_qc()))
.max_by_key(|qc| qc.view)
.expect("empty root committee")
.clone();
let timeout_qc = TimeoutQc {
view: timeouts.iter().next().unwrap().view,
high_qc,
sender: self.id(),
};
output.push(Output::BroadcastTimeoutQc { timeout_qc });
}
}
Event::LocalTimeout => {
tracing::error!("unimplemented local timeout branch");
unreachable!("local timeout will never be constructed")
tracing::info!(
node = parse_idx(&self.id),
current_view = self.engine.current_view(),
"receive local timeout message"
);
let (new, out) = self.engine.local_timeout();
self.engine = new;
if let Some(out) = out {
output.push(Output::Send(out));
}
}
Event::None => {
tracing::error!("unimplemented none branch");

View File

@ -0,0 +1,37 @@
use consensus_engine::View;
use std::collections::{HashMap, HashSet};
pub(crate) struct Tally<T: core::hash::Hash + Eq> {
cache: HashMap<View, HashSet<T>>,
threshold: usize,
}
impl<T: core::hash::Hash + Eq> Default for Tally<T> {
fn default() -> Self {
Self::new(0)
}
}
impl<T: core::hash::Hash + Eq> Tally<T> {
pub fn new(threshold: usize) -> Self {
Self {
cache: Default::default(),
threshold,
}
}
pub fn tally(&mut self, view: View, message: T) -> Option<HashSet<T>> {
self.tally_by(view, message, self.threshold)
}
pub fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option<HashSet<T>> {
let entries = self.cache.entry(view).or_default();
entries.insert(message);
let entries = entries.len();
if entries >= threshold {
Some(self.cache.remove(&view).unwrap())
} else {
None
}
}
}

View File

@ -0,0 +1,34 @@
use consensus_engine::View;
use polars::export::ahash::HashMap;
use std::time::Duration;
pub(crate) struct TimeoutHandler {
pub timeout: Duration,
pub per_view: HashMap<View, Duration>,
}
impl TimeoutHandler {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
per_view: Default::default(),
}
}
pub fn step(&mut self, view: View, elapsed: Duration) -> bool {
let timeout = self.per_view.entry(view).or_insert(self.timeout);
*timeout = timeout.saturating_sub(elapsed);
*timeout == Duration::ZERO
}
pub fn is_timeout(&self, view: View) -> bool {
self.per_view
.get(&view)
.map(|t| t.is_zero())
.unwrap_or(false)
}
pub fn prune_by_view(&mut self, view: View) {
self.per_view.retain(|entry, _| entry > &view);
}
}

View File

@ -1,5 +1,6 @@
// std
use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;
// crates
use serde::{Deserialize, Serialize};
// internal
@ -362,7 +363,7 @@ impl Node for DummyNode {
&self.state
}
fn step(&mut self) {
fn step(&mut self, _: Duration) {
let incoming_messages = self.network_interface.receive_messages();
self.state.message_count += incoming_messages.len();
@ -565,19 +566,19 @@ mod tests {
for (_, node) in nodes.iter() {
assert_eq!(node.current_view(), 0);
}
let elapsed = Duration::from_millis(100);
// 1. Leaders receive vote and broadcast new Proposal(Block) to all nodes.
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
// 2. a) All nodes received proposal block.
// b) Leaf nodes send vote to internal nodes.
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
@ -598,9 +599,9 @@ mod tests {
assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf
// 3. Internal nodes send vote to root node.
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
@ -616,9 +617,9 @@ mod tests {
assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf
// 4. Root node send vote to next view leader nodes.
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
@ -632,9 +633,9 @@ mod tests {
assert!(nodes[&node_id(6)].state().view_state[&1].vote_sent); // Leaf
// 5. Leaders receive vote and broadcast new Proposal(Block) to all nodes.
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
@ -645,9 +646,9 @@ mod tests {
// 6. a) All nodes received proposal block.
// b) Leaf nodes send vote to internal nodes.
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
@ -705,11 +706,11 @@ mod tests {
for (_, node) in nodes.iter() {
assert_eq!(node.current_view(), 0);
}
let elapsed = Duration::from_millis(100);
for _ in 0..7 {
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
}
@ -755,11 +756,11 @@ mod tests {
for (_, node) in nodes.iter() {
assert_eq!(node.current_view(), 0);
}
let elapsed = Duration::from_millis(100);
for _ in 0..7 {
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
}
@ -803,10 +804,11 @@ mod tests {
network.collect_messages();
let nodes = Arc::new(RwLock::new(nodes));
let elapsed = Duration::from_millis(100);
for _ in 0..9 {
network.dispatch_after(Duration::from_millis(100));
network.dispatch_after(elapsed);
nodes.write().par_iter_mut().for_each(|(_, node)| {
node.step();
node.step(elapsed);
});
network.collect_messages();
}

View File

@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
use super::{Node, NodeId};
@ -42,7 +43,7 @@ impl<S> Node for DummyStreamingNode<S> {
&self.state
}
fn step(&mut self) {
fn step(&mut self, _: Duration) {
self.state.current_view += 1;
}
}

View File

@ -154,7 +154,7 @@ pub trait Node {
// TODO: View must be view whenever we integrate consensus engine
fn current_view(&self) -> usize;
fn state(&self) -> &Self::State;
fn step(&mut self);
fn step(&mut self, elapsed: Duration);
}
#[cfg(test)]
@ -174,7 +174,7 @@ impl Node for usize {
self
}
fn step(&mut self) {
fn step(&mut self, _: Duration) {
use std::ops::AddAssign;
self.add_assign(1);
}

View File

@ -9,6 +9,7 @@ use rayon::prelude::*;
use serde::Serialize;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use super::SimulationRunnerHandle;
@ -39,6 +40,7 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
loop {
select! {
@ -53,7 +55,7 @@ where
.write()
.par_iter_mut()
.filter(|n| ids.contains(&n.id()))
.for_each(N::step);
.for_each(|node|node.step(elapsed));
p.send(R::try_from(
&simulation_state,

View File

@ -9,6 +9,7 @@ use rand::prelude::IteratorRandom;
use serde::Serialize;
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use super::SimulationRunnerHandle;
@ -42,6 +43,7 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
'main: for chunk in iterations.chunks(update_rate) {
select! {
@ -61,7 +63,7 @@ where
let node: &mut N = shared_nodes
.get_mut(parse_idx(&node_id))
.expect("Node should be present");
node.step();
node.step(elapsed);
}
// check if any condition makes the simulation stop

View File

@ -33,6 +33,7 @@ use crossbeam::select;
use std::collections::BTreeSet;
use std::ops::Not;
use std::sync::Arc;
use std::time::Duration;
// crates
use fixed_slice_deque::FixedSliceDeque;
use rand::prelude::{IteratorRandom, SliceRandom};
@ -80,6 +81,7 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
loop {
select! {
@ -99,7 +101,7 @@ where
.get_mut(parse_idx(&node_id))
.expect("Node should be present");
let prev_view = node.current_view();
node.step();
node.step(elapsed);
let after_view = node.current_view();
if after_view > prev_view {
// pass node to next step group

View File

@ -80,15 +80,15 @@ where
.any(|x| x)
}
fn step<N>(&mut self, nodes: &mut [N])
fn step<N>(&mut self, nodes: &mut [N], elapsed: Duration)
where
N: Node + Send + Sync,
N::Settings: Clone + Send,
N::State: Serialize,
{
self.network.dispatch_after(Duration::from_millis(100));
self.network.dispatch_after(elapsed);
nodes.par_iter_mut().for_each(|node| {
node.step();
node.step(elapsed);
});
self.network.collect_messages();
}

View File

@ -5,6 +5,7 @@ use crate::warding::SimulationState;
use crate::{node::Node, output_processors::Record};
use crossbeam::channel::{bounded, select};
use std::sync::Arc;
use std::time::Duration;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, N: Node, R>(
@ -31,6 +32,7 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
p.send(R::try_from(&state)?)?;
loop {
@ -44,7 +46,7 @@ where
// then dead lock will occur
{
let mut nodes = nodes.write();
inner_runner.step(&mut nodes);
inner_runner.step(&mut nodes, elapsed);
}
p.send(R::try_from(&state)?)?;
@ -147,7 +149,7 @@ mod tests {
let mut runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
SimulationRunner::new(network, nodes, producer, settings).unwrap();
let mut nodes = runner.nodes.write();
runner.inner.step(&mut nodes);
runner.inner.step(&mut nodes, Duration::from_millis(100));
drop(nodes);
let nodes = runner.nodes.read();
@ -194,7 +196,7 @@ mod tests {
SimulationRunner::new(network, nodes, Default::default(), settings).unwrap();
let mut nodes = runner.nodes.write();
runner.inner.step(&mut nodes);
runner.inner.step(&mut nodes, Duration::from_millis(100));
drop(nodes);
let nodes = runner.nodes.read();