From f8617d73315b1a49cde1a99569724e6cfba02610 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Thu, 27 Apr 2023 19:18:24 +0200 Subject: [PATCH] Consensus engine rework (#127) --------- Co-authored-by: Giacomo Pasini --------- Co-authored-by: Al Liu Co-authored-by: Daniel Sanchez --- Cargo.toml | 3 +- consensus-engine/Cargo.toml | 8 + consensus-engine/src/lib.rs | 395 ++++++++++++++++++++++++++++++++++ consensus-engine/src/types.rs | 144 +++++++++++++ 4 files changed, 549 insertions(+), 1 deletion(-) create mode 100644 consensus-engine/Cargo.toml create mode 100644 consensus-engine/src/lib.rs create mode 100644 consensus-engine/src/types.rs diff --git a/Cargo.toml b/Cargo.toml index 96b13d0b..c95c1911 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,5 +10,6 @@ members = [ "nomos-services/http", "nodes/nomos-node", "nodes/mockpool-node", - "simulations" + "simulations", + "consensus-engine" ] diff --git a/consensus-engine/Cargo.toml b/consensus-engine/Cargo.toml new file mode 100644 index 00000000..c44f2075 --- /dev/null +++ b/consensus-engine/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "consensus-engine" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs new file mode 100644 index 00000000..3067a225 --- /dev/null +++ b/consensus-engine/src/lib.rs @@ -0,0 +1,395 @@ +use std::collections::{HashMap, HashSet}; + +mod types; +use types::*; + +#[derive(Clone, Debug)] +pub struct Carnot { + id: NodeId, + current_view: View, + highest_voted_view: View, + local_high_qc: StandardQc, + safe_blocks: HashMap, + last_view_timeout_qc: Option, + overlay: O, +} + +impl Carnot { + pub fn from_genesis(id: NodeId, genesis_block: Block, overlay: O) -> Self { + Self { + current_view: 0, + local_high_qc: StandardQc::genesis(), + id, + highest_voted_view: -1, + last_view_timeout_qc: None, + overlay, + safe_blocks: [(id, genesis_block)].into(), + } + } + /// Upon reception of a block + /// + /// Preconditions: + /// * The parent-children relation between blocks must be preserved when calling + /// this function. In other words, you must call `receive_block(b.parent())` with + /// success before `receive_block(b)`. + /// * Overlay changes for views < block.view should be made available before trying to process + /// a block by calling `receive_timeout_qc`. + #[allow(clippy::result_unit_err)] + pub fn receive_block(&self, block: Block) -> Result { + assert!( + self.safe_blocks.contains_key(&block.parent()), + "out of order view not supported, missing parent block for {block:?}", + ); + + // if the block has already been processed, return early + if self.safe_blocks.contains_key(&block.id) { + return Ok(self.clone()); + } + + if self.blocks_in_view(block.view).contains(&block) + || block.view <= self.latest_committed_view() + { + // TODO: Report malicious leader + // TODO: it could be possible that a malicious leader send a block to a node and another one to + // the rest of the network. The node should be able to catch up with the rest of the network after having + // validated that the history of the block is correct and diverged from its fork. + // By rejecting any other blocks except the first one received for a view this code does NOT do that. + return Err(()); + } + + let mut new_state = self.clone(); + + if new_state.block_is_safe(block.clone()) { + new_state.safe_blocks.insert(block.id, block.clone()); + new_state.update_high_qc(block.parent_qc); + } else { + // Non safe block, not necessarily an error + return Err(()); + } + + Ok(new_state) + } + + /// Upon reception of a global timeout event + /// + /// Preconditions: + pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc) -> Self { + let mut new_state = self.clone(); + + if timeout_qc.view < new_state.current_view { + return new_state; + } + new_state.update_high_qc(timeout_qc.high_qc.clone()); + new_state.update_timeout_qc(timeout_qc.clone()); + + new_state.current_view = timeout_qc.view + 1; + new_state.overlay.rebuild(timeout_qc); + + new_state + } + + /// Upon reception of a supermajority of votes for a safe block from children + /// of the current node. It signals approval of the block to the network. + /// + /// Preconditions: + /// * `receive_block(b)` must have been called successfully before trying to approve a block b. + /// * A node should not attempt to vote for a block in a view earlier than the latest one it actively participated in. + pub fn approve_block(&self, block: Block) -> (Self, Output) { + assert!(self.safe_blocks.contains_key(&block.id)); + assert!( + self.highest_voted_view < block.view, + "can't vote for a block in the past" + ); + + let mut new_state = self.clone(); + + new_state.highest_voted_view = block.view; + + let to = if new_state.overlay.is_member_of_root_committee(new_state.id) { + [new_state.overlay.leader(block.view + 1)] + .into_iter() + .collect() + } else { + new_state.overlay.parent_committee(self.id) + }; + ( + new_state, + Output::Send { + to, + payload: Payload::Vote(Vote { block: block.id }), + }, + ) + } + + /// Upon reception of a supermajority of votes for a new view from children of the current node. + /// It signals approval of the new view to the network. + /// + /// Preconditions: + /// * `receive_timeout_qc(timeout_qc)` must have been called successfully before trying to approve a new view with that + /// timeout qc. + /// * A node should not attempt to approve a view earlier than the latest one it actively participated in. + pub fn approve_new_view( + &self, + timeout_qc: TimeoutQc, + new_views: HashSet, + ) -> (Self, Output) { + let new_view = timeout_qc.view + 1; + assert!( + new_view + >= self + .last_view_timeout_qc + .as_ref() + .map(|qc| qc.view) + .unwrap_or(0) + ); + assert_eq!( + new_views.len(), + self.overlay.super_majority_threshold(self.id) + ); + assert!(new_views.iter().all(|nv| self + .overlay + .is_member_of_child_committee(self.id, nv.sender))); + assert!(self.highest_voted_view < new_view); + assert!(new_views.iter().all(|nv| nv.view == new_view)); + assert!(new_views.iter().all(|nv| nv.timeout_qc == timeout_qc)); + + let mut new_state = self.clone(); + + let high_qc = new_views + .iter() + .map(|nv| &nv.high_qc) + .chain(std::iter::once(&timeout_qc.high_qc)) + .max_by_key(|qc| qc.view()) + .unwrap(); + new_state.update_high_qc(high_qc.clone()); + + let new_view_msg = NewView { + view: new_view, + high_qc: high_qc.clone(), + sender: new_state.id, + timeout_qc, + }; + + new_state.highest_voted_view = new_view; + let to = if new_state.overlay.is_member_of_root_committee(new_state.id) { + [new_state.overlay.leader(new_view + 1)] + .into_iter() + .collect() + } else { + new_state.overlay.parent_committee(new_state.id) + }; + ( + new_state, + Output::Send { + to, + payload: Payload::NewView(new_view_msg), + }, + ) + } + + /// Upon a configurable amout of time has elapsed since the last view change + /// + /// Preconditions: none! + /// Just notice that the timer only reset after a view change, i.e. a node can't timeout + /// more than once for the same view + pub fn local_timeout(&self) -> (Self, Option) { + let mut new_state = self.clone(); + + new_state.highest_voted_view = new_state.current_view; + if new_state.overlay.is_member_of_root_committee(new_state.id) + || new_state.overlay.is_child_of_root_committee(new_state.id) + { + let timeout_msg = Timeout { + view: new_state.current_view, + high_qc: Qc::Standard(new_state.local_high_qc.clone()), + sender: new_state.id, + timeout_qc: new_state.last_view_timeout_qc.clone(), + }; + let to = new_state.overlay.root_committee(); + return ( + new_state, + Some(Output::Send { + to, + payload: Payload::Timeout(timeout_msg), + }), + ); + } + (new_state, None) + } + + fn block_is_safe(&self, block: Block) -> bool { + block.view >= self.current_view && block.view == block.parent_qc.view() + 1 + } + + fn update_high_qc(&mut self, qc: Qc) { + let qc_view = qc.view(); + match qc { + Qc::Standard(new_qc) if new_qc.view > self.local_high_qc.view => { + self.local_high_qc = new_qc; + } + Qc::Aggregated(new_qc) if new_qc.high_qc.view != self.local_high_qc.view => { + self.local_high_qc = new_qc.high_qc; + } + _ => {} + } + if qc_view == self.current_view { + self.current_view += 1; + } + } + + fn update_timeout_qc(&mut self, timeout_qc: TimeoutQc) { + match (&self.last_view_timeout_qc, timeout_qc) { + (None, timeout_qc) => { + self.last_view_timeout_qc = Some(timeout_qc); + } + (Some(current_qc), timeout_qc) if timeout_qc.view > current_qc.view => { + self.last_view_timeout_qc = Some(timeout_qc); + } + _ => {} + } + } + + pub fn blocks_in_view(&self, view: View) -> Vec { + self.safe_blocks + .iter() + .filter(|(_, b)| b.view == view) + .map(|(_, block)| block.clone()) + .collect() + } + + pub fn genesis_block(&self) -> Block { + self.blocks_in_view(0)[0].clone() + } + + // Returns the id of the grandparent block if it can be committed or None otherwise + fn can_commit_grandparent(&self, block: Block) -> Option { + let parent = self.safe_blocks.get(&block.parent())?; + let grandparent = self.safe_blocks.get(&parent.parent())?; + + if parent.view == grandparent.view + 1 + && matches!(parent.parent_qc, Qc::Standard { .. }) + && matches!(grandparent.parent_qc, Qc::Standard { .. }) + { + return Some(grandparent.clone()); + } + None + } + + pub fn latest_committed_block(&self) -> Block { + for view in (0..self.current_view).rev() { + for block in self.blocks_in_view(view) { + if let Some(block) = self.can_commit_grandparent(block) { + return block; + } + } + } + self.genesis_block() + } + + pub fn latest_committed_view(&self) -> View { + self.latest_committed_block().view + } + + pub fn committed_blocks(&self) -> Vec { + let mut res = vec![]; + let mut current = self.latest_committed_block(); + while current != self.genesis_block() { + res.push(current.id); + current = self.safe_blocks.get(¤t.parent()).unwrap().clone(); + } + // If the length is 1, it means that the genesis block is the only committed block + // and was added to the list already at the beginning of the function. + // Otherwise, we need to add the genesis block to the list. + if res.len() > 1 { + res.push(self.genesis_block().id); + } + res + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[derive(Clone)] + struct NoOverlay; + + impl Overlay for NoOverlay { + fn root_committee(&self) -> Committee { + todo!() + } + + fn rebuild(&mut self, _timeout_qc: TimeoutQc) { + todo!() + } + + fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool { + todo!() + } + + fn is_member_of_root_committee(&self, _id: NodeId) -> bool { + todo!() + } + + fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool { + todo!() + } + + fn is_child_of_root_committee(&self, _id: NodeId) -> bool { + todo!() + } + + fn parent_committee(&self, _id: NodeId) -> Committee { + todo!() + } + + fn leaf_committees(&self, _id: NodeId) -> HashSet { + todo!() + } + + fn leader(&self, _view: View) -> NodeId { + todo!() + } + + fn super_majority_threshold(&self, _id: NodeId) -> usize { + todo!() + } + + fn leader_super_majority_threshold(&self, _view: View) -> usize { + todo!() + } + } + + #[test] + fn block_is_committed() { + let genesis = Block { + view: 0, + id: [0; 32], + parent_qc: Qc::Standard(StandardQc { + view: 0, + id: [0; 32], + }), + }; + let mut engine = Carnot::from_genesis([0; 32], genesis.clone(), NoOverlay); + let p1 = Block { + view: 1, + id: [1; 32], + parent_qc: Qc::Standard(StandardQc { + view: 0, + id: [0; 32], + }), + }; + let p2 = Block { + view: 2, + id: [2; 32], + parent_qc: Qc::Standard(StandardQc { + view: 1, + id: [1; 32], + }), + }; + assert_eq!(engine.latest_committed_block(), genesis); + engine = engine.receive_block(p1).unwrap(); + engine = engine.receive_block(p2).unwrap(); + assert_eq!(engine.latest_committed_block(), genesis); + } +} diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs new file mode 100644 index 00000000..efe87970 --- /dev/null +++ b/consensus-engine/src/types.rs @@ -0,0 +1,144 @@ +use std::collections::HashSet; +use std::hash::Hash; + +pub type View = i64; +pub type NodeId = [u8; 32]; +pub type BlockId = [u8; 32]; +pub type Committee = HashSet; + +/// The way the consensus engine communicates with the rest of the system is by returning +/// actions to be performed. +/// Often, the actions are to send a message to a set of nodes. +/// This enum represents the different types of messages that can be sent from the perspective of consensus and +/// can't be directly used in the network as they lack things like cryptographic signatures. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum Payload { + /// Vote for a block in a view + Vote(Vote), + /// Signal that a local timeout has occurred + Timeout(Timeout), + /// Vote for moving to a new view + NewView(NewView), +} + +/// Returned +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Vote { + pub block: BlockId, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Timeout { + pub view: View, + pub sender: NodeId, + pub high_qc: Qc, + pub timeout_qc: Option, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct NewView { + pub view: View, + pub sender: NodeId, + pub timeout_qc: TimeoutQc, + pub high_qc: Qc, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct TimeoutQc { + pub view: View, + pub high_qc: Qc, + pub sender: NodeId, +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct Block { + pub id: BlockId, + pub view: View, + pub parent_qc: Qc, +} + +impl Block { + pub fn parent(&self) -> BlockId { + self.parent_qc.block() + } +} + +/// Possible output events. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum Output { + Send { + to: HashSet, + payload: Payload, + }, + Broadcast { + payload: Payload, + }, +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct StandardQc { + pub view: View, + pub id: BlockId, +} + +impl StandardQc { + pub(crate) fn genesis() -> Self { + Self { + view: -1, + id: [0; 32], + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct AggregateQc { + pub high_qc: StandardQc, + pub view: View, +} + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub enum Qc { + Standard(StandardQc), + Aggregated(AggregateQc), +} + +impl Qc { + /// The view in which this Qc was built. + pub fn view(&self) -> View { + match self { + Qc::Standard(StandardQc { view, .. }) => *view, + Qc::Aggregated(AggregateQc { view, .. }) => *view, + } + } + + /// The view of the block this qc is for. + pub fn parent_view(&self) -> View { + match self { + Qc::Standard(StandardQc { view, .. }) => *view, + Qc::Aggregated(AggregateQc { view, .. }) => *view, + } + } + + /// The id of the block this qc is for. + /// This will be the parent of the block which will include this qc + pub fn block(&self) -> BlockId { + match self { + Qc::Standard(StandardQc { id, .. }) => *id, + Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.id, + } + } +} + +pub trait Overlay: Clone { + fn root_committee(&self) -> Committee; + fn rebuild(&mut self, timeout_qc: TimeoutQc); + fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool; + fn is_member_of_root_committee(&self, id: NodeId) -> bool; + fn is_member_of_leaf_committee(&self, id: NodeId) -> bool; + fn is_child_of_root_committee(&self, id: NodeId) -> bool; + fn parent_committee(&self, id: NodeId) -> Committee; + fn leaf_committees(&self, id: NodeId) -> HashSet; + fn leader(&self, view: View) -> NodeId; + fn super_majority_threshold(&self, id: NodeId) -> usize; + fn leader_super_majority_threshold(&self, view: View) -> usize; +}