diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index f7a8b210..e9eebc80 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -1,13 +1,7 @@ -//! In this module, and children ones, the 'view lifetime is tied to a logical consensus view, -//! represented by the `View` struct. -//! This is done to ensure that all the different data structs used to represent various actors -//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views). -//! It's obviously extremely important that the information contained in `View` is synchronized across different -//! nodes, but that has to be achieved through different means. mod leader_selection; pub mod network; mod tally; -mod view_cancel; +mod task_manager; // std use std::collections::HashSet; @@ -17,19 +11,20 @@ use std::pin::Pin; use std::time::Duration; // crates use bls_signatures::PrivateKey; -use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use futures::{Stream, StreamExt}; use leader_selection::UpdateableLeaderSelection; use serde::Deserialize; use serde::{de::DeserializeOwned, Serialize}; +use tracing::instrument; // internal use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg}; use crate::network::NetworkAdapter; use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings}; -use crate::view_cancel::ViewCancelCache; use consensus_engine::{ overlay::RandomBeaconState, AggregateQc, Carnot, Committee, LeaderProof, NewView, Overlay, - Payload, Qc, StandardQc, Timeout, TimeoutQc, Vote, + Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, }; +use task_manager::TaskManager; use nomos_core::block::Block; use nomos_core::crypto::PublicKey; @@ -178,117 +173,65 @@ where leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, }; let mut carnot = Carnot::from_genesis(private_key, genesis, overlay); - let network_adapter = A::new(network_relay).await; - let adapter = &network_adapter; - let _self_committee = carnot.self_committee(); - let self_committee = &_self_committee; - let _leader_committee = [carnot.id()].into_iter().collect(); - let leader_committee = &_leader_committee; + let adapter = A::new(network_relay).await; let fountain = F::new(fountain_settings); - let _tally_settings = CarnotTallySettings { + let private_key = PrivateKey::new(private_key); + let self_committee = carnot.self_committee(); + let leader_committee = [carnot.id()].into_iter().collect::>(); + let tally_settings = CarnotTallySettings { threshold: carnot.super_majority_threshold(), participating_nodes: carnot.child_committees().into_iter().flatten().collect(), }; - let tally_settings = &_tally_settings; - let _leader_tally_settings = CarnotTallySettings { + let leader_tally_settings = CarnotTallySettings { threshold: carnot.leader_super_majority_threshold(), - // TODO: add children of root committee participating_nodes: carnot.root_committee(), }; - let leader_tally_settings = &_leader_tally_settings; - let mut view_cancel_cache = ViewCancelCache::new(); + let mut task_manager = TaskManager::new(); - let events: FuturesUnordered> + Send>>> = - FuturesUnordered::new(); let genesis_block = carnot.genesis_block(); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( + task_manager.push( genesis_block.view + 1, - Self::gather_block(adapter, genesis_block.view + 1), - ))); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( + Self::gather_block(adapter.clone(), genesis_block.view + 1), + ); + + task_manager.push( genesis_block.view, Self::gather_votes( - adapter, - self_committee, + adapter.clone(), + self_committee.clone(), genesis_block.clone(), tally_settings.clone(), ), - ))); + ); + if carnot.is_next_leader() { - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - genesis_block.view + 1, - async move { - let Event::Approve { qc, .. } = Self::gather_votes( - adapter, - leader_committee, - genesis_block, - leader_tally_settings.clone(), - ) - .await else { unreachable!() }; - Event::ProposeBlock { qc } - }, - ))); + let network_adapter = adapter.clone(); + task_manager.push(genesis_block.view + 1, async move { + let Event::Approve { qc, .. } = Self::gather_votes( + network_adapter, + leader_committee.clone(), + genesis_block, + leader_tally_settings.clone(), + ) + .await else { unreachable!() }; + Event::ProposeBlock { qc } + }); } - tokio::pin!(events); - - while let Some(event) = events.next().await { + while let Some(event) = task_manager.next().await { let mut output = None; let prev_view = carnot.current_view(); match event { - Event::Proposal { block, mut stream } => { - tracing::debug!("received proposal {:?}", block); - let original_block = block; - let block = original_block.header().clone(); - match carnot.receive_block(block.clone()) { - Ok(mut new_state) => { - let new_view = new_state.current_view(); - if new_view != carnot.current_view() { - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - block.view, - Self::gather_votes( - adapter, - self_committee, - block.clone(), - tally_settings.clone(), - ), - ))); - new_state = - Self::update_leader_selection(new_state, |leader_selection| { - leader_selection.on_new_block_received(original_block) - }); - } else { - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - block.view, - async move { - if let Some(block) = stream.next().await { - Event::Proposal { block, stream } - } else { - Event::None - } - }, - ))); - } - carnot = new_state; - } - Err(_) => tracing::debug!("invalid block {:?}", block), - } - if carnot.is_next_leader() { - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - block.view, - async move { - let Event::Approve { qc, .. } = Self::gather_votes( - adapter, - leader_committee, - block, - leader_tally_settings.clone(), - ) - .await else { unreachable!() }; - Event::ProposeBlock { qc } - }, - ))); - } + Event::Proposal { block, stream } => { + (carnot, output) = Self::process_block( + carnot, + block, + stream, + &mut task_manager, + adapter.clone(), + ) + .await; } Event::Approve { block, .. } => { tracing::debug!("approving proposal {:?}", block); @@ -306,131 +249,48 @@ where timeout_qc, new_views, } => { - tracing::debug!("approving new view {:?}", timeout_qc); - let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views); - carnot = new_carnot; - output = Some(Output::Send(out)); - let new_view = timeout_qc.view + 1; - if carnot.is_next_leader() { - let high_qc = carnot.high_qc(); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - new_view, - async move { - let _votes = Self::gather_new_views( - adapter, - leader_committee, - timeout_qc.clone(), - leader_tally_settings.clone(), - ) - .await; - Event::ProposeBlock { - qc: Qc::Aggregated(AggregateQc { - high_qc, - view: new_view, - }), - } - }, - ))); - } + (carnot, output) = Self::approve_new_view( + carnot, + timeout_qc, + new_views, + &mut task_manager, + adapter.clone(), + ) + .await; } Event::TimeoutQc { timeout_qc } => { - tracing::debug!("timeout received {:?}", timeout_qc); - let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - timeout_qc.view + 1, - Self::gather_new_views( - adapter, - self_committee, - timeout_qc.clone(), - tally_settings.clone(), - ), - ))); - if carnot.current_view() != new_state.current_view() { - new_state = Self::update_leader_selection(new_state, |leader_selection| { - leader_selection.on_timeout_qc_received(timeout_qc) - }); - } - carnot = new_state; + (carnot, output) = Self::receive_timeout_qc( + carnot, + timeout_qc, + &mut task_manager, + adapter.clone(), + ) + .await; } Event::RootTimeout { timeouts } => { - tracing::debug!("root timeout {:?}", timeouts); - // TODO: filter timeouts upon reception - assert!(timeouts.iter().all(|t| t.view == carnot.current_view())); - let high_qc = timeouts - .iter() - .map(|t| &t.high_qc) - .chain(std::iter::once(&carnot.high_qc())) - .max_by_key(|qc| qc.view) - .expect("empty root committee") - .clone(); - if carnot.is_member_of_root_committee() { - let timeout_qc = TimeoutQc { - view: carnot.current_view(), - high_qc, - sender: carnot.id(), - }; - output = Some(Output::BroadcastTimeoutQc { timeout_qc }); - } + (carnot, output) = Self::process_root_timeout(carnot, timeouts).await; } Event::ProposeBlock { qc } => { - tracing::debug!("proposing block"); - let (reply_channel, rx) = tokio::sync::oneshot::channel(); - mempool_relay - .send(MempoolMsg::View { - ancestor_hint: [0; 32], - reply_channel, - }) - .await - .unwrap_or_else(|(e, _)| { - eprintln!("Could not get transactions from mempool {e}") - }); - match rx.await { - Ok(txs) => { - let beacon = RandomBeaconState::generate_happy( - qc.view(), - &PrivateKey::new(private_key), - ); - let proposal = Block::new(qc.view() + 1, qc, txs, carnot.id(), beacon); - output = Some(Output::BroadcastProposal { proposal }); - } - Err(e) => tracing::error!("Could not fetch txs {e}"), - } + output = + Self::propose_block(carnot.id(), private_key, qc, mempool_relay.clone()) + .await; } - Event::None => {} + _ => {} } let current_view = carnot.current_view(); if current_view != prev_view { - // First we cancel previous processing view tasks - view_cancel_cache.cancel(prev_view); - tracing::debug!("Advanced view from {prev_view} to {current_view}"); - // View change! - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - current_view, - async { - tokio::time::sleep(TIMEOUT).await; - Event::LocalTimeout - }, - ))); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - current_view + 1, - Self::gather_block(adapter, current_view + 1), - ))); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - current_view, - Self::gather_timeout_qc(adapter, current_view), - ))); - if carnot.is_member_of_root_committee() { - let threshold = carnot.leader_super_majority_threshold(); - events.push(Box::pin(view_cancel_cache.cancelable_event_future( - current_view, - Self::gather_timeout(adapter, self_committee, current_view, threshold), - ))); - } + Self::process_view_change( + carnot.clone(), + prev_view, + &mut task_manager, + adapter.clone(), + ) + .await; } if let Some(output) = output { - handle_output(adapter, &fountain, carnot.id(), output).await; + handle_output(&adapter, &fountain, carnot.id(), output).await; } } @@ -458,7 +318,226 @@ where O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, { - async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event { + #[instrument(level = "debug", skip(adapter, task_manager, stream))] + async fn process_block( + mut carnot: Carnot, + block: Block, + mut stream: Pin> + Send>>, + task_manager: &mut TaskManager>, + adapter: A, + ) -> (Carnot, Option>) { + tracing::debug!("received proposal {:?}", block); + let original_block = block; + let block = original_block.header().clone(); + let self_committee = carnot.self_committee(); + let leader_committee = [carnot.id()].into_iter().collect(); + + let tally_settings = CarnotTallySettings { + threshold: carnot.super_majority_threshold(), + participating_nodes: carnot.child_committees().into_iter().flatten().collect(), + }; + let leader_tally_settings = CarnotTallySettings { + threshold: carnot.leader_super_majority_threshold(), + // TODO: add children of root committee + participating_nodes: carnot.root_committee(), + }; + + match carnot.receive_block(block.clone()) { + Ok(mut new_state) => { + let new_view = new_state.current_view(); + if new_view != carnot.current_view() { + task_manager.push( + block.view, + Self::gather_votes( + adapter.clone(), + self_committee, + block.clone(), + tally_settings, + ), + ); + new_state = Self::update_leader_selection(new_state, |leader_selection| { + leader_selection.on_new_block_received(original_block) + }); + } else { + task_manager.push(block.view, async move { + if let Some(block) = stream.next().await { + Event::Proposal { block, stream } + } else { + Event::None + } + }); + } + carnot = new_state; + } + Err(_) => tracing::debug!("invalid block {:?}", block), + } + + if carnot.is_next_leader() { + task_manager.push(block.view, async move { + let Event::Approve { qc, .. } = Self::gather_votes( + adapter, + leader_committee, + block, + leader_tally_settings, + ) + .await else { unreachable!() }; + Event::ProposeBlock { qc } + }); + } + + (carnot, None) + } + + #[instrument(level = "debug", skip(task_manager, adapter))] + async fn approve_new_view( + carnot: Carnot, + timeout_qc: TimeoutQc, + new_views: HashSet, + task_manager: &mut TaskManager>, + adapter: A, + ) -> (Carnot, Option>) { + let leader_committee = [carnot.id()].into_iter().collect(); + let leader_tally_settings = CarnotTallySettings { + threshold: carnot.leader_super_majority_threshold(), + // TODO: add children of root committee + participating_nodes: carnot.root_committee(), + }; + let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views); + let next_view = timeout_qc.view + 2; + if carnot.is_next_leader() { + let high_qc = carnot.high_qc(); + task_manager.push(timeout_qc.view + 1, async move { + let _votes = Self::gather_new_views( + adapter, + leader_committee, + timeout_qc, + leader_tally_settings.clone(), + ) + .await; + Event::ProposeBlock { + qc: Qc::Aggregated(AggregateQc { + high_qc, + view: next_view, + }), + } + }); + } + + (new_carnot, Some(Output::Send(out))) + } + + #[instrument(level = "debug", skip(task_manager, adapter))] + async fn receive_timeout_qc( + carnot: Carnot, + timeout_qc: TimeoutQc, + task_manager: &mut TaskManager>, + adapter: A, + ) -> (Carnot, Option>) { + let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); + let self_committee = carnot.self_committee(); + let tally_settings = CarnotTallySettings { + threshold: carnot.super_majority_threshold(), + participating_nodes: carnot.child_committees().into_iter().flatten().collect(), + }; + task_manager.push( + timeout_qc.view + 1, + Self::gather_new_views(adapter, self_committee, timeout_qc.clone(), tally_settings), + ); + if carnot.current_view() != new_state.current_view() { + new_state = Self::update_leader_selection(new_state, |leader_selection| { + leader_selection.on_timeout_qc_received(timeout_qc) + }); + } + (new_state, None) + } + + #[instrument(level = "debug")] + async fn process_root_timeout( + carnot: Carnot, + timeouts: HashSet, + ) -> (Carnot, Option>) { + // TODO: filter timeouts upon reception + assert!(timeouts.iter().all(|t| t.view == carnot.current_view())); + let high_qc = timeouts + .iter() + .map(|t| &t.high_qc) + .chain(std::iter::once(&carnot.high_qc())) + .max_by_key(|qc| qc.view) + .expect("empty root committee") + .clone(); + let mut output = None; + if carnot.is_member_of_root_committee() { + let timeout_qc = TimeoutQc { + view: carnot.current_view(), + high_qc, + sender: carnot.id(), + }; + output = Some(Output::BroadcastTimeoutQc { timeout_qc }); + } + (carnot, output) + } + + #[instrument(level = "debug", skip(mempool_relay, private_key))] + async fn propose_block( + id: NodeId, + private_key: PrivateKey, + qc: Qc, + mempool_relay: OutboundRelay>, + ) -> Option> { + let (reply_channel, rx) = tokio::sync::oneshot::channel(); + let mut output = None; + mempool_relay + .send(MempoolMsg::View { + ancestor_hint: [0; 32], + reply_channel, + }) + .await + .unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}")); + + match rx.await { + Ok(txs) => { + let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key); + let proposal = Block::new(qc.view() + 1, qc, txs, id, beacon); + output = Some(Output::BroadcastProposal { proposal }); + } + Err(e) => tracing::error!("Could not fetch txs {e}"), + } + output + } + + async fn process_view_change( + carnot: Carnot, + prev_view: View, + task_manager: &mut TaskManager>, + adapter: A, + ) { + let current_view = carnot.current_view(); + // First we cancel previous processing view tasks + task_manager.cancel(prev_view); + tracing::debug!("Advanced view from {prev_view} to {current_view}"); + // View change! + task_manager.push(current_view, async { + tokio::time::sleep(TIMEOUT).await; + Event::LocalTimeout + }); + task_manager.push( + current_view + 1, + Self::gather_block(adapter.clone(), current_view + 1), + ); + task_manager.push( + current_view, + Self::gather_timeout_qc(adapter.clone(), current_view), + ); + if carnot.is_member_of_root_committee() { + let threshold = carnot.leader_super_majority_threshold(); + task_manager.push( + current_view, + Self::gather_timeout(adapter, carnot.self_committee(), current_view, threshold), + ); + } + } + + async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event { if let Some(timeout_qc) = adapter .timeout_qc_stream(view) .await @@ -473,13 +552,13 @@ where } async fn gather_votes( - adapter: &A, - committee: &Committee, + adapter: A, + committee: Committee, block: consensus_engine::Block, tally: CarnotTallySettings, ) -> Event { let tally = CarnotTally::new(tally); - let votes_stream = adapter.votes_stream(committee, block.view, block.id).await; + let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await; match tally.tally(block.clone(), votes_stream).await { Ok((qc, votes)) => Event::Approve { qc, votes, block }, Err(_e) => { @@ -489,14 +568,14 @@ where } async fn gather_new_views( - adapter: &A, - committee: &Committee, + adapter: A, + committee: Committee, timeout_qc: TimeoutQc, tally: CarnotTallySettings, ) -> Event { let tally = NewViewTally::new(tally); let stream = adapter - .new_view_stream(committee, timeout_qc.view + 1) + .new_view_stream(&committee, timeout_qc.view + 1) .await; match tally.tally(timeout_qc.clone(), stream).await { Ok((_qc, new_views)) => Event::NewView { @@ -510,13 +589,13 @@ where } async fn gather_timeout( - adapter: &A, - committee: &Committee, + adapter: A, + committee: Committee, view: consensus_engine::View, threshold: usize, ) -> Event { let timeouts = adapter - .timeout_stream(committee, view) + .timeout_stream(&committee, view) .await .take(threshold) .map(|msg| msg.vote) @@ -525,7 +604,7 @@ where Event::RootTimeout { timeouts } } - async fn gather_block(adapter: &A, view: consensus_engine::View) -> Event { + async fn gather_block(adapter: A, view: consensus_engine::View) -> Event { let stream = adapter .proposal_chunks_stream(view) .await diff --git a/nomos-services/consensus/src/task_manager.rs b/nomos-services/consensus/src/task_manager.rs new file mode 100644 index 00000000..7f973f9a --- /dev/null +++ b/nomos-services/consensus/src/task_manager.rs @@ -0,0 +1,121 @@ +use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use std::collections::HashMap; +use std::hash::Hash; +use std::pin::Pin; +use tokio::select; +use tokio_util::sync::CancellationToken; + +pub struct TaskManager { + tasks: FuturesUnordered> + Send>>>, + cancel_cache: CancelCache, +} + +impl TaskManager +where + Group: Eq + PartialEq + Hash + 'static, + Out: 'static, +{ + pub fn new() -> Self { + Self { + tasks: FuturesUnordered::new(), + cancel_cache: CancelCache::new(), + } + } + + pub fn push(&mut self, group: Group, task: impl Future + Send + 'static) { + self.tasks.push(Box::pin( + self.cancel_cache.cancelable_event_future(group, task), + )); + } + + pub fn cancel(&mut self, group: Group) { + self.cancel_cache.cancel(group); + } +} + +impl Stream for TaskManager { + type Item = Out; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + use std::task::Poll; + + let tasks = &mut self.get_mut().tasks; + + loop { + match tasks.poll_next_unpin(cx) { + // we need to remove the outer Option that was inserted by the cancelabl future + Poll::Ready(Some(Some(event))) => return Poll::Ready(Some(event)), + // an empty output means the task was cancelled, ignore it + Poll::Ready(Some(None)) => {} + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } +} + +pub struct GroupCancel(CancellationToken); + +impl GroupCancel { + pub fn new() -> Self { + Self(CancellationToken::new()) + } + + pub fn cancel(&self) { + self.0.cancel(); + } + + pub fn cancel_token(&self) -> CancellationToken { + self.0.clone() + } +} + +impl Drop for GroupCancel { + fn drop(&mut self) { + if !self.0.is_cancelled() { + self.cancel(); + } + } +} + +pub struct CancelCache { + cancels: HashMap, +} + +impl CancelCache { + pub fn new() -> Self { + Self { + cancels: HashMap::new(), + } + } + + pub fn cancel(&mut self, group: Group) { + if let Some(cancel) = self.cancels.remove(&group) { + cancel.cancel(); + } + } + + pub fn cancel_token(&mut self, group: Group) -> CancellationToken { + self.cancels + .entry(group) + .or_insert_with(GroupCancel::new) + .cancel_token() + } + + pub(crate) fn cancelable_event_future>( + &mut self, + group: Group, + f: F, + ) -> impl Future> { + let token = self.cancel_token(group); + async move { + select! { + event = f => Some(event), + _ = token.cancelled() => None, + } + } + } +} diff --git a/nomos-services/consensus/src/view_cancel.rs b/nomos-services/consensus/src/view_cancel.rs index a94524ba..e69de29b 100644 --- a/nomos-services/consensus/src/view_cancel.rs +++ b/nomos-services/consensus/src/view_cancel.rs @@ -1,70 +0,0 @@ -use crate::Event; -use consensus_engine::View; -use std::collections::HashMap; -use std::future::Future; -use std::hash::Hash; -use tokio::select; -use tokio_util::sync::CancellationToken; - -pub struct ViewCancel(CancellationToken); - -impl ViewCancel { - pub fn new() -> Self { - ViewCancel(CancellationToken::new()) - } - - pub fn cancel(&self) { - self.0.cancel(); - } - - pub fn cancel_token(&self) -> CancellationToken { - self.0.clone() - } -} - -impl Drop for ViewCancel { - fn drop(&mut self) { - if !self.0.is_cancelled() { - self.cancel(); - } - } -} - -pub struct ViewCancelCache { - cancels: HashMap, -} - -impl ViewCancelCache { - pub fn new() -> Self { - ViewCancelCache { - cancels: HashMap::new(), - } - } - - pub fn cancel(&mut self, view: View) { - if let Some(cancel) = self.cancels.remove(&view) { - cancel.cancel(); - } - } - - pub fn cancel_token(&mut self, view: View) -> CancellationToken { - self.cancels - .entry(view) - .or_insert_with(ViewCancel::new) - .cancel_token() - } - - pub(crate) fn cancelable_event_future>>( - &mut self, - view: View, - f: F, - ) -> impl Future> { - let token = self.cancel_token(view); - async move { - select! { - event = f => event, - _ = token.cancelled() => Event::None, - } - } - } -}