diff --git a/consensus/carnot-engine/src/lib.rs b/consensus/carnot-engine/src/lib.rs index 328ce8c4..63f8cf0f 100644 --- a/consensus/carnot-engine/src/lib.rs +++ b/consensus/carnot-engine/src/lib.rs @@ -1,4 +1,7 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + hash::Hash, +}; pub mod overlay; mod types; @@ -12,23 +15,27 @@ pub mod openapi { } #[derive(Clone, Debug, PartialEq)] -pub struct Carnot { +pub struct Carnot { id: NodeId, current_view: View, highest_voted_view: View, - local_high_qc: StandardQc, - safe_blocks: HashMap, - tip: BlockId, - last_view_timeout_qc: Option, - latest_committed_block: Option, + local_high_qc: StandardQc, + safe_blocks: HashMap>, + tip: Id, + last_view_timeout_qc: Option>, + latest_committed_block: Option, overlay: O, } -impl Carnot { - pub fn from_genesis(id: NodeId, genesis_block: Block, overlay: O) -> Self { +impl Carnot +where + O: Overlay, + Id: Copy + Eq + Hash + core::fmt::Debug, +{ + pub fn from_genesis(id: NodeId, genesis_block: Block, overlay: O) -> Self { Self { current_view: View(0), - local_high_qc: StandardQc::genesis(), + local_high_qc: StandardQc::genesis(genesis_block.id), id, highest_voted_view: View(-1), last_view_timeout_qc: None, @@ -47,12 +54,12 @@ impl Carnot { self.highest_voted_view } - pub fn safe_blocks(&self) -> &HashMap { + pub fn safe_blocks(&self) -> &HashMap> { &self.safe_blocks } /// Return the most recent safe block - pub fn tip(&self) -> Block { + pub fn tip(&self) -> Block { self.safe_blocks[&self.tip].clone() } @@ -65,7 +72,7 @@ impl Carnot { /// * Overlay changes for views < block.view should be made available before trying to process /// a block by calling `receive_timeout_qc`. #[allow(clippy::result_unit_err)] - pub fn receive_block(&self, block: Block) -> Result { + pub fn receive_block(&self, block: Block) -> Result { assert!( self.safe_blocks.contains_key(&block.parent()), "out of order view not supported, missing parent block for {block:?}", @@ -114,7 +121,7 @@ impl Carnot { /// Upon reception of a global timeout event /// /// Preconditions: - pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc) -> Self { + pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc) -> Self { let mut new_state = self.clone(); if timeout_qc.view() < new_state.current_view { @@ -134,7 +141,7 @@ impl Carnot { /// Preconditions: /// * `receive_block(b)` must have been called successfully before trying to approve a block b. /// * A node should not attempt to vote for a block in a view earlier than the latest one it actively participated in. - pub fn approve_block(&self, block: Block) -> (Self, Send) { + pub fn approve_block(&self, block: Block) -> (Self, Send) { assert!( self.safe_blocks.contains_key(&block.id), "{:?} not in {:?}", @@ -179,9 +186,9 @@ impl Carnot { /// * A node should not attempt to approve a view earlier than the latest one it actively participated in. pub fn approve_new_view( &self, - timeout_qc: TimeoutQc, - new_views: HashSet, - ) -> (Self, Send) { + timeout_qc: TimeoutQc, + new_views: HashSet>, + ) -> (Self, Send) { let new_view = timeout_qc.view().next(); assert!( new_view @@ -243,7 +250,7 @@ impl Carnot { /// Preconditions: none! /// Just notice that the timer only reset after a view change, i.e. a node can't timeout /// more than once for the same view - pub fn local_timeout(&self) -> (Self, Option) { + pub fn local_timeout(&self) -> (Self, Option>) { let mut new_state = self.clone(); new_state.highest_voted_view = new_state.current_view; @@ -268,11 +275,11 @@ impl Carnot { (new_state, None) } - fn block_is_safe(&self, block: Block) -> bool { + fn block_is_safe(&self, block: Block) -> bool { block.view >= self.current_view && block.view == block.parent_qc.view().next() } - fn update_high_qc(&mut self, qc: Qc) { + fn update_high_qc(&mut self, qc: Qc) { let qc_view = qc.view(); match qc { Qc::Standard(new_qc) if new_qc.view > self.local_high_qc.view => { @@ -288,7 +295,7 @@ impl Carnot { } } - fn update_timeout_qc(&mut self, timeout_qc: TimeoutQc) { + fn update_timeout_qc(&mut self, timeout_qc: TimeoutQc) { match (&self.last_view_timeout_qc, timeout_qc) { (None, timeout_qc) => { self.last_view_timeout_qc = Some(timeout_qc); @@ -300,13 +307,13 @@ impl Carnot { } } - fn update_latest_committed_block(&mut self, block: &Block) { + fn update_latest_committed_block(&mut self, block: &Block) { if let Some(block) = self.can_commit_grandparent(block) { self.latest_committed_block = Some(block.id); } } - pub fn blocks_in_view(&self, view: View) -> Vec { + pub fn blocks_in_view(&self, view: View) -> Vec> { self.safe_blocks .iter() .filter(|(_, b)| b.view == view) @@ -314,12 +321,12 @@ impl Carnot { .collect() } - pub fn genesis_block(&self) -> Block { + pub fn genesis_block(&self) -> Block { self.blocks_in_view(View(0))[0].clone() } // Returns the id of the grandparent block if it can be committed or None otherwise - fn can_commit_grandparent(&self, block: &Block) -> Option { + fn can_commit_grandparent(&self, block: &Block) -> Option> { let parent = self.safe_blocks.get(&block.parent())?; let grandparent = self.safe_blocks.get(&parent.parent())?; @@ -332,7 +339,7 @@ impl Carnot { None } - pub fn latest_committed_block(&self) -> Block { + pub fn latest_committed_block(&self) -> Block { self.latest_committed_block .and_then(|id| self.safe_blocks.get(&id).cloned()) .unwrap_or_else(|| self.genesis_block()) @@ -342,7 +349,7 @@ impl Carnot { self.latest_committed_block().view } - pub fn latest_committed_blocks(&self, limit: Option) -> Vec { + pub fn latest_committed_blocks(&self, limit: Option) -> Vec { let limit = limit.unwrap_or(self.safe_blocks.len()); let mut res = vec![]; let mut current = self.latest_committed_block(); @@ -363,11 +370,11 @@ impl Carnot { res } - pub fn last_view_timeout_qc(&self) -> Option { + pub fn last_view_timeout_qc(&self) -> Option> { self.last_view_timeout_qc.clone() } - pub fn high_qc(&self) -> StandardQc { + pub fn high_qc(&self) -> StandardQc { self.local_high_qc.clone() } @@ -444,15 +451,15 @@ mod test { use super::*; - fn init(nodes: Vec) -> Carnot> { + fn init(nodes: Vec) -> Carnot, usize> { assert!(!nodes.is_empty()); Carnot::from_genesis( *nodes.first().unwrap(), Block { view: View(0), - id: BlockId::zeros(), - parent_qc: Qc::Standard(StandardQc::genesis()), + id: 0, + parent_qc: Qc::Standard(StandardQc::genesis(0)), leader_proof: LeaderProof::LeaderId { leader_id: *nodes.first().unwrap(), }, @@ -466,11 +473,10 @@ mod test { } fn next_block( - engine: &Carnot>, - block: &Block, - ) -> Block { - let mut next_id = block.id; - next_id.0[0] += 1; + engine: &Carnot, usize>, + block: &Block, + ) -> Block { + let next_id = block.id + 1; Block { view: block.view.next(), @@ -486,8 +492,8 @@ mod test { } fn update_leader_selection( - engine: &Carnot>, - ) -> Carnot> { + engine: &Carnot, usize>, + ) -> Carnot, usize> { engine .update_overlay(|overlay| { overlay.update_leader_selection( @@ -545,11 +551,10 @@ mod test { // Ensure that receive_block() fails if the parent block has never been received. fn receive_block_with_unknown_parent() { let engine = init(vec![NodeId::new([0; 32])]); - let mut parent_block_id = engine.genesis_block().id; - parent_block_id.0[0] += 1; // generate an unknown parent block ID + let parent_block_id = 42; let block = Block { view: engine.current_view().next(), - id: BlockId::new([1; 32]), + id: 1, parent_qc: Qc::Standard(StandardQc { view: engine.current_view(), id: parent_block_id, @@ -649,7 +654,7 @@ mod test { // a future block should be rejected let future_block = Block { - id: BlockId::new([10; 32]), + id: 10, view: View(11), // a future view parent_qc: Qc::Aggregated(AggregateQc { view: View(10), @@ -667,7 +672,7 @@ mod test { // a past block should be also rejected let mut past_block = block1; // with the same view as block1 - past_block.id = BlockId::new([10; 32]); + past_block.id = 10; assert!(engine.receive_block(past_block).is_err()); } @@ -744,7 +749,7 @@ mod test { sender: NodeId::new([0; 32]), high_qc: StandardQc { view: View(0), // genesis - id: BlockId::zeros(), + id: 0, }, timeout_qc: None }), @@ -767,7 +772,7 @@ mod test { View(1), StandardQc { view: View::new(0), // genesis - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); @@ -792,7 +797,7 @@ mod test { View(1), StandardQc { view: View(0), // genesis - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); @@ -819,7 +824,7 @@ mod test { View(1), StandardQc { view: View(0), // genesis - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); @@ -861,7 +866,7 @@ mod test { View(1), StandardQc { view: View(0), // genesis - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); @@ -874,7 +879,7 @@ mod test { View(2), StandardQc { view: View(0), // genesis - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); diff --git a/consensus/carnot-engine/src/types.rs b/consensus/carnot-engine/src/types.rs index 698c2c95..5bd93fcc 100644 --- a/consensus/carnot-engine/src/types.rs +++ b/consensus/carnot-engine/src/types.rs @@ -8,8 +8,6 @@ mod committee; pub use committee::{Committee, CommitteeId}; mod node_id; pub use node_id::NodeId; -mod block_id; -pub use block_id::BlockId; mod view; pub use view::View; @@ -21,32 +19,32 @@ pub use view::View; #[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub enum Payload { +pub enum Payload { /// Vote for a block in a view - Vote(Vote), + Vote(Vote), /// Signal that a local timeout has occurred - Timeout(Timeout), + Timeout(Timeout), /// Vote for moving to a new view - NewView(NewView), + NewView(NewView), } /// Returned #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct Vote { +pub struct Vote { pub view: View, - pub block: BlockId, + pub block: Id, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct Timeout { +pub struct Timeout { pub view: View, pub sender: NodeId, - pub high_qc: StandardQc, - pub timeout_qc: Option, + pub high_qc: StandardQc, + pub timeout_qc: Option>, } // TODO: We are making "mandatory" to have received the timeout_qc before the new_view votes. @@ -54,24 +52,24 @@ pub struct Timeout { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct NewView { +pub struct NewView { pub view: View, pub sender: NodeId, - pub timeout_qc: TimeoutQc, - pub high_qc: StandardQc, + pub timeout_qc: TimeoutQc, + pub high_qc: StandardQc, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct TimeoutQc { +pub struct TimeoutQc { view: View, - high_qc: StandardQc, + high_qc: StandardQc, sender: NodeId, } -impl TimeoutQc { - pub fn new(view: View, high_qc: StandardQc, sender: NodeId) -> Self { +impl TimeoutQc { + pub fn new(view: View, high_qc: StandardQc, sender: NodeId) -> Self { assert!( view >= high_qc.view, "timeout_qc.view:{} shouldn't be lower than timeout_qc.high_qc.view:{}", @@ -90,7 +88,7 @@ impl TimeoutQc { self.view } - pub fn high_qc(&self) -> &StandardQc { + pub fn high_qc(&self) -> &StandardQc { &self.high_qc } @@ -102,10 +100,10 @@ impl TimeoutQc { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct Block { - pub id: BlockId, +pub struct Block { + pub id: Id, pub view: View, - pub parent_qc: Qc, + pub parent_qc: Qc, pub leader_proof: LeaderProof, } @@ -116,16 +114,16 @@ pub enum LeaderProof { LeaderId { leader_id: NodeId }, } -impl Block { - pub fn parent(&self) -> BlockId { +impl Block { + pub fn parent(&self) -> Id { self.parent_qc.block() } - pub fn genesis() -> Self { + pub fn genesis(id: Id) -> Self { Self { view: View(0), - id: BlockId::zeros(), - parent_qc: Qc::Standard(StandardQc::genesis()), + id, + parent_qc: Qc::Standard(StandardQc::genesis(id)), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), }, @@ -135,45 +133,42 @@ impl Block { /// Possible output events. #[derive(Debug, Clone, Eq, PartialEq)] -pub struct Send { +pub struct Send { pub to: Committee, - pub payload: Payload, + pub payload: Payload, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct StandardQc { +pub struct StandardQc { pub view: View, - pub id: BlockId, + pub id: Id, } -impl StandardQc { - pub fn genesis() -> Self { - Self { - view: View(-1), - id: BlockId::zeros(), - } +impl StandardQc { + pub fn genesis(id: Id) -> Self { + Self { view: View(-1), id } } } #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub struct AggregateQc { - pub high_qc: StandardQc, +pub struct AggregateQc { + pub high_qc: StandardQc, pub view: View, } #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] -pub enum Qc { - Standard(StandardQc), - Aggregated(AggregateQc), +pub enum Qc { + Standard(StandardQc), + Aggregated(AggregateQc), } -impl Qc { +impl Qc { /// The view in which this Qc was built. pub fn view(&self) -> View { match self { @@ -184,14 +179,14 @@ impl Qc { /// The id of the block this qc is for. /// This will be the parent of the block which will include this qc - pub fn block(&self) -> BlockId { + pub fn block(&self) -> Id { match self { Qc::Standard(StandardQc { id, .. }) => *id, Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.id, } } - pub fn high_qc(&self) -> StandardQc { + pub fn high_qc(&self) -> StandardQc { match self { Qc::Standard(qc) => qc.clone(), Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.clone(), @@ -207,11 +202,11 @@ mod test { fn standard_qc() { let standard_qc = StandardQc { view: View(10), - id: BlockId::zeros(), + id: 0, }; let qc = Qc::Standard(standard_qc.clone()); assert_eq!(qc.view(), View(10)); - assert_eq!(qc.block(), BlockId::new([0; 32])); + assert_eq!(qc.block(), 0); assert_eq!(qc.high_qc(), standard_qc); } @@ -221,12 +216,12 @@ mod test { view: View(20), high_qc: StandardQc { view: View(10), - id: BlockId::zeros(), + id: 0, }, }; let qc = Qc::Aggregated(aggregated_qc.clone()); assert_eq!(qc.view(), View(20)); - assert_eq!(qc.block(), BlockId::new([0; 32])); + assert_eq!(qc.block(), 0); assert_eq!(qc.high_qc(), aggregated_qc.high_qc); } @@ -236,26 +231,26 @@ mod test { View(2), StandardQc { view: View(1), - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); assert_eq!(timeout_qc.view(), View(2)); assert_eq!(timeout_qc.high_qc().view, View(1)); - assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32])); + assert_eq!(timeout_qc.high_qc().id, 0); assert_eq!(timeout_qc.sender(), NodeId::new([0; 32])); let timeout_qc = TimeoutQc::new( View(2), StandardQc { view: View(2), - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); assert_eq!(timeout_qc.view(), View(2)); assert_eq!(timeout_qc.high_qc().view, View(2)); - assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32])); + assert_eq!(timeout_qc.high_qc().id, 0); assert_eq!(timeout_qc.sender(), NodeId::new([0; 32])); } @@ -268,7 +263,7 @@ mod test { View(1), StandardQc { view: View(2), - id: BlockId::zeros(), + id: 0, }, NodeId::new([0; 32]), ); diff --git a/consensus/carnot-engine/src/types/block_id.rs b/consensus/carnot-engine/src/types/block_id.rs index e1de990c..37d0cc42 100644 --- a/consensus/carnot-engine/src/types/block_id.rs +++ b/consensus/carnot-engine/src/types/block_id.rs @@ -3,23 +3,6 @@ #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct BlockId(pub(crate) [u8; 32]); -#[cfg(feature = "serde")] -impl serde::Serialize for BlockId { - fn serialize(&self, serializer: S) -> Result { - nomos_utils::serde::serialize_bytes_array(self.0, serializer) - } -} - -#[cfg(feature = "serde")] -impl<'de> serde::de::Deserialize<'de> for BlockId { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - nomos_utils::serde::deserialize_bytes_array(deserializer).map(Self) - } -} - impl BlockId { pub const fn new(val: [u8; 32]) -> Self { Self(val) diff --git a/consensus/carnot-engine/tests/fuzz/mod.rs b/consensus/carnot-engine/tests/fuzz/mod.rs index e8d18602..6a464dae 100644 --- a/consensus/carnot-engine/tests/fuzz/mod.rs +++ b/consensus/carnot-engine/tests/fuzz/mod.rs @@ -1,3 +1,10 @@ mod ref_state; pub mod sut; mod transition; + +type Block = carnot_engine::Block<[u8; 32]>; +type AggregateQc = carnot_engine::AggregateQc<[u8; 32]>; +type Qc = carnot_engine::Qc<[u8; 32]>; +type StandardQc = carnot_engine::StandardQc<[u8; 32]>; +type TimeoutQc = carnot_engine::TimeoutQc<[u8; 32]>; +type NewView = carnot_engine::NewView<[u8; 32]>; diff --git a/consensus/carnot-engine/tests/fuzz/ref_state.rs b/consensus/carnot-engine/tests/fuzz/ref_state.rs index 71b148f7..e5df7ea0 100644 --- a/consensus/carnot-engine/tests/fuzz/ref_state.rs +++ b/consensus/carnot-engine/tests/fuzz/ref_state.rs @@ -1,13 +1,12 @@ use std::collections::{BTreeMap, HashSet}; -use carnot_engine::{ - AggregateQc, Block, BlockId, LeaderProof, NodeId, Qc, StandardQc, TimeoutQc, View, -}; +use carnot_engine::{LeaderProof, NodeId, View}; use proptest::prelude::*; use proptest::strategy::BoxedStrategy; use proptest_state_machine::ReferenceStateMachine; use crate::fuzz::transition::Transition; +use crate::fuzz::{AggregateQc, Block, Qc, StandardQc, TimeoutQc}; // A reference state machine (RefState) is used to generated state transitions. // To generate some kinds of transition, we may need to keep historical blocks in RefState. @@ -42,8 +41,8 @@ impl ReferenceStateMachine for RefState { fn init_state() -> BoxedStrategy { let genesis_block = Block { view: View::new(0), - id: BlockId::zeros(), - parent_qc: Qc::Standard(StandardQc::genesis()), + id: [0; 32], + parent_qc: Qc::Standard(StandardQc::genesis([0; 32])), leader_proof: LEADER_PROOF.clone(), }; @@ -330,10 +329,11 @@ impl RefState { fn transition_receive_safe_block_with_aggregated_qc(&self) -> BoxedStrategy { //TODO: more randomness let current_view = self.current_view(); - + let mut id = [0; 32]; + rand::thread_rng().fill_bytes(&mut id); Just(Transition::ReceiveSafeBlock(Block { view: current_view.next(), - id: BlockId::random(&mut rand::thread_rng()), + id, parent_qc: Qc::Aggregated(AggregateQc { high_qc: self.high_qc(), view: current_view, @@ -360,9 +360,13 @@ impl RefState { pub fn high_qc(&self) -> StandardQc { self.chain .values() - .map(|entry| entry.high_qc().unwrap_or_else(StandardQc::genesis)) + .map(|entry| { + entry + .high_qc() + .unwrap_or_else(|| StandardQc::genesis([0; 32])) + }) .max_by_key(|qc| qc.view) - .unwrap_or_else(StandardQc::genesis) + .unwrap_or_else(|| StandardQc::genesis([0; 32])) } pub fn latest_timeout_qcs(&self) -> Vec { @@ -386,17 +390,19 @@ impl RefState { self.contains_block(block.parent_qc.block()) } - fn contains_block(&self, block_id: BlockId) -> bool { + fn contains_block(&self, block_id: [u8; 32]) -> bool { self.chain .iter() .any(|(_, entry)| entry.blocks.iter().any(|block| block.id == block_id)) } fn consecutive_block(parent: &Block) -> Block { + let mut id = [0; 32]; + rand::thread_rng().fill_bytes(&mut id); Block { // use rand because we don't want this to be shrinked by proptest view: parent.view.next(), - id: BlockId::random(&mut rand::thread_rng()), + id, parent_qc: Qc::Standard(StandardQc { view: parent.view, id: parent.id, diff --git a/consensus/carnot-engine/tests/fuzz/sut.rs b/consensus/carnot-engine/tests/fuzz/sut.rs index 0cb2a435..1778167e 100644 --- a/consensus/carnot-engine/tests/fuzz/sut.rs +++ b/consensus/carnot-engine/tests/fuzz/sut.rs @@ -8,13 +8,13 @@ use carnot_engine::{ use proptest_state_machine::{ReferenceStateMachine, StateMachineTest}; use crate::fuzz::ref_state::RefState; -use crate::fuzz::transition::Transition; +use crate::fuzz::{transition::Transition, Block}; // ConsensusEngineTest defines a state that we want to test. // This is called as SUT (System Under Test). #[derive(Clone, Debug)] pub struct ConsensusEngineTest { - pub engine: Carnot>, + pub engine: Carnot, [u8; 32]>, } impl ConsensusEngineTest { @@ -23,8 +23,8 @@ impl ConsensusEngineTest { NodeId::new([0; 32]), Block { view: View::new(0), - id: BlockId::zeros(), - parent_qc: Qc::Standard(StandardQc::genesis()), + id: [0; 32], + parent_qc: Qc::Standard(StandardQc::genesis([0; 32])), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), }, diff --git a/consensus/carnot-engine/tests/fuzz/transition.rs b/consensus/carnot-engine/tests/fuzz/transition.rs index 4ea721dd..6b0386f7 100644 --- a/consensus/carnot-engine/tests/fuzz/transition.rs +++ b/consensus/carnot-engine/tests/fuzz/transition.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use carnot_engine::{Block, NewView, TimeoutQc}; +use crate::fuzz::{Block, NewView, TimeoutQc}; // State transtitions that will be picked randomly #[derive(Clone, Debug)] diff --git a/consensus/cryptarchia-engine/Cargo.toml b/consensus/cryptarchia-engine/Cargo.toml index 03169a4c..66a8e59d 100644 --- a/consensus/cryptarchia-engine/Cargo.toml +++ b/consensus/cryptarchia-engine/Cargo.toml @@ -7,3 +7,9 @@ edition = "2021" [dependencies] thiserror = "1" +serde = { version = "1.0", features = ["derive"], optional = true } +nomos-utils = { path = "../../nomos-utils", optional = true } + +[features] +default = [] +serde = ["dep:serde", "nomos-utils/serde"] \ No newline at end of file diff --git a/consensus/cryptarchia-engine/src/time.rs b/consensus/cryptarchia-engine/src/time.rs index 35bd1c5c..96d8bb36 100644 --- a/consensus/cryptarchia-engine/src/time.rs +++ b/consensus/cryptarchia-engine/src/time.rs @@ -1,5 +1,6 @@ use std::ops::Add; +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)] pub struct Slot(u64); diff --git a/ledger/cryptarchia-ledger/Cargo.toml b/ledger/cryptarchia-ledger/Cargo.toml index eb0fb1a5..73441093 100644 --- a/ledger/cryptarchia-ledger/Cargo.toml +++ b/ledger/cryptarchia-ledger/Cargo.toml @@ -9,5 +9,10 @@ edition = "2021" blake2 = "0.10" rpds = "1" thiserror = "1" +serde = { version = "1.0", features = ["derive"], optional = true } # TODO: we only need types definition from this crate -cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" } \ No newline at end of file +cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" } +nomos-utils = { path = "../../nomos-utils", optional = true } + +[features] +serde = ["dep:serde", "nomos-utils/serde"] \ No newline at end of file diff --git a/ledger/cryptarchia-ledger/src/leader_proof.rs b/ledger/cryptarchia-ledger/src/leader_proof.rs index 0ea7636e..c699929f 100644 --- a/ledger/cryptarchia-ledger/src/leader_proof.rs +++ b/ledger/cryptarchia-ledger/src/leader_proof.rs @@ -1,5 +1,6 @@ use cryptarchia_engine::Slot; +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)] pub struct LeaderProof { commitment: Commitment, @@ -91,3 +92,9 @@ impl AsRef<[u8]> for Commitment { &self.0 } } + +// ----------- serialization +use crate::utils::serialize_bytes_newtype; + +serialize_bytes_newtype!(Commitment); +serialize_bytes_newtype!(Nullifier); diff --git a/ledger/cryptarchia-ledger/src/lib.rs b/ledger/cryptarchia-ledger/src/lib.rs index cc44501f..971a27c1 100644 --- a/ledger/cryptarchia-ledger/src/lib.rs +++ b/ledger/cryptarchia-ledger/src/lib.rs @@ -1,21 +1,22 @@ -mod block; mod config; mod crypto; mod leader_proof; +mod nonce; +mod utils; -use crate::{crypto::Blake2b, Commitment, LeaderProof, Nullifier}; use blake2::Digest; use cryptarchia_engine::{Epoch, Slot}; +use crypto::Blake2b; use rpds::HashTrieSet; -use std::collections::HashMap; +use std::{collections::HashMap, hash::Hash}; use thiserror::Error; -pub use block::*; pub use config::Config; pub use leader_proof::*; +pub use nonce::*; #[derive(Clone, Debug, Error)] -pub enum LedgerError { +pub enum LedgerError { #[error("Commitment not found in the ledger state")] CommitmentNotFound, #[error("Nullifier already exists in the ledger state")] @@ -25,9 +26,9 @@ pub enum LedgerError { #[error("Invalid block slot {block:?} for parent slot {parent:?}")] InvalidSlot { parent: Slot, block: Slot }, #[error("Parent block not found: {0:?}")] - ParentNotFound(HeaderId), + ParentNotFound(Id), #[error("Orphan block missing: {0:?}. Importing leader proofs requires the block to be validated first")] - OrphanMissing(HeaderId), + OrphanMissing(Id), } #[derive(Clone, Debug, Eq, PartialEq)] @@ -69,13 +70,16 @@ impl EpochState { } #[derive(Clone, Debug, PartialEq)] -pub struct Ledger { - states: HashMap, +pub struct Ledger { + states: HashMap, config: Config, } -impl Ledger { - pub fn from_genesis(id: HeaderId, state: LedgerState, config: Config) -> Self { +impl Ledger +where + Id: Eq + Hash + Copy, +{ + pub fn from_genesis(id: Id, state: LedgerState, config: Config) -> Self { Self { states: [(id, state)].into_iter().collect(), config, @@ -83,8 +87,15 @@ impl Ledger { } #[must_use = "this returns the result of the operation, without modifying the original"] - pub fn try_apply_header(&self, header: &Header) -> Result { - let parent_id = header.parent(); + pub fn try_update( + &self, + id: Id, + parent_id: Id, + slot: Slot, + proof: &LeaderProof, + // (update corresponding to the leader proof, leader proof) + orphan_proofs: impl IntoIterator, + ) -> Result> { let parent_state = self .states .get(&parent_id) @@ -96,25 +107,27 @@ impl Ledger { // * not in conflict with the current ledger state // This first condition is checked here, the second one is checked in the state update // (in particular, we do not check the imported leader proof is for an earlier slot) - for orphan in header.orphaned_proofs() { - if !self.states.contains_key(&orphan.id()) { - return Err(LedgerError::OrphanMissing(orphan.id())); + let (orphan_ids, orphan_proofs): (Vec<_>, Vec<_>) = orphan_proofs.into_iter().unzip(); + for orphan_id in orphan_ids { + if !self.states.contains_key(&orphan_id) { + return Err(LedgerError::OrphanMissing(orphan_id)); } } - let new_state = parent_state - .clone() - .try_apply_header(header, &self.config)?; + let new_state = + parent_state + .clone() + .try_update(slot, proof, &orphan_proofs, &self.config)?; let mut states = self.states.clone(); - states.insert(header.id(), new_state); + states.insert(id, new_state); Ok(Self { states, config }) } - pub fn state(&self, header_id: &HeaderId) -> Option<&LedgerState> { - self.states.get(header_id) + pub fn state(&self, id: &Id) -> Option<&LedgerState> { + self.states.get(id) } } @@ -134,13 +147,19 @@ pub struct LedgerState { } impl LedgerState { - fn try_apply_header(self, header: &Header, config: &Config) -> Result { + fn try_update( + self, + slot: Slot, + proof: &LeaderProof, + orphan_proofs: &[LeaderProof], + config: &Config, + ) -> Result> { // TODO: import leader proofs - self.update_epoch_state(header.slot(), config)? - .try_apply_leadership(header, config) + self.update_epoch_state(slot, config)? + .try_apply_leadership(proof, orphan_proofs, config) } - fn update_epoch_state(self, slot: Slot, config: &Config) -> Result { + fn update_epoch_state(self, slot: Slot, config: &Config) -> Result> { if slot <= self.slot { return Err(LedgerError::InvalidSlot { parent: self.slot, @@ -204,7 +223,11 @@ impl LedgerState { } } - fn try_apply_proof(self, proof: &LeaderProof, config: &Config) -> Result { + fn try_apply_proof( + self, + proof: &LeaderProof, + config: &Config, + ) -> Result> { assert_eq!(config.epoch(proof.slot()), self.epoch_state.epoch); // The leadership coin either has to be in the state snapshot or be derived from // a coin that is in the state snapshot (i.e. be in the lead coins commitments) @@ -234,18 +257,17 @@ impl LedgerState { }) } - fn try_apply_leadership( + fn try_apply_leadership( mut self, - header: &Header, + proof: &LeaderProof, + orphan_proofs: &[LeaderProof], config: &Config, - ) -> Result { - for proof in header.orphaned_proofs() { - self = self.try_apply_proof(proof.leader_proof(), config)?; + ) -> Result> { + for proof in orphan_proofs { + self = self.try_apply_proof(proof, config)?; } - self = self - .try_apply_proof(header.leader_proof(), config)? - .update_nonce(header.leader_proof()); + self = self.try_apply_proof(proof, config)?.update_nonce(proof); Ok(self) } @@ -280,6 +302,27 @@ impl LedgerState { ..self } } + + pub fn from_commitments(commitments: impl IntoIterator) -> Self { + let commitments = commitments.into_iter().collect::>(); + Self { + lead_commitments: commitments.clone(), + spend_commitments: commitments, + nullifiers: Default::default(), + nonce: [0; 32].into(), + slot: 0.into(), + next_epoch_state: EpochState { + epoch: 1.into(), + nonce: [0; 32].into(), + commitments: Default::default(), + }, + epoch_state: EpochState { + epoch: 0.into(), + nonce: [0; 32].into(), + commitments: Default::default(), + }, + } + } } impl core::fmt::Debug for LedgerState { @@ -303,35 +346,51 @@ impl core::fmt::Debug for LedgerState { #[cfg(test)] pub mod tests { use super::{EpochState, Ledger, LedgerState}; - use crate::{ - crypto::Blake2b, Commitment, Config, Header, HeaderId, LeaderProof, LedgerError, Nullifier, - }; + use crate::{crypto::Blake2b, Commitment, Config, LeaderProof, LedgerError, Nullifier}; use blake2::Digest; use cryptarchia_engine::Slot; use std::hash::{DefaultHasher, Hash, Hasher}; - pub fn header(slot: impl Into, parent: HeaderId, coin: Coin) -> Header { - let slot = slot.into(); - Header::new(parent, 0, [0; 32].into(), slot, coin.to_proof(slot)) - } + type HeaderId = [u8; 32]; - pub fn header_with_orphans( - slot: impl Into, + fn update_ledger( + ledger: &mut Ledger, parent: HeaderId, + slot: impl Into, coin: Coin, - orphans: Vec
, - ) -> Header { - header(slot, parent, coin).with_orphaned_proofs(orphans) + ) -> Result> { + update_orphans(ledger, parent, slot, coin, vec![]) } - pub fn genesis_header() -> Header { - Header::new( - [0; 32].into(), - 0, - [0; 32].into(), - 0.into(), - LeaderProof::dummy(0.into()), - ) + fn make_id(parent: HeaderId, slot: impl Into, coin: Coin) -> HeaderId { + Blake2b::new() + .chain_update(parent) + .chain_update(slot.into().to_be_bytes()) + .chain_update(coin.sk.to_be_bytes()) + .chain_update(coin.nonce.to_be_bytes()) + .finalize() + .into() + } + + fn update_orphans( + ledger: &mut Ledger, + parent: HeaderId, + slot: impl Into, + coin: Coin, + orphans: Vec<(HeaderId, (u64, Coin))>, + ) -> Result> { + let slot = slot.into(); + let id = make_id(parent, slot, coin); + *ledger = ledger.try_update( + id, + parent, + slot, + &coin.to_proof(slot), + orphans + .into_iter() + .map(|(id, (slot, coin))| (id, coin.to_proof(slot.into()))), + )?; + Ok(id) } pub fn config() -> Config { @@ -414,36 +473,41 @@ pub mod tests { } } - fn ledger(commitments: &[Commitment]) -> (Ledger, Header) { + fn ledger(commitments: &[Commitment]) -> (Ledger, HeaderId) { let genesis_state = genesis_state(commitments); - let genesis_header = genesis_header(); + ( - Ledger::from_genesis(genesis_header.id(), genesis_state, config()), - genesis_header, + Ledger::from_genesis([0; 32], genesis_state, config()), + [0; 32], ) } - fn apply_and_add_coin(mut ledger: Ledger, header: Header, coin: Coin) -> Ledger { - let header_id = header.id(); - ledger = ledger.try_apply_header(&header).unwrap(); + fn apply_and_add_coin( + ledger: &mut Ledger, + parent: HeaderId, + slot: impl Into, + coin_proof: Coin, + coin_add: Coin, + ) -> HeaderId { + let id = update_ledger(ledger, parent, slot, coin_proof).unwrap(); // we still don't have transactions, so the only way to add a commitment to spendable commitments and // test epoch snapshotting is by doing this manually - let mut block_state = ledger.states[&header_id].clone(); - block_state.spend_commitments = block_state.spend_commitments.insert(coin.commitment()); - ledger.states.insert(header_id, block_state); - ledger + let mut block_state = ledger.states[&id].clone(); + block_state.spend_commitments = block_state.spend_commitments.insert(coin_add.commitment()); + ledger.states.insert(id, block_state); + id } #[test] fn test_ledger_state_prevents_coin_reuse() { let coin = Coin::new(0); let (mut ledger, genesis) = ledger(&[coin.commitment()]); - let h = header(1, genesis.id(), coin); - ledger = ledger.try_apply_header(&h).unwrap(); + + let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); // reusing the same coin should be prevented assert!(matches!( - ledger.try_apply_header(&header(2, h.id(), coin)), + update_ledger(&mut ledger, h, 2, coin), Err(LedgerError::NullifierExists), )); } @@ -451,10 +515,9 @@ pub mod tests { #[test] fn test_ledger_state_uncommited_coin() { let coin = Coin::new(0); - let (ledger, genesis) = ledger(&[]); - let h = header(1, genesis.id(), coin); + let (mut ledger, genesis) = ledger(&[]); assert!(matches!( - ledger.try_apply_header(&h), + update_ledger(&mut ledger, genesis, 1, coin), Err(LedgerError::CommitmentNotFound), )); } @@ -472,17 +535,14 @@ pub mod tests { ]); // coin_1 & coin_2 both concurrently win slot 0 - let h_1 = header(1, genesis.id(), coin_1); - let h_2 = header(1, genesis.id(), coin_2); - ledger = ledger.try_apply_header(&h_1).unwrap(); - ledger = ledger.try_apply_header(&h_2).unwrap(); + update_ledger(&mut ledger, genesis, 1, coin_1).unwrap(); + let h = update_ledger(&mut ledger, genesis, 1, coin_2).unwrap(); // then coin_3 wins slot 1 and chooses to extend from block_2 - let h_3 = header(2, h_2.id(), coin_3); - ledger = ledger.try_apply_header(&h_3).unwrap(); + let h_3 = update_ledger(&mut ledger, h, 2, coin_3).unwrap(); // coin 1 is not spent in the chain that ends with block_3 - assert!(!ledger.states[&h_3.id()].is_nullified(&coin_1.nullifier())); + assert!(!ledger.states[&h_3].is_nullified(&coin_1.nullifier())); } #[test] @@ -496,45 +556,39 @@ pub mod tests { // An epoch will be 10 slots long, with stake distribution snapshot taken at the start of the epoch // and nonce snapshot before slot 7 - let h_1 = header(1, genesis.id(), coins[0]); - ledger = ledger.try_apply_header(&h_1).unwrap(); - assert_eq!(ledger.states[&h_1.id()].epoch_state.epoch, 0.into()); + let h_1 = update_ledger(&mut ledger, genesis, 1, coins[0]).unwrap(); + assert_eq!(ledger.states[&h_1].epoch_state.epoch, 0.into()); - let h_2 = header(6, h_1.id(), coins[1]); - ledger = ledger.try_apply_header(&h_2).unwrap(); + let h_2 = update_ledger(&mut ledger, h_1, 6, coins[1]).unwrap(); - let h_3 = header(9, h_2.id(), coins[2]); - ledger = apply_and_add_coin(ledger, h_3.clone(), coin_4); + let h_3 = apply_and_add_coin(&mut ledger, h_2, 9, coins[2], coin_4); // test epoch jump - let h_4 = header(20, h_3.id(), coins[3]); - ledger = ledger.try_apply_header(&h_4).unwrap(); + let h_4 = update_ledger(&mut ledger, h_3, 20, coins[3]).unwrap(); // nonce for epoch 2 should be taken at the end of slot 16, but in our case the last block is at slot 9 assert_eq!( - ledger.states[&h_4.id()].epoch_state.nonce, - ledger.states[&h_3.id()].nonce, + ledger.states[&h_4].epoch_state.nonce, + ledger.states[&h_3].nonce, ); // stake distribution snapshot should be taken at the end of slot 9 assert_eq!( - ledger.states[&h_4.id()].epoch_state.commitments, - ledger.states[&h_3.id()].spend_commitments, + ledger.states[&h_4].epoch_state.commitments, + ledger.states[&h_3].spend_commitments, ); // nonce for epoch 1 should be taken at the end of slot 6 - let h_5 = header(10, h_3.id(), coins[3]); - ledger = apply_and_add_coin(ledger, h_5.clone(), coin_5); + let h_5 = apply_and_add_coin(&mut ledger, h_3, 10, coins[3], coin_5); assert_eq!( - ledger.states[&h_5.id()].epoch_state.nonce, - ledger.states[&h_2.id()].nonce, + ledger.states[&h_5].epoch_state.nonce, + ledger.states[&h_2].nonce, ); - let h_6 = header(20, h_5.id(), coins[3].evolve()); - ledger = ledger.try_apply_header(&h_6).unwrap(); + let h_6 = update_ledger(&mut ledger, h_5, 20, coins[3].evolve()).unwrap(); // stake distribution snapshot should be taken at the end of slot 9, check that changes in slot 10 // are ignored assert_eq!( - ledger.states[&h_6.id()].epoch_state.commitments, - ledger.states[&h_3.id()].spend_commitments, + ledger.states[&h_6].epoch_state.commitments, + ledger.states[&h_3].spend_commitments, ); } @@ -542,25 +596,23 @@ pub mod tests { fn test_evolved_coin_is_eligible_for_leadership() { let coin = Coin::new(0); let (mut ledger, genesis) = ledger(&[coin.commitment()]); - let h = header(1, genesis.id(), coin); - ledger = ledger.try_apply_header(&h).unwrap(); + + let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); // reusing the same coin should be prevented assert!(matches!( - ledger.try_apply_header(&header(2, h.id(), coin)), + update_ledger(&mut ledger, h, 2, coin), Err(LedgerError::NullifierExists), )); // the evolved coin is not elibile before block 2 as it has not appeared on the ledger yet assert!(matches!( - ledger.try_apply_header(&header(2, genesis.id(), coin.evolve())), + update_ledger(&mut ledger, genesis, 2, coin.evolve()), Err(LedgerError::CommitmentNotFound), )); // the evolved coin is eligible after coin 1 is spent - assert!(ledger - .try_apply_header(&header(2, h.id(), coin.evolve())) - .is_ok()); + assert!(update_ledger(&mut ledger, h, 2, coin.evolve()).is_ok()); } #[test] @@ -570,39 +622,34 @@ pub mod tests { let (mut ledger, genesis) = ledger(&[coin.commitment()]); // EPOCH 0 - let h_0_1 = header(1, genesis.id(), coin); // mint a new coin to be used for leader elections in upcoming epochs - ledger = apply_and_add_coin(ledger, h_0_1.clone(), coin_1); + let h_0_1 = apply_and_add_coin(&mut ledger, genesis, 1, coin, coin_1); - let h_0_2 = header(2, h_0_1.id(), coin_1); // the new coin is not yet eligible for leader elections assert!(matches!( - ledger.try_apply_header(&h_0_2), + update_ledger(&mut ledger, h_0_1, 2, coin_1), Err(LedgerError::CommitmentNotFound), )); - // but the evolved coin can - let h_0_2 = header(2, h_0_1.id(), coin.evolve()); - ledger = ledger.try_apply_header(&h_0_2).unwrap(); + // // but the evolved coin can + let h_0_2 = update_ledger(&mut ledger, h_0_1, 2, coin.evolve()).unwrap(); // EPOCH 1 for i in 10..20 { // the newly minted coin is still not eligible in the following epoch since the // stake distribution snapshot is taken at the beginning of the previous epoch assert!(matches!( - ledger.try_apply_header(&header(i, h_0_2.id(), coin_1)), + update_ledger(&mut ledger, h_0_2, i, coin_1), Err(LedgerError::CommitmentNotFound), )); } // EPOCH 2 // the coin is finally eligible 2 epochs after it was first minted - let h_2_0 = header(20, h_0_2.id(), coin_1); - ledger = ledger.try_apply_header(&h_2_0).unwrap(); + let h_2_0 = update_ledger(&mut ledger, h_0_2, 20, coin_1).unwrap(); // and now the minted coin can freely use the evolved coin for subsequent blocks - let h_2_1 = header(21, h_2_0.id(), coin_1.evolve()); - ledger.try_apply_header(&h_2_1).unwrap(); + update_ledger(&mut ledger, h_2_0, 21, coin_1.evolve()).unwrap(); } #[test] @@ -614,83 +661,83 @@ pub mod tests { let coin_new_new = coin_new.evolve(); // produce a fork where the coin has been spent twice - let fork_1 = header(1, genesis.id(), coin); - let fork_2 = header(2, fork_1.id(), coin_new); + let fork_1 = make_id(genesis, 1, coin); + let fork_2 = make_id(fork_1, 2, coin_new); // neither of the evolved coins should be usable right away in another branch assert!(matches!( - ledger.try_apply_header(&header(1, genesis.id(), coin_new)), + update_ledger(&mut ledger, genesis, 1, coin_new), Err(LedgerError::CommitmentNotFound) )); assert!(matches!( - ledger.try_apply_header(&header(1, genesis.id(), coin_new_new)), + update_ledger(&mut ledger, genesis, 1, coin_new_new), Err(LedgerError::CommitmentNotFound) )); // they also should not be accepted if the fork from where they have been imported has not been seen already assert!(matches!( - ledger.try_apply_header(&header_with_orphans( - 1, - genesis.id(), - coin_new, - vec![fork_1.clone()] - )), + update_orphans(&mut ledger, genesis, 1, coin_new, vec![(fork_1, (1, coin))]), Err(LedgerError::OrphanMissing(_)) )); // now the first block of the fork is seen (and accepted) - ledger = ledger.try_apply_header(&fork_1).unwrap(); + let h_1 = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); + assert_eq!(h_1, fork_1); + // and it can now be imported in another branch (note this does not validate it's for an earlier slot) - ledger - .try_apply_header(&header_with_orphans( - 1, - genesis.id(), - coin_new, - vec![fork_1.clone()], - )) - .unwrap(); + update_orphans( + &mut ledger.clone(), + genesis, + 1, + coin_new, + vec![(fork_1, (1, coin))], + ) + .unwrap(); // but the next coin is still not accepted since the second block using the evolved coin has not been seen yet assert!(matches!( - ledger.try_apply_header(&header_with_orphans( + update_orphans( + &mut ledger.clone(), + genesis, 1, - genesis.id(), coin_new_new, - vec![fork_1.clone(), fork_2.clone()] - )), + vec![(fork_1, (1, coin)), (fork_2, (2, coin_new))], + ), Err(LedgerError::OrphanMissing(_)) )); // now the second block of the fork is seen as well and the coin evolved twice can be used in another branch - ledger = ledger.try_apply_header(&fork_2).unwrap(); - ledger - .try_apply_header(&header_with_orphans( - 1, - genesis.id(), - coin_new_new, - vec![fork_1.clone(), fork_2.clone()], - )) - .unwrap(); + let h_2 = update_ledger(&mut ledger, h_1, 2, coin_new).unwrap(); + assert_eq!(h_2, fork_2); + update_orphans( + &mut ledger.clone(), + genesis, + 1, + coin_new_new, + vec![(fork_1, (1, coin)), (fork_2, (2, coin_new))], + ) + .unwrap(); // but we can't import just the second proof because it's using an evolved coin that has not been seen yet assert!(matches!( - ledger.try_apply_header(&header_with_orphans( + update_orphans( + &mut ledger.clone(), + genesis, 1, - genesis.id(), coin_new_new, - vec![fork_2.clone()] - )), + vec![(fork_2, (2, coin_new))], + ), Err(LedgerError::CommitmentNotFound) )); // an imported proof that uses a coin that was already used in the base branch should not be allowed - let header_1 = header(1, genesis.id(), coin); - ledger = ledger.try_apply_header(&header_1).unwrap(); + let header_1 = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); assert!(matches!( - ledger.try_apply_header(&header_with_orphans( + update_orphans( + &mut ledger, + header_1, 2, - header_1.id(), coin_new_new, - vec![fork_1.clone(), fork_2.clone()] - )), + vec![(fork_1, (1, coin)), (fork_2, (2, coin_new))], + ), Err(LedgerError::NullifierExists) )); } diff --git a/ledger/cryptarchia-ledger/src/nonce.rs b/ledger/cryptarchia-ledger/src/nonce.rs new file mode 100644 index 00000000..6dd32e21 --- /dev/null +++ b/ledger/cryptarchia-ledger/src/nonce.rs @@ -0,0 +1,17 @@ +use crate::utils::serialize_bytes_newtype; + +#[derive(Clone, Debug, Eq, PartialEq, Copy)] +pub struct Nonce([u8; 32]); +impl From<[u8; 32]> for Nonce { + fn from(nonce: [u8; 32]) -> Self { + Self(nonce) + } +} + +impl From for [u8; 32] { + fn from(nonce: Nonce) -> [u8; 32] { + nonce.0 + } +} + +serialize_bytes_newtype!(Nonce); diff --git a/ledger/cryptarchia-ledger/src/utils.rs b/ledger/cryptarchia-ledger/src/utils.rs new file mode 100644 index 00000000..5a98d86f --- /dev/null +++ b/ledger/cryptarchia-ledger/src/utils.rs @@ -0,0 +1,22 @@ +macro_rules! serialize_bytes_newtype { + ($newtype:ty) => { + #[cfg(feature = "serde")] + impl serde::Serialize for $newtype { + fn serialize(&self, serializer: S) -> Result { + nomos_utils::serde::serialize_bytes_array(self.0, serializer) + } + } + + #[cfg(feature = "serde")] + impl<'de> serde::de::Deserialize<'de> for $newtype { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + nomos_utils::serde::deserialize_bytes_array(deserializer).map(Self) + } + } + }; +} + +pub(crate) use serialize_bytes_newtype; diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index f30770c8..eaf01602 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -19,9 +19,8 @@ use tower_http::{ use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -use carnot_engine::BlockId; use full_replication::{Blob, Certificate}; -use nomos_core::{da::blob, tx::Transaction}; +use nomos_core::{da::blob, header::HeaderId, tx::Transaction}; use nomos_mempool::{network::adapters::libp2p::Libp2pAdapter, openapi::Status, MempoolMetrics}; use nomos_network::backends::libp2p::Libp2p; use nomos_storage::backends::StorageSerde; @@ -53,7 +52,7 @@ pub struct AxumBackend { da_status, ), components( - schemas(Status, MempoolMetrics) + schemas(Status, MempoolMetrics) ), tags( (name = "da", description = "data availibility related APIs") @@ -255,8 +254,8 @@ where #[derive(Deserialize)] struct QueryParams { - from: Option, - to: Option, + from: Option, + to: Option, } #[utoipa::path( @@ -300,7 +299,7 @@ async fn libp2p_info(State(handle): State) -> Response { (status = 500, description = "Internal server error", body = String), ) )] -async fn block(State(handle): State, Json(id): Json) -> Response +async fn block(State(handle): State, Json(id): Json) -> Response where Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash, S: StorageSerde + Send + Sync + 'static, diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index c8869744..af5ce14d 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -15,8 +15,8 @@ use bytes::Bytes; use carnot_consensus::CarnotConsensus; use nomos_api::ApiService; use nomos_core::{ - block::BlockId, da::{blob, certificate}, + header::HeaderId, tx::Transaction, wire, }; @@ -59,10 +59,10 @@ const MB16: usize = 1024 * 1024 * 16; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, - MockPool::Hash>, + MockPool::Hash>, MempoolLibp2pAdapter::Hash>, MockPool< - BlockId, + HeaderId, Certificate, <::Blob as blob::Blob>::Hash, >, @@ -82,7 +82,7 @@ pub type DataAvailability = DataAvailabilityService< DaLibp2pAdapter, >; -type Mempool = MempoolService, MockPool, D>; +type Mempool = MempoolService, MockPool, D>; #[derive(Services)] pub struct Nomos { diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs index 86d31592..283f93ab 100644 --- a/nomos-cli/src/api/consensus.rs +++ b/nomos-cli/src/api/consensus.rs @@ -1,6 +1,7 @@ use super::CLIENT; use carnot_consensus::CarnotInfo; -use carnot_engine::{Block, BlockId}; +use carnot_engine::Block; +use nomos_core::header::HeaderId; use reqwest::Url; pub async fn carnot_info(node: &Url) -> Result { @@ -15,9 +16,9 @@ pub async fn carnot_info(node: &Url) -> Result { pub async fn get_blocks_info( node: &Url, - from: Option, - to: Option, -) -> Result, reqwest::Error> { + from: Option, + to: Option, +) -> Result>, reqwest::Error> { const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks"; let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap()); if let Some(from) = from { diff --git a/nomos-cli/src/api/storage.rs b/nomos-cli/src/api/storage.rs index cc439600..09345c87 100644 --- a/nomos-cli/src/api/storage.rs +++ b/nomos-cli/src/api/storage.rs @@ -1,13 +1,13 @@ use super::CLIENT; -use carnot_engine::BlockId; use full_replication::Certificate; use nomos_core::block::Block; +use nomos_core::header::HeaderId; use nomos_node::Tx; use reqwest::Url; pub async fn get_block_contents( node: &Url, - block: &BlockId, + block: &HeaderId, ) -> Result>, reqwest::Error> { const BLOCK_PATH: &str = "storage/block"; CLIENT diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index 5242c191..47de8832 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -18,7 +18,7 @@ use full_replication::{ AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, }; use futures::{stream, StreamExt}; -use nomos_core::{block::BlockId, da::DaProtocol, wire}; +use nomos_core::{da::DaProtocol, header::HeaderId, wire}; use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; @@ -266,7 +266,7 @@ struct ChatMessage { #[tokio::main] async fn check_for_messages(sender: Sender>, node: Url) { // Should ask for the genesis block to be more robust - let mut last_tip = BlockId::zeros(); + let mut last_tip = [0; 32].into(); loop { if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { @@ -280,7 +280,7 @@ async fn check_for_messages(sender: Sender>, node: Url) { // Process a single block's blobs and return chat messages async fn process_block_blobs( node: Url, - block_id: &BlockId, + block_id: &HeaderId, da_settings: DaSettings, ) -> Result, Box> { let blobs = get_block_blobs(&node, block_id).await?; @@ -304,9 +304,9 @@ async fn process_block_blobs( // Fetch new messages since the last tip async fn fetch_new_messages( - last_tip: &BlockId, + last_tip: &HeaderId, node: &Url, -) -> Result<(BlockId, Vec), Box> { +) -> Result<(HeaderId, Vec), Box> { // By only specifying the 'to' parameter we get all the blocks since the last tip let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) .await? diff --git a/nomos-cli/src/da/retrieve.rs b/nomos-cli/src/da/retrieve.rs index af42fcec..48163717 100644 --- a/nomos-cli/src/da/retrieve.rs +++ b/nomos-cli/src/da/retrieve.rs @@ -1,6 +1,6 @@ -use carnot_engine::BlockId; use full_replication::Blob; use nomos_core::da::certificate::Certificate; +use nomos_core::header::HeaderId; use reqwest::Url; use thiserror::Error; @@ -15,7 +15,7 @@ pub enum Error { } /// Return the blobs whose certificate has been included in the provided block. -pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result, Error> { +pub async fn get_block_blobs(node: &Url, block: &HeaderId) -> Result, Error> { let block = get_block_contents(node, block) .await? .ok_or(Error::NotFound)?; diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index 0e1a9f0f..b65c6804 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -13,6 +13,8 @@ async-trait = { version = "0.1" } blake2 = { version = "0.10" } bytes = "1.3" carnot-engine = { path = "../consensus/carnot-engine", features = ["serde"]} +cryptarchia-engine = { path = "../consensus/cryptarchia-engine", features = ["serde"]} +cryptarchia-ledger = { path = "../ledger/cryptarchia-ledger", features = ["serde"]} futures = "0.3" raptorq = { version = "1.7", optional = true } serde = { version = "1.0", features = ["derive"] } @@ -20,6 +22,7 @@ thiserror = "1.0" bincode = "1.3" once_cell = "1.0" indexmap = { version = "1.9", features = ["serde"] } +const-hex = "1" [dev-dependencies] rand = "0.8" diff --git a/nomos-core/src/block/builder.rs b/nomos-core/src/block/builder.rs index 508bc8ef..f3c8c85f 100644 --- a/nomos-core/src/block/builder.rs +++ b/nomos-core/src/block/builder.rs @@ -1,16 +1,22 @@ // std +use indexmap::IndexSet; use std::hash::Hash; // crates use serde::de::DeserializeOwned; use serde::Serialize; // internal use crate::block::Block; +use crate::crypto::Blake2b; use crate::da::certificate::BlobCertificateSelect; use crate::da::certificate::Certificate; +use crate::header::{ + carnot::Builder as CarnotBuilder, cryptarchia::Builder as CryptarchiaBuilder, Header, HeaderId, +}; use crate::tx::{Transaction, TxSelect}; +use crate::wire; +use blake2::digest::Digest; use carnot_engine::overlay::RandomBeaconState; -use carnot_engine::{NodeId, Qc, View}; - +use carnot_engine::{LeaderProof, Qc, View}; /// Wrapper over a block building `new` method than holds intermediary state and can be /// passed around. It also compounds the transaction selection and blob selection heuristics to be /// used for transaction and blob selection. @@ -20,10 +26,6 @@ use carnot_engine::{NodeId, Qc, View}; /// use nomos_core::block::builder::BlockBuilder; /// let builder: BlockBuilder<(), (), FirstTx, FirstBlob> = { /// BlockBuilder::new( FirstTx::default(), FirstBlob::default()) -/// .with_view(View::from(0)) -/// .with_parent_qc(qc) -/// .with_proposer(proposer) -/// .with_beacon_state(beacon) /// .with_transactions([tx1].into_iter()) /// .with_blobs([blob1].into_iter()) /// }; @@ -32,14 +34,33 @@ use carnot_engine::{NodeId, Qc, View}; pub struct BlockBuilder { tx_selector: TxSelector, blob_selector: BlobSelector, - view: Option, - parent_qc: Option, - proposer: Option, - beacon: Option, + carnot_header_builder: Option, + cryptarchia_header_builder: Option, txs: Option>>, blobs: Option>>, } +impl BlockBuilder +where + Tx: Clone + Eq + Hash, + C: Clone + Eq + Hash, +{ + pub fn empty_carnot( + beacon: RandomBeaconState, + view: View, + parent_qc: Qc, + leader_proof: LeaderProof, + ) -> Block { + Block { + header: Header::Carnot( + CarnotBuilder::new(beacon, view, parent_qc, leader_proof).build([0; 32].into(), 0), + ), + cl_transactions: IndexSet::new(), + bl_blobs: IndexSet::new(), + } + } +} + impl BlockBuilder where Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned, @@ -51,36 +72,25 @@ where Self { tx_selector, blob_selector, - view: None, - parent_qc: None, - proposer: None, - beacon: None, + carnot_header_builder: None, + cryptarchia_header_builder: None, txs: None, blobs: None, } } #[must_use] - pub fn with_view(mut self, view: View) -> Self { - self.view = Some(view); + pub fn with_carnot_builder(mut self, carnot_header_builder: CarnotBuilder) -> Self { + self.carnot_header_builder = Some(carnot_header_builder); self } #[must_use] - pub fn with_parent_qc(mut self, qc: Qc) -> Self { - self.parent_qc = Some(qc); - self - } - - #[must_use] - pub fn with_proposer(mut self, proposer: NodeId) -> Self { - self.proposer = Some(proposer); - self - } - - #[must_use] - pub fn with_beacon_state(mut self, beacon: RandomBeaconState) -> Self { - self.beacon = Some(beacon); + pub fn with_cryptarchia_builder( + mut self, + cryptarchia_header_builder: CryptarchiaBuilder, + ) -> Self { + self.cryptarchia_header_builder = Some(cryptarchia_header_builder); self } @@ -100,28 +110,48 @@ where } #[allow(clippy::result_large_err)] - pub fn build(self) -> Result, Self> { + pub fn build(self) -> Result, String> { if let Self { tx_selector, blob_selector, - view: Some(view), - parent_qc: Some(parent_qc), - proposer: Some(proposer), - beacon: Some(beacon), + carnot_header_builder: carnot_builder, + cryptarchia_header_builder: cryptarchia_builder, txs: Some(txs), blobs: Some(blobs), } = self { - Ok(Block::new( - view, - parent_qc, - tx_selector.select_tx_from(txs), - blob_selector.select_blob_from(blobs), - proposer, - beacon, - )) + let txs = tx_selector.select_tx_from(txs).collect::>(); + let blobs = blob_selector + .select_blob_from(blobs) + .collect::>(); + + let serialized_content = wire::serialize(&(&txs, &blobs)).unwrap(); + let content_size = u32::try_from(serialized_content.len()).map_err(|_| { + format!( + "Content is too big: {} out of {} max", + serialized_content.len(), + u32::MAX + ) + })?; + let content_id = <[u8; 32]>::from(Blake2b::digest(&serialized_content)).into(); + + let header = match (carnot_builder, cryptarchia_builder) { + (Some(carnot_builder), None) => { + Header::Carnot(carnot_builder.build(content_id, content_size)) + } + (None, Some(cryptarchia_builder)) => { + Header::Cryptarchia(cryptarchia_builder.build(content_id, content_size)) + } + _ => return Err("Exactly one header builder should be set".to_string()), + }; + + Ok(Block { + header, + cl_transactions: txs, + bl_blobs: blobs, + }) } else { - Err(self) + Err("incomplete block".to_string()) } } } diff --git a/nomos-core/src/block/mod.rs b/nomos-core/src/block/mod.rs index e4973c1f..14bea461 100644 --- a/nomos-core/src/block/mod.rs +++ b/nomos-core/src/block/mod.rs @@ -1,68 +1,27 @@ pub mod builder; -use carnot_engine::overlay::RandomBeaconState; use indexmap::IndexSet; // std use core::hash::Hash; // crates +use crate::header::Header; use crate::wire; -use ::serde::{ - de::{DeserializeOwned, Deserializer}, - Deserialize, Serialize, Serializer, -}; +use ::serde::{de::DeserializeOwned, Deserialize, Serialize}; use bytes::Bytes; -pub use carnot_engine::BlockId; -use carnot_engine::{LeaderProof, NodeId, Qc, View}; // internal pub type TxHash = [u8; 32]; /// A block -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Block { - header: carnot_engine::Block, - beacon: RandomBeaconState, + header: Header, cl_transactions: IndexSet, bl_blobs: IndexSet, } -impl< - Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, - BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned, - > Block -{ - pub fn new( - view: View, - parent_qc: Qc, - txs: impl Iterator, - blobs: impl Iterator, - proposer: NodeId, - beacon: RandomBeaconState, - ) -> Self { - let transactions = txs.collect(); - let blobs = blobs.collect(); - let header = carnot_engine::Block { - id: BlockId::zeros(), - view, - parent_qc, - leader_proof: LeaderProof::LeaderId { - leader_id: proposer, - }, - }; - let mut s = Self { - header, - beacon, - cl_transactions: transactions, - bl_blobs: blobs, - }; - let id = block_id_from_wire_content(&s); - s.header.id = id; - s - } -} - impl Block { - pub fn header(&self) -> &carnot_engine::Block { + pub fn header(&self) -> &Header { &self.header } @@ -73,24 +32,6 @@ impl Block impl Iterator + '_ { self.bl_blobs.iter() } - - pub fn beacon(&self) -> &RandomBeaconState { - &self.beacon - } -} - -pub fn block_id_from_wire_content< - Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, - BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned, ->( - block: &Block, -) -> carnot_engine::BlockId { - use blake2::digest::{consts::U32, Digest}; - use blake2::Blake2b; - let bytes = block.as_bytes(); - let mut hasher = Blake2b::::new(); - hasher.update(bytes); - BlockId::new(hasher.finalize().into()) } impl< @@ -104,87 +45,6 @@ impl< } pub fn from_bytes(bytes: &[u8]) -> Self { - let mut result: Self = wire::deserialize(bytes).unwrap(); - result.header.id = block_id_from_wire_content(&result); - result - } -} - -mod serde { - use super::*; - // use ::serde::{de::Deserializer, Deserialize, Serialize}; - - /// consensus_engine::Block but without the id field, which will be computed - /// from the rest of the block. - #[derive(Serialize, Deserialize)] - struct StrippedHeader { - pub view: View, - pub parent_qc: Qc, - pub leader_proof: LeaderProof, - } - - #[derive(Serialize, Deserialize)] - struct StrippedBlock { - header: StrippedHeader, - beacon: RandomBeaconState, - cl_transactions: IndexSet, - bl_blobs: IndexSet, - } - - impl< - 'de, - Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, - BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned, - > Deserialize<'de> for Block - { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let StrippedBlock { - header, - beacon, - cl_transactions, - bl_blobs, - } = StrippedBlock::deserialize(deserializer)?; - let header = carnot_engine::Block { - id: BlockId::zeros(), - view: header.view, - parent_qc: header.parent_qc, - leader_proof: header.leader_proof, - }; - let mut block = Block { - beacon, - cl_transactions, - bl_blobs, - header, - }; - block.header.id = block_id_from_wire_content(&block); - Ok(block) - } - } - - impl< - Tx: Clone + Eq + Hash + Serialize + DeserializeOwned, - BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned, - > Serialize for Block - { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - // TODO: zero copy serialization - let block = StrippedBlock { - header: StrippedHeader { - view: self.header.view, - parent_qc: self.header.parent_qc.clone(), - leader_proof: self.header.leader_proof.clone(), - }, - beacon: self.beacon.clone(), - cl_transactions: self.cl_transactions.clone(), - bl_blobs: self.bl_blobs.clone(), - }; - block.serialize(serializer) - } + wire::deserialize(bytes).unwrap() } } diff --git a/nomos-core/src/crypto.rs b/nomos-core/src/crypto.rs index 297984da..4c38c383 100644 --- a/nomos-core/src/crypto.rs +++ b/nomos-core/src/crypto.rs @@ -1,3 +1,7 @@ +use blake2::digest::typenum::U32; + pub type PublicKey = [u8; 32]; pub type PrivateKey = [u8; 32]; pub type Signature = [u8; 32]; + +pub(crate) type Blake2b = blake2::Blake2b; diff --git a/nomos-core/src/header/carnot.rs b/nomos-core/src/header/carnot.rs new file mode 100644 index 00000000..5618d844 --- /dev/null +++ b/nomos-core/src/header/carnot.rs @@ -0,0 +1,116 @@ +use super::{ContentId, HeaderId}; +use crate::crypto::Blake2b; +use crate::wire; +use blake2::Digest; +use serde::{Deserialize, Serialize}; + +use carnot_engine::overlay::RandomBeaconState; +use carnot_engine::{LeaderProof, Qc, View}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Header { + beacon: RandomBeaconState, + view: View, + parent_qc: Qc, + leader_proof: LeaderProof, + content_id: ContentId, + content_size: u32, +} + +impl Header { + pub fn new( + beacon: RandomBeaconState, + view: View, + parent_qc: Qc, + leader_proof: LeaderProof, + content_id: ContentId, + content_size: u32, + ) -> Self { + Self { + beacon, + view, + parent_qc, + leader_proof, + content_id, + content_size, + } + } + + pub fn beacon(&self) -> &RandomBeaconState { + &self.beacon + } + + pub fn id(&self) -> HeaderId { + let mut h = Blake2b::new(); + let bytes = wire::serialize(&self).unwrap(); + h.update(&bytes); + HeaderId(h.finalize().into()) + } + + pub fn parent_qc(&self) -> &Qc { + &self.parent_qc + } + + pub fn leader_proof(&self) -> &LeaderProof { + &self.leader_proof + } + + pub fn content_id(&self) -> ContentId { + self.content_id + } + + pub fn content_size(&self) -> u32 { + self.content_size + } + + pub fn view(&self) -> View { + self.view + } + + pub fn parent(&self) -> HeaderId { + self.parent_qc.block() + } + + pub fn to_carnot_block(&self) -> carnot_engine::Block { + carnot_engine::Block { + id: self.id(), + parent_qc: self.parent_qc.clone(), + view: self.view(), + leader_proof: self.leader_proof().clone(), + } + } +} + +pub struct Builder { + beacon: RandomBeaconState, + view: View, + parent_qc: Qc, + leader_proof: LeaderProof, +} + +impl Builder { + pub fn new( + beacon: RandomBeaconState, + view: View, + parent_qc: Qc, + leader_proof: LeaderProof, + ) -> Self { + Self { + beacon, + view, + parent_qc, + leader_proof, + } + } + + pub fn build(self, content_id: ContentId, content_size: u32) -> Header { + Header::new( + self.beacon, + self.view, + self.parent_qc, + self.leader_proof, + content_id, + content_size, + ) + } +} diff --git a/ledger/cryptarchia-ledger/src/block.rs b/nomos-core/src/header/cryptarchia.rs similarity index 63% rename from ledger/cryptarchia-ledger/src/block.rs rename to nomos-core/src/header/cryptarchia.rs index 19ff3a6d..2dc3a042 100644 --- a/ledger/cryptarchia-ledger/src/block.rs +++ b/nomos-core/src/header/cryptarchia.rs @@ -1,24 +1,22 @@ -use crate::{crypto::Blake2b, leader_proof::LeaderProof}; +use super::{ContentId, HeaderId}; +use crate::crypto::Blake2b; use blake2::Digest; use cryptarchia_engine::Slot; - -#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)] -pub struct HeaderId([u8; 32]); - -#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)] -pub struct ContentId([u8; 32]); +use cryptarchia_ledger::LeaderProof; +use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Eq, PartialEq, Copy)] pub struct Nonce([u8; 32]); -#[derive(Clone, Debug, Eq, PartialEq, Hash)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Header { parent: HeaderId, + slot: Slot, + // TODO: move this to common header fields // length of block contents in bytes content_size: u32, // id of block contents content_id: ContentId, - slot: Slot, leader_proof: LeaderProof, orphaned_leader_proofs: Vec
, } @@ -85,40 +83,36 @@ impl Header { } } -// ----------- conversions - -impl From<[u8; 32]> for Nonce { - fn from(nonce: [u8; 32]) -> Self { - Self(nonce) - } +pub struct Builder { + parent: HeaderId, + slot: Slot, + leader_proof: LeaderProof, + orphaned_leader_proofs: Vec
, } -impl From for [u8; 32] { - fn from(nonce: Nonce) -> [u8; 32] { - nonce.0 +impl Builder { + pub fn new(parent: HeaderId, slot: Slot, leader_proof: LeaderProof) -> Self { + Self { + parent, + slot, + leader_proof, + orphaned_leader_proofs: vec![], + } } -} -impl From<[u8; 32]> for HeaderId { - fn from(id: [u8; 32]) -> Self { - Self(id) + pub fn with_orphaned_proofs(mut self, orphaned_leader_proofs: Vec
) -> Self { + self.orphaned_leader_proofs = orphaned_leader_proofs; + self } -} -impl From for [u8; 32] { - fn from(id: HeaderId) -> Self { - id.0 - } -} - -impl From<[u8; 32]> for ContentId { - fn from(id: [u8; 32]) -> Self { - Self(id) - } -} - -impl From for [u8; 32] { - fn from(id: ContentId) -> Self { - id.0 + pub fn build(self, content_id: ContentId, content_size: u32) -> Header { + Header { + parent: self.parent, + slot: self.slot, + content_size, + content_id, + leader_proof: self.leader_proof, + orphaned_leader_proofs: self.orphaned_leader_proofs, + } } } diff --git a/nomos-core/src/header/mod.rs b/nomos-core/src/header/mod.rs new file mode 100644 index 00000000..5c489fe9 --- /dev/null +++ b/nomos-core/src/header/mod.rs @@ -0,0 +1,89 @@ +use serde::{Deserialize, Serialize}; + +use crate::utils::{display_hex_bytes_newtype, serde_bytes_newtype}; + +pub mod carnot; +pub mod cryptarchia; + +#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)] +pub struct HeaderId([u8; 32]); + +#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)] +pub struct ContentId([u8; 32]); + +// This lint is a false positive? +#[allow(clippy::large_enum_variant)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum Header { + Cryptarchia(cryptarchia::Header), + Carnot(carnot::Header), +} + +impl Header { + pub fn cryptarchia(&self) -> &cryptarchia::Header { + match self { + Self::Cryptarchia(header) => header, + Self::Carnot(_) => panic!("Header is not a Cryptarchia header"), + } + } + + pub fn carnot(&self) -> &carnot::Header { + match self { + Self::Carnot(header) => header, + Self::Cryptarchia(_) => panic!("Header is not a Carnot header"), + } + } + + pub fn id(&self) -> HeaderId { + match self { + Self::Cryptarchia(header) => header.id(), + Self::Carnot(header) => header.id(), + } + } + + pub fn parent(&self) -> HeaderId { + match self { + Self::Cryptarchia(header) => header.parent(), + Self::Carnot(header) => header.parent(), + } + } +} + +impl From<[u8; 32]> for HeaderId { + fn from(id: [u8; 32]) -> Self { + Self(id) + } +} + +impl From for [u8; 32] { + fn from(id: HeaderId) -> Self { + id.0 + } +} + +impl From<[u8; 32]> for ContentId { + fn from(id: [u8; 32]) -> Self { + Self(id) + } +} + +impl From for [u8; 32] { + fn from(id: ContentId) -> Self { + id.0 + } +} + +display_hex_bytes_newtype!(HeaderId); +display_hex_bytes_newtype!(ContentId); + +serde_bytes_newtype!(HeaderId, 32); +serde_bytes_newtype!(ContentId, 32); + +#[test] +fn test_serde() { + assert_eq!( + crate::wire::deserialize::(&crate::wire::serialize(&HeaderId([0; 32])).unwrap()) + .unwrap(), + HeaderId([0; 32]) + ); +} diff --git a/nomos-core/src/lib.rs b/nomos-core/src/lib.rs index ed4174ef..72efd45c 100644 --- a/nomos-core/src/lib.rs +++ b/nomos-core/src/lib.rs @@ -2,6 +2,7 @@ pub mod account; pub mod block; pub mod crypto; pub mod da; +pub mod header; pub mod staking; pub mod tx; pub mod utils; diff --git a/nomos-core/src/utils/mod.rs b/nomos-core/src/utils/mod.rs index 0c305a7e..b8459a9e 100644 --- a/nomos-core/src/utils/mod.rs +++ b/nomos-core/src/utils/mod.rs @@ -1 +1,53 @@ pub mod select; + +macro_rules! display_hex_bytes_newtype { + ($newtype:ty) => { + impl core::fmt::Display for $newtype { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "0x")?; + for v in self.0 { + write!(f, "{:02x}", v)?; + } + Ok(()) + } + } + }; +} + +macro_rules! serde_bytes_newtype { + ($newtype:ty, $len:expr) => { + impl serde::Serialize for $newtype { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + const_hex::const_encode::<$len, false>(&self.0) + .as_str() + .serialize(serializer) + } else { + self.0.serialize(serializer) + } + } + } + + impl<'de> serde::Deserialize<'de> for $newtype { + fn deserialize(deserializer: D) -> Result<$newtype, D::Error> + where + D: serde::Deserializer<'de>, + { + if deserializer.is_human_readable() { + let s = <&str>::deserialize(deserializer)?; + const_hex::decode_to_array(s) + .map(Self) + .map_err(serde::de::Error::custom) + } else { + <[u8; $len]>::deserialize(deserializer).map(Self) + } + } + } + }; +} + +pub(crate) use display_hex_bytes_newtype; +pub(crate) use serde_bytes_newtype; diff --git a/nomos-core/src/vote/mock.rs b/nomos-core/src/vote/mock.rs index c262ee06..53b5d9f9 100644 --- a/nomos-core/src/vote/mock.rs +++ b/nomos-core/src/vote/mock.rs @@ -1,5 +1,6 @@ // std // crates +use crate::header::HeaderId; use carnot_engine::{Block, View}; use futures::{Stream, StreamExt}; use serde::{Deserialize, Serialize}; @@ -49,7 +50,7 @@ impl Tally for MockTally { type Vote = MockVote; type Qc = MockQc; type Outcome = (); - type Subject = Block; + type Subject = Block; type TallyError = Error; type Settings = MockTallySettings; @@ -60,7 +61,7 @@ impl Tally for MockTally { async fn tally + Unpin + Send>( &self, - block: Block, + block: Block, mut vote_stream: S, ) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> { let mut count_votes = 0; diff --git a/nomos-services/api/src/http/cl.rs b/nomos-services/api/src/http/cl.rs index d0bc8437..8af70ff8 100644 --- a/nomos-services/api/src/http/cl.rs +++ b/nomos-services/api/src/http/cl.rs @@ -1,6 +1,6 @@ use core::{fmt::Debug, hash::Hash}; -use nomos_core::block::BlockId; +use nomos_core::header::HeaderId; use nomos_core::tx::Transaction; use nomos_mempool::{ backend::mockpool::MockPool, @@ -13,7 +13,7 @@ use tokio::sync::oneshot; type ClMempoolService = MempoolService< Libp2pAdapter::Hash>, - MockPool::Hash>, + MockPool::Hash>, TxDiscriminant, >; @@ -47,7 +47,7 @@ where pub async fn cl_mempool_status( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, items: Vec<::Hash>, -) -> Result>, super::DynError> +) -> Result>, super::DynError> where T: Transaction + Clone diff --git a/nomos-services/api/src/http/consensus.rs b/nomos-services/api/src/http/consensus.rs index 1655ae89..de9914a3 100644 --- a/nomos-services/api/src/http/consensus.rs +++ b/nomos-services/api/src/http/consensus.rs @@ -10,7 +10,7 @@ use carnot_consensus::{ }; use carnot_engine::{ overlay::{RandomBeaconState, RoundRobin, TreeOverlay}, - Block, BlockId, + Block, }; use full_replication::Certificate; use nomos_core::{ @@ -18,6 +18,7 @@ use nomos_core::{ blob, certificate::{self, select::FillSize as FillSizeWithBlobsCertificate}, }, + header::HeaderId, tx::{select::FillSize as FillSizeWithTx, Transaction}, }; use nomos_mempool::{ @@ -27,10 +28,10 @@ use nomos_storage::backends::{sled::SledBackend, StorageSerde}; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, - MockPool::Hash>, + MockPool::Hash>, MempoolLibp2pAdapter::Hash>, MockPool< - BlockId, + HeaderId, Certificate, <::Blob as blob::Blob>::Hash, >, @@ -64,9 +65,9 @@ where pub async fn carnot_blocks( handle: &OverwatchHandle, - from: Option, - to: Option, -) -> Result, super::DynError> + from: Option, + to: Option, +) -> Result>, super::DynError> where Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index b8bc12a7..5763d3c0 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -1,6 +1,6 @@ use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication}; -use nomos_core::block::BlockId; use nomos_core::da::blob; +use nomos_core::header::HeaderId; use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, DaMsg, DataAvailabilityService, @@ -15,7 +15,7 @@ use tokio::sync::oneshot; pub type DaMempoolService = MempoolService< Libp2pAdapter::Hash>, - MockPool::Hash>, + MockPool::Hash>, CertDiscriminant, >; @@ -43,7 +43,7 @@ pub async fn da_mempool_metrics( pub async fn da_mempool_status( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, items: Vec<::Hash>, -) -> Result>, super::DynError> { +) -> Result>, super::DynError> { let relay = handle.relay::().connect().await?; let (sender, receiver) = oneshot::channel(); relay diff --git a/nomos-services/api/src/http/mempool.rs b/nomos-services/api/src/http/mempool.rs index 9d9c3c3d..f3e62575 100644 --- a/nomos-services/api/src/http/mempool.rs +++ b/nomos-services/api/src/http/mempool.rs @@ -1,5 +1,5 @@ use core::{fmt::Debug, hash::Hash}; -use nomos_core::block::BlockId; +use nomos_core::header::HeaderId; use nomos_mempool::{ backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService, }; @@ -20,7 +20,7 @@ where Key: Clone + Debug + Ord + Hash + 'static, { let relay = handle - .relay::, D>>() + .relay::, D>>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/api/src/http/storage.rs b/nomos-services/api/src/http/storage.rs index 090c09cd..8a408fe3 100644 --- a/nomos-services/api/src/http/storage.rs +++ b/nomos-services/api/src/http/storage.rs @@ -1,5 +1,5 @@ -use carnot_engine::BlockId; use nomos_core::block::Block; +use nomos_core::header::HeaderId; use nomos_storage::{ backends::{sled::SledBackend, StorageSerde}, StorageMsg, StorageService, @@ -7,7 +7,7 @@ use nomos_storage::{ pub async fn block_req( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, - id: BlockId, + id: HeaderId, ) -> Result>, super::DynError> where Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash, diff --git a/nomos-services/carnot-consensus/src/committee_membership/mod.rs b/nomos-services/carnot-consensus/src/committee_membership/mod.rs index f6c15a4b..5c757bd0 100644 --- a/nomos-services/carnot-consensus/src/committee_membership/mod.rs +++ b/nomos-services/carnot-consensus/src/committee_membership/mod.rs @@ -6,10 +6,10 @@ use std::hash::Hash; // crates // internal +use crate::TimeoutQc; use carnot_engine::overlay::{ CommitteeMembership, Error as RandomBeaconError, FreezeMembership, RandomBeaconState, }; -use carnot_engine::TimeoutQc; use nomos_core::block::Block; pub trait UpdateableCommitteeMembership: CommitteeMembership { @@ -44,7 +44,10 @@ impl UpdateableCommitteeMembership for RandomBeaconState { &self, block: &Block, ) -> Result { - self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view()) + self.check_advance_happy( + block.header().carnot().beacon().clone(), + block.header().carnot().parent_qc().view(), + ) } fn on_timeout_qc_received(&self, qc: &TimeoutQc) -> Result { diff --git a/nomos-services/carnot-consensus/src/leader_selection/mod.rs b/nomos-services/carnot-consensus/src/leader_selection/mod.rs index 759b37e2..eb681e19 100644 --- a/nomos-services/carnot-consensus/src/leader_selection/mod.rs +++ b/nomos-services/carnot-consensus/src/leader_selection/mod.rs @@ -1,8 +1,6 @@ +use crate::TimeoutQc; use carnot_engine::overlay::RoundRobin; -use carnot_engine::{ - overlay::{Error as RandomBeaconError, LeaderSelection, RandomBeaconState}, - TimeoutQc, -}; +use carnot_engine::overlay::{Error as RandomBeaconError, LeaderSelection, RandomBeaconState}; use nomos_core::block::Block; use std::{convert::Infallible, error::Error, hash::Hash}; @@ -38,7 +36,10 @@ impl UpdateableLeaderSelection for RandomBeaconState { &self, block: &Block, ) -> Result { - self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view()) + self.check_advance_happy( + block.header().carnot().beacon().clone(), + block.header().carnot().parent_qc().view(), + ) // TODO: check random beacon public keys is leader id } diff --git a/nomos-services/carnot-consensus/src/lib.rs b/nomos-services/carnot-consensus/src/lib.rs index 8e2cf48f..9ffe63d3 100644 --- a/nomos-services/carnot-consensus/src/lib.rs +++ b/nomos-services/carnot-consensus/src/lib.rs @@ -29,8 +29,7 @@ use crate::tally::{ happy::CarnotTally, timeout::TimeoutTally, unhappy::NewViewTally, CarnotTallySettings, }; use carnot_engine::{ - overlay::RandomBeaconState, AggregateQc, BlockId, Carnot, Committee, LeaderProof, NewView, - Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, + overlay::RandomBeaconState, Carnot, Committee, LeaderProof, Overlay, Payload, View, }; use task_manager::TaskManager; @@ -38,6 +37,7 @@ use crate::committee_membership::UpdateableCommitteeMembership; use nomos_core::block::builder::BlockBuilder; use nomos_core::block::Block; use nomos_core::da::certificate::{BlobCertificateSelect, Certificate}; +use nomos_core::header::{carnot::Builder, HeaderId}; use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::vote::Tally; use nomos_mempool::{ @@ -65,6 +65,13 @@ fn default_timeout() -> Duration { // Random seed for each round provided by the protocol pub type Seed = [u8; 32]; +type TimeoutQc = carnot_engine::TimeoutQc; +type NewView = carnot_engine::NewView; +type AggregateQc = carnot_engine::AggregateQc; +type Qc = carnot_engine::Qc; +type StandardQc = carnot_engine::StandardQc; +type Vote = carnot_engine::Vote; +type Timeout = carnot_engine::Timeout; #[derive(Debug, Deserialize, Serialize)] pub struct CarnotSettings { @@ -113,8 +120,8 @@ pub struct CarnotConsensus, - ClPool: MemPool, - DaPool: MemPool, + ClPool: MemPool, + DaPool: MemPool, DaPoolAdapter: MempoolAdapter, O: Overlay + Debug, ClPool::Item: Debug + 'static, @@ -140,10 +147,10 @@ impl Servi for CarnotConsensus where A: NetworkAdapter, - ClPool: MemPool, + ClPool: MemPool, ClPool::Item: Debug, ClPool::Key: Debug, - DaPool: MemPool, + DaPool: MemPool, DaPool::Item: Debug, DaPool::Key: Debug, ClPoolAdapter: MempoolAdapter, @@ -165,9 +172,9 @@ impl Servi for CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, - ClPool: MemPool + Send + Sync + 'static, + ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, - DaPool: MemPool + Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction + Debug @@ -252,9 +259,9 @@ where let overlay = O::new(overlay_settings); let genesis = carnot_engine::Block { - id: BlockId::zeros(), + id: [0; 32].into(), view: View::new(0), - parent_qc: Qc::Standard(StandardQc::genesis()), + parent_qc: Qc::Standard(StandardQc::genesis([0; 32].into())), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), }, @@ -299,6 +306,7 @@ where ); if carnot.is_next_leader() { + tracing::info!("is next leader, gathering vores"); let network_adapter = adapter.clone(); task_manager.push(genesis_block.view.next(), async move { let Event::Approve { qc, .. } = Self::gather_votes( @@ -312,6 +320,7 @@ where tracing::debug!("Failed to gather initial votes"); return Event::None; }; + tracing::info!("got enough votes"); Event::ProposeBlock { qc } }); } @@ -351,7 +360,7 @@ where #[derive(Debug)] #[allow(clippy::large_enum_variant)] enum Output { - Send(carnot_engine::Send), + Send(carnot_engine::Send), BroadcastTimeoutQc { timeout_qc: TimeoutQc, }, @@ -364,9 +373,9 @@ impl CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, - ClPool: MemPool + Send + Sync + 'static, + ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, - DaPool: MemPool + Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction + Debug @@ -414,7 +423,7 @@ where } } - fn process_message(carnot: &Carnot, msg: ConsensusMsg) { + fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { ConsensusMsg::Info { tx } => { let info = CarnotInfo { @@ -457,18 +466,18 @@ where #[allow(clippy::too_many_arguments)] async fn process_carnot_event( - mut carnot: Carnot, + mut carnot: Carnot, event: Event, task_manager: &mut TaskManager>, adapter: A, private_key: PrivateKey, - cl_mempool_relay: OutboundRelay>, - da_mempool_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, storage_relay: OutboundRelay>, tx_selector: TxS, blobl_selector: BS, timeout: Duration, - ) -> Carnot { + ) -> Carnot { let mut output = None; let prev_view = carnot.current_view(); match event { @@ -571,24 +580,26 @@ where ) )] async fn process_block( - mut carnot: Carnot, + mut carnot: Carnot, block: Block, mut stream: Pin> + Send>>, task_manager: &mut TaskManager>, adapter: A, storage_relay: OutboundRelay>, - cl_mempool_relay: OutboundRelay>, - da_mempool_relay: OutboundRelay>, - ) -> (Carnot, Option>) { + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, + ) -> ( + Carnot, + Option>, + ) { tracing::debug!("received proposal {:?}", block); - if carnot.highest_voted_view() >= block.header().view { - tracing::debug!("already voted for view {}", block.header().view); + let original_block = block; + let block = original_block.header().carnot().clone(); + if carnot.highest_voted_view() >= block.view() { + tracing::debug!("already voted for view {}", block.view()); return (carnot, None); } - let original_block = block; - let block = original_block.header().clone(); - let self_committee = carnot.self_committee(); let leader_committee = [carnot.id()].into_iter().collect(); @@ -602,10 +613,10 @@ where participating_nodes: carnot.root_committee(), }; - match carnot.receive_block(block.clone()) { + match carnot.receive_block(block.to_carnot_block()) { Ok(mut new_state) => { let new_view = new_state.current_view(); - let msg = >::new_store_message(block.id, original_block.clone()); + let msg = >::new_store_message(block.id(), original_block.clone()); if let Err((e, _msg)) = storage_relay.send(msg).await { tracing::error!("Could not send block to storage: {e}"); } @@ -614,24 +625,24 @@ where mark_in_block( cl_mempool_relay, original_block.transactions().map(Transaction::hash), - block.id, + block.id(), ) .await; mark_in_block( da_mempool_relay, original_block.blobs().map(Certificate::hash), - block.id, + block.id(), ) .await; if new_view != carnot.current_view() { task_manager.push( - block.view, + block.view(), Self::gather_votes( adapter.clone(), self_committee, - block.clone(), + block.to_carnot_block(), tally_settings, ), ); @@ -643,7 +654,7 @@ where }, ); } else { - task_manager.push(block.view, async move { + task_manager.push(block.view(), async move { if let Some(block) = stream.next().await { Event::Proposal { block, stream } } else { @@ -657,10 +668,14 @@ where } if carnot.is_next_leader() { - task_manager.push(block.view, async move { - let Event::Approve { qc, .. } = - Self::gather_votes(adapter, leader_committee, block, leader_tally_settings) - .await + task_manager.push(block.view(), async move { + let Event::Approve { qc, .. } = Self::gather_votes( + adapter, + leader_committee, + block.to_carnot_block(), + leader_tally_settings, + ) + .await else { unreachable!() }; @@ -674,12 +689,15 @@ where #[allow(clippy::type_complexity)] #[instrument(level = "debug", skip(task_manager, adapter))] async fn approve_new_view( - carnot: Carnot, + carnot: Carnot, timeout_qc: TimeoutQc, new_views: HashSet, task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> ( + Carnot, + Option>, + ) { let leader_committee = [carnot.id()].into_iter().collect(); let leader_tally_settings = CarnotTallySettings { threshold: carnot.leader_super_majority_threshold(), @@ -713,11 +731,14 @@ where #[allow(clippy::type_complexity)] #[instrument(level = "debug", skip(task_manager, adapter))] async fn receive_timeout_qc( - carnot: Carnot, + carnot: Carnot, timeout_qc: TimeoutQc, task_manager: &mut TaskManager>, adapter: A, - ) -> (Carnot, Option>) { + ) -> ( + Carnot, + Option>, + ) { let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone()); let self_committee = carnot.self_committee(); let tally_settings = CarnotTallySettings { @@ -741,9 +762,12 @@ where #[allow(clippy::type_complexity)] #[instrument(level = "debug")] async fn process_root_timeout( - carnot: Carnot, + carnot: Carnot, timeouts: HashSet, - ) -> (Carnot, Option>) { + ) -> ( + Carnot, + Option>, + ) { // we might have received a timeout_qc sent by some other node and advanced the view // already, in which case we should ignore the timeout if carnot.current_view() @@ -793,8 +817,8 @@ where qc: Qc, tx_selector: TxS, blob_selector: BS, - cl_mempool_relay: OutboundRelay>, - da_mempool_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, ) -> Option> { let mut output = None; let cl_txs = get_mempool_contents(cl_mempool_relay); @@ -804,10 +828,12 @@ where (Ok(cl_txs), Ok(da_certs)) => { let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key); let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector) - .with_view(qc.view().next()) - .with_parent_qc(qc) - .with_proposer(id) - .with_beacon_state(beacon) + .with_carnot_builder(Builder::new( + beacon, + qc.view().next(), + qc, + LeaderProof::LeaderId { leader_id: id }, + )) .with_transactions(cl_txs) .with_blobs_certificates(da_certs) .build() @@ -823,7 +849,7 @@ where } async fn process_view_change( - carnot: Carnot, + carnot: Carnot, prev_view: View, task_manager: &mut TaskManager>, adapter: A, @@ -883,7 +909,7 @@ where async fn gather_votes( adapter: A, committee: Committee, - block: carnot_engine::Block, + block: carnot_engine::Block, tally: CarnotTallySettings, ) -> Event { let tally = CarnotTally::new(tally); @@ -947,7 +973,7 @@ where .filter_map(move |msg| { async move { let proposal = Block::from_bytes(&msg.data); - if proposal.header().id == msg.proposal { + if proposal.header().id() == msg.proposal { // TODO: Leader is faulty? what should we do? Some(proposal) } else { @@ -967,9 +993,9 @@ where E: std::error::Error, Fl: FnOnce(O::LeaderSelection) -> Result, >( - carnot: Carnot, + carnot: Carnot, leader_selection_f: Fl, - ) -> Carnot { + ) -> Carnot { carnot .update_overlay(|overlay| overlay.update_leader_selection(leader_selection_f)) .unwrap() @@ -979,9 +1005,9 @@ where E: std::error::Error, Fm: FnOnce(O::CommitteeMembership) -> Result, >( - carnot: Carnot, + carnot: Carnot, committee_membership_f: Fm, - ) -> Carnot { + ) -> Carnot { carnot .update_overlay(|overlay| overlay.update_committees(committee_membership_f)) .unwrap() @@ -993,10 +1019,10 @@ where Fl: FnOnce(O::LeaderSelection) -> Result, Fm: FnOnce(O::CommitteeMembership) -> Result, >( - carnot: Carnot, + carnot: Carnot, leader_selection_f: Fl, committee_membership_f: Fm, - ) -> Carnot { + ) -> Carnot { let carnot = Self::update_leader_selection(carnot, leader_selection_f); Self::update_committee_membership(carnot, committee_membership_f) } @@ -1048,9 +1074,9 @@ where Output::BroadcastProposal { proposal } => { adapter .broadcast(NetworkMessage::Proposal(ProposalMsg { - proposal: proposal.header().id, + proposal: proposal.header().id(), data: proposal.as_bytes().to_vec().into_boxed_slice(), - view: proposal.header().view, + view: proposal.header().carnot().view(), })) .await; } @@ -1074,7 +1100,7 @@ enum Event { #[allow(dead_code)] Approve { qc: Qc, - block: carnot_engine::Block, + block: carnot_engine::Block, votes: HashSet, }, LocalTimeout { @@ -1105,9 +1131,9 @@ pub enum ConsensusMsg { /// 'to' (the oldest block). If 'from' is None, the tip of the chain is used as a starting /// point. If 'to' is None or not known to the node, the genesis block is used as an end point. GetBlocks { - from: Option, - to: Option, - tx: Sender>, + from: Option, + to: Option, + tx: Sender>>, }, } @@ -1121,19 +1147,19 @@ pub struct CarnotInfo { pub current_view: View, pub highest_voted_view: View, pub local_high_qc: StandardQc, - pub tip: carnot_engine::Block, + pub tip: carnot_engine::Block, pub last_view_timeout_qc: Option, - pub last_committed_block: carnot_engine::Block, + pub last_committed_block: carnot_engine::Block, } async fn get_mempool_contents( - mempool: OutboundRelay>, + mempool: OutboundRelay>, ) -> Result + Send>, tokio::sync::oneshot::error::RecvError> { let (reply_channel, rx) = tokio::sync::oneshot::channel(); mempool .send(MempoolMsg::View { - ancestor_hint: BlockId::zeros(), + ancestor_hint: [0; 32].into(), reply_channel, }) .await @@ -1143,9 +1169,9 @@ async fn get_mempool_contents( } async fn mark_in_block( - mempool: OutboundRelay>, + mempool: OutboundRelay>, ids: impl Iterator, - block: BlockId, + block: HeaderId, ) { mempool .send(MempoolMsg::MarkInBlock { @@ -1170,14 +1196,14 @@ mod tests { highest_voted_view: View::new(-1), local_high_qc: StandardQc { view: View::new(0), - id: BlockId::zeros(), + id: [0; 32].into(), }, tip: Block { - id: BlockId::zeros(), + id: [0; 32].into(), view: View::new(0), parent_qc: Qc::Standard(StandardQc { view: View::new(0), - id: BlockId::zeros(), + id: [0; 32].into(), }), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), @@ -1185,11 +1211,11 @@ mod tests { }, last_view_timeout_qc: None, last_committed_block: Block { - id: BlockId::zeros(), + id: [0; 32].into(), view: View::new(0), parent_qc: Qc::Standard(StandardQc { view: View::new(0), - id: BlockId::zeros(), + id: [0; 32].into(), }), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), diff --git a/nomos-services/carnot-consensus/src/network/adapters/libp2p.rs b/nomos-services/carnot-consensus/src/network/adapters/libp2p.rs index e9dc5a78..46717bcc 100644 --- a/nomos-services/carnot-consensus/src/network/adapters/libp2p.rs +++ b/nomos-services/carnot-consensus/src/network/adapters/libp2p.rs @@ -13,8 +13,8 @@ use crate::network::{ messages::{NetworkMessage, ProposalMsg, VoteMsg}, BoxedStream, NetworkAdapter, }; -use carnot_engine::{BlockId, Committee, CommitteeId, View}; -use nomos_core::wire; +use carnot_engine::{Committee, CommitteeId, View}; +use nomos_core::{header::HeaderId, wire}; use nomos_network::{ backends::libp2p::{Command, Event, EventKind, Libp2p}, NetworkMsg, NetworkService, @@ -94,7 +94,7 @@ impl Spsc { #[derive(Default)] struct Messages { proposal_chunks: Spsc, - votes: HashMap>>, + votes: HashMap>>, new_views: HashMap>, timeouts: HashMap>, timeout_qcs: Spsc, @@ -153,7 +153,7 @@ impl MessageCache { &self, view: View, committee_id: CommitteeId, - proposal_id: BlockId, + proposal_id: HeaderId, ) -> Option> { self.cache.lock().unwrap().get_mut(&view).map(|m| { m.votes @@ -264,7 +264,7 @@ impl NetworkAdapter for Libp2pAdapter { } } NetworkMessage::Vote(msg) => { - tracing::debug!("received vote"); + tracing::debug!("received vote {:?}", msg); let mut cache = cache.cache.lock().unwrap(); let view = msg.vote.view; if let Some(messages) = cache.get_mut(&view) { @@ -356,7 +356,7 @@ impl NetworkAdapter for Libp2pAdapter { &self, committee: &Committee, view: View, - proposal_id: BlockId, + proposal_id: HeaderId, ) -> BoxedStream { self.message_cache .get_votes(view, committee.id::(), proposal_id) diff --git a/nomos-services/carnot-consensus/src/network/messages.rs b/nomos-services/carnot-consensus/src/network/messages.rs index c464e410..35dade01 100644 --- a/nomos-services/carnot-consensus/src/network/messages.rs +++ b/nomos-services/carnot-consensus/src/network/messages.rs @@ -3,13 +3,15 @@ use serde::{Deserialize, Serialize}; // internal use crate::NodeId; -use carnot_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote}; +use crate::{NewView, Qc, Timeout, TimeoutQc, Vote}; +use carnot_engine::View; +use nomos_core::header::HeaderId; use nomos_core::wire; #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] pub struct ProposalMsg { pub data: Box<[u8]>, - pub proposal: BlockId, + pub proposal: HeaderId, pub view: View, } @@ -84,7 +86,7 @@ impl TimeoutQcMsg { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum NetworkMessage { Timeout(TimeoutMsg), TimeoutQc(TimeoutQcMsg), diff --git a/nomos-services/carnot-consensus/src/network/mod.rs b/nomos-services/carnot-consensus/src/network/mod.rs index db48c1ea..c2396cf8 100644 --- a/nomos-services/carnot-consensus/src/network/mod.rs +++ b/nomos-services/carnot-consensus/src/network/mod.rs @@ -4,11 +4,12 @@ pub mod messages; // std // crates use futures::Stream; +use nomos_core::header::HeaderId; // internal use crate::network::messages::{ NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, }; -use carnot_engine::{BlockId, Committee, View}; +use carnot_engine::{Committee, View}; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; @@ -33,7 +34,7 @@ pub trait NetworkAdapter { &self, committee: &Committee, view: View, - proposal_id: BlockId, + proposal_id: HeaderId, ) -> BoxedStream; async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream; async fn send(&self, message: NetworkMessage, committee: &Committee); diff --git a/nomos-services/carnot-consensus/src/tally/happy.rs b/nomos-services/carnot-consensus/src/tally/happy.rs index f1322d4b..5a3f03e5 100644 --- a/nomos-services/carnot-consensus/src/tally/happy.rs +++ b/nomos-services/carnot-consensus/src/tally/happy.rs @@ -4,15 +4,17 @@ use std::collections::HashSet; // crates use futures::{Stream, StreamExt}; +use nomos_core::header::HeaderId; // internal use super::CarnotTallySettings; use crate::network::messages::VoteMsg; -use carnot_engine::{Block, Qc, StandardQc, Vote}; +use crate::{Qc, StandardQc, Vote}; use nomos_core::crypto::PublicKey; use nomos_core::vote::Tally; pub type NodeId = PublicKey; +type Block = carnot_engine::Block; #[derive(thiserror::Error, Debug)] pub enum CarnotTallyError { @@ -82,7 +84,6 @@ impl Tally for CarnotTally { )); } } - Err(CarnotTallyError::StreamEnded) } } diff --git a/nomos-services/carnot-consensus/src/tally/timeout.rs b/nomos-services/carnot-consensus/src/tally/timeout.rs index b0ba4096..17f64bbc 100644 --- a/nomos-services/carnot-consensus/src/tally/timeout.rs +++ b/nomos-services/carnot-consensus/src/tally/timeout.rs @@ -5,7 +5,8 @@ use futures::{Stream, StreamExt}; // internal use super::CarnotTallySettings; use crate::network::messages::TimeoutMsg; -use carnot_engine::{Timeout, View}; +use crate::Timeout; +use carnot_engine::View; use nomos_core::vote::Tally; #[derive(Clone, Debug)] diff --git a/nomos-services/carnot-consensus/src/tally/unhappy.rs b/nomos-services/carnot-consensus/src/tally/unhappy.rs index 08540a6c..cf25a083 100644 --- a/nomos-services/carnot-consensus/src/tally/unhappy.rs +++ b/nomos-services/carnot-consensus/src/tally/unhappy.rs @@ -6,9 +6,10 @@ use serde::{Deserialize, Serialize}; // internal use super::CarnotTallySettings; use crate::network::messages::NewViewMsg; -use carnot_engine::{NewView, TimeoutQc}; use nomos_core::vote::Tally; +use crate::{NewView, TimeoutQc}; + #[derive(thiserror::Error, Debug)] pub enum NewViewTallyError { #[error("Did not receive enough votes")] diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index 5024a640..1a4685f2 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -1,5 +1,5 @@ use nomos_core::{ - block::BlockId, + header::HeaderId, tx::mock::{MockTransaction, MockTxId}, }; use nomos_log::{Logger, LoggerSettings}; @@ -23,7 +23,7 @@ struct MockPoolNode { mockpool: ServiceHandle< MempoolService< MockAdapter, - MockPool, MockTxId>, + MockPool, MockTxId>, Transaction, >, >, @@ -80,7 +80,7 @@ fn test_mockmempool() { let network = app.handle().relay::>(); let mempool = app.handle().relay::, MockTxId>, + MockPool, MockTxId>, Transaction, >>(); @@ -102,7 +102,7 @@ fn test_mockmempool() { let (mtx, mrx) = tokio::sync::oneshot::channel(); mempool_outbound .send(MempoolMsg::View { - ancestor_hint: BlockId::default(), + ancestor_hint: [0; 32].into(), reply_channel: mtx, }) .await diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index df0d4a8d..0084d667 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -6,9 +6,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates use anyhow::Ok; use carnot_engine::overlay::RandomBeaconState; -use carnot_engine::{Block, View}; +use carnot_engine::{Block, LeaderProof, View}; use clap::Parser; use crossbeam::channel; +use nomos_core::block::builder::BlockBuilder; use parking_lot::Mutex; use rand::rngs::SmallRng; use rand::seq::SliceRandom; @@ -129,15 +130,13 @@ impl SimulationApp { let leader = nodes.first().copied().unwrap(); // FIXME: Actually use a proposer and a key to generate random beacon state - let genesis = nomos_core::block::Block::new( - View::new(0), - Block::genesis().parent_qc, - [].into_iter(), - [].into_iter(), - leader, + let genesis = >::empty_carnot( RandomBeaconState::Sad { entropy: Box::new([0; 32]), }, + View::new(0), + Block::genesis([0; 32].into()).parent_qc, + LeaderProof::LeaderId { leader_id: leader }, ); let mut rng = SmallRng::seed_from_u64(seed); overlay_node::to_overlay_node( diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs index 6e8bdb4f..a700bc83 100644 --- a/simulations/src/node/carnot/event_builder.rs +++ b/simulations/src/node/carnot/event_builder.rs @@ -1,10 +1,10 @@ use crate::node::carnot::{messages::CarnotMessage, tally::Tally, timeout::TimeoutHandler}; +use crate::node::carnot::{AggregateQc, Carnot, NewView, Qc, StandardQc, Timeout, TimeoutQc, Vote}; use carnot_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}; use carnot_consensus::NodeId; -use carnot_engine::{ - AggregateQc, Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, -}; +use carnot_engine::{Overlay, View}; use nomos_core::block::Block; +use nomos_core::header::HeaderId; use std::collections::HashSet; use std::hash::Hash; use std::time::Duration; @@ -97,8 +97,8 @@ impl EventBuilder { tracing::info!( node=%self.id, current_view = %engine.current_view(), - block_view=%block.header().view, - block=?block.header().id, + block_view=%block.header().carnot().view(), + block=?block.header().id(), parent_block=?block.header().parent(), "receive proposal message", ); @@ -236,7 +236,7 @@ pub enum Event { #[allow(dead_code)] Approve { qc: Qc, - block: carnot_engine::Block, + block: carnot_engine::Block, votes: HashSet, }, ProposeBlock { diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 37edd74c..91b539ce 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -4,6 +4,8 @@ mod event_builder; mod message_cache; pub mod messages; mod state; +use nomos_core::block::builder::BlockBuilder; +use nomos_core::header::HeaderId; pub use state::*; mod serde_util; mod tally; @@ -36,9 +38,18 @@ use carnot_consensus::{ network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}, }; use carnot_engine::overlay::RandomBeaconState; -use carnot_engine::{ - Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote, -}; +use carnot_engine::{Committee, LeaderProof, Overlay, View}; + +type Block = carnot_engine::Block; +type AggregateQc = carnot_engine::AggregateQc; +type Carnot = carnot_engine::Carnot; +type Payload = carnot_engine::Payload; +type TimeoutQc = carnot_engine::TimeoutQc; +type Vote = carnot_engine::Vote; +type Qc = carnot_engine::Qc; +type StandardQc = carnot_engine::StandardQc; +type NewView = carnot_engine::NewView; +type Timeout = carnot_engine::Timeout; static RECORD_SETTINGS: std::sync::OnceLock> = std::sync::OnceLock::new(); @@ -95,7 +106,7 @@ impl< rng: &mut R, ) -> Self { let overlay = O::new(overlay_settings); - let engine = Carnot::from_genesis(id, genesis.header().clone(), overlay); + let engine = Carnot::from_genesis(id, genesis.header().carnot().to_carnot_block(), overlay); let state = CarnotState::from(&engine); let timeout = settings.timeout; RECORD_SETTINGS.get_or_init(|| settings.record_settings.clone()); @@ -179,8 +190,8 @@ impl< self.network_interface .broadcast(CarnotMessage::Proposal(ProposalMsg { data: proposal.as_bytes().to_vec().into(), - proposal: proposal.header().id, - view: proposal.header().view, + proposal: proposal.header().id(), + view: proposal.header().carnot().view(), })) } } @@ -195,12 +206,15 @@ impl< node=%self.id, last_committed_view=%self.engine.latest_committed_view(), current_view = %current_view, - block_view = %block.header().view, - block = %block.header().id, + block_view = %block.header().carnot().view(), + block = %block.header().id(), parent_block=%block.header().parent(), "receive block proposal", ); - match self.engine.receive_block(block.header().clone()) { + match self + .engine + .receive_block(block.header().carnot().to_carnot_block()) + { Ok(mut new) => { if self.engine.current_view() != new.current_view() { new = Self::update_overlay_with_block(new, &block); @@ -211,7 +225,7 @@ impl< tracing::error!( node = %self.id, current_view = %self.engine.current_view(), - block_view = %block.header().view, block = %block.header().id, + block_view = %block.header().carnot().view(), block = %block.header().id(), "receive block proposal, but is invalid", ); } @@ -230,7 +244,7 @@ impl< to, payload: Payload::Vote(Vote { view: self.engine.current_view(), - block: block.header().id, + block: block.header().id(), }), })) } @@ -265,13 +279,13 @@ impl< } Event::ProposeBlock { qc } => { output = Some(Output::BroadcastProposal { - proposal: nomos_core::block::Block::new( - qc.view().next(), - qc.clone(), - [].into_iter(), - [].into_iter(), - self.id, + proposal: >::empty_carnot( RandomBeaconState::generate_happy(qc.view().next(), &self.random_beacon_pk), + qc.view().next(), + qc, + LeaderProof::LeaderId { + leader_id: [0; 32].into(), + }, ), }); } @@ -440,7 +454,7 @@ impl< #[derive(Debug)] #[allow(clippy::large_enum_variant)] enum Output { - Send(carnot_engine::Send), + Send(carnot_engine::Send), BroadcastTimeoutQc { timeout_qc: TimeoutQc, }, diff --git a/simulations/src/node/carnot/serde_util.rs b/simulations/src/node/carnot/serde_util.rs index 49d79e70..4523707b 100644 --- a/simulations/src/node/carnot/serde_util.rs +++ b/simulations/src/node/carnot/serde_util.rs @@ -10,7 +10,8 @@ use self::{ standard_qc::StandardQcHelper, timeout_qc::TimeoutQcHelper, }; -use carnot_engine::{AggregateQc, Block, BlockId, Committee, Qc, StandardQc, TimeoutQc, View}; +use crate::node::carnot::{AggregateQc, Block, Committee, Qc, StandardQc, TimeoutQc}; +use carnot_engine::View; const NODE_ID: &str = "node_id"; const CURRENT_VIEW: &str = "current_view"; @@ -238,16 +239,24 @@ pub(crate) mod timeout_qc { } pub(crate) mod serde_id { - use carnot_engine::{BlockId, NodeId}; + use carnot_engine::NodeId; + use nomos_core::header::HeaderId; use super::*; #[derive(Serialize)] - pub(crate) struct BlockIdHelper<'a>(#[serde(with = "serde_array32")] &'a [u8; 32]); + pub(crate) struct BlockIdHelper<'a> { + #[serde(with = "serde_array32")] + header: [u8; 32], + _marker: std::marker::PhantomData<&'a HeaderId>, + } - impl<'a> From<&'a BlockId> for BlockIdHelper<'a> { - fn from(val: &'a BlockId) -> Self { - Self(val.into()) + impl<'a> From<&'a HeaderId> for BlockIdHelper<'a> { + fn from(val: &'a HeaderId) -> Self { + Self { + header: (*val).into(), + _marker: std::marker::PhantomData, + } } } diff --git a/simulations/src/node/carnot/serde_util/csv.rs b/simulations/src/node/carnot/serde_util/csv.rs index 4088e2f2..437dc746 100644 --- a/simulations/src/node/carnot/serde_util/csv.rs +++ b/simulations/src/node/carnot/serde_util/csv.rs @@ -1,4 +1,5 @@ use super::*; +use nomos_core::header::HeaderId; use serde_block::BlockHelper; serializer!(CarnotStateCsvSerializer); @@ -76,10 +77,10 @@ impl<'a> From<&'a StandardQc> for LocalHighQcHelper<'a> { } } -struct SafeBlocksHelper<'a>(&'a HashMap); +struct SafeBlocksHelper<'a>(&'a HashMap); -impl<'a> From<&'a HashMap> for SafeBlocksHelper<'a> { - fn from(val: &'a HashMap) -> Self { +impl<'a> From<&'a HashMap> for SafeBlocksHelper<'a> { + fn from(val: &'a HashMap) -> Self { Self(val) } } @@ -142,10 +143,10 @@ impl<'a> Serialize for CommitteesHelper<'a> { } } -struct CommittedBlockHelper<'a>(&'a [BlockId]); +struct CommittedBlockHelper<'a>(&'a [HeaderId]); -impl<'a> From<&'a [BlockId]> for CommittedBlockHelper<'a> { - fn from(val: &'a [BlockId]) -> Self { +impl<'a> From<&'a [HeaderId]> for CommittedBlockHelper<'a> { + fn from(val: &'a [HeaderId]) -> Self { Self(val) } } diff --git a/simulations/src/node/carnot/serde_util/json.rs b/simulations/src/node/carnot/serde_util/json.rs index 0a3bd8df..3241cfbc 100644 --- a/simulations/src/node/carnot/serde_util/json.rs +++ b/simulations/src/node/carnot/serde_util/json.rs @@ -1,4 +1,5 @@ use super::*; +use nomos_core::header::HeaderId; use serde_block::BlockHelper; serializer!(CarnotStateJsonSerializer); @@ -50,10 +51,10 @@ pub(crate) mod serde_block { } } -struct SafeBlocksHelper<'a>(&'a HashMap); +struct SafeBlocksHelper<'a>(&'a HashMap); -impl<'a> From<&'a HashMap> for SafeBlocksHelper<'a> { - fn from(val: &'a HashMap) -> Self { +impl<'a> From<&'a HashMap> for SafeBlocksHelper<'a> { + fn from(val: &'a HashMap) -> Self { Self(val) } } @@ -115,10 +116,10 @@ impl<'a> Serialize for CommitteesHelper<'a> { } } -struct CommittedBlockHelper<'a>(&'a [BlockId]); +struct CommittedBlockHelper<'a>(&'a [HeaderId]); -impl<'a> From<&'a [BlockId]> for CommittedBlockHelper<'a> { - fn from(val: &'a [BlockId]) -> Self { +impl<'a> From<&'a [HeaderId]> for CommittedBlockHelper<'a> { + fn from(val: &'a [HeaderId]) -> Self { Self(val) } } diff --git a/simulations/src/node/carnot/state.rs b/simulations/src/node/carnot/state.rs index 9095af86..712ae94e 100644 --- a/simulations/src/node/carnot/state.rs +++ b/simulations/src/node/carnot/state.rs @@ -8,14 +8,14 @@ pub struct CarnotState { pub(crate) current_view: View, pub(crate) highest_voted_view: View, pub(crate) local_high_qc: StandardQc, - pub(crate) safe_blocks: HashMap, + pub(crate) safe_blocks: HashMap, pub(crate) last_view_timeout_qc: Option, pub(crate) latest_committed_block: Block, pub(crate) latest_committed_view: View, pub(crate) root_committee: Committee, pub(crate) parent_committee: Option, pub(crate) child_committees: Vec, - pub(crate) committed_blocks: Vec, + pub(crate) committed_blocks: Vec, pub(super) step_duration: Duration, /// Step id for this state diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 5803dc9f..2bd3ccbe 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -7,9 +7,9 @@ use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node, SpawnConfig}; use carnot_consensus::{CarnotInfo, CarnotSettings}; use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings}; -use carnot_engine::{BlockId, NodeId, Overlay}; +use carnot_engine::{NodeId, Overlay}; use full_replication::Certificate; -use nomos_core::block::Block; +use nomos_core::{block::Block, header::HeaderId}; use nomos_libp2p::{Multiaddr, Swarm}; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; @@ -112,7 +112,7 @@ impl NomosNode { format!("http://{}", self.addr).parse().unwrap() } - pub async fn get_block(&self, id: BlockId) -> Option> { + pub async fn get_block(&self, id: HeaderId) -> Option> { CLIENT .post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) .header("Content-Type", "application/json") @@ -146,9 +146,9 @@ impl NomosNode { pub async fn get_blocks_info( &self, - from: Option, - to: Option, - ) -> Vec { + from: Option, + to: Option, + ) -> Vec> { let mut req = CLIENT.get(format!("http://{}/{}", self.addr, GET_BLOCKS_INFO)); if let Some(from) = from { @@ -162,7 +162,7 @@ impl NomosNode { req.send() .await .unwrap() - .json::>() + .json::>>() .await .unwrap() } diff --git a/tests/src/tests/unhappy.rs b/tests/src/tests/unhappy.rs index fe35a47a..b2c32d9f 100644 --- a/tests/src/tests/unhappy.rs +++ b/tests/src/tests/unhappy.rs @@ -1,13 +1,17 @@ use carnot_consensus::CarnotInfo; -use carnot_engine::{Block, NodeId, TimeoutQc, View}; +use carnot_engine::{NodeId, View}; use fraction::Fraction; use futures::stream::{self, StreamExt}; +use nomos_core::header::HeaderId; use std::{collections::HashSet, time::Duration}; use tests::{adjust_timeout, ConsensusConfig, Node, NomosNode, SpawnConfig}; const TARGET_VIEW: View = View::new(20); const DUMMY_NODE_ID: NodeId = NodeId::new([0u8; 32]); +type Block = carnot_engine::Block; +type TimeoutQc = carnot_engine::TimeoutQc; + #[tokio::test] async fn ten_nodes_one_down() { let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Chain {