diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 68dfaf85..760adc83 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -25,7 +25,7 @@ use nomos_core::fountain::FountainCode; use nomos_core::staking::Stake; use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService}; use nomos_network::NetworkService; -use overlay::{Member, Overlay}; +use overlay::Overlay; use overwatch_rs::services::relay::{OutboundRelay, Relay}; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -40,8 +40,6 @@ pub type NodeId = PublicKey; // Random seed for each round provided by the protocol pub type Seed = [u8; 32]; -const COMMITTEE_SIZE: usize = 1; - pub struct CarnotSettings { private_key: [u8; 32], fountain_settings: Fountain::Settings, @@ -66,12 +64,13 @@ impl CarnotSettings { } } -pub struct CarnotConsensus +pub struct CarnotConsensus where F: FountainCode, A: NetworkAdapter, M: MempoolAdapter, P: MemPool, + O: Overlay, P::Tx: Debug + 'static, P::Id: Debug + 'static, A::Backend: 'static, @@ -82,9 +81,10 @@ where network_relay: Relay>, mempool_relay: Relay>, _fountain: std::marker::PhantomData, + _overlay: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where F: FountainCode, A: NetworkAdapter, @@ -92,6 +92,7 @@ where P::Tx: Debug, P::Id: Debug, M: MempoolAdapter, + O: Overlay, { const SERVICE_ID: ServiceId = "Carnot"; type Settings = CarnotSettings; @@ -101,7 +102,7 @@ where } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where F: FountainCode + Send + Sync + 'static, A: NetworkAdapter + Send + Sync + 'static, @@ -110,6 +111,7 @@ where P::Tx: Debug + Send + Sync + 'static, P::Id: Debug + Send + Sync + 'static, M: MempoolAdapter + Send + Sync + 'static, + O: Overlay + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -118,6 +120,7 @@ where service_state, network_relay, _fountain: Default::default(), + _overlay: Default::default(), mempool_relay, }) } @@ -159,7 +162,7 @@ where // FIXME: this should probably have a timer to detect failed rounds let res = cur_view - .resolve::, _, _, _>( + .resolve::( private_key, &tip, &network_adapter, @@ -206,7 +209,7 @@ impl View { where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - O: Overlay<'view, A, F>, + O: Overlay, { let res = if self.is_leader(node_id) { let block = self @@ -241,19 +244,19 @@ impl View { where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - O: Overlay<'view, A, F>, + O: Overlay, { let overlay = O::new(self, node_id); // We need to build the QC for the block we are proposing - let qc = overlay.build_qc(adapter).await; + let qc = overlay.build_qc(self, adapter).await; let LeadershipResult::Leader { block, _view } = leadership .try_propose_block(self, tip, qc) .await else { panic!("we are leader")}; overlay - .broadcast_block(block.clone(), adapter, fountain) + .broadcast_block(self, block.clone(), adapter, fountain) .await; Ok(block) @@ -268,7 +271,7 @@ impl View { where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - O: Overlay<'view, A, F>, + O: Overlay, { let overlay = O::new(self, node_id); // Consensus in Carnot is achieved in 2 steps from the point of view of a node: @@ -279,7 +282,7 @@ impl View { // 1) Collect and verify block proposal. let block = overlay - .reconstruct_proposal_block(adapter, fountain) + .reconstruct_proposal_block(self, adapter, fountain) .await .unwrap(); // FIXME: handle sad path @@ -291,7 +294,7 @@ impl View { // We only consider the happy path for now if self.pipelined_safe_block(&block) { overlay - .approve_and_forward(&block, adapter, &next_view) + .approve_and_forward(self, &block, adapter, &next_view) .await .unwrap(); // FIXME: handle sad path } diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 851855d2..09ba68a0 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -8,37 +8,38 @@ use crate::network::messages::ProposalChunkMsg; use crate::network::NetworkAdapter; /// View of the tree overlay centered around a specific member -pub struct Member<'view, const C: usize> { +pub struct Member { // id is not used now, but gonna probably used it for later checking later on #[allow(dead_code)] id: NodeId, committee: Committee, - committees: Committees<'view, C>, + committees: Committees, + view_n: u64, } /// #Just a newtype index to be able to implement parent/children methods #[derive(Copy, Clone)] pub struct Committee(usize); -pub struct Committees<'view, const C: usize> { - view: &'view View, +pub struct Committees { nodes: Box<[NodeId]>, } -impl<'view, const C: usize> Committees<'view, C> { - pub fn new(view: &'view View) -> Self { +impl Committees { + pub fn new(view: &View) -> Self { let mut nodes = view.staking_keys.keys().cloned().collect::>(); let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed); nodes.shuffle(&mut rng); - Self { nodes, view } + Self { nodes } } - pub fn into_member(self, id: NodeId) -> Option> { + pub fn into_member(self, id: NodeId, view: &View) -> Option> { let member_idx = self.nodes.iter().position(|m| m == &id)?; Some(Member { committee: Committee(member_idx / C), committees: self, id, + view_n: view.view_n, }) } @@ -83,7 +84,7 @@ impl Committee { } } -impl<'view, const C: usize> Member<'view, C> { +impl Member { /// Return other members of this committee pub fn peers(&self) -> &[NodeId] { self.committees @@ -108,28 +109,36 @@ impl<'view, const C: usize> Member<'view, C> { } #[async_trait::async_trait] -impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const C: usize> - Overlay<'view, Network, Fountain> for Member<'view, C> +impl + Overlay for Member { - fn new(view: &'view View, node: NodeId) -> Self { + // we still need view here to help us initialize + fn new(view: &View, node: NodeId) -> Self { let committees = Committees::new(view); - committees.into_member(node).unwrap() + committees.into_member(node, view).unwrap() } async fn reconstruct_proposal_block( &self, + view: &View, adapter: &Network, fountain: &Fountain, ) -> Result { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let committee = self.committee; - let view = self.committees.view; let message_stream = adapter.proposal_chunks_stream(committee, view).await; fountain.decode(message_stream).await.map(Block::from_bytes) } - async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { + async fn broadcast_block( + &self, + view: &View, + block: Block, + adapter: &Network, + fountain: &Fountain, + ) { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let (left_child, right_child) = self.children_committes(); - let view = self.committees.view; let block_bytes = block.as_bytes(); let encoded_stream = fountain.encode(&block_bytes); encoded_stream @@ -152,10 +161,12 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const async fn approve_and_forward( &self, + view: &View, _block: &Block, _adapter: &Network, _next_view: &View, ) -> Result<(), Box> { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // roughly, we want to do something like this: // 1. wait for left and right children committees to approve // 2. approve the block @@ -166,7 +177,8 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const todo!() } - async fn build_qc(&self, _adapter: &Network) -> Approval { + async fn build_qc(&self, view: &View, _adapter: &Network) -> Approval { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // maybe the leader publishing the QC? todo!() } diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index 15563a09..09cc1c17 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -29,19 +29,19 @@ impl Threshold { /// As far as the API is concerned, this should be equivalent to any other /// overlay and far simpler to implement. /// For this reason, this might act as a 'reference' overlay for testing. -pub struct Flat<'view> { - view: &'view View, +pub struct Flat { // TODO: this should be a const param, but we can't do that yet threshold: Threshold, node_id: NodeId, + view_n: u64, } -impl<'view> Flat<'view> { - pub fn new(view: &'view View, node_id: NodeId) -> Self { +impl Flat { + pub fn new(view_n: u64, node_id: NodeId) -> Self { Self { - view, threshold: DEFAULT_THRESHOLD, node_id, + view_n, } } @@ -52,32 +52,39 @@ impl<'view> Flat<'view> { } #[async_trait::async_trait] -impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> - Overlay<'view, Network, Fountain> for Flat<'view> +impl Overlay + for Flat { - fn new(view: &'view View, node: NodeId) -> Self { - Flat::new(view, node) + fn new(view: &View, node: NodeId) -> Self { + Flat::new(view.view_n, node) } async fn reconstruct_proposal_block( &self, + view: &View, adapter: &Network, fountain: &Fountain, ) -> Result { - let message_stream = adapter - .proposal_chunks_stream(FLAT_COMMITTEE, self.view) - .await; + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); + let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await; fountain.decode(message_stream).await.map(Block::from_bytes) } - async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { + async fn broadcast_block( + &self, + view: &View, + block: Block, + adapter: &Network, + fountain: &Fountain, + ) { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let block_bytes = block.as_bytes(); let encoded_stream = fountain.encode(&block_bytes); encoded_stream .for_each_concurrent(None, |chunk| async move { let message = ProposalChunkMsg { chunk }; adapter - .broadcast_block_chunk(FLAT_COMMITTEE, self.view, message) + .broadcast_block_chunk(FLAT_COMMITTEE, view, message) .await; }) .await; @@ -85,16 +92,18 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> async fn approve_and_forward( &self, + view: &View, block: &Block, adapter: &Network, _next_view: &View, ) -> Result<(), Box> { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); // in the flat overlay, there's no need to wait for anyone before approving the block let approval = self.approve(block); adapter .forward_approval( FLAT_COMMITTEE, - self.view, + view, ApprovalMsg { approval, source: self.node_id, @@ -104,11 +113,13 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Ok(()) } - async fn build_qc(&self, adapter: &Network) -> Approval { + async fn build_qc(&self, view: &View, adapter: &Network) -> Approval { + assert_eq!(view.view_n, self.view_n, "view_n mismatch"); + // for now, let's pretend that consensus is reached as soon as the // block is approved by a share of the nodes let mut approvals = HashSet::new(); - let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, self.view).await); + let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, view).await); // Shadow the original binding so that it can't be directly accessed // ever again. @@ -118,7 +129,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> approvals.insert(approval); // ceil(num/den * n) let threshold = - (self.threshold.num * self.view.staking_keys.len() as u64 + self.threshold.den - 1) + (self.threshold.num * view.staking_keys.len() as u64 + self.threshold.den - 1) / self.threshold.den; if approvals.len() as u64 >= threshold { // consensus reached diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index 5ec2228d..5989ccd9 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -13,24 +13,32 @@ use nomos_core::fountain::{FountainCode, FountainError}; /// Dissemination overlay, tied to a specific view #[async_trait::async_trait] -pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> { - fn new(view: &'view View, node: NodeId) -> Self; +pub trait Overlay { + fn new(view: &View, node: NodeId) -> Self; async fn reconstruct_proposal_block( &self, + view: &View, adapter: &Network, fountain: &Fountain, ) -> Result; - async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain); + async fn broadcast_block( + &self, + view: &View, + block: Block, + adapter: &Network, + fountain: &Fountain, + ); /// Different overlays might have different needs or the same overlay might /// require different steps depending on the node role /// For now let's put this responsibility on the overlay async fn approve_and_forward( &self, + view: &View, block: &Block, adapter: &Network, next_view: &View, ) -> Result<(), Box>; /// Wait for consensus on a block - async fn build_qc(&self, adapter: &Network) -> Approval; + async fn build_qc(&self, view: &View, adapter: &Network) -> Approval; } diff --git a/nomos-services/consensus/src/test.rs b/nomos-services/consensus/src/test.rs index af59c714..9b8f6bfc 100644 --- a/nomos-services/consensus/src/test.rs +++ b/nomos-services/consensus/src/test.rs @@ -5,8 +5,8 @@ use crate::*; use async_trait::async_trait; use bytes::Bytes; use futures::Stream; -use nomos_core::block::*; -use nomos_core::fountain::{mock::MockFountain, FountainCode, FountainError}; +use nomos_core::fountain::FountainError; +use nomos_core::fountain::{mock::MockFountain, FountainCode}; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::*; @@ -17,26 +17,35 @@ struct DummyAdapter; struct DummyBackend; #[async_trait] -impl<'view, N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<'view, N, F> - for DummyOverlay -{ +impl Overlay for DummyOverlay { fn new(_: &View, _: NodeId) -> Self { DummyOverlay } - async fn build_qc(&self, _: &N) -> Approval { - Approval - } - - async fn broadcast_block(&self, _: Block, _: &N, _: &F) {} - - async fn reconstruct_proposal_block(&self, _: &N, _: &F) -> Result { + async fn reconstruct_proposal_block( + &self, + _view: &View, + _adapter: &N, + _fountain: &F, + ) -> Result { Ok(Block) } - async fn approve_and_forward(&self, _: &Block, _: &N, _: &View) -> Result<(), Box> { + async fn broadcast_block(&self, _view: &View, _block: Block, _adapter: &N, _fountain: &F) {} + + async fn approve_and_forward( + &self, + _view: &View, + _block: &Block, + _adapter: &N, + _next_view: &View, + ) -> Result<(), Box> { Ok(()) } + + async fn build_qc(&self, _view: &View, _: &N) -> Approval { + Approval + } } #[async_trait]