From 3607ce76271b80002ab4a88951dc812b13c8a258 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Wed, 5 Jul 2023 15:30:57 +0200 Subject: [PATCH] Unhappy path fixes (#245) * Add unhappy path handlers for first view Nodes resolving the first view were not instructed to handle possible failures. In particular, no timeouts were configured and nodes were not listening for timeouts from other nodes. This commit fixes this by making them use the same path as every other view. * Endless timeouts Keep signaling timeout until consensus is achieved. This could be helpful in network partitioning where messages could be lost. * Ensure consistent committee hash Sort committee members before hashing to ensure all nodes in the network obtain a consistent hash * Fix timeout_qc filtering condition '>' was used instead of '==' * Fix new view topic name 'votes' was used instead of 'new-view' * Fix filtering condition for unhappy path tally Rogue '!' at the beginning * Add timeout tally Filter timeouts to allow only timeouts for the current view coming from root committee members. We might want to try to unigy this with happy and unhappy tally. * Add debug logs * clippy happy --- nomos-services/consensus/src/lib.rs | 65 +++++++++++++------ .../consensus/src/network/adapters/waku.rs | 7 +- nomos-services/consensus/src/tally/mod.rs | 1 + nomos-services/consensus/src/tally/timeout.rs | 55 ++++++++++++++++ nomos-services/consensus/src/tally/unhappy.rs | 3 +- 5 files changed, 106 insertions(+), 25 deletions(-) create mode 100644 nomos-services/consensus/src/tally/timeout.rs diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 16a9a38a..450a4862 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -21,7 +21,9 @@ use tracing::instrument; // internal use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg}; use crate::network::NetworkAdapter; -use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings}; +use crate::tally::{ + happy::CarnotTally, timeout::TimeoutTally, unhappy::NewViewTally, CarnotTallySettings, +}; use consensus_engine::{ overlay::RandomBeaconState, AggregateQc, BlockId, Carnot, Committee, LeaderProof, NewView, Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, @@ -191,11 +193,14 @@ where let mut task_manager = TaskManager::new(); let genesis_block = carnot.genesis_block(); - task_manager.push( - genesis_block.view + 1, - Self::gather_block(adapter.clone(), genesis_block.view + 1), - ); - + Self::process_view_change( + carnot.clone(), + genesis_block.view - 1, + &mut task_manager, + adapter.clone(), + ) + .await; + // we already have the genesis block, no need to wait for it task_manager.push( genesis_block.view, Self::gather_votes( @@ -303,11 +308,16 @@ where carnot = new_carnot; output = Some(Output::Send::(out)); } - Event::LocalTimeout => { + Event::LocalTimeout { view } => { tracing::debug!("local timeout"); let (new_carnot, out) = carnot.local_timeout(); carnot = new_carnot; output = out.map(Output::Send); + // keep timeout until the situation is resolved + task_manager.push(view, async move { + tokio::time::sleep(TIMEOUT).await; + Event::LocalTimeout { view } + }); } Event::NewView { timeout_qc, @@ -543,9 +553,9 @@ where task_manager.cancel(prev_view); tracing::debug!("Advanced view from {prev_view} to {current_view}"); // View change! - task_manager.push(current_view, async { + task_manager.push(current_view, async move { tokio::time::sleep(TIMEOUT).await; - Event::LocalTimeout + Event::LocalTimeout { view: current_view } }); task_manager.push( current_view + 1, @@ -556,10 +566,17 @@ where Self::gather_timeout_qc(adapter.clone(), current_view), ); if carnot.is_member_of_root_committee() { - let threshold = carnot.leader_super_majority_threshold(); task_manager.push( current_view, - Self::gather_timeout(adapter, carnot.self_committee(), current_view, threshold), + Self::gather_timeout( + adapter, + carnot.self_committee(), + current_view, + CarnotTallySettings { + threshold: carnot.leader_super_majority_threshold(), + participating_nodes: carnot.root_committee(), + }, + ), ); } } @@ -578,6 +595,7 @@ where } } + #[instrument(level = "debug", skip(adapter, tally))] async fn gather_votes( adapter: A, committee: Committee, @@ -594,6 +612,7 @@ where } } + #[instrument(level = "debug", skip(adapter, tally))] async fn gather_new_views( adapter: A, committee: Committee, @@ -615,22 +634,24 @@ where } } + #[instrument(level = "debug", skip(adapter, tally))] async fn gather_timeout( adapter: A, committee: Committee, view: consensus_engine::View, - threshold: usize, + tally: CarnotTallySettings, ) -> Event { - let timeouts = adapter - .timeout_stream(&committee, view) - .await - .take(threshold) - .map(|msg| msg.vote) - .collect() - .await; - Event::RootTimeout { timeouts } + let tally = TimeoutTally::new(tally); + let stream = adapter.timeout_stream(&committee, view).await; + match tally.tally(view, stream).await { + Ok((_, timeouts)) => Event::RootTimeout { timeouts }, + Err(_e) => { + todo!("Handle tally error {_e}"); + } + } } + #[instrument(level = "debug", skip(adapter))] async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { let stream = adapter .proposal_chunks_stream(view) @@ -754,7 +775,9 @@ enum Event { block: consensus_engine::Block, votes: HashSet, }, - LocalTimeout, + LocalTimeout { + view: View, + }, NewView { timeout_qc: TimeoutQc, new_views: HashSet, diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index 6c6493b4..acc44729 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -1,6 +1,7 @@ // std use std::borrow::Cow; use std::collections::hash_map::DefaultHasher; +use std::collections::BTreeSet; use std::hash::{Hash, Hasher}; // crates use futures::{Stream, StreamExt}; @@ -247,7 +248,7 @@ impl NetworkAdapter for WakuAdapter { let payload = message.payload(); let qc = TimeoutQcMsg::from_bytes(payload); async move { - if qc.qc.view() > view { + if qc.qc.view() == view { Some(qc) } else { None @@ -286,7 +287,7 @@ impl NetworkAdapter for WakuAdapter { committee: &Committee, view: View, ) -> Box + Send + Unpin> { - let content_topic = create_topic("votes", committee, view); + let content_topic = create_topic("new-view", committee, view); Box::new(Box::pin( self.cached_stream_with_content_topic(content_topic) .await @@ -338,6 +339,8 @@ const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic = // TODO: Maybe use a secure hasher instead fn hash_set(c: &Committee) -> u64 { let mut s = DefaultHasher::new(); + // ensure consistent iteration across nodes + let c = c.iter().collect::>(); for e in c.iter() { e.hash(&mut s); } diff --git a/nomos-services/consensus/src/tally/mod.rs b/nomos-services/consensus/src/tally/mod.rs index bcf73ef6..b69bedd9 100644 --- a/nomos-services/consensus/src/tally/mod.rs +++ b/nomos-services/consensus/src/tally/mod.rs @@ -1,4 +1,5 @@ pub mod happy; +pub mod timeout; pub mod unhappy; // std diff --git a/nomos-services/consensus/src/tally/timeout.rs b/nomos-services/consensus/src/tally/timeout.rs new file mode 100644 index 00000000..471c123e --- /dev/null +++ b/nomos-services/consensus/src/tally/timeout.rs @@ -0,0 +1,55 @@ +// std +use std::{collections::HashSet, convert::Infallible}; +// crates +use futures::{Stream, StreamExt}; +// internal +use super::CarnotTallySettings; +use crate::network::messages::TimeoutMsg; +use consensus_engine::{Timeout, View}; +use nomos_core::vote::Tally; + +#[derive(Clone, Debug)] +pub struct TimeoutTally { + settings: CarnotTallySettings, +} + +#[async_trait::async_trait] +impl Tally for TimeoutTally { + type Vote = TimeoutMsg; + type Qc = (); + type Subject = View; + type Outcome = HashSet; + type TallyError = Infallible; + type Settings = CarnotTallySettings; + + fn new(settings: Self::Settings) -> Self { + Self { settings } + } + + async fn tally + Unpin + Send>( + &self, + view: View, + mut vote_stream: S, + ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { + let mut seen = HashSet::new(); + let mut outcome = HashSet::new(); + while let Some(vote) = vote_stream.next().await { + // check timeout view is valid + if vote.vote.view != view { + continue; + } + + // check for individual nodes votes + if !self.settings.participating_nodes.contains(&vote.voter) { + continue; + } + + seen.insert(vote.voter); + outcome.insert(vote.vote.clone()); + if seen.len() >= self.settings.threshold { + return Ok(((), outcome)); + } + } + unreachable!() + } +} diff --git a/nomos-services/consensus/src/tally/unhappy.rs b/nomos-services/consensus/src/tally/unhappy.rs index bdba87d1..df4ec985 100644 --- a/nomos-services/consensus/src/tally/unhappy.rs +++ b/nomos-services/consensus/src/tally/unhappy.rs @@ -40,7 +40,6 @@ impl Tally for NewViewTally { ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { let mut seen = HashSet::new(); let mut outcome = HashSet::new(); - // return early for leaf nodes if self.settings.threshold == 0 { return Ok(((), outcome)); @@ -48,7 +47,7 @@ impl Tally for NewViewTally { while let Some(vote) = vote_stream.next().await { // check vote view is valid - if !vote.vote.view != timeout_qc.view() { + if vote.vote.view != timeout_qc.view() + 1 { continue; }