diff --git a/consensus-engine/Cargo.toml b/consensus-engine/Cargo.toml index 5710bb3e..dd9c51ab 100644 --- a/consensus-engine/Cargo.toml +++ b/consensus-engine/Cargo.toml @@ -7,6 +7,12 @@ edition = "2021" [dependencies] serde = { version = "1.0", features = ["derive"], optional = true } +bls-signatures = "0.14" +integer-encoding = "3" +sha2 = "0.10" +rand = "0.8" +rand_chacha = "0.3" +thiserror = "1" [features] default = [] diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 06f738b5..001a7739 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -1,6 +1,8 @@ use std::collections::{HashMap, HashSet}; +pub mod overlay; mod types; +pub use overlay::Overlay; pub use types::*; #[derive(Clone, Debug)] @@ -48,6 +50,16 @@ impl Carnot { if self.safe_blocks.contains_key(&block.id) { return Ok(self.clone()); } + + match block.leader_proof { + LeaderProof::LeaderId { leader_id } => { + // This only accepts blocks from the leader of current_view + 1 + if leader_id != self.overlay.next_leader() { + return Err(()); + } + } + } + if self.blocks_in_view(block.view).contains(&block) || block.view <= self.latest_committed_view() { @@ -111,9 +123,7 @@ impl Carnot { new_state.highest_voted_view = block.view; let to = if new_state.overlay.is_member_of_root_committee(new_state.id) { - [new_state.overlay.leader(block.view + 1)] - .into_iter() - .collect() + [new_state.overlay.next_leader()].into_iter().collect() } else { new_state.overlay.parent_committee(self.id) }; @@ -180,9 +190,7 @@ impl Carnot { new_state.highest_voted_view = new_view; let to = if new_state.overlay.is_member_of_root_committee(new_state.id) { - [new_state.overlay.leader(new_view + 1)] - .into_iter() - .collect() + [new_state.overlay.next_leader()].into_iter().collect() } else { new_state.overlay.parent_committee(new_state.id) }; @@ -317,8 +325,8 @@ impl Carnot { self.local_high_qc.clone() } - pub fn is_leader_for_view(&self, view: View) -> bool { - self.overlay.leader(view) == self.id + pub fn is_next_leader(&self) -> bool { + self.overlay.next_leader() == self.id } pub fn super_majority_threshold(&self) -> usize { @@ -352,82 +360,42 @@ impl Carnot { pub fn is_member_of_root_committee(&self) -> bool { self.overlay.is_member_of_root_committee(self.id) } + + /// A way to allow for overlay extendability without compromising the engine + /// generality. + pub fn update_overlay(&self, f: F) -> Result + where + F: FnOnce(O) -> Result, + { + match f(self.overlay.clone()) { + Ok(overlay) => Ok(Self { + overlay, + ..self.clone() + }), + Err(e) => Err(e), + } + } } #[cfg(test)] mod test { + use crate::overlay::{FlatOverlay, RoundRobin, Settings}; + use super::*; - #[derive(Clone, Debug, PartialEq)] - struct MockOverlay; - - impl Overlay for MockOverlay { - fn new(_nodes: Vec) -> Self { - Self - } - - fn root_committee(&self) -> Committee { - vec![[0; 32]].into_iter().collect() - } - - fn rebuild(&mut self, _timeout_qc: TimeoutQc) { - todo!() - } - - fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool { - false - } - - fn is_member_of_root_committee(&self, _id: NodeId) -> bool { - true - } - - fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool { - true - } - - fn is_child_of_root_committee(&self, _id: NodeId) -> bool { - false - } - - fn node_committee(&self, _id: NodeId) -> Committee { - self.root_committee() - } - - fn parent_committee(&self, _id: NodeId) -> Committee { - self.root_committee() - } - - fn child_committees(&self, _id: NodeId) -> Vec { - vec![] - } - - fn leaf_committees(&self, _id: NodeId) -> Vec { - vec![self.root_committee()] - } - - fn leader(&self, _view: View) -> NodeId { - [0; 32] - } - - fn super_majority_threshold(&self, _id: NodeId) -> usize { - todo!() - } - - fn leader_super_majority_threshold(&self, _id: NodeId) -> usize { - self.root_committee().len() * 2 / 3 + 1 - } - } - - fn init_from_genesis() -> Carnot { + fn init_from_genesis() -> Carnot> { Carnot::from_genesis( [0; 32], Block { view: 0, id: [0; 32], parent_qc: Qc::Standard(StandardQc::genesis()), + leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, }, - MockOverlay, + FlatOverlay::new(Settings { + nodes: vec![[0; 32]], + leader: RoundRobin::default(), + }), ) } @@ -442,6 +410,7 @@ mod test { view: block.view, id: block.id, }), + leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, }; } @@ -499,6 +468,7 @@ mod test { view: engine.current_view(), id: parent_block_id, }), + leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, }; let _ = engine.receive_block(block.clone()); diff --git a/consensus-engine/src/overlay/flat_overlay.rs b/consensus-engine/src/overlay/flat_overlay.rs new file mode 100644 index 00000000..e687d4d4 --- /dev/null +++ b/consensus-engine/src/overlay/flat_overlay.rs @@ -0,0 +1,119 @@ +use super::LeaderSelection; +use crate::{Committee, NodeId, Overlay}; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug)] +/// Flat overlay with a single committee and round robin leader selection. +pub struct FlatOverlay { + nodes: Vec, + + leader: L, +} + +impl Overlay for FlatOverlay +where + L: LeaderSelection + Send + Sync + 'static, +{ + type Settings = Settings; + type LeaderSelection = L; + + fn new(Settings { leader, nodes }: Self::Settings) -> Self { + Self { nodes, leader } + } + + fn root_committee(&self) -> crate::Committee { + self.nodes.clone().into_iter().collect() + } + + fn rebuild(&mut self, _timeout_qc: crate::TimeoutQc) { + todo!() + } + + fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool { + false + } + + fn is_member_of_root_committee(&self, _id: NodeId) -> bool { + true + } + + fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool { + true + } + + fn is_child_of_root_committee(&self, _id: NodeId) -> bool { + false + } + + fn parent_committee(&self, _id: NodeId) -> crate::Committee { + Committee::new() + } + + fn node_committee(&self, _id: NodeId) -> crate::Committee { + self.nodes.clone().into_iter().collect() + } + + fn child_committees(&self, _id: NodeId) -> Vec { + vec![] + } + + fn leaf_committees(&self, _id: NodeId) -> Vec { + vec![self.root_committee()] + } + + fn next_leader(&self) -> NodeId { + self.leader.next_leader(&self.nodes) + } + + fn super_majority_threshold(&self, _id: NodeId) -> usize { + 0 + } + + fn leader_super_majority_threshold(&self, _id: NodeId) -> usize { + self.nodes.len() * 2 / 3 + 1 + } + + fn update_leader_selection(&self, f: F) -> Result + where + F: FnOnce(Self::LeaderSelection) -> Result, + { + match f(self.leader.clone()) { + Ok(leader_selection) => Ok(Self { + leader: leader_selection, + ..self.clone() + }), + Err(e) => Err(e), + } + } +} + +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct RoundRobin { + cur: usize, +} + +impl RoundRobin { + pub fn new() -> Self { + Self { cur: 0 } + } + + pub fn advance(&self) -> Self { + Self { + cur: (self.cur + 1), + } + } +} + +impl LeaderSelection for RoundRobin { + fn next_leader(&self, nodes: &[NodeId]) -> NodeId { + nodes[self.cur % nodes.len()] + } +} + +#[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct Settings { + pub nodes: Vec, + pub leader: L, +} diff --git a/consensus-engine/src/overlay/mod.rs b/consensus-engine/src/overlay/mod.rs new file mode 100644 index 00000000..2023e2ca --- /dev/null +++ b/consensus-engine/src/overlay/mod.rs @@ -0,0 +1,35 @@ +use super::types::*; + +mod flat_overlay; +mod random_beacon; +pub use flat_overlay::*; +pub use random_beacon::*; + +use std::marker::Send; + +pub trait Overlay: Clone { + type Settings: Clone + Send + Sync + 'static; + type LeaderSelection: LeaderSelection + Clone + Send + Sync + 'static; + + fn new(settings: Self::Settings) -> Self; + fn root_committee(&self) -> Committee; + fn rebuild(&mut self, timeout_qc: TimeoutQc); + fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool; + fn is_member_of_root_committee(&self, id: NodeId) -> bool; + fn is_member_of_leaf_committee(&self, id: NodeId) -> bool; + fn is_child_of_root_committee(&self, id: NodeId) -> bool; + fn parent_committee(&self, id: NodeId) -> Committee; + fn child_committees(&self, id: NodeId) -> Vec; + fn leaf_committees(&self, id: NodeId) -> Vec; + fn node_committee(&self, id: NodeId) -> Committee; + fn next_leader(&self) -> NodeId; + fn super_majority_threshold(&self, id: NodeId) -> usize; + fn leader_super_majority_threshold(&self, id: NodeId) -> usize; + fn update_leader_selection(&self, f: F) -> Result + where + F: FnOnce(Self::LeaderSelection) -> Result; +} + +pub trait LeaderSelection: Clone { + fn next_leader(&self, nodes: &[NodeId]) -> NodeId; +} diff --git a/consensus-engine/src/overlay/random_beacon.rs b/consensus-engine/src/overlay/random_beacon.rs new file mode 100644 index 00000000..e1648d46 --- /dev/null +++ b/consensus-engine/src/overlay/random_beacon.rs @@ -0,0 +1,122 @@ +use crate::types::*; +use bls_signatures::{PrivateKey, PublicKey, Serialize, Signature}; +use integer_encoding::VarInt; +use rand::{seq::SliceRandom, SeedableRng}; +use serde::{Deserialize, Serialize as SerdeSerialize}; +use sha2::{Digest, Sha256}; +use std::ops::Deref; +use thiserror::Error; + +use super::LeaderSelection; + +pub type Entropy = [u8]; +pub type Context = [u8]; + +#[cfg_attr(feature = "serde", derive(SerdeSerialize, Deserialize))] +#[derive(Debug, Clone, PartialEq)] +pub enum RandomBeaconState { + Happy { + // a byte string so that we can access the entropy without + // copying memory (can't directly go from the signature to its compressed form) + // We still assume the conversion does not fail, so the format has to be checked + // during deserialization + sig: Box<[u8]>, + #[serde(with = "serialize_bls")] + public_key: PublicKey, + }, + Sad { + entropy: Box, + }, +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Invalid random beacon transition")] + InvalidRandomBeacon, +} + +impl RandomBeaconState { + pub fn entropy(&self) -> &Entropy { + match self { + Self::Happy { sig, .. } => sig, + Self::Sad { entropy } => entropy, + } + } + + pub fn generate_happy(view: View, sk: &PrivateKey) -> Self { + let sig = sk.sign(view_to_bytes(view)); + Self::Happy { + sig: sig.as_bytes().into(), + public_key: sk.public_key(), + } + } + + pub fn generate_sad(view: View, prev: &Self) -> Self { + let context = view_to_bytes(view); + let mut hasher = Sha256::new(); + hasher.update(prev.entropy()); + hasher.update(context); + + let entropy = hasher.finalize().to_vec().into(); + Self::Sad { entropy } + } + + pub fn check_advance_happy(&self, rb: RandomBeaconState, view: View) -> Result { + let context = view_to_bytes(view); + match rb { + Self::Happy { + ref sig, + public_key, + } => { + let sig = Signature::from_bytes(sig).unwrap(); + if !public_key.verify(sig, context) { + return Err(Error::InvalidRandomBeacon); + } + } + Self::Sad { .. } => return Err(Error::InvalidRandomBeacon), + } + Ok(rb) + } +} + +fn view_to_bytes(view: View) -> Box<[u8]> { + View::encode_var_vec(view).into_boxed_slice() +} + +// FIXME: the spec should be clearer on what is the expected behavior, +// for now, just use something that works +fn choice(state: &RandomBeaconState, nodes: &[NodeId]) -> NodeId { + let mut seed = [0; 32]; + seed.copy_from_slice(&state.entropy().deref()[..32]); + let mut rng = rand_chacha::ChaChaRng::from_seed(seed); + *nodes.choose(&mut rng).unwrap() +} + +impl LeaderSelection for RandomBeaconState { + fn next_leader(&self, nodes: &[NodeId]) -> NodeId { + choice(self, nodes) + } +} + +mod serialize_bls { + use super::*; + use serde::{Deserializer, Serializer}; + + pub fn deserialize<'de, D, T>(deserializer: D) -> Result + where + D: Deserializer<'de>, + T: bls_signatures::Serialize, + { + let bytes = Vec::::deserialize(deserializer)?; + T::from_bytes(&bytes).map_err(serde::de::Error::custom) + } + + pub fn serialize(sig: &T, serializer: S) -> Result + where + S: Serializer, + T: bls_signatures::Serialize, + { + let bytes = sig.as_bytes(); + bytes.serialize(serializer) + } +} diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs index 297bb922..26e6c1bc 100644 --- a/consensus-engine/src/types.rs +++ b/consensus-engine/src/types.rs @@ -69,6 +69,13 @@ pub struct Block { pub id: BlockId, pub view: View, pub parent_qc: Qc, + pub leader_proof: LeaderProof, +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum LeaderProof { + LeaderId { leader_id: NodeId }, } impl Block { @@ -147,20 +154,3 @@ impl Qc { } } } - -pub trait Overlay: Clone { - fn new(nodes: Vec) -> Self; - fn root_committee(&self) -> Committee; - fn rebuild(&mut self, timeout_qc: TimeoutQc); - fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool; - fn is_member_of_root_committee(&self, id: NodeId) -> bool; - fn is_member_of_leaf_committee(&self, id: NodeId) -> bool; - fn is_child_of_root_committee(&self, id: NodeId) -> bool; - fn parent_committee(&self, id: NodeId) -> Committee; - fn child_committees(&self, id: NodeId) -> Vec; - fn leaf_committees(&self, id: NodeId) -> Vec; - fn node_committee(&self, id: NodeId) -> Committee; - fn leader(&self, view: View) -> NodeId; - fn super_majority_threshold(&self, id: NodeId) -> usize; - fn leader_super_majority_threshold(&self, id: NodeId) -> usize; -} diff --git a/nodes/mockpool-node/Cargo.toml b/nodes/mockpool-node/Cargo.toml index 9f128e55..f787fe62 100644 --- a/nodes/mockpool-node/Cargo.toml +++ b/nodes/mockpool-node/Cargo.toml @@ -24,6 +24,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mo nomos-http = { path = "../../nomos-services/http", features = ["http"] } nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] } tracing-subscriber = "0.3" +consensus-engine = { path = "../../consensus-engine" } tokio = {version = "1.24", features = ["sync"] } serde_json = "1.0" serde_yaml = "0.9" diff --git a/nodes/mockpool-node/src/main.rs b/nodes/mockpool-node/src/main.rs index 5de599c4..f3754d27 100644 --- a/nodes/mockpool-node/src/main.rs +++ b/nodes/mockpool-node/src/main.rs @@ -3,9 +3,9 @@ mod tx; use clap::Parser; use color_eyre::eyre::{eyre, Result}; +use consensus_engine::overlay::{FlatOverlay, RoundRobin}; use nomos_consensus::{ - network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, overlay::FlatRoundRobin, - CarnotConsensus, + network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, CarnotConsensus, }; use nomos_core::fountain::mock::MockFountain; use nomos_http::backends::axum::AxumBackend; @@ -39,7 +39,7 @@ type Carnot = CarnotConsensus< MockPool, MempoolWakuAdapter, MockFountain, - FlatRoundRobin, + FlatOverlay, >; #[derive(Deserialize)] diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index 19a8bdc9..43423021 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -1,10 +1,11 @@ +use consensus_engine::overlay::RandomBeaconState; use indexmap::IndexSet; // std use core::hash::Hash; // crates use crate::wire; use bytes::Bytes; -use consensus_engine::{Qc, View}; +use consensus_engine::{LeaderProof, NodeId, Qc, View}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; // internal @@ -16,29 +17,42 @@ pub type TxHash = [u8; 32]; pub struct Block { header: consensus_engine::Block, transactions: IndexSet, + beacon: RandomBeaconState, } /// Identifier of a block pub type BlockId = [u8; 32]; impl Block { - pub fn new(view: View, parent_qc: Qc, txs: impl Iterator) -> Self { + pub fn new( + view: View, + parent_qc: Qc, + txs: impl Iterator, + proposer: NodeId, + beacon: RandomBeaconState, + ) -> Self { let transactions = txs.collect(); let header = consensus_engine::Block { id: [view as u8; 32], view, parent_qc, + leader_proof: LeaderProof::LeaderId { + leader_id: proposer, + }, }; let mut s = Self { header, transactions, + beacon, }; let id = id_from_wire_content(&s.as_bytes()); s.header.id = id; s } +} +impl Block { pub fn header(&self) -> &consensus_engine::Block { &self.header } @@ -46,6 +60,10 @@ impl Block { pub fn transactions(&self) -> impl Iterator + '_ { self.transactions.iter() } + + pub fn beacon(&self) -> &RandomBeaconState { + &self.beacon + } } fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId { diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 47492b5b..1af3ec8d 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -25,6 +25,7 @@ tokio-stream = "0.1" tokio-util = "0.7" tracing = "0.1" waku-bindings = { version = "0.1.0-rc.2", optional = true} +bls-signatures = "0.14" [features] default = [] diff --git a/nomos-services/consensus/src/leader_selection/mod.rs b/nomos-services/consensus/src/leader_selection/mod.rs new file mode 100644 index 00000000..0cbb4d03 --- /dev/null +++ b/nomos-services/consensus/src/leader_selection/mod.rs @@ -0,0 +1,47 @@ +use consensus_engine::{ + overlay::{Error as RandomBeaconError, LeaderSelection, RandomBeaconState, RoundRobin}, + TimeoutQc, +}; +use nomos_core::block::Block; +use std::{convert::Infallible, error::Error, hash::Hash}; + +pub trait UpdateableLeaderSelection: LeaderSelection { + type Error: Error; + + fn on_new_block_received( + &self, + block: Block, + ) -> Result; + fn on_timeout_qc_received(&self, qc: TimeoutQc) -> Result; +} + +impl UpdateableLeaderSelection for RoundRobin { + type Error = Infallible; + + fn on_new_block_received( + &self, + _block: Block, + ) -> Result { + Ok(self.advance()) + } + + fn on_timeout_qc_received(&self, _qc: TimeoutQc) -> Result { + Ok(self.advance()) + } +} + +impl UpdateableLeaderSelection for RandomBeaconState { + type Error = RandomBeaconError; + + fn on_new_block_received( + &self, + block: Block, + ) -> Result { + self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view()) + // TODO: check random beacon public keys is leader id + } + + fn on_timeout_qc_received(&self, qc: TimeoutQc) -> Result { + Ok(Self::generate_sad(qc.view, self)) + } +} diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 00e19002..f7a8b210 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -4,8 +4,8 @@ //! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views). //! It's obviously extremely important that the information contained in `View` is synchronized across different //! nodes, but that has to be achieved through different means. +mod leader_selection; pub mod network; -pub mod overlay; mod tally; mod view_cancel; @@ -16,7 +16,9 @@ use std::hash::Hash; use std::pin::Pin; use std::time::Duration; // crates +use bls_signatures::PrivateKey; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use leader_selection::UpdateableLeaderSelection; use serde::Deserialize; use serde::{de::DeserializeOwned, Serialize}; // internal @@ -25,9 +27,10 @@ use crate::network::NetworkAdapter; use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings}; use crate::view_cancel::ViewCancelCache; use consensus_engine::{ - AggregateQc, Carnot, Committee, NewView, Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, - Vote, + overlay::RandomBeaconState, AggregateQc, Carnot, Committee, LeaderProof, NewView, Overlay, + Payload, Qc, StandardQc, Timeout, TimeoutQc, Vote, }; + use nomos_core::block::Block; use nomos_core::crypto::PublicKey; use nomos_core::fountain::FountainCode; @@ -54,33 +57,33 @@ pub type NodeId = PublicKey; pub type Seed = [u8; 32]; #[derive(Debug, Deserialize, Serialize)] -pub struct CarnotSettings { +pub struct CarnotSettings { private_key: [u8; 32], fountain_settings: Fountain::Settings, - nodes: Vec, + overlay_settings: O::Settings, } -impl Clone for CarnotSettings { +impl Clone for CarnotSettings { fn clone(&self) -> Self { Self { private_key: self.private_key, fountain_settings: self.fountain_settings.clone(), - nodes: self.nodes.clone(), + overlay_settings: self.overlay_settings.clone(), } } } -impl CarnotSettings { +impl CarnotSettings { #[inline] pub const fn new( private_key: [u8; 32], fountain_settings: Fountain::Settings, - nodes: Vec, + overlay_settings: O::Settings, ) -> Self { Self { private_key, fountain_settings, - nodes, + overlay_settings, } } } @@ -116,7 +119,7 @@ where O: Overlay + Debug, { const SERVICE_ID: ServiceId = "Carnot"; - type Settings = CarnotSettings; + type Settings = CarnotSettings; type State = NoState; type StateOperator = NoOperator; type Message = NoMessage; @@ -134,6 +137,7 @@ where ::Hash: Debug + Send + Sync, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, + O::LeaderSelection: UpdateableLeaderSelection, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -163,14 +167,15 @@ where let CarnotSettings { private_key, fountain_settings, - nodes, + overlay_settings, } = self.service_state.settings_reader.get_updated_settings(); - let overlay = O::new(nodes); + let overlay = O::new(overlay_settings); let genesis = consensus_engine::Block { id: [0; 32], view: 0, parent_qc: Qc::Standard(StandardQc::genesis()), + leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, }; let mut carnot = Carnot::from_genesis(private_key, genesis, overlay); let network_adapter = A::new(network_relay).await; @@ -210,7 +215,7 @@ where tally_settings.clone(), ), ))); - if carnot.is_leader_for_view(genesis_block.view + 1) { + if carnot.is_next_leader() { events.push(Box::pin(view_cancel_cache.cancelable_event_future( genesis_block.view + 1, async move { @@ -234,9 +239,10 @@ where match event { Event::Proposal { block, mut stream } => { tracing::debug!("received proposal {:?}", block); - let block = block.header().clone(); + let original_block = block; + let block = original_block.header().clone(); match carnot.receive_block(block.clone()) { - Ok(new_state) => { + Ok(mut new_state) => { let new_view = new_state.current_view(); if new_view != carnot.current_view() { events.push(Box::pin(view_cancel_cache.cancelable_event_future( @@ -248,6 +254,10 @@ where tally_settings.clone(), ), ))); + new_state = + Self::update_leader_selection(new_state, |leader_selection| { + leader_selection.on_new_block_received(original_block) + }); } else { events.push(Box::pin(view_cancel_cache.cancelable_event_future( block.view, @@ -264,7 +274,7 @@ where } Err(_) => tracing::debug!("invalid block {:?}", block), } - if carnot.is_leader_for_view(block.view + 1) { + if carnot.is_next_leader() { events.push(Box::pin(view_cancel_cache.cancelable_event_future( block.view, async move { @@ -301,8 +311,7 @@ where carnot = new_carnot; output = Some(Output::Send(out)); let new_view = timeout_qc.view + 1; - let next_view = new_view + 1; - if carnot.is_leader_for_view(next_view) { + if carnot.is_next_leader() { let high_qc = carnot.high_qc(); events.push(Box::pin(view_cancel_cache.cancelable_event_future( new_view, @@ -326,16 +335,22 @@ where } Event::TimeoutQc { timeout_qc } => { tracing::debug!("timeout received {:?}", timeout_qc); - carnot = carnot.receive_timeout_qc(timeout_qc.clone()); + let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); events.push(Box::pin(view_cancel_cache.cancelable_event_future( timeout_qc.view + 1, Self::gather_new_views( adapter, self_committee, - timeout_qc, + timeout_qc.clone(), tally_settings.clone(), ), ))); + if carnot.current_view() != new_state.current_view() { + new_state = Self::update_leader_selection(new_state, |leader_selection| { + leader_selection.on_timeout_qc_received(timeout_qc) + }); + } + carnot = new_state; } Event::RootTimeout { timeouts } => { tracing::debug!("root timeout {:?}", timeouts); @@ -371,7 +386,11 @@ where }); match rx.await { Ok(txs) => { - let proposal = Block::new(qc.view() + 1, qc, txs); + let beacon = RandomBeaconState::generate_happy( + qc.view(), + &PrivateKey::new(private_key), + ); + let proposal = Block::new(qc.view() + 1, qc, txs, carnot.id(), beacon); output = Some(Output::BroadcastProposal { proposal }); } Err(e) => tracing::error!("Could not fetch txs {e}"), @@ -437,6 +456,7 @@ where ::Hash: Debug + Send + Sync, M: MempoolAdapter + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static, + O::LeaderSelection: UpdateableLeaderSelection, { async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event { if let Some(timeout_qc) = adapter @@ -527,6 +547,19 @@ where Event::None } } + + fn update_leader_selection< + E: std::error::Error, + U: FnOnce(O::LeaderSelection) -> Result, + >( + carnot: Carnot, + f: U, + ) -> Carnot { + carnot + .update_overlay(|overlay| overlay.update_leader_selection(f)) + // TODO: remove unwrap + .unwrap() + } } async fn handle_output(adapter: &A, fountain: &F, node_id: NodeId, output: Output) diff --git a/nomos-services/consensus/src/overlay.rs b/nomos-services/consensus/src/overlay.rs deleted file mode 100644 index 18b112ff..00000000 --- a/nomos-services/consensus/src/overlay.rs +++ /dev/null @@ -1,65 +0,0 @@ -use consensus_engine::{Committee, NodeId, Overlay, View}; - -#[derive(Clone, Debug)] -/// Flat overlay with a single committee and round robin leader selection. -pub struct FlatRoundRobin { - nodes: Vec, -} - -impl Overlay for FlatRoundRobin { - fn new(nodes: Vec) -> Self { - Self { nodes } - } - - fn root_committee(&self) -> consensus_engine::Committee { - self.nodes.clone().into_iter().collect() - } - - fn rebuild(&mut self, _timeout_qc: consensus_engine::TimeoutQc) { - todo!() - } - - fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool { - false - } - - fn is_member_of_root_committee(&self, _id: NodeId) -> bool { - true - } - - fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool { - true - } - - fn is_child_of_root_committee(&self, _id: NodeId) -> bool { - false - } - - fn parent_committee(&self, _id: NodeId) -> consensus_engine::Committee { - Committee::new() - } - - fn node_committee(&self, _id: NodeId) -> consensus_engine::Committee { - self.nodes.clone().into_iter().collect() - } - - fn child_committees(&self, _id: NodeId) -> Vec { - vec![] - } - - fn leaf_committees(&self, _id: NodeId) -> Vec { - vec![self.root_committee()] - } - - fn leader(&self, view: View) -> NodeId { - self.nodes[view as usize % self.nodes.len()] - } - - fn super_majority_threshold(&self, _id: NodeId) -> usize { - 0 - } - - fn leader_super_majority_threshold(&self, _id: NodeId) -> usize { - self.nodes.len() * 2 / 3 + 1 - } -}