Unhappy path fixes (#245)

* Add unhappy path handlers for first view

Nodes resolving the first view were not instructed to handle
possible failures. In particular, no timeouts were configured
and nodes were not listening for timeouts from other nodes.
This commit fixes this by making them use the
same path as every other view.

* Endless timeouts

Keep signaling timeout until consensus is achieved.
This could be helpful in network partitioning where messages could
be lost.

* Ensure consistent committee hash

Sort committee members before hashing to ensure all nodes in the
network obtain a consistent hash

* Fix timeout_qc filtering condition

'>' was used instead of '=='

* Fix new view topic name

'votes' was used instead of 'new-view'

* Fix filtering condition for unhappy path tally

Rogue '!' at the beginning

* Add timeout tally

Filter timeouts to allow only timeouts for the current view
coming from root committee members.
We might want to try to unigy this with happy and unhappy tally.

* Add debug logs

* clippy happy
This commit is contained in:
Giacomo Pasini 2023-07-05 15:30:57 +02:00 committed by GitHub
parent d0c6df23fc
commit 3607ce7627
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 25 deletions

View File

@ -21,7 +21,9 @@ use tracing::instrument;
// internal
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use crate::network::NetworkAdapter;
use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings};
use crate::tally::{
happy::CarnotTally, timeout::TimeoutTally, unhappy::NewViewTally, CarnotTallySettings,
};
use consensus_engine::{
overlay::RandomBeaconState, AggregateQc, BlockId, Carnot, Committee, LeaderProof, NewView,
Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote,
@ -191,11 +193,14 @@ where
let mut task_manager = TaskManager::new();
let genesis_block = carnot.genesis_block();
task_manager.push(
genesis_block.view + 1,
Self::gather_block(adapter.clone(), genesis_block.view + 1),
);
Self::process_view_change(
carnot.clone(),
genesis_block.view - 1,
&mut task_manager,
adapter.clone(),
)
.await;
// we already have the genesis block, no need to wait for it
task_manager.push(
genesis_block.view,
Self::gather_votes(
@ -303,11 +308,16 @@ where
carnot = new_carnot;
output = Some(Output::Send::<P::Tx>(out));
}
Event::LocalTimeout => {
Event::LocalTimeout { view } => {
tracing::debug!("local timeout");
let (new_carnot, out) = carnot.local_timeout();
carnot = new_carnot;
output = out.map(Output::Send);
// keep timeout until the situation is resolved
task_manager.push(view, async move {
tokio::time::sleep(TIMEOUT).await;
Event::LocalTimeout { view }
});
}
Event::NewView {
timeout_qc,
@ -543,9 +553,9 @@ where
task_manager.cancel(prev_view);
tracing::debug!("Advanced view from {prev_view} to {current_view}");
// View change!
task_manager.push(current_view, async {
task_manager.push(current_view, async move {
tokio::time::sleep(TIMEOUT).await;
Event::LocalTimeout
Event::LocalTimeout { view: current_view }
});
task_manager.push(
current_view + 1,
@ -556,10 +566,17 @@ where
Self::gather_timeout_qc(adapter.clone(), current_view),
);
if carnot.is_member_of_root_committee() {
let threshold = carnot.leader_super_majority_threshold();
task_manager.push(
current_view,
Self::gather_timeout(adapter, carnot.self_committee(), current_view, threshold),
Self::gather_timeout(
adapter,
carnot.self_committee(),
current_view,
CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
participating_nodes: carnot.root_committee(),
},
),
);
}
}
@ -578,6 +595,7 @@ where
}
}
#[instrument(level = "debug", skip(adapter, tally))]
async fn gather_votes(
adapter: A,
committee: Committee,
@ -594,6 +612,7 @@ where
}
}
#[instrument(level = "debug", skip(adapter, tally))]
async fn gather_new_views(
adapter: A,
committee: Committee,
@ -615,22 +634,24 @@ where
}
}
#[instrument(level = "debug", skip(adapter, tally))]
async fn gather_timeout(
adapter: A,
committee: Committee,
view: consensus_engine::View,
threshold: usize,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let timeouts = adapter
.timeout_stream(&committee, view)
.await
.take(threshold)
.map(|msg| msg.vote)
.collect()
.await;
Event::RootTimeout { timeouts }
let tally = TimeoutTally::new(tally);
let stream = adapter.timeout_stream(&committee, view).await;
match tally.tally(view, stream).await {
Ok((_, timeouts)) => Event::RootTimeout { timeouts },
Err(_e) => {
todo!("Handle tally error {_e}");
}
}
}
#[instrument(level = "debug", skip(adapter))]
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx> {
let stream = adapter
.proposal_chunks_stream(view)
@ -754,7 +775,9 @@ enum Event<Tx: Clone + Hash + Eq> {
block: consensus_engine::Block,
votes: HashSet<Vote>,
},
LocalTimeout,
LocalTimeout {
view: View,
},
NewView {
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,

View File

@ -1,6 +1,7 @@
// std
use std::borrow::Cow;
use std::collections::hash_map::DefaultHasher;
use std::collections::BTreeSet;
use std::hash::{Hash, Hasher};
// crates
use futures::{Stream, StreamExt};
@ -247,7 +248,7 @@ impl NetworkAdapter for WakuAdapter {
let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload);
async move {
if qc.qc.view() > view {
if qc.qc.view() == view {
Some(qc)
} else {
None
@ -286,7 +287,7 @@ impl NetworkAdapter for WakuAdapter {
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin> {
let content_topic = create_topic("votes", committee, view);
let content_topic = create_topic("new-view", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
@ -338,6 +339,8 @@ const TIMEOUT_QC_CONTENT_TOPIC: WakuContentTopic =
// TODO: Maybe use a secure hasher instead
fn hash_set(c: &Committee) -> u64 {
let mut s = DefaultHasher::new();
// ensure consistent iteration across nodes
let c = c.iter().collect::<BTreeSet<_>>();
for e in c.iter() {
e.hash(&mut s);
}

View File

@ -1,4 +1,5 @@
pub mod happy;
pub mod timeout;
pub mod unhappy;
// std

View File

@ -0,0 +1,55 @@
// std
use std::{collections::HashSet, convert::Infallible};
// crates
use futures::{Stream, StreamExt};
// internal
use super::CarnotTallySettings;
use crate::network::messages::TimeoutMsg;
use consensus_engine::{Timeout, View};
use nomos_core::vote::Tally;
#[derive(Clone, Debug)]
pub struct TimeoutTally {
settings: CarnotTallySettings,
}
#[async_trait::async_trait]
impl Tally for TimeoutTally {
type Vote = TimeoutMsg;
type Qc = ();
type Subject = View;
type Outcome = HashSet<Timeout>;
type TallyError = Infallible;
type Settings = CarnotTallySettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: View,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut seen = HashSet::new();
let mut outcome = HashSet::new();
while let Some(vote) = vote_stream.next().await {
// check timeout view is valid
if vote.vote.view != view {
continue;
}
// check for individual nodes votes
if !self.settings.participating_nodes.contains(&vote.voter) {
continue;
}
seen.insert(vote.voter);
outcome.insert(vote.vote.clone());
if seen.len() >= self.settings.threshold {
return Ok(((), outcome));
}
}
unreachable!()
}
}

View File

@ -40,7 +40,6 @@ impl Tally for NewViewTally {
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut seen = HashSet::new();
let mut outcome = HashSet::new();
// return early for leaf nodes
if self.settings.threshold == 0 {
return Ok(((), outcome));
@ -48,7 +47,7 @@ impl Tally for NewViewTally {
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if !vote.vote.view != timeout_qc.view() {
if vote.vote.view != timeout_qc.view() + 1 {
continue;
}