diff --git a/consensus-engine/Cargo.toml b/consensus-engine/Cargo.toml index 02623a4e..9c3938eb 100644 --- a/consensus-engine/Cargo.toml +++ b/consensus-engine/Cargo.toml @@ -8,6 +8,7 @@ serde = { version = "1.0", features = ["derive"], optional = true } blake2 = "0.10" bls-signatures = "0.14" digest = "0.10" +derive_more = "0.99" integer-encoding = "3" sha2 = "0.10" rand = "0.8" diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 26113b7f..4ef9624a 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -19,10 +19,10 @@ pub struct Carnot { impl Carnot { pub fn from_genesis(id: NodeId, genesis_block: Block, overlay: O) -> Self { Self { - current_view: 0, + current_view: View(0), local_high_qc: StandardQc::genesis(), id, - highest_voted_view: -1, + highest_voted_view: View(-1), last_view_timeout_qc: None, overlay, safe_blocks: [(genesis_block.id, genesis_block)].into(), @@ -63,7 +63,8 @@ impl Carnot { match block.leader_proof { LeaderProof::LeaderId { leader_id } => { // This only accepts blocks from the leader of current_view + 1 - if leader_id != self.overlay.next_leader() || block.view != self.current_view() + 1 + if leader_id != self.overlay.next_leader() + || block.view != self.current_view().next() { return Err(()); } @@ -103,7 +104,7 @@ impl Carnot { new_state.update_high_qc(Qc::Standard(timeout_qc.high_qc().clone())); new_state.update_timeout_qc(timeout_qc.clone()); - new_state.current_view = timeout_qc.view() + 1; + new_state.current_view = timeout_qc.view().next(); new_state.overlay.rebuild(timeout_qc); new_state @@ -160,14 +161,14 @@ impl Carnot { timeout_qc: TimeoutQc, new_views: HashSet, ) -> (Self, Send) { - let new_view = timeout_qc.view() + 1; + let new_view = timeout_qc.view().next(); assert!( new_view > self .last_view_timeout_qc .as_ref() .map(|qc| qc.view()) - .unwrap_or(0), + .unwrap_or(View(0)), "can't vote for a new view not bigger than the last timeout_qc" ); assert_eq!( @@ -244,7 +245,7 @@ impl Carnot { } fn block_is_safe(&self, block: Block) -> bool { - block.view >= self.current_view && block.view == block.parent_qc.view() + 1 + block.view >= self.current_view && block.view == block.parent_qc.view().next() } fn update_high_qc(&mut self, qc: Qc) { @@ -259,7 +260,7 @@ impl Carnot { _ => {} } if qc_view == self.current_view { - self.current_view += 1; + self.current_view += View(1); } } @@ -284,7 +285,7 @@ impl Carnot { } pub fn genesis_block(&self) -> Block { - self.blocks_in_view(0)[0].clone() + self.blocks_in_view(View(0))[0].clone() } // Returns the id of the grandparent block if it can be committed or None otherwise @@ -292,7 +293,7 @@ impl Carnot { let parent = self.safe_blocks.get(&block.parent())?; let grandparent = self.safe_blocks.get(&parent.parent())?; - if parent.view == grandparent.view + 1 + if parent.view == grandparent.view.next() && matches!(parent.parent_qc, Qc::Standard { .. }) && matches!(grandparent.parent_qc, Qc::Standard { .. }) { @@ -302,8 +303,8 @@ impl Carnot { } pub fn latest_committed_block(&self) -> Block { - for view in (0..=self.current_view).rev() { - for block in self.blocks_in_view(view) { + for view in (0..=self.current_view.0).rev() { + for block in self.blocks_in_view(View(view)) { if let Some(block) = self.can_commit_grandparent(block) { return block; } @@ -405,7 +406,7 @@ mod test { Carnot::from_genesis( *nodes.first().unwrap(), Block { - view: 0, + view: View(0), id: BlockId::zeros(), parent_qc: Qc::Standard(StandardQc::genesis()), leader_proof: LeaderProof::LeaderId { @@ -425,7 +426,7 @@ mod test { next_id.0[0] += 1; Block { - view: block.view + 1, + view: block.view.next(), id: next_id, parent_qc: Qc::Standard(StandardQc { view: block.view, @@ -455,12 +456,12 @@ mod test { // Ensure that all states are initialized correctly with the genesis block. fn from_genesis() { let engine = init(vec![NodeId::new([0; 32])]); - assert_eq!(engine.current_view(), 0); - assert_eq!(engine.highest_voted_view, -1); + assert_eq!(engine.current_view(), View(0)); + assert_eq!(engine.highest_voted_view, View(-1)); let genesis = engine.genesis_block(); assert_eq!(engine.high_qc(), genesis.parent_qc.high_qc()); - assert_eq!(engine.blocks_in_view(0), vec![genesis.clone()]); + assert_eq!(engine.blocks_in_view(View(0)), vec![genesis.clone()]); assert_eq!(engine.last_view_timeout_qc(), None); assert_eq!(engine.committed_blocks(), vec![genesis.id]); } @@ -489,7 +490,7 @@ mod test { let mut block2 = next_block(&engine, &block1); block2.id = block1.id; engine = engine.receive_block(block2).unwrap(); - assert_eq!(engine.blocks_in_view(1), vec![block1]); + assert_eq!(engine.blocks_in_view(View(1)), vec![block1]); } #[test] @@ -500,7 +501,7 @@ mod test { let mut parent_block_id = engine.genesis_block().id; parent_block_id.0[0] += 1; // generate an unknown parent block ID let block = Block { - view: engine.current_view() + 1, + view: engine.current_view().next(), id: BlockId::new([1; 32]), parent_qc: Qc::Standard(StandardQc { view: engine.current_view(), @@ -528,11 +529,11 @@ mod test { engine = update_leader_selection(&engine); let mut unsafe_block = next_block(&engine, &block); - unsafe_block.view = engine.current_view() - 1; // UNSAFE: view < engine.current_view + unsafe_block.view = engine.current_view().prev(); // UNSAFE: view < engine.current_view assert!(engine.receive_block(unsafe_block).is_err()); let mut unsafe_block = next_block(&engine, &block); - unsafe_block.view = unsafe_block.parent_qc.view() + 2; // UNSAFE: view != parent_qc.view + 1 + unsafe_block.view = unsafe_block.parent_qc.view() + View(2); // UNSAFE: view != parent_qc.view + 1 assert!(engine.receive_block(unsafe_block).is_err()); } @@ -596,15 +597,15 @@ mod test { let block1 = next_block(&engine, &engine.genesis_block()); engine = engine.receive_block(block1.clone()).unwrap(); - assert_eq!(engine.current_view(), 1); + assert_eq!(engine.current_view(), View(1)); engine = update_leader_selection(&engine); // a future block should be rejected let future_block = Block { id: BlockId::new([10; 32]), - view: 11, // a future view + view: View(11), // a future view parent_qc: Qc::Aggregated(AggregateQc { - view: 10, + view: View(10), high_qc: StandardQc { // a known parent block id: block1.id, @@ -686,16 +687,16 @@ mod test { engine = update_leader_selection(&engine); let (engine, send) = engine.local_timeout(); - assert_eq!(engine.highest_voted_view, 1); // updated from 0 (genesis) to 1 (current_view) + assert_eq!(engine.highest_voted_view, View(1)); // updated from 0 (genesis) to 1 (current_view) assert_eq!( send, Some(Send { to: engine.overlay().root_committee(), payload: Payload::Timeout(Timeout { - view: 1, + view: View(1), sender: NodeId::new([0; 32]), high_qc: StandardQc { - view: 0, // genesis + view: View(0), // genesis id: BlockId::zeros(), }, timeout_qc: None @@ -714,11 +715,11 @@ mod test { let (mut engine, _) = engine.local_timeout(); - assert_eq!(engine.current_view(), 1); + assert_eq!(engine.current_view(), View(1)); let timeout_qc = TimeoutQc::new( - 1, + View(1), StandardQc { - view: 0, // genesis + view: View::new(0), // genesis id: BlockId::zeros(), }, NodeId::new([0; 32]), @@ -726,7 +727,7 @@ mod test { engine = engine.receive_timeout_qc(timeout_qc.clone()); assert_eq!(&engine.local_high_qc, timeout_qc.high_qc()); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc)); - assert_eq!(engine.current_view(), 2); + assert_eq!(engine.current_view(), View(2)); } #[test] @@ -739,11 +740,11 @@ mod test { // before local_timeout occurs - assert_eq!(engine.current_view(), 1); + assert_eq!(engine.current_view(), View(1)); let timeout_qc = TimeoutQc::new( - 1, + View(1), StandardQc { - view: 0, // genesis + view: View(0), // genesis id: BlockId::zeros(), }, NodeId::new([0; 32]), @@ -751,7 +752,7 @@ mod test { engine = engine.receive_timeout_qc(timeout_qc.clone()); assert_eq!(&engine.local_high_qc, timeout_qc.high_qc()); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc)); - assert_eq!(engine.current_view(), 2); + assert_eq!(engine.current_view(), View(2)); } #[test] @@ -766,11 +767,11 @@ mod test { engine = engine.receive_block(block).unwrap(); // received but not approved yet engine = update_leader_selection(&engine); - assert_eq!(engine.current_view(), 1); // still waiting for a QC(view=1) + assert_eq!(engine.current_view(), View(1)); // still waiting for a QC(view=1) let timeout_qc = TimeoutQc::new( - 1, + View(1), StandardQc { - view: 0, // genesis + view: View(0), // genesis id: BlockId::zeros(), }, NodeId::new([0; 32]), @@ -778,20 +779,20 @@ mod test { engine = engine.receive_timeout_qc(timeout_qc.clone()); assert_eq!(&engine.local_high_qc, timeout_qc.high_qc()); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc.clone())); - assert_eq!(engine.current_view(), 2); - assert_eq!(engine.highest_voted_view, -1); // didn't vote on anything yet + assert_eq!(engine.current_view(), View(2)); + assert_eq!(engine.highest_voted_view, View(-1)); // didn't vote on anything yet engine = update_leader_selection(&engine); let (engine, send) = engine.approve_new_view(timeout_qc.clone(), HashSet::new()); assert_eq!(&engine.high_qc(), timeout_qc.high_qc()); - assert_eq!(engine.current_view(), 2); // not changed - assert_eq!(engine.highest_voted_view, 2); + assert_eq!(engine.current_view(), View(2)); // not changed + assert_eq!(engine.highest_voted_view, View(2)); assert_eq!( send, Send { to: vec![engine.overlay().next_leader()].into_iter().collect(), payload: Payload::NewView(NewView { - view: 2, + view: View(2), sender: NodeId::new([0; 32]), timeout_qc: timeout_qc.clone(), high_qc: timeout_qc.high_qc().clone(), @@ -808,11 +809,11 @@ mod test { engine = engine.receive_block(block).unwrap(); // received but not approved yet engine = update_leader_selection(&engine); - assert_eq!(engine.current_view(), 1); + assert_eq!(engine.current_view(), View(1)); let timeout_qc1 = TimeoutQc::new( - 1, + View(1), StandardQc { - view: 0, // genesis + view: View(0), // genesis id: BlockId::zeros(), }, NodeId::new([0; 32]), @@ -823,9 +824,9 @@ mod test { // receiving a timeout_qc2 before approving new_view(timeout_qc1) let timeout_qc2 = TimeoutQc::new( - 2, + View(2), StandardQc { - view: 0, // genesis + view: View(0), // genesis id: BlockId::zeros(), }, NodeId::new([0; 32]), diff --git a/consensus-engine/src/overlay/random_beacon.rs b/consensus-engine/src/overlay/random_beacon.rs index e1648d46..f59b680e 100644 --- a/consensus-engine/src/overlay/random_beacon.rs +++ b/consensus-engine/src/overlay/random_beacon.rs @@ -1,6 +1,5 @@ use crate::types::*; use bls_signatures::{PrivateKey, PublicKey, Serialize, Signature}; -use integer_encoding::VarInt; use rand::{seq::SliceRandom, SeedableRng}; use serde::{Deserialize, Serialize as SerdeSerialize}; use sha2::{Digest, Sha256}; @@ -80,7 +79,7 @@ impl RandomBeaconState { } fn view_to_bytes(view: View) -> Box<[u8]> { - View::encode_var_vec(view).into_boxed_slice() + view.encode_var_vec().into_boxed_slice() } // FIXME: the spec should be clearer on what is the expected behavior, diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs index 1158523e..ab970115 100644 --- a/consensus-engine/src/types.rs +++ b/consensus-engine/src/types.rs @@ -10,8 +10,8 @@ mod node_id; pub use node_id::NodeId; mod block_id; pub use block_id::BlockId; - -pub type View = i64; +mod view; +pub use view::View; /// The way the consensus engine communicates with the rest of the system is by returning /// actions to be performed. @@ -117,8 +117,8 @@ impl Block { pub fn genesis() -> Self { Self { + view: View(0), id: BlockId::zeros(), - view: 0, parent_qc: Qc::Standard(StandardQc::genesis()), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), @@ -144,7 +144,7 @@ pub struct StandardQc { impl StandardQc { pub fn genesis() -> Self { Self { - view: -1, + view: View(-1), id: BlockId::zeros(), } } @@ -197,11 +197,11 @@ mod test { #[test] fn standard_qc() { let standard_qc = StandardQc { - view: 10, + view: View(10), id: BlockId::zeros(), }; let qc = Qc::Standard(standard_qc.clone()); - assert_eq!(qc.view(), 10); + assert_eq!(qc.view(), View(10)); assert_eq!(qc.block(), BlockId::new([0; 32])); assert_eq!(qc.high_qc(), standard_qc); } @@ -209,14 +209,14 @@ mod test { #[test] fn aggregated_qc() { let aggregated_qc = AggregateQc { - view: 20, + view: View(20), high_qc: StandardQc { - view: 10, + view: View(10), id: BlockId::zeros(), }, }; let qc = Qc::Aggregated(aggregated_qc.clone()); - assert_eq!(qc.view(), 20); + assert_eq!(qc.view(), View(20)); assert_eq!(qc.block(), BlockId::new([0; 32])); assert_eq!(qc.high_qc(), aggregated_qc.high_qc); } @@ -224,28 +224,28 @@ mod test { #[test] fn new_timeout_qc() { let timeout_qc = TimeoutQc::new( - 2, + View(2), StandardQc { - view: 1, + view: View(1), id: BlockId::zeros(), }, NodeId::new([0; 32]), ); - assert_eq!(timeout_qc.view(), 2); - assert_eq!(timeout_qc.high_qc().view, 1); + assert_eq!(timeout_qc.view(), View(2)); + assert_eq!(timeout_qc.high_qc().view, View(1)); assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32])); assert_eq!(timeout_qc.sender(), NodeId::new([0; 32])); let timeout_qc = TimeoutQc::new( - 2, + View(2), StandardQc { - view: 2, + view: View(2), id: BlockId::zeros(), }, NodeId::new([0; 32]), ); - assert_eq!(timeout_qc.view(), 2); - assert_eq!(timeout_qc.high_qc().view, 2); + assert_eq!(timeout_qc.view(), View(2)); + assert_eq!(timeout_qc.high_qc().view, View(2)); assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32])); assert_eq!(timeout_qc.sender(), NodeId::new([0; 32])); } @@ -256,9 +256,9 @@ mod test { )] fn new_timeout_qc_panic() { let _ = TimeoutQc::new( - 1, + View(1), StandardQc { - view: 2, + view: View(2), id: BlockId::zeros(), }, NodeId::new([0; 32]), diff --git a/consensus-engine/src/types/view.rs b/consensus-engine/src/types/view.rs new file mode 100644 index 00000000..497f5668 --- /dev/null +++ b/consensus-engine/src/types/view.rs @@ -0,0 +1,82 @@ +use derive_more::{Add, AddAssign, Sub, SubAssign}; + +#[derive( + Default, + Debug, + Copy, + Clone, + Eq, + PartialEq, + Hash, + Ord, + PartialOrd, + Add, + AddAssign, + Sub, + SubAssign, +)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", serde(transparent))] +pub struct View(pub(crate) i64); + +impl View { + pub const ZERO: Self = Self(0); + + pub const fn new(val: i64) -> Self { + Self(val) + } + + pub const fn zero() -> Self { + Self(0) + } + + pub fn encode_var_vec(&self) -> Vec { + use integer_encoding::VarInt; + self.0.encode_var_vec() + } + + pub const fn next(&self) -> Self { + Self(self.0 + 1) + } + + pub const fn prev(&self) -> Self { + Self(self.0 - 1) + } +} + +impl From for View { + fn from(id: i64) -> Self { + Self(id) + } +} + +impl From for i64 { + fn from(id: View) -> Self { + id.0 + } +} + +impl core::fmt::Display for View { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "{}", self.0) + } +} + +// TODO: uncomment this when #[feature(step_trait)] is stabilized +// impl std::iter::Step for View { +// fn steps_between(start: &Self, end: &Self) -> Option { +// if start > end { +// None +// } else { +// Some((end.0 - start.0) as usize) +// } +// } + +// fn forward_checked(start: Self, count: usize) -> Option { +// start.0.checked_add(count as i64).map(View) +// } + +// fn backward_checked(start: Self, count: usize) -> Option { +// start.0.checked_sub(count as i64).map(View) +// } +// } diff --git a/consensus-engine/tests/fuzz/ref_state.rs b/consensus-engine/tests/fuzz/ref_state.rs index 6aeef37d..11d5fb1b 100644 --- a/consensus-engine/tests/fuzz/ref_state.rs +++ b/consensus-engine/tests/fuzz/ref_state.rs @@ -30,7 +30,7 @@ pub struct ViewEntry { const LEADER_PROOF: LeaderProof = LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), }; -const INITIAL_HIGHEST_VOTED_VIEW: View = -1; +const INITIAL_HIGHEST_VOTED_VIEW: View = View::new(-1); const SENDER: NodeId = NodeId::new([0; 32]); impl ReferenceStateMachine for RefState { @@ -41,7 +41,7 @@ impl ReferenceStateMachine for RefState { // Initialize the reference state machine fn init_state() -> BoxedStrategy { let genesis_block = Block { - view: 0, + view: View::new(0), id: BlockId::zeros(), parent_qc: Qc::Standard(StandardQc::genesis()), leader_proof: LEADER_PROOF.clone(), @@ -191,7 +191,7 @@ impl RefState { fn transition_receive_unsafe_block(&self) -> BoxedStrategy { let old_parents = self .chain - .range(..self.current_view() - 1) + .range(..self.current_view().prev()) .flat_map(|(_view, entry)| entry.blocks.iter().cloned()) .collect::>(); @@ -211,7 +211,7 @@ impl RefState { fn transition_approve_block(&self) -> BoxedStrategy { let blocks_not_voted = self .chain - .range(self.highest_voted_view + 1..) + .range(self.highest_voted_view.next()..) .flat_map(|(_view, entry)| entry.blocks.iter().cloned()) .collect::>(); @@ -250,13 +250,13 @@ impl RefState { // Generate a Transition::ReceiveTimeoutQcForRecentView fn transition_receive_timeout_qc_for_recent_view(&self) -> BoxedStrategy { - let current_view = self.current_view(); + let current_view: i64 = self.current_view().into(); let local_high_qc = self.high_qc(); let delta = 3; let blocks_around_local_high_qc = self .chain - .range(local_high_qc.view - delta..=local_high_qc.view + delta) // including past/future QCs + .range(local_high_qc.view - View::new(delta)..=local_high_qc.view + View::new(delta)) // including past/future QCs .flat_map(|(_, entry)| entry.blocks.iter().cloned()) .collect::>(); @@ -268,7 +268,7 @@ impl RefState { (current_view..=current_view + delta) // including future views .prop_map(move |view| { Transition::ReceiveTimeoutQcForRecentView(TimeoutQc::new( - view, + View::new(view), StandardQc { view: block.view, id: block.id, @@ -332,8 +332,8 @@ impl RefState { let current_view = self.current_view(); Just(Transition::ReceiveSafeBlock(Block { + view: current_view.next(), id: BlockId::random(&mut rand::thread_rng()), - view: current_view + 1, parent_qc: Qc::Aggregated(AggregateQc { high_qc: self.high_qc(), view: current_view, @@ -354,7 +354,7 @@ impl RefState { } pub fn new_view_from(timeout_qc: &TimeoutQc) -> View { - timeout_qc.view() + 1 + timeout_qc.view().next() } pub fn high_qc(&self) -> StandardQc { @@ -395,8 +395,8 @@ impl RefState { fn consecutive_block(parent: &Block) -> Block { Block { // use rand because we don't want this to be shrinked by proptest + view: parent.view.next(), id: BlockId::random(&mut rand::thread_rng()), - view: parent.view + 1, parent_qc: Qc::Standard(StandardQc { view: parent.view, id: parent.id, diff --git a/consensus-engine/tests/fuzz/sut.rs b/consensus-engine/tests/fuzz/sut.rs index a3d132be..d914d12e 100644 --- a/consensus-engine/tests/fuzz/sut.rs +++ b/consensus-engine/tests/fuzz/sut.rs @@ -21,7 +21,7 @@ impl ConsensusEngineTest { let engine = Carnot::from_genesis( NodeId::new([0; 32]), Block { - view: 0, + view: View::new(0), id: BlockId::zeros(), parent_qc: Qc::Standard(StandardQc::genesis()), leader_proof: LeaderProof::LeaderId { diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 92c52dae..765ed9e1 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -178,7 +178,7 @@ where let overlay = O::new(overlay_settings); let genesis = consensus_engine::Block { id: BlockId::zeros(), - view: 0, + view: View::new(0), parent_qc: Qc::Standard(StandardQc::genesis()), leader_proof: LeaderProof::LeaderId { leader_id: NodeId::new([0; 32]), @@ -204,7 +204,7 @@ where let genesis_block = carnot.genesis_block(); Self::process_view_change( carnot.clone(), - genesis_block.view - 1, + genesis_block.view.prev(), &mut task_manager, adapter.clone(), timeout, @@ -223,7 +223,7 @@ where if carnot.is_next_leader() { let network_adapter = adapter.clone(); - task_manager.push(genesis_block.view + 1, async move { + task_manager.push(genesis_block.view.next(), async move { let Event::Approve { qc, .. } = Self::gather_votes( network_adapter, leader_committee.clone(), @@ -469,7 +469,7 @@ where participating_nodes: carnot.root_committee(), }; let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views); - let new_view = timeout_qc.view() + 1; + let new_view = timeout_qc.view().next(); if carnot.is_next_leader() { let high_qc = carnot.high_qc(); task_manager.push(new_view, async move { @@ -506,7 +506,7 @@ where participating_nodes: carnot.child_committees().into_iter().flatten().collect(), }; task_manager.push( - timeout_qc.view() + 1, + timeout_qc.view().next(), Self::gather_new_views(adapter, self_committee, timeout_qc.clone(), tally_settings), ); if carnot.current_view() != new_state.current_view() { @@ -524,7 +524,13 @@ where ) -> (Carnot, Option>) { // we might have received a timeout_qc sent by some other node and advanced the view // already, in which case we should ignore the timeout - if carnot.current_view() != timeouts.iter().map(|t| t.view).max().unwrap_or(0) { + if carnot.current_view() + != timeouts + .iter() + .map(|t| t.view) + .max() + .unwrap_or(View::new(0)) + { return (carnot, None); } @@ -569,7 +575,7 @@ where match rx.await { Ok(txs) => { let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key); - let proposal = Block::new(qc.view() + 1, qc, txs, id, beacon); + let proposal = Block::new(qc.view().next(), qc, txs, id, beacon); output = Some(Output::BroadcastProposal { proposal }); } Err(e) => tracing::error!("Could not fetch txs {e}"), @@ -594,8 +600,8 @@ where Event::LocalTimeout { view: current_view } }); task_manager.push( - current_view + 1, - Self::gather_block(adapter.clone(), current_view + 1), + current_view.next(), + Self::gather_block(adapter.clone(), current_view.next()), ); task_manager.push( current_view, @@ -657,7 +663,7 @@ where ) -> Event { let tally = NewViewTally::new(tally); let stream = adapter - .new_view_stream(&committee, timeout_qc.view() + 1) + .new_view_stream(&committee, timeout_qc.view().next()) .await; match tally.tally(timeout_qc.clone(), stream).await { Ok((_qc, new_views)) => Event::NewView { @@ -860,19 +866,19 @@ mod tests { fn serde_carnot_info() { let info = CarnotInfo { id: NodeId::new([0; 32]), - current_view: 1, - highest_voted_view: -1, + current_view: View::new(1), + highest_voted_view: View::new(-1), local_high_qc: StandardQc { - view: 0, + view: View::new(0), id: BlockId::zeros(), }, safe_blocks: HashMap::from([( BlockId::zeros(), Block { id: BlockId::zeros(), - view: 0, + view: View::new(0), parent_qc: Qc::Standard(StandardQc { - view: 0, + view: View::new(0), id: BlockId::zeros(), }), leader_proof: LeaderProof::LeaderId { diff --git a/nomos-services/consensus/src/tally/unhappy.rs b/nomos-services/consensus/src/tally/unhappy.rs index df4ec985..0360ae20 100644 --- a/nomos-services/consensus/src/tally/unhappy.rs +++ b/nomos-services/consensus/src/tally/unhappy.rs @@ -47,7 +47,7 @@ impl Tally for NewViewTally { while let Some(vote) = vote_stream.next().await { // check vote view is valid - if vote.vote.view != timeout_qc.view() + 1 { + if vote.vote.view != timeout_qc.view().next() { continue; } diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index 1b3ec2b5..86da1635 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -8,7 +8,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; // crates use clap::Parser; use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; -use consensus_engine::Block; +use consensus_engine::{Block, View}; use crossbeam::channel; use rand::rngs::SmallRng; use rand::seq::SliceRandom; @@ -93,7 +93,7 @@ impl SimulationApp { }; // FIXME: Actually use a proposer and a key to generate random beacon state let genesis = nomos_core::block::Block::new( - 0, + View::new(0), Block::genesis().parent_qc, [].into_iter(), leader, diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs index 84779894..207bf754 100644 --- a/simulations/src/node/carnot/event_builder.rs +++ b/simulations/src/node/carnot/event_builder.rs @@ -65,7 +65,7 @@ impl EventBuilder { } // only run when the engine is in the genesis view - if engine.highest_voted_view() == -1 + if engine.highest_voted_view() == View::new(-1) && engine.overlay().is_member_of_leaf_committee(self.id) { tracing::info!(node = %self.id, "voting genesis",); @@ -86,8 +86,8 @@ impl EventBuilder { let block = Block::from_bytes(&msg.chunk); tracing::info!( node=%self.id, - current_view = engine.current_view(), - block_view=block.header().view, + current_view = %engine.current_view(), + block_view=%block.header().view, block=?block.header().id, parent_block=?block.header().parent(), "receive proposal message", @@ -119,7 +119,7 @@ impl EventBuilder { }; let Some(qc) = msg.qc.clone() else { - tracing::warn!(node=%self.id, current_view = engine.current_view(), "received vote without QC"); + tracing::warn!(node=%self.id, current_view = %engine.current_view(), "received vote without QC"); continue; }; @@ -140,9 +140,9 @@ impl EventBuilder { tracing::info!( node=%self.id, votes=votes.len(), - current_view = engine.current_view(), - block_view=block.view, - block=?block.id, + current_view = %engine.current_view(), + block_view=%block.view, + block=%block.id, "approve block", ); @@ -198,7 +198,7 @@ impl EventBuilder { events.push(Event::ProposeBlock { qc: Qc::Aggregated(AggregateQc { high_qc, - view: msg_view + 1, + view: msg_view.next(), }), }); } else { diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index b8d8715a..baae6055 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -320,7 +320,8 @@ impl> Node for Car } fn current_view(&self) -> usize { - self.event_builder.current_view as usize + let view: i64 = self.event_builder.current_view.into(); + view as usize } fn state(&self) -> &CarnotState { @@ -338,7 +339,7 @@ impl> Node for Car m.view() == self.engine.current_view() || matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_)) }); - self.message_cache.prune(self.engine.current_view() - 1); + self.message_cache.prune(self.engine.current_view().prev()); self.message_cache.update(other_view_messages); current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view())); @@ -353,11 +354,11 @@ impl> Node for Car let current_view = self.engine.current_view(); tracing::info!( node=%self.id, - last_committed_view=self.engine.latest_committed_view(), - current_view = current_view, - block_view = block.header().view, - block = ?block.header().id, - parent_block=?block.header().parent(), + last_committed_view=%self.engine.latest_committed_view(), + current_view = %current_view, + block_view = %block.header().view, + block = %block.header().id, + parent_block=%block.header().parent(), "receive block proposal", ); match self.engine.receive_block(block.header().clone()) { @@ -374,7 +375,7 @@ impl> Node for Car } } Err(_) => { - tracing::error!(node = %self.id, current_view = self.engine.current_view(), block_view = block.header().view, block = ?block.header().id, "receive block proposal, but is invalid"); + tracing::error!(node = %self.id, current_view = %self.engine.current_view(), block_view = %block.header().view, block = %block.header().id, "receive block proposal, but is invalid"); } } @@ -393,10 +394,10 @@ impl> Node for Car Event::Approve { block, .. } => { tracing::info!( node = %self.id, - current_view = self.engine.current_view(), - block_view = block.view, - block = ?block.id, - parent_block=?block.parent(), + current_view = %self.engine.current_view(), + block_view = %block.view, + block = %block.id, + parent_block=%block.parent(), "receive approve message" ); let (new, out) = self.engine.approve_block(block); @@ -407,12 +408,12 @@ impl> Node for Car Event::ProposeBlock { qc } => { output = vec![Output::BroadcastProposal { proposal: nomos_core::block::Block::new( - qc.view() + 1, + qc.view().next(), qc.clone(), [].into_iter(), self.id, RandomBeaconState::generate_happy( - qc.view() + 1, + qc.view().next(), &self.random_beacon_pk, ), ), @@ -426,8 +427,8 @@ impl> Node for Car } => { tracing::info!( node = %self.id, - current_view = self.engine.current_view(), - timeout_view = timeout_qc.view(), + current_view = %self.engine.current_view(), + timeout_view = %timeout_qc.view(), "receive new view message" ); let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views); @@ -437,8 +438,8 @@ impl> Node for Car Event::TimeoutQc { timeout_qc } => { tracing::info!( node = %self.id, - current_view = self.engine.current_view(), - timeout_view = timeout_qc.view(), + current_view = %self.engine.current_view(), + timeout_view = %timeout_qc.view(), "receive timeout qc message" ); self.engine = self.engine.receive_timeout_qc(timeout_qc.clone()); @@ -467,7 +468,7 @@ impl> Node for Car Event::LocalTimeout => { tracing::info!( node = %self.id, - current_view = self.engine.current_view(), + current_view = %self.engine.current_view(), "receive local timeout message" ); let (new, out) = self.engine.local_timeout(); diff --git a/tests/src/tests/happy.rs b/tests/src/tests/happy.rs index 0fef64d3..72f23fb3 100644 --- a/tests/src/tests/happy.rs +++ b/tests/src/tests/happy.rs @@ -1,10 +1,11 @@ +use consensus_engine::View; use fraction::{Fraction, One}; use futures::stream::{self, StreamExt}; use std::collections::HashSet; use std::time::Duration; use tests::{Node, NomosNode, SpawnConfig}; -const TARGET_VIEW: i64 = 20; +const TARGET_VIEW: View = View::new(20); async fn happy_test(nodes: Vec) { let timeout = std::time::Duration::from_secs(20); diff --git a/tests/src/tests/unhappy.rs b/tests/src/tests/unhappy.rs index 147b70d0..59bb78ff 100644 --- a/tests/src/tests/unhappy.rs +++ b/tests/src/tests/unhappy.rs @@ -1,9 +1,10 @@ +use consensus_engine::View; use fraction::Fraction; use futures::stream::{self, StreamExt}; use std::collections::HashSet; use tests::{Node, NomosNode, SpawnConfig}; -const TARGET_VIEW: i64 = 20; +const TARGET_VIEW: View = View::new(20); #[tokio::test] async fn ten_nodes_one_down() {