1
0
mirror of synced 2025-01-24 06:29:21 +00:00

Finish View wrapper (#254)

* finish View wrapper
This commit is contained in:
Al Liu 2023-07-12 21:30:22 +08:00 committed by GitHub
parent 4745b99996
commit 9467351c10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 219 additions and 127 deletions

View File

@ -8,6 +8,7 @@ serde = { version = "1.0", features = ["derive"], optional = true }
blake2 = "0.10"
bls-signatures = "0.14"
digest = "0.10"
derive_more = "0.99"
integer-encoding = "3"
sha2 = "0.10"
rand = "0.8"

View File

@ -19,10 +19,10 @@ pub struct Carnot<O: Overlay> {
impl<O: Overlay> Carnot<O> {
pub fn from_genesis(id: NodeId, genesis_block: Block, overlay: O) -> Self {
Self {
current_view: 0,
current_view: View(0),
local_high_qc: StandardQc::genesis(),
id,
highest_voted_view: -1,
highest_voted_view: View(-1),
last_view_timeout_qc: None,
overlay,
safe_blocks: [(genesis_block.id, genesis_block)].into(),
@ -63,7 +63,8 @@ impl<O: Overlay> Carnot<O> {
match block.leader_proof {
LeaderProof::LeaderId { leader_id } => {
// This only accepts blocks from the leader of current_view + 1
if leader_id != self.overlay.next_leader() || block.view != self.current_view() + 1
if leader_id != self.overlay.next_leader()
|| block.view != self.current_view().next()
{
return Err(());
}
@ -103,7 +104,7 @@ impl<O: Overlay> Carnot<O> {
new_state.update_high_qc(Qc::Standard(timeout_qc.high_qc().clone()));
new_state.update_timeout_qc(timeout_qc.clone());
new_state.current_view = timeout_qc.view() + 1;
new_state.current_view = timeout_qc.view().next();
new_state.overlay.rebuild(timeout_qc);
new_state
@ -160,14 +161,14 @@ impl<O: Overlay> Carnot<O> {
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
) -> (Self, Send) {
let new_view = timeout_qc.view() + 1;
let new_view = timeout_qc.view().next();
assert!(
new_view
> self
.last_view_timeout_qc
.as_ref()
.map(|qc| qc.view())
.unwrap_or(0),
.unwrap_or(View(0)),
"can't vote for a new view not bigger than the last timeout_qc"
);
assert_eq!(
@ -244,7 +245,7 @@ impl<O: Overlay> Carnot<O> {
}
fn block_is_safe(&self, block: Block) -> bool {
block.view >= self.current_view && block.view == block.parent_qc.view() + 1
block.view >= self.current_view && block.view == block.parent_qc.view().next()
}
fn update_high_qc(&mut self, qc: Qc) {
@ -259,7 +260,7 @@ impl<O: Overlay> Carnot<O> {
_ => {}
}
if qc_view == self.current_view {
self.current_view += 1;
self.current_view += View(1);
}
}
@ -284,7 +285,7 @@ impl<O: Overlay> Carnot<O> {
}
pub fn genesis_block(&self) -> Block {
self.blocks_in_view(0)[0].clone()
self.blocks_in_view(View(0))[0].clone()
}
// Returns the id of the grandparent block if it can be committed or None otherwise
@ -292,7 +293,7 @@ impl<O: Overlay> Carnot<O> {
let parent = self.safe_blocks.get(&block.parent())?;
let grandparent = self.safe_blocks.get(&parent.parent())?;
if parent.view == grandparent.view + 1
if parent.view == grandparent.view.next()
&& matches!(parent.parent_qc, Qc::Standard { .. })
&& matches!(grandparent.parent_qc, Qc::Standard { .. })
{
@ -302,8 +303,8 @@ impl<O: Overlay> Carnot<O> {
}
pub fn latest_committed_block(&self) -> Block {
for view in (0..=self.current_view).rev() {
for block in self.blocks_in_view(view) {
for view in (0..=self.current_view.0).rev() {
for block in self.blocks_in_view(View(view)) {
if let Some(block) = self.can_commit_grandparent(block) {
return block;
}
@ -405,7 +406,7 @@ mod test {
Carnot::from_genesis(
*nodes.first().unwrap(),
Block {
view: 0,
view: View(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LeaderProof::LeaderId {
@ -425,7 +426,7 @@ mod test {
next_id.0[0] += 1;
Block {
view: block.view + 1,
view: block.view.next(),
id: next_id,
parent_qc: Qc::Standard(StandardQc {
view: block.view,
@ -455,12 +456,12 @@ mod test {
// Ensure that all states are initialized correctly with the genesis block.
fn from_genesis() {
let engine = init(vec![NodeId::new([0; 32])]);
assert_eq!(engine.current_view(), 0);
assert_eq!(engine.highest_voted_view, -1);
assert_eq!(engine.current_view(), View(0));
assert_eq!(engine.highest_voted_view, View(-1));
let genesis = engine.genesis_block();
assert_eq!(engine.high_qc(), genesis.parent_qc.high_qc());
assert_eq!(engine.blocks_in_view(0), vec![genesis.clone()]);
assert_eq!(engine.blocks_in_view(View(0)), vec![genesis.clone()]);
assert_eq!(engine.last_view_timeout_qc(), None);
assert_eq!(engine.committed_blocks(), vec![genesis.id]);
}
@ -489,7 +490,7 @@ mod test {
let mut block2 = next_block(&engine, &block1);
block2.id = block1.id;
engine = engine.receive_block(block2).unwrap();
assert_eq!(engine.blocks_in_view(1), vec![block1]);
assert_eq!(engine.blocks_in_view(View(1)), vec![block1]);
}
#[test]
@ -500,7 +501,7 @@ mod test {
let mut parent_block_id = engine.genesis_block().id;
parent_block_id.0[0] += 1; // generate an unknown parent block ID
let block = Block {
view: engine.current_view() + 1,
view: engine.current_view().next(),
id: BlockId::new([1; 32]),
parent_qc: Qc::Standard(StandardQc {
view: engine.current_view(),
@ -528,11 +529,11 @@ mod test {
engine = update_leader_selection(&engine);
let mut unsafe_block = next_block(&engine, &block);
unsafe_block.view = engine.current_view() - 1; // UNSAFE: view < engine.current_view
unsafe_block.view = engine.current_view().prev(); // UNSAFE: view < engine.current_view
assert!(engine.receive_block(unsafe_block).is_err());
let mut unsafe_block = next_block(&engine, &block);
unsafe_block.view = unsafe_block.parent_qc.view() + 2; // UNSAFE: view != parent_qc.view + 1
unsafe_block.view = unsafe_block.parent_qc.view() + View(2); // UNSAFE: view != parent_qc.view + 1
assert!(engine.receive_block(unsafe_block).is_err());
}
@ -596,15 +597,15 @@ mod test {
let block1 = next_block(&engine, &engine.genesis_block());
engine = engine.receive_block(block1.clone()).unwrap();
assert_eq!(engine.current_view(), 1);
assert_eq!(engine.current_view(), View(1));
engine = update_leader_selection(&engine);
// a future block should be rejected
let future_block = Block {
id: BlockId::new([10; 32]),
view: 11, // a future view
view: View(11), // a future view
parent_qc: Qc::Aggregated(AggregateQc {
view: 10,
view: View(10),
high_qc: StandardQc {
// a known parent block
id: block1.id,
@ -686,16 +687,16 @@ mod test {
engine = update_leader_selection(&engine);
let (engine, send) = engine.local_timeout();
assert_eq!(engine.highest_voted_view, 1); // updated from 0 (genesis) to 1 (current_view)
assert_eq!(engine.highest_voted_view, View(1)); // updated from 0 (genesis) to 1 (current_view)
assert_eq!(
send,
Some(Send {
to: engine.overlay().root_committee(),
payload: Payload::Timeout(Timeout {
view: 1,
view: View(1),
sender: NodeId::new([0; 32]),
high_qc: StandardQc {
view: 0, // genesis
view: View(0), // genesis
id: BlockId::zeros(),
},
timeout_qc: None
@ -714,11 +715,11 @@ mod test {
let (mut engine, _) = engine.local_timeout();
assert_eq!(engine.current_view(), 1);
assert_eq!(engine.current_view(), View(1));
let timeout_qc = TimeoutQc::new(
1,
View(1),
StandardQc {
view: 0, // genesis
view: View::new(0), // genesis
id: BlockId::zeros(),
},
NodeId::new([0; 32]),
@ -726,7 +727,7 @@ mod test {
engine = engine.receive_timeout_qc(timeout_qc.clone());
assert_eq!(&engine.local_high_qc, timeout_qc.high_qc());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc));
assert_eq!(engine.current_view(), 2);
assert_eq!(engine.current_view(), View(2));
}
#[test]
@ -739,11 +740,11 @@ mod test {
// before local_timeout occurs
assert_eq!(engine.current_view(), 1);
assert_eq!(engine.current_view(), View(1));
let timeout_qc = TimeoutQc::new(
1,
View(1),
StandardQc {
view: 0, // genesis
view: View(0), // genesis
id: BlockId::zeros(),
},
NodeId::new([0; 32]),
@ -751,7 +752,7 @@ mod test {
engine = engine.receive_timeout_qc(timeout_qc.clone());
assert_eq!(&engine.local_high_qc, timeout_qc.high_qc());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc));
assert_eq!(engine.current_view(), 2);
assert_eq!(engine.current_view(), View(2));
}
#[test]
@ -766,11 +767,11 @@ mod test {
engine = engine.receive_block(block).unwrap(); // received but not approved yet
engine = update_leader_selection(&engine);
assert_eq!(engine.current_view(), 1); // still waiting for a QC(view=1)
assert_eq!(engine.current_view(), View(1)); // still waiting for a QC(view=1)
let timeout_qc = TimeoutQc::new(
1,
View(1),
StandardQc {
view: 0, // genesis
view: View(0), // genesis
id: BlockId::zeros(),
},
NodeId::new([0; 32]),
@ -778,20 +779,20 @@ mod test {
engine = engine.receive_timeout_qc(timeout_qc.clone());
assert_eq!(&engine.local_high_qc, timeout_qc.high_qc());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc.clone()));
assert_eq!(engine.current_view(), 2);
assert_eq!(engine.highest_voted_view, -1); // didn't vote on anything yet
assert_eq!(engine.current_view(), View(2));
assert_eq!(engine.highest_voted_view, View(-1)); // didn't vote on anything yet
engine = update_leader_selection(&engine);
let (engine, send) = engine.approve_new_view(timeout_qc.clone(), HashSet::new());
assert_eq!(&engine.high_qc(), timeout_qc.high_qc());
assert_eq!(engine.current_view(), 2); // not changed
assert_eq!(engine.highest_voted_view, 2);
assert_eq!(engine.current_view(), View(2)); // not changed
assert_eq!(engine.highest_voted_view, View(2));
assert_eq!(
send,
Send {
to: vec![engine.overlay().next_leader()].into_iter().collect(),
payload: Payload::NewView(NewView {
view: 2,
view: View(2),
sender: NodeId::new([0; 32]),
timeout_qc: timeout_qc.clone(),
high_qc: timeout_qc.high_qc().clone(),
@ -808,11 +809,11 @@ mod test {
engine = engine.receive_block(block).unwrap(); // received but not approved yet
engine = update_leader_selection(&engine);
assert_eq!(engine.current_view(), 1);
assert_eq!(engine.current_view(), View(1));
let timeout_qc1 = TimeoutQc::new(
1,
View(1),
StandardQc {
view: 0, // genesis
view: View(0), // genesis
id: BlockId::zeros(),
},
NodeId::new([0; 32]),
@ -823,9 +824,9 @@ mod test {
// receiving a timeout_qc2 before approving new_view(timeout_qc1)
let timeout_qc2 = TimeoutQc::new(
2,
View(2),
StandardQc {
view: 0, // genesis
view: View(0), // genesis
id: BlockId::zeros(),
},
NodeId::new([0; 32]),

View File

@ -1,6 +1,5 @@
use crate::types::*;
use bls_signatures::{PrivateKey, PublicKey, Serialize, Signature};
use integer_encoding::VarInt;
use rand::{seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize as SerdeSerialize};
use sha2::{Digest, Sha256};
@ -80,7 +79,7 @@ impl RandomBeaconState {
}
fn view_to_bytes(view: View) -> Box<[u8]> {
View::encode_var_vec(view).into_boxed_slice()
view.encode_var_vec().into_boxed_slice()
}
// FIXME: the spec should be clearer on what is the expected behavior,

View File

@ -10,8 +10,8 @@ mod node_id;
pub use node_id::NodeId;
mod block_id;
pub use block_id::BlockId;
pub type View = i64;
mod view;
pub use view::View;
/// The way the consensus engine communicates with the rest of the system is by returning
/// actions to be performed.
@ -117,8 +117,8 @@ impl Block {
pub fn genesis() -> Self {
Self {
view: View(0),
id: BlockId::zeros(),
view: 0,
parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
@ -144,7 +144,7 @@ pub struct StandardQc {
impl StandardQc {
pub fn genesis() -> Self {
Self {
view: -1,
view: View(-1),
id: BlockId::zeros(),
}
}
@ -197,11 +197,11 @@ mod test {
#[test]
fn standard_qc() {
let standard_qc = StandardQc {
view: 10,
view: View(10),
id: BlockId::zeros(),
};
let qc = Qc::Standard(standard_qc.clone());
assert_eq!(qc.view(), 10);
assert_eq!(qc.view(), View(10));
assert_eq!(qc.block(), BlockId::new([0; 32]));
assert_eq!(qc.high_qc(), standard_qc);
}
@ -209,14 +209,14 @@ mod test {
#[test]
fn aggregated_qc() {
let aggregated_qc = AggregateQc {
view: 20,
view: View(20),
high_qc: StandardQc {
view: 10,
view: View(10),
id: BlockId::zeros(),
},
};
let qc = Qc::Aggregated(aggregated_qc.clone());
assert_eq!(qc.view(), 20);
assert_eq!(qc.view(), View(20));
assert_eq!(qc.block(), BlockId::new([0; 32]));
assert_eq!(qc.high_qc(), aggregated_qc.high_qc);
}
@ -224,28 +224,28 @@ mod test {
#[test]
fn new_timeout_qc() {
let timeout_qc = TimeoutQc::new(
2,
View(2),
StandardQc {
view: 1,
view: View(1),
id: BlockId::zeros(),
},
NodeId::new([0; 32]),
);
assert_eq!(timeout_qc.view(), 2);
assert_eq!(timeout_qc.high_qc().view, 1);
assert_eq!(timeout_qc.view(), View(2));
assert_eq!(timeout_qc.high_qc().view, View(1));
assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32]));
assert_eq!(timeout_qc.sender(), NodeId::new([0; 32]));
let timeout_qc = TimeoutQc::new(
2,
View(2),
StandardQc {
view: 2,
view: View(2),
id: BlockId::zeros(),
},
NodeId::new([0; 32]),
);
assert_eq!(timeout_qc.view(), 2);
assert_eq!(timeout_qc.high_qc().view, 2);
assert_eq!(timeout_qc.view(), View(2));
assert_eq!(timeout_qc.high_qc().view, View(2));
assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32]));
assert_eq!(timeout_qc.sender(), NodeId::new([0; 32]));
}
@ -256,9 +256,9 @@ mod test {
)]
fn new_timeout_qc_panic() {
let _ = TimeoutQc::new(
1,
View(1),
StandardQc {
view: 2,
view: View(2),
id: BlockId::zeros(),
},
NodeId::new([0; 32]),

View File

@ -0,0 +1,82 @@
use derive_more::{Add, AddAssign, Sub, SubAssign};
#[derive(
Default,
Debug,
Copy,
Clone,
Eq,
PartialEq,
Hash,
Ord,
PartialOrd,
Add,
AddAssign,
Sub,
SubAssign,
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(transparent))]
pub struct View(pub(crate) i64);
impl View {
pub const ZERO: Self = Self(0);
pub const fn new(val: i64) -> Self {
Self(val)
}
pub const fn zero() -> Self {
Self(0)
}
pub fn encode_var_vec(&self) -> Vec<u8> {
use integer_encoding::VarInt;
self.0.encode_var_vec()
}
pub const fn next(&self) -> Self {
Self(self.0 + 1)
}
pub const fn prev(&self) -> Self {
Self(self.0 - 1)
}
}
impl From<i64> for View {
fn from(id: i64) -> Self {
Self(id)
}
}
impl From<View> for i64 {
fn from(id: View) -> Self {
id.0
}
}
impl core::fmt::Display for View {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", self.0)
}
}
// TODO: uncomment this when #[feature(step_trait)] is stabilized
// impl std::iter::Step for View {
// fn steps_between(start: &Self, end: &Self) -> Option<usize> {
// if start > end {
// None
// } else {
// Some((end.0 - start.0) as usize)
// }
// }
// fn forward_checked(start: Self, count: usize) -> Option<Self> {
// start.0.checked_add(count as i64).map(View)
// }
// fn backward_checked(start: Self, count: usize) -> Option<Self> {
// start.0.checked_sub(count as i64).map(View)
// }
// }

View File

@ -30,7 +30,7 @@ pub struct ViewEntry {
const LEADER_PROOF: LeaderProof = LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
};
const INITIAL_HIGHEST_VOTED_VIEW: View = -1;
const INITIAL_HIGHEST_VOTED_VIEW: View = View::new(-1);
const SENDER: NodeId = NodeId::new([0; 32]);
impl ReferenceStateMachine for RefState {
@ -41,7 +41,7 @@ impl ReferenceStateMachine for RefState {
// Initialize the reference state machine
fn init_state() -> BoxedStrategy<Self::State> {
let genesis_block = Block {
view: 0,
view: View::new(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LEADER_PROOF.clone(),
@ -191,7 +191,7 @@ impl RefState {
fn transition_receive_unsafe_block(&self) -> BoxedStrategy<Transition> {
let old_parents = self
.chain
.range(..self.current_view() - 1)
.range(..self.current_view().prev())
.flat_map(|(_view, entry)| entry.blocks.iter().cloned())
.collect::<Vec<Block>>();
@ -211,7 +211,7 @@ impl RefState {
fn transition_approve_block(&self) -> BoxedStrategy<Transition> {
let blocks_not_voted = self
.chain
.range(self.highest_voted_view + 1..)
.range(self.highest_voted_view.next()..)
.flat_map(|(_view, entry)| entry.blocks.iter().cloned())
.collect::<Vec<Block>>();
@ -250,13 +250,13 @@ impl RefState {
// Generate a Transition::ReceiveTimeoutQcForRecentView
fn transition_receive_timeout_qc_for_recent_view(&self) -> BoxedStrategy<Transition> {
let current_view = self.current_view();
let current_view: i64 = self.current_view().into();
let local_high_qc = self.high_qc();
let delta = 3;
let blocks_around_local_high_qc = self
.chain
.range(local_high_qc.view - delta..=local_high_qc.view + delta) // including past/future QCs
.range(local_high_qc.view - View::new(delta)..=local_high_qc.view + View::new(delta)) // including past/future QCs
.flat_map(|(_, entry)| entry.blocks.iter().cloned())
.collect::<Vec<Block>>();
@ -268,7 +268,7 @@ impl RefState {
(current_view..=current_view + delta) // including future views
.prop_map(move |view| {
Transition::ReceiveTimeoutQcForRecentView(TimeoutQc::new(
view,
View::new(view),
StandardQc {
view: block.view,
id: block.id,
@ -332,8 +332,8 @@ impl RefState {
let current_view = self.current_view();
Just(Transition::ReceiveSafeBlock(Block {
view: current_view.next(),
id: BlockId::random(&mut rand::thread_rng()),
view: current_view + 1,
parent_qc: Qc::Aggregated(AggregateQc {
high_qc: self.high_qc(),
view: current_view,
@ -354,7 +354,7 @@ impl RefState {
}
pub fn new_view_from(timeout_qc: &TimeoutQc) -> View {
timeout_qc.view() + 1
timeout_qc.view().next()
}
pub fn high_qc(&self) -> StandardQc {
@ -395,8 +395,8 @@ impl RefState {
fn consecutive_block(parent: &Block) -> Block {
Block {
// use rand because we don't want this to be shrinked by proptest
view: parent.view.next(),
id: BlockId::random(&mut rand::thread_rng()),
view: parent.view + 1,
parent_qc: Qc::Standard(StandardQc {
view: parent.view,
id: parent.id,

View File

@ -21,7 +21,7 @@ impl ConsensusEngineTest {
let engine = Carnot::from_genesis(
NodeId::new([0; 32]),
Block {
view: 0,
view: View::new(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LeaderProof::LeaderId {

View File

@ -178,7 +178,7 @@ where
let overlay = O::new(overlay_settings);
let genesis = consensus_engine::Block {
id: BlockId::zeros(),
view: 0,
view: View::new(0),
parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
@ -204,7 +204,7 @@ where
let genesis_block = carnot.genesis_block();
Self::process_view_change(
carnot.clone(),
genesis_block.view - 1,
genesis_block.view.prev(),
&mut task_manager,
adapter.clone(),
timeout,
@ -223,7 +223,7 @@ where
if carnot.is_next_leader() {
let network_adapter = adapter.clone();
task_manager.push(genesis_block.view + 1, async move {
task_manager.push(genesis_block.view.next(), async move {
let Event::Approve { qc, .. } = Self::gather_votes(
network_adapter,
leader_committee.clone(),
@ -469,7 +469,7 @@ where
participating_nodes: carnot.root_committee(),
};
let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views);
let new_view = timeout_qc.view() + 1;
let new_view = timeout_qc.view().next();
if carnot.is_next_leader() {
let high_qc = carnot.high_qc();
task_manager.push(new_view, async move {
@ -506,7 +506,7 @@ where
participating_nodes: carnot.child_committees().into_iter().flatten().collect(),
};
task_manager.push(
timeout_qc.view() + 1,
timeout_qc.view().next(),
Self::gather_new_views(adapter, self_committee, timeout_qc.clone(), tally_settings),
);
if carnot.current_view() != new_state.current_view() {
@ -524,7 +524,13 @@ where
) -> (Carnot<O>, Option<Output<P::Tx>>) {
// we might have received a timeout_qc sent by some other node and advanced the view
// already, in which case we should ignore the timeout
if carnot.current_view() != timeouts.iter().map(|t| t.view).max().unwrap_or(0) {
if carnot.current_view()
!= timeouts
.iter()
.map(|t| t.view)
.max()
.unwrap_or(View::new(0))
{
return (carnot, None);
}
@ -569,7 +575,7 @@ where
match rx.await {
Ok(txs) => {
let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key);
let proposal = Block::new(qc.view() + 1, qc, txs, id, beacon);
let proposal = Block::new(qc.view().next(), qc, txs, id, beacon);
output = Some(Output::BroadcastProposal { proposal });
}
Err(e) => tracing::error!("Could not fetch txs {e}"),
@ -594,8 +600,8 @@ where
Event::LocalTimeout { view: current_view }
});
task_manager.push(
current_view + 1,
Self::gather_block(adapter.clone(), current_view + 1),
current_view.next(),
Self::gather_block(adapter.clone(), current_view.next()),
);
task_manager.push(
current_view,
@ -657,7 +663,7 @@ where
) -> Event<P::Tx> {
let tally = NewViewTally::new(tally);
let stream = adapter
.new_view_stream(&committee, timeout_qc.view() + 1)
.new_view_stream(&committee, timeout_qc.view().next())
.await;
match tally.tally(timeout_qc.clone(), stream).await {
Ok((_qc, new_views)) => Event::NewView {
@ -860,19 +866,19 @@ mod tests {
fn serde_carnot_info() {
let info = CarnotInfo {
id: NodeId::new([0; 32]),
current_view: 1,
highest_voted_view: -1,
current_view: View::new(1),
highest_voted_view: View::new(-1),
local_high_qc: StandardQc {
view: 0,
view: View::new(0),
id: BlockId::zeros(),
},
safe_blocks: HashMap::from([(
BlockId::zeros(),
Block {
id: BlockId::zeros(),
view: 0,
view: View::new(0),
parent_qc: Qc::Standard(StandardQc {
view: 0,
view: View::new(0),
id: BlockId::zeros(),
}),
leader_proof: LeaderProof::LeaderId {

View File

@ -47,7 +47,7 @@ impl Tally for NewViewTally {
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if vote.vote.view != timeout_qc.view() + 1 {
if vote.vote.view != timeout_qc.view().next() {
continue;
}

View File

@ -8,7 +8,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
// crates
use clap::Parser;
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
use consensus_engine::Block;
use consensus_engine::{Block, View};
use crossbeam::channel;
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
@ -93,7 +93,7 @@ impl SimulationApp {
};
// FIXME: Actually use a proposer and a key to generate random beacon state
let genesis = nomos_core::block::Block::new(
0,
View::new(0),
Block::genesis().parent_qc,
[].into_iter(),
leader,

View File

@ -65,7 +65,7 @@ impl EventBuilder {
}
// only run when the engine is in the genesis view
if engine.highest_voted_view() == -1
if engine.highest_voted_view() == View::new(-1)
&& engine.overlay().is_member_of_leaf_committee(self.id)
{
tracing::info!(node = %self.id, "voting genesis",);
@ -86,8 +86,8 @@ impl EventBuilder {
let block = Block::from_bytes(&msg.chunk);
tracing::info!(
node=%self.id,
current_view = engine.current_view(),
block_view=block.header().view,
current_view = %engine.current_view(),
block_view=%block.header().view,
block=?block.header().id,
parent_block=?block.header().parent(),
"receive proposal message",
@ -119,7 +119,7 @@ impl EventBuilder {
};
let Some(qc) = msg.qc.clone() else {
tracing::warn!(node=%self.id, current_view = engine.current_view(), "received vote without QC");
tracing::warn!(node=%self.id, current_view = %engine.current_view(), "received vote without QC");
continue;
};
@ -140,9 +140,9 @@ impl EventBuilder {
tracing::info!(
node=%self.id,
votes=votes.len(),
current_view = engine.current_view(),
block_view=block.view,
block=?block.id,
current_view = %engine.current_view(),
block_view=%block.view,
block=%block.id,
"approve block",
);
@ -198,7 +198,7 @@ impl EventBuilder {
events.push(Event::ProposeBlock {
qc: Qc::Aggregated(AggregateQc {
high_qc,
view: msg_view + 1,
view: msg_view.next(),
}),
});
} else {

View File

@ -320,7 +320,8 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
}
fn current_view(&self) -> usize {
self.event_builder.current_view as usize
let view: i64 = self.event_builder.current_view.into();
view as usize
}
fn state(&self) -> &CarnotState {
@ -338,7 +339,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
m.view() == self.engine.current_view()
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))
});
self.message_cache.prune(self.engine.current_view() - 1);
self.message_cache.prune(self.engine.current_view().prev());
self.message_cache.update(other_view_messages);
current_view_messages.append(&mut self.message_cache.retrieve(self.engine.current_view()));
@ -353,11 +354,11 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
let current_view = self.engine.current_view();
tracing::info!(
node=%self.id,
last_committed_view=self.engine.latest_committed_view(),
current_view = current_view,
block_view = block.header().view,
block = ?block.header().id,
parent_block=?block.header().parent(),
last_committed_view=%self.engine.latest_committed_view(),
current_view = %current_view,
block_view = %block.header().view,
block = %block.header().id,
parent_block=%block.header().parent(),
"receive block proposal",
);
match self.engine.receive_block(block.header().clone()) {
@ -374,7 +375,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
}
}
Err(_) => {
tracing::error!(node = %self.id, current_view = self.engine.current_view(), block_view = block.header().view, block = ?block.header().id, "receive block proposal, but is invalid");
tracing::error!(node = %self.id, current_view = %self.engine.current_view(), block_view = %block.header().view, block = %block.header().id, "receive block proposal, but is invalid");
}
}
@ -393,10 +394,10 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
Event::Approve { block, .. } => {
tracing::info!(
node = %self.id,
current_view = self.engine.current_view(),
block_view = block.view,
block = ?block.id,
parent_block=?block.parent(),
current_view = %self.engine.current_view(),
block_view = %block.view,
block = %block.id,
parent_block=%block.parent(),
"receive approve message"
);
let (new, out) = self.engine.approve_block(block);
@ -407,12 +408,12 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
Event::ProposeBlock { qc } => {
output = vec![Output::BroadcastProposal {
proposal: nomos_core::block::Block::new(
qc.view() + 1,
qc.view().next(),
qc.clone(),
[].into_iter(),
self.id,
RandomBeaconState::generate_happy(
qc.view() + 1,
qc.view().next(),
&self.random_beacon_pk,
),
),
@ -426,8 +427,8 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
} => {
tracing::info!(
node = %self.id,
current_view = self.engine.current_view(),
timeout_view = timeout_qc.view(),
current_view = %self.engine.current_view(),
timeout_view = %timeout_qc.view(),
"receive new view message"
);
let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views);
@ -437,8 +438,8 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
Event::TimeoutQc { timeout_qc } => {
tracing::info!(
node = %self.id,
current_view = self.engine.current_view(),
timeout_view = timeout_qc.view(),
current_view = %self.engine.current_view(),
timeout_view = %timeout_qc.view(),
"receive timeout qc message"
);
self.engine = self.engine.receive_timeout_qc(timeout_qc.clone());
@ -467,7 +468,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
Event::LocalTimeout => {
tracing::info!(
node = %self.id,
current_view = self.engine.current_view(),
current_view = %self.engine.current_view(),
"receive local timeout message"
);
let (new, out) = self.engine.local_timeout();

View File

@ -1,10 +1,11 @@
use consensus_engine::View;
use fraction::{Fraction, One};
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use std::time::Duration;
use tests::{Node, NomosNode, SpawnConfig};
const TARGET_VIEW: i64 = 20;
const TARGET_VIEW: View = View::new(20);
async fn happy_test(nodes: Vec<NomosNode>) {
let timeout = std::time::Duration::from_secs(20);

View File

@ -1,9 +1,10 @@
use consensus_engine::View;
use fraction::Fraction;
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use tests::{Node, NomosNode, SpawnConfig};
const TARGET_VIEW: i64 = 20;
const TARGET_VIEW: View = View::new(20);
#[tokio::test]
async fn ten_nodes_one_down() {