Refactor block (#609)

* Refactor Block/Header definition

Refactor block/header definition so that it's now responsibility
of the nomos-core crate. This removes definitions in ledger/consensus
crates since there's no need at that level to have an understanding
of the block format.

The new header format supports both carnot and cryptarchia.
This commit is contained in:
Giacomo Pasini 2024-03-13 18:46:10 +01:00 committed by GitHub
parent e7d591b7bc
commit 50cff241fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 1046 additions and 730 deletions

View File

@ -1,4 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
hash::Hash,
};
pub mod overlay;
mod types;
@ -12,23 +15,27 @@ pub mod openapi {
}
#[derive(Clone, Debug, PartialEq)]
pub struct Carnot<O: Overlay> {
pub struct Carnot<O: Overlay, Id: Eq + Hash> {
id: NodeId,
current_view: View,
highest_voted_view: View,
local_high_qc: StandardQc,
safe_blocks: HashMap<BlockId, Block>,
tip: BlockId,
last_view_timeout_qc: Option<TimeoutQc>,
latest_committed_block: Option<BlockId>,
local_high_qc: StandardQc<Id>,
safe_blocks: HashMap<Id, Block<Id>>,
tip: Id,
last_view_timeout_qc: Option<TimeoutQc<Id>>,
latest_committed_block: Option<Id>,
overlay: O,
}
impl<O: Overlay> Carnot<O> {
pub fn from_genesis(id: NodeId, genesis_block: Block, overlay: O) -> Self {
impl<O, Id> Carnot<O, Id>
where
O: Overlay,
Id: Copy + Eq + Hash + core::fmt::Debug,
{
pub fn from_genesis(id: NodeId, genesis_block: Block<Id>, overlay: O) -> Self {
Self {
current_view: View(0),
local_high_qc: StandardQc::genesis(),
local_high_qc: StandardQc::genesis(genesis_block.id),
id,
highest_voted_view: View(-1),
last_view_timeout_qc: None,
@ -47,12 +54,12 @@ impl<O: Overlay> Carnot<O> {
self.highest_voted_view
}
pub fn safe_blocks(&self) -> &HashMap<BlockId, Block> {
pub fn safe_blocks(&self) -> &HashMap<Id, Block<Id>> {
&self.safe_blocks
}
/// Return the most recent safe block
pub fn tip(&self) -> Block {
pub fn tip(&self) -> Block<Id> {
self.safe_blocks[&self.tip].clone()
}
@ -65,7 +72,7 @@ impl<O: Overlay> Carnot<O> {
/// * Overlay changes for views < block.view should be made available before trying to process
/// a block by calling `receive_timeout_qc`.
#[allow(clippy::result_unit_err)]
pub fn receive_block(&self, block: Block) -> Result<Self, ()> {
pub fn receive_block(&self, block: Block<Id>) -> Result<Self, ()> {
assert!(
self.safe_blocks.contains_key(&block.parent()),
"out of order view not supported, missing parent block for {block:?}",
@ -114,7 +121,7 @@ impl<O: Overlay> Carnot<O> {
/// Upon reception of a global timeout event
///
/// Preconditions:
pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc) -> Self {
pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc<Id>) -> Self {
let mut new_state = self.clone();
if timeout_qc.view() < new_state.current_view {
@ -134,7 +141,7 @@ impl<O: Overlay> Carnot<O> {
/// Preconditions:
/// * `receive_block(b)` must have been called successfully before trying to approve a block b.
/// * A node should not attempt to vote for a block in a view earlier than the latest one it actively participated in.
pub fn approve_block(&self, block: Block) -> (Self, Send) {
pub fn approve_block(&self, block: Block<Id>) -> (Self, Send<Id>) {
assert!(
self.safe_blocks.contains_key(&block.id),
"{:?} not in {:?}",
@ -179,9 +186,9 @@ impl<O: Overlay> Carnot<O> {
/// * A node should not attempt to approve a view earlier than the latest one it actively participated in.
pub fn approve_new_view(
&self,
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
) -> (Self, Send) {
timeout_qc: TimeoutQc<Id>,
new_views: HashSet<NewView<Id>>,
) -> (Self, Send<Id>) {
let new_view = timeout_qc.view().next();
assert!(
new_view
@ -243,7 +250,7 @@ impl<O: Overlay> Carnot<O> {
/// Preconditions: none!
/// Just notice that the timer only reset after a view change, i.e. a node can't timeout
/// more than once for the same view
pub fn local_timeout(&self) -> (Self, Option<Send>) {
pub fn local_timeout(&self) -> (Self, Option<Send<Id>>) {
let mut new_state = self.clone();
new_state.highest_voted_view = new_state.current_view;
@ -268,11 +275,11 @@ impl<O: Overlay> Carnot<O> {
(new_state, None)
}
fn block_is_safe(&self, block: Block) -> bool {
fn block_is_safe(&self, block: Block<Id>) -> bool {
block.view >= self.current_view && block.view == block.parent_qc.view().next()
}
fn update_high_qc(&mut self, qc: Qc) {
fn update_high_qc(&mut self, qc: Qc<Id>) {
let qc_view = qc.view();
match qc {
Qc::Standard(new_qc) if new_qc.view > self.local_high_qc.view => {
@ -288,7 +295,7 @@ impl<O: Overlay> Carnot<O> {
}
}
fn update_timeout_qc(&mut self, timeout_qc: TimeoutQc) {
fn update_timeout_qc(&mut self, timeout_qc: TimeoutQc<Id>) {
match (&self.last_view_timeout_qc, timeout_qc) {
(None, timeout_qc) => {
self.last_view_timeout_qc = Some(timeout_qc);
@ -300,13 +307,13 @@ impl<O: Overlay> Carnot<O> {
}
}
fn update_latest_committed_block(&mut self, block: &Block) {
fn update_latest_committed_block(&mut self, block: &Block<Id>) {
if let Some(block) = self.can_commit_grandparent(block) {
self.latest_committed_block = Some(block.id);
}
}
pub fn blocks_in_view(&self, view: View) -> Vec<Block> {
pub fn blocks_in_view(&self, view: View) -> Vec<Block<Id>> {
self.safe_blocks
.iter()
.filter(|(_, b)| b.view == view)
@ -314,12 +321,12 @@ impl<O: Overlay> Carnot<O> {
.collect()
}
pub fn genesis_block(&self) -> Block {
pub fn genesis_block(&self) -> Block<Id> {
self.blocks_in_view(View(0))[0].clone()
}
// Returns the id of the grandparent block if it can be committed or None otherwise
fn can_commit_grandparent(&self, block: &Block) -> Option<Block> {
fn can_commit_grandparent(&self, block: &Block<Id>) -> Option<Block<Id>> {
let parent = self.safe_blocks.get(&block.parent())?;
let grandparent = self.safe_blocks.get(&parent.parent())?;
@ -332,7 +339,7 @@ impl<O: Overlay> Carnot<O> {
None
}
pub fn latest_committed_block(&self) -> Block {
pub fn latest_committed_block(&self) -> Block<Id> {
self.latest_committed_block
.and_then(|id| self.safe_blocks.get(&id).cloned())
.unwrap_or_else(|| self.genesis_block())
@ -342,7 +349,7 @@ impl<O: Overlay> Carnot<O> {
self.latest_committed_block().view
}
pub fn latest_committed_blocks(&self, limit: Option<usize>) -> Vec<BlockId> {
pub fn latest_committed_blocks(&self, limit: Option<usize>) -> Vec<Id> {
let limit = limit.unwrap_or(self.safe_blocks.len());
let mut res = vec![];
let mut current = self.latest_committed_block();
@ -363,11 +370,11 @@ impl<O: Overlay> Carnot<O> {
res
}
pub fn last_view_timeout_qc(&self) -> Option<TimeoutQc> {
pub fn last_view_timeout_qc(&self) -> Option<TimeoutQc<Id>> {
self.last_view_timeout_qc.clone()
}
pub fn high_qc(&self) -> StandardQc {
pub fn high_qc(&self) -> StandardQc<Id> {
self.local_high_qc.clone()
}
@ -444,15 +451,15 @@ mod test {
use super::*;
fn init(nodes: Vec<NodeId>) -> Carnot<FlatOverlay<RoundRobin, FreezeMembership>> {
fn init(nodes: Vec<NodeId>) -> Carnot<FlatOverlay<RoundRobin, FreezeMembership>, usize> {
assert!(!nodes.is_empty());
Carnot::from_genesis(
*nodes.first().unwrap(),
Block {
view: View(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
id: 0,
parent_qc: Qc::Standard(StandardQc::genesis(0)),
leader_proof: LeaderProof::LeaderId {
leader_id: *nodes.first().unwrap(),
},
@ -466,11 +473,10 @@ mod test {
}
fn next_block(
engine: &Carnot<FlatOverlay<RoundRobin, FreezeMembership>>,
block: &Block,
) -> Block {
let mut next_id = block.id;
next_id.0[0] += 1;
engine: &Carnot<FlatOverlay<RoundRobin, FreezeMembership>, usize>,
block: &Block<usize>,
) -> Block<usize> {
let next_id = block.id + 1;
Block {
view: block.view.next(),
@ -486,8 +492,8 @@ mod test {
}
fn update_leader_selection(
engine: &Carnot<FlatOverlay<RoundRobin, FreezeMembership>>,
) -> Carnot<FlatOverlay<RoundRobin, FreezeMembership>> {
engine: &Carnot<FlatOverlay<RoundRobin, FreezeMembership>, usize>,
) -> Carnot<FlatOverlay<RoundRobin, FreezeMembership>, usize> {
engine
.update_overlay(|overlay| {
overlay.update_leader_selection(
@ -545,11 +551,10 @@ mod test {
// Ensure that receive_block() fails if the parent block has never been received.
fn receive_block_with_unknown_parent() {
let engine = init(vec![NodeId::new([0; 32])]);
let mut parent_block_id = engine.genesis_block().id;
parent_block_id.0[0] += 1; // generate an unknown parent block ID
let parent_block_id = 42;
let block = Block {
view: engine.current_view().next(),
id: BlockId::new([1; 32]),
id: 1,
parent_qc: Qc::Standard(StandardQc {
view: engine.current_view(),
id: parent_block_id,
@ -649,7 +654,7 @@ mod test {
// a future block should be rejected
let future_block = Block {
id: BlockId::new([10; 32]),
id: 10,
view: View(11), // a future view
parent_qc: Qc::Aggregated(AggregateQc {
view: View(10),
@ -667,7 +672,7 @@ mod test {
// a past block should be also rejected
let mut past_block = block1; // with the same view as block1
past_block.id = BlockId::new([10; 32]);
past_block.id = 10;
assert!(engine.receive_block(past_block).is_err());
}
@ -744,7 +749,7 @@ mod test {
sender: NodeId::new([0; 32]),
high_qc: StandardQc {
view: View(0), // genesis
id: BlockId::zeros(),
id: 0,
},
timeout_qc: None
}),
@ -767,7 +772,7 @@ mod test {
View(1),
StandardQc {
view: View::new(0), // genesis
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);
@ -792,7 +797,7 @@ mod test {
View(1),
StandardQc {
view: View(0), // genesis
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);
@ -819,7 +824,7 @@ mod test {
View(1),
StandardQc {
view: View(0), // genesis
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);
@ -861,7 +866,7 @@ mod test {
View(1),
StandardQc {
view: View(0), // genesis
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);
@ -874,7 +879,7 @@ mod test {
View(2),
StandardQc {
view: View(0), // genesis
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);

View File

@ -8,8 +8,6 @@ mod committee;
pub use committee::{Committee, CommitteeId};
mod node_id;
pub use node_id::NodeId;
mod block_id;
pub use block_id::BlockId;
mod view;
pub use view::View;
@ -21,32 +19,32 @@ pub use view::View;
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub enum Payload {
pub enum Payload<Id> {
/// Vote for a block in a view
Vote(Vote),
Vote(Vote<Id>),
/// Signal that a local timeout has occurred
Timeout(Timeout),
Timeout(Timeout<Id>),
/// Vote for moving to a new view
NewView(NewView),
NewView(NewView<Id>),
}
/// Returned
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Vote {
pub struct Vote<Id> {
pub view: View,
pub block: BlockId,
pub block: Id,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Timeout {
pub struct Timeout<Id> {
pub view: View,
pub sender: NodeId,
pub high_qc: StandardQc,
pub timeout_qc: Option<TimeoutQc>,
pub high_qc: StandardQc<Id>,
pub timeout_qc: Option<TimeoutQc<Id>>,
}
// TODO: We are making "mandatory" to have received the timeout_qc before the new_view votes.
@ -54,24 +52,24 @@ pub struct Timeout {
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct NewView {
pub struct NewView<Id> {
pub view: View,
pub sender: NodeId,
pub timeout_qc: TimeoutQc,
pub high_qc: StandardQc,
pub timeout_qc: TimeoutQc<Id>,
pub high_qc: StandardQc<Id>,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct TimeoutQc {
pub struct TimeoutQc<Id> {
view: View,
high_qc: StandardQc,
high_qc: StandardQc<Id>,
sender: NodeId,
}
impl TimeoutQc {
pub fn new(view: View, high_qc: StandardQc, sender: NodeId) -> Self {
impl<Id> TimeoutQc<Id> {
pub fn new(view: View, high_qc: StandardQc<Id>, sender: NodeId) -> Self {
assert!(
view >= high_qc.view,
"timeout_qc.view:{} shouldn't be lower than timeout_qc.high_qc.view:{}",
@ -90,7 +88,7 @@ impl TimeoutQc {
self.view
}
pub fn high_qc(&self) -> &StandardQc {
pub fn high_qc(&self) -> &StandardQc<Id> {
&self.high_qc
}
@ -102,10 +100,10 @@ impl TimeoutQc {
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Block {
pub id: BlockId,
pub struct Block<Id> {
pub id: Id,
pub view: View,
pub parent_qc: Qc,
pub parent_qc: Qc<Id>,
pub leader_proof: LeaderProof,
}
@ -116,16 +114,16 @@ pub enum LeaderProof {
LeaderId { leader_id: NodeId },
}
impl Block {
pub fn parent(&self) -> BlockId {
impl<Id: Copy> Block<Id> {
pub fn parent(&self) -> Id {
self.parent_qc.block()
}
pub fn genesis() -> Self {
pub fn genesis(id: Id) -> Self {
Self {
view: View(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
id,
parent_qc: Qc::Standard(StandardQc::genesis(id)),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
},
@ -135,45 +133,42 @@ impl Block {
/// Possible output events.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Send {
pub struct Send<Id> {
pub to: Committee,
pub payload: Payload,
pub payload: Payload<Id>,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct StandardQc {
pub struct StandardQc<Id> {
pub view: View,
pub id: BlockId,
pub id: Id,
}
impl StandardQc {
pub fn genesis() -> Self {
Self {
view: View(-1),
id: BlockId::zeros(),
}
impl<Id> StandardQc<Id> {
pub fn genesis(id: Id) -> Self {
Self { view: View(-1), id }
}
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct AggregateQc {
pub high_qc: StandardQc,
pub struct AggregateQc<Id> {
pub high_qc: StandardQc<Id>,
pub view: View,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub enum Qc {
Standard(StandardQc),
Aggregated(AggregateQc),
pub enum Qc<Id> {
Standard(StandardQc<Id>),
Aggregated(AggregateQc<Id>),
}
impl Qc {
impl<Id: Copy> Qc<Id> {
/// The view in which this Qc was built.
pub fn view(&self) -> View {
match self {
@ -184,14 +179,14 @@ impl Qc {
/// The id of the block this qc is for.
/// This will be the parent of the block which will include this qc
pub fn block(&self) -> BlockId {
pub fn block(&self) -> Id {
match self {
Qc::Standard(StandardQc { id, .. }) => *id,
Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.id,
}
}
pub fn high_qc(&self) -> StandardQc {
pub fn high_qc(&self) -> StandardQc<Id> {
match self {
Qc::Standard(qc) => qc.clone(),
Qc::Aggregated(AggregateQc { high_qc, .. }) => high_qc.clone(),
@ -207,11 +202,11 @@ mod test {
fn standard_qc() {
let standard_qc = StandardQc {
view: View(10),
id: BlockId::zeros(),
id: 0,
};
let qc = Qc::Standard(standard_qc.clone());
assert_eq!(qc.view(), View(10));
assert_eq!(qc.block(), BlockId::new([0; 32]));
assert_eq!(qc.block(), 0);
assert_eq!(qc.high_qc(), standard_qc);
}
@ -221,12 +216,12 @@ mod test {
view: View(20),
high_qc: StandardQc {
view: View(10),
id: BlockId::zeros(),
id: 0,
},
};
let qc = Qc::Aggregated(aggregated_qc.clone());
assert_eq!(qc.view(), View(20));
assert_eq!(qc.block(), BlockId::new([0; 32]));
assert_eq!(qc.block(), 0);
assert_eq!(qc.high_qc(), aggregated_qc.high_qc);
}
@ -236,26 +231,26 @@ mod test {
View(2),
StandardQc {
view: View(1),
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);
assert_eq!(timeout_qc.view(), View(2));
assert_eq!(timeout_qc.high_qc().view, View(1));
assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32]));
assert_eq!(timeout_qc.high_qc().id, 0);
assert_eq!(timeout_qc.sender(), NodeId::new([0; 32]));
let timeout_qc = TimeoutQc::new(
View(2),
StandardQc {
view: View(2),
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);
assert_eq!(timeout_qc.view(), View(2));
assert_eq!(timeout_qc.high_qc().view, View(2));
assert_eq!(timeout_qc.high_qc().id, BlockId::new([0; 32]));
assert_eq!(timeout_qc.high_qc().id, 0);
assert_eq!(timeout_qc.sender(), NodeId::new([0; 32]));
}
@ -268,7 +263,7 @@ mod test {
View(1),
StandardQc {
view: View(2),
id: BlockId::zeros(),
id: 0,
},
NodeId::new([0; 32]),
);

View File

@ -3,23 +3,6 @@
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct BlockId(pub(crate) [u8; 32]);
#[cfg(feature = "serde")]
impl serde::Serialize for BlockId {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
nomos_utils::serde::serialize_bytes_array(self.0, serializer)
}
}
#[cfg(feature = "serde")]
impl<'de> serde::de::Deserialize<'de> for BlockId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
nomos_utils::serde::deserialize_bytes_array(deserializer).map(Self)
}
}
impl BlockId {
pub const fn new(val: [u8; 32]) -> Self {
Self(val)

View File

@ -1,3 +1,10 @@
mod ref_state;
pub mod sut;
mod transition;
type Block = carnot_engine::Block<[u8; 32]>;
type AggregateQc = carnot_engine::AggregateQc<[u8; 32]>;
type Qc = carnot_engine::Qc<[u8; 32]>;
type StandardQc = carnot_engine::StandardQc<[u8; 32]>;
type TimeoutQc = carnot_engine::TimeoutQc<[u8; 32]>;
type NewView = carnot_engine::NewView<[u8; 32]>;

View File

@ -1,13 +1,12 @@
use std::collections::{BTreeMap, HashSet};
use carnot_engine::{
AggregateQc, Block, BlockId, LeaderProof, NodeId, Qc, StandardQc, TimeoutQc, View,
};
use carnot_engine::{LeaderProof, NodeId, View};
use proptest::prelude::*;
use proptest::strategy::BoxedStrategy;
use proptest_state_machine::ReferenceStateMachine;
use crate::fuzz::transition::Transition;
use crate::fuzz::{AggregateQc, Block, Qc, StandardQc, TimeoutQc};
// A reference state machine (RefState) is used to generated state transitions.
// To generate some kinds of transition, we may need to keep historical blocks in RefState.
@ -42,8 +41,8 @@ impl ReferenceStateMachine for RefState {
fn init_state() -> BoxedStrategy<Self::State> {
let genesis_block = Block {
view: View::new(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
id: [0; 32],
parent_qc: Qc::Standard(StandardQc::genesis([0; 32])),
leader_proof: LEADER_PROOF.clone(),
};
@ -330,10 +329,11 @@ impl RefState {
fn transition_receive_safe_block_with_aggregated_qc(&self) -> BoxedStrategy<Transition> {
//TODO: more randomness
let current_view = self.current_view();
let mut id = [0; 32];
rand::thread_rng().fill_bytes(&mut id);
Just(Transition::ReceiveSafeBlock(Block {
view: current_view.next(),
id: BlockId::random(&mut rand::thread_rng()),
id,
parent_qc: Qc::Aggregated(AggregateQc {
high_qc: self.high_qc(),
view: current_view,
@ -360,9 +360,13 @@ impl RefState {
pub fn high_qc(&self) -> StandardQc {
self.chain
.values()
.map(|entry| entry.high_qc().unwrap_or_else(StandardQc::genesis))
.map(|entry| {
entry
.high_qc()
.unwrap_or_else(|| StandardQc::genesis([0; 32]))
})
.max_by_key(|qc| qc.view)
.unwrap_or_else(StandardQc::genesis)
.unwrap_or_else(|| StandardQc::genesis([0; 32]))
}
pub fn latest_timeout_qcs(&self) -> Vec<TimeoutQc> {
@ -386,17 +390,19 @@ impl RefState {
self.contains_block(block.parent_qc.block())
}
fn contains_block(&self, block_id: BlockId) -> bool {
fn contains_block(&self, block_id: [u8; 32]) -> bool {
self.chain
.iter()
.any(|(_, entry)| entry.blocks.iter().any(|block| block.id == block_id))
}
fn consecutive_block(parent: &Block) -> Block {
let mut id = [0; 32];
rand::thread_rng().fill_bytes(&mut id);
Block {
// use rand because we don't want this to be shrinked by proptest
view: parent.view.next(),
id: BlockId::random(&mut rand::thread_rng()),
id,
parent_qc: Qc::Standard(StandardQc {
view: parent.view,
id: parent.id,

View File

@ -8,13 +8,13 @@ use carnot_engine::{
use proptest_state_machine::{ReferenceStateMachine, StateMachineTest};
use crate::fuzz::ref_state::RefState;
use crate::fuzz::transition::Transition;
use crate::fuzz::{transition::Transition, Block};
// ConsensusEngineTest defines a state that we want to test.
// This is called as SUT (System Under Test).
#[derive(Clone, Debug)]
pub struct ConsensusEngineTest {
pub engine: Carnot<FlatOverlay<RoundRobin, FreezeMembership>>,
pub engine: Carnot<FlatOverlay<RoundRobin, FreezeMembership>, [u8; 32]>,
}
impl ConsensusEngineTest {
@ -23,8 +23,8 @@ impl ConsensusEngineTest {
NodeId::new([0; 32]),
Block {
view: View::new(0),
id: BlockId::zeros(),
parent_qc: Qc::Standard(StandardQc::genesis()),
id: [0; 32],
parent_qc: Qc::Standard(StandardQc::genesis([0; 32])),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
},

View File

@ -1,6 +1,6 @@
use std::collections::HashSet;
use carnot_engine::{Block, NewView, TimeoutQc};
use crate::fuzz::{Block, NewView, TimeoutQc};
// State transtitions that will be picked randomly
#[derive(Clone, Debug)]

View File

@ -7,3 +7,9 @@ edition = "2021"
[dependencies]
thiserror = "1"
serde = { version = "1.0", features = ["derive"], optional = true }
nomos-utils = { path = "../../nomos-utils", optional = true }
[features]
default = []
serde = ["dep:serde", "nomos-utils/serde"]

View File

@ -1,5 +1,6 @@
use std::ops::Add;
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
pub struct Slot(u64);

View File

@ -9,5 +9,10 @@ edition = "2021"
blake2 = "0.10"
rpds = "1"
thiserror = "1"
serde = { version = "1.0", features = ["derive"], optional = true }
# TODO: we only need types definition from this crate
cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" }
nomos-utils = { path = "../../nomos-utils", optional = true }
[features]
serde = ["dep:serde", "nomos-utils/serde"]

View File

@ -1,5 +1,6 @@
use cryptarchia_engine::Slot;
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)]
pub struct LeaderProof {
commitment: Commitment,
@ -91,3 +92,9 @@ impl AsRef<[u8]> for Commitment {
&self.0
}
}
// ----------- serialization
use crate::utils::serialize_bytes_newtype;
serialize_bytes_newtype!(Commitment);
serialize_bytes_newtype!(Nullifier);

View File

@ -1,21 +1,22 @@
mod block;
mod config;
mod crypto;
mod leader_proof;
mod nonce;
mod utils;
use crate::{crypto::Blake2b, Commitment, LeaderProof, Nullifier};
use blake2::Digest;
use cryptarchia_engine::{Epoch, Slot};
use crypto::Blake2b;
use rpds::HashTrieSet;
use std::collections::HashMap;
use std::{collections::HashMap, hash::Hash};
use thiserror::Error;
pub use block::*;
pub use config::Config;
pub use leader_proof::*;
pub use nonce::*;
#[derive(Clone, Debug, Error)]
pub enum LedgerError {
pub enum LedgerError<Id> {
#[error("Commitment not found in the ledger state")]
CommitmentNotFound,
#[error("Nullifier already exists in the ledger state")]
@ -25,9 +26,9 @@ pub enum LedgerError {
#[error("Invalid block slot {block:?} for parent slot {parent:?}")]
InvalidSlot { parent: Slot, block: Slot },
#[error("Parent block not found: {0:?}")]
ParentNotFound(HeaderId),
ParentNotFound(Id),
#[error("Orphan block missing: {0:?}. Importing leader proofs requires the block to be validated first")]
OrphanMissing(HeaderId),
OrphanMissing(Id),
}
#[derive(Clone, Debug, Eq, PartialEq)]
@ -69,13 +70,16 @@ impl EpochState {
}
#[derive(Clone, Debug, PartialEq)]
pub struct Ledger {
states: HashMap<HeaderId, LedgerState>,
pub struct Ledger<Id: Eq + Hash> {
states: HashMap<Id, LedgerState>,
config: Config,
}
impl Ledger {
pub fn from_genesis(id: HeaderId, state: LedgerState, config: Config) -> Self {
impl<Id> Ledger<Id>
where
Id: Eq + Hash + Copy,
{
pub fn from_genesis(id: Id, state: LedgerState, config: Config) -> Self {
Self {
states: [(id, state)].into_iter().collect(),
config,
@ -83,8 +87,15 @@ impl Ledger {
}
#[must_use = "this returns the result of the operation, without modifying the original"]
pub fn try_apply_header(&self, header: &Header) -> Result<Self, LedgerError> {
let parent_id = header.parent();
pub fn try_update(
&self,
id: Id,
parent_id: Id,
slot: Slot,
proof: &LeaderProof,
// (update corresponding to the leader proof, leader proof)
orphan_proofs: impl IntoIterator<Item = (Id, LeaderProof)>,
) -> Result<Self, LedgerError<Id>> {
let parent_state = self
.states
.get(&parent_id)
@ -96,25 +107,27 @@ impl Ledger {
// * not in conflict with the current ledger state
// This first condition is checked here, the second one is checked in the state update
// (in particular, we do not check the imported leader proof is for an earlier slot)
for orphan in header.orphaned_proofs() {
if !self.states.contains_key(&orphan.id()) {
return Err(LedgerError::OrphanMissing(orphan.id()));
let (orphan_ids, orphan_proofs): (Vec<_>, Vec<_>) = orphan_proofs.into_iter().unzip();
for orphan_id in orphan_ids {
if !self.states.contains_key(&orphan_id) {
return Err(LedgerError::OrphanMissing(orphan_id));
}
}
let new_state = parent_state
let new_state =
parent_state
.clone()
.try_apply_header(header, &self.config)?;
.try_update(slot, proof, &orphan_proofs, &self.config)?;
let mut states = self.states.clone();
states.insert(header.id(), new_state);
states.insert(id, new_state);
Ok(Self { states, config })
}
pub fn state(&self, header_id: &HeaderId) -> Option<&LedgerState> {
self.states.get(header_id)
pub fn state(&self, id: &Id) -> Option<&LedgerState> {
self.states.get(id)
}
}
@ -134,13 +147,19 @@ pub struct LedgerState {
}
impl LedgerState {
fn try_apply_header(self, header: &Header, config: &Config) -> Result<Self, LedgerError> {
fn try_update<Id>(
self,
slot: Slot,
proof: &LeaderProof,
orphan_proofs: &[LeaderProof],
config: &Config,
) -> Result<Self, LedgerError<Id>> {
// TODO: import leader proofs
self.update_epoch_state(header.slot(), config)?
.try_apply_leadership(header, config)
self.update_epoch_state(slot, config)?
.try_apply_leadership(proof, orphan_proofs, config)
}
fn update_epoch_state(self, slot: Slot, config: &Config) -> Result<Self, LedgerError> {
fn update_epoch_state<Id>(self, slot: Slot, config: &Config) -> Result<Self, LedgerError<Id>> {
if slot <= self.slot {
return Err(LedgerError::InvalidSlot {
parent: self.slot,
@ -204,7 +223,11 @@ impl LedgerState {
}
}
fn try_apply_proof(self, proof: &LeaderProof, config: &Config) -> Result<Self, LedgerError> {
fn try_apply_proof<Id>(
self,
proof: &LeaderProof,
config: &Config,
) -> Result<Self, LedgerError<Id>> {
assert_eq!(config.epoch(proof.slot()), self.epoch_state.epoch);
// The leadership coin either has to be in the state snapshot or be derived from
// a coin that is in the state snapshot (i.e. be in the lead coins commitments)
@ -234,18 +257,17 @@ impl LedgerState {
})
}
fn try_apply_leadership(
fn try_apply_leadership<Id>(
mut self,
header: &Header,
proof: &LeaderProof,
orphan_proofs: &[LeaderProof],
config: &Config,
) -> Result<Self, LedgerError> {
for proof in header.orphaned_proofs() {
self = self.try_apply_proof(proof.leader_proof(), config)?;
) -> Result<Self, LedgerError<Id>> {
for proof in orphan_proofs {
self = self.try_apply_proof(proof, config)?;
}
self = self
.try_apply_proof(header.leader_proof(), config)?
.update_nonce(header.leader_proof());
self = self.try_apply_proof(proof, config)?.update_nonce(proof);
Ok(self)
}
@ -280,6 +302,27 @@ impl LedgerState {
..self
}
}
pub fn from_commitments(commitments: impl IntoIterator<Item = Commitment>) -> Self {
let commitments = commitments.into_iter().collect::<HashTrieSet<_>>();
Self {
lead_commitments: commitments.clone(),
spend_commitments: commitments,
nullifiers: Default::default(),
nonce: [0; 32].into(),
slot: 0.into(),
next_epoch_state: EpochState {
epoch: 1.into(),
nonce: [0; 32].into(),
commitments: Default::default(),
},
epoch_state: EpochState {
epoch: 0.into(),
nonce: [0; 32].into(),
commitments: Default::default(),
},
}
}
}
impl core::fmt::Debug for LedgerState {
@ -303,35 +346,51 @@ impl core::fmt::Debug for LedgerState {
#[cfg(test)]
pub mod tests {
use super::{EpochState, Ledger, LedgerState};
use crate::{
crypto::Blake2b, Commitment, Config, Header, HeaderId, LeaderProof, LedgerError, Nullifier,
};
use crate::{crypto::Blake2b, Commitment, Config, LeaderProof, LedgerError, Nullifier};
use blake2::Digest;
use cryptarchia_engine::Slot;
use std::hash::{DefaultHasher, Hash, Hasher};
pub fn header(slot: impl Into<Slot>, parent: HeaderId, coin: Coin) -> Header {
let slot = slot.into();
Header::new(parent, 0, [0; 32].into(), slot, coin.to_proof(slot))
}
type HeaderId = [u8; 32];
pub fn header_with_orphans(
slot: impl Into<Slot>,
fn update_ledger(
ledger: &mut Ledger<HeaderId>,
parent: HeaderId,
slot: impl Into<Slot>,
coin: Coin,
orphans: Vec<Header>,
) -> Header {
header(slot, parent, coin).with_orphaned_proofs(orphans)
) -> Result<HeaderId, LedgerError<HeaderId>> {
update_orphans(ledger, parent, slot, coin, vec![])
}
pub fn genesis_header() -> Header {
Header::new(
[0; 32].into(),
0,
[0; 32].into(),
0.into(),
LeaderProof::dummy(0.into()),
)
fn make_id(parent: HeaderId, slot: impl Into<Slot>, coin: Coin) -> HeaderId {
Blake2b::new()
.chain_update(parent)
.chain_update(slot.into().to_be_bytes())
.chain_update(coin.sk.to_be_bytes())
.chain_update(coin.nonce.to_be_bytes())
.finalize()
.into()
}
fn update_orphans(
ledger: &mut Ledger<HeaderId>,
parent: HeaderId,
slot: impl Into<Slot>,
coin: Coin,
orphans: Vec<(HeaderId, (u64, Coin))>,
) -> Result<HeaderId, LedgerError<HeaderId>> {
let slot = slot.into();
let id = make_id(parent, slot, coin);
*ledger = ledger.try_update(
id,
parent,
slot,
&coin.to_proof(slot),
orphans
.into_iter()
.map(|(id, (slot, coin))| (id, coin.to_proof(slot.into()))),
)?;
Ok(id)
}
pub fn config() -> Config {
@ -414,36 +473,41 @@ pub mod tests {
}
}
fn ledger(commitments: &[Commitment]) -> (Ledger, Header) {
fn ledger(commitments: &[Commitment]) -> (Ledger<HeaderId>, HeaderId) {
let genesis_state = genesis_state(commitments);
let genesis_header = genesis_header();
(
Ledger::from_genesis(genesis_header.id(), genesis_state, config()),
genesis_header,
Ledger::from_genesis([0; 32], genesis_state, config()),
[0; 32],
)
}
fn apply_and_add_coin(mut ledger: Ledger, header: Header, coin: Coin) -> Ledger {
let header_id = header.id();
ledger = ledger.try_apply_header(&header).unwrap();
fn apply_and_add_coin(
ledger: &mut Ledger<HeaderId>,
parent: HeaderId,
slot: impl Into<Slot>,
coin_proof: Coin,
coin_add: Coin,
) -> HeaderId {
let id = update_ledger(ledger, parent, slot, coin_proof).unwrap();
// we still don't have transactions, so the only way to add a commitment to spendable commitments and
// test epoch snapshotting is by doing this manually
let mut block_state = ledger.states[&header_id].clone();
block_state.spend_commitments = block_state.spend_commitments.insert(coin.commitment());
ledger.states.insert(header_id, block_state);
ledger
let mut block_state = ledger.states[&id].clone();
block_state.spend_commitments = block_state.spend_commitments.insert(coin_add.commitment());
ledger.states.insert(id, block_state);
id
}
#[test]
fn test_ledger_state_prevents_coin_reuse() {
let coin = Coin::new(0);
let (mut ledger, genesis) = ledger(&[coin.commitment()]);
let h = header(1, genesis.id(), coin);
ledger = ledger.try_apply_header(&h).unwrap();
let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap();
// reusing the same coin should be prevented
assert!(matches!(
ledger.try_apply_header(&header(2, h.id(), coin)),
update_ledger(&mut ledger, h, 2, coin),
Err(LedgerError::NullifierExists),
));
}
@ -451,10 +515,9 @@ pub mod tests {
#[test]
fn test_ledger_state_uncommited_coin() {
let coin = Coin::new(0);
let (ledger, genesis) = ledger(&[]);
let h = header(1, genesis.id(), coin);
let (mut ledger, genesis) = ledger(&[]);
assert!(matches!(
ledger.try_apply_header(&h),
update_ledger(&mut ledger, genesis, 1, coin),
Err(LedgerError::CommitmentNotFound),
));
}
@ -472,17 +535,14 @@ pub mod tests {
]);
// coin_1 & coin_2 both concurrently win slot 0
let h_1 = header(1, genesis.id(), coin_1);
let h_2 = header(1, genesis.id(), coin_2);
ledger = ledger.try_apply_header(&h_1).unwrap();
ledger = ledger.try_apply_header(&h_2).unwrap();
update_ledger(&mut ledger, genesis, 1, coin_1).unwrap();
let h = update_ledger(&mut ledger, genesis, 1, coin_2).unwrap();
// then coin_3 wins slot 1 and chooses to extend from block_2
let h_3 = header(2, h_2.id(), coin_3);
ledger = ledger.try_apply_header(&h_3).unwrap();
let h_3 = update_ledger(&mut ledger, h, 2, coin_3).unwrap();
// coin 1 is not spent in the chain that ends with block_3
assert!(!ledger.states[&h_3.id()].is_nullified(&coin_1.nullifier()));
assert!(!ledger.states[&h_3].is_nullified(&coin_1.nullifier()));
}
#[test]
@ -496,45 +556,39 @@ pub mod tests {
// An epoch will be 10 slots long, with stake distribution snapshot taken at the start of the epoch
// and nonce snapshot before slot 7
let h_1 = header(1, genesis.id(), coins[0]);
ledger = ledger.try_apply_header(&h_1).unwrap();
assert_eq!(ledger.states[&h_1.id()].epoch_state.epoch, 0.into());
let h_1 = update_ledger(&mut ledger, genesis, 1, coins[0]).unwrap();
assert_eq!(ledger.states[&h_1].epoch_state.epoch, 0.into());
let h_2 = header(6, h_1.id(), coins[1]);
ledger = ledger.try_apply_header(&h_2).unwrap();
let h_2 = update_ledger(&mut ledger, h_1, 6, coins[1]).unwrap();
let h_3 = header(9, h_2.id(), coins[2]);
ledger = apply_and_add_coin(ledger, h_3.clone(), coin_4);
let h_3 = apply_and_add_coin(&mut ledger, h_2, 9, coins[2], coin_4);
// test epoch jump
let h_4 = header(20, h_3.id(), coins[3]);
ledger = ledger.try_apply_header(&h_4).unwrap();
let h_4 = update_ledger(&mut ledger, h_3, 20, coins[3]).unwrap();
// nonce for epoch 2 should be taken at the end of slot 16, but in our case the last block is at slot 9
assert_eq!(
ledger.states[&h_4.id()].epoch_state.nonce,
ledger.states[&h_3.id()].nonce,
ledger.states[&h_4].epoch_state.nonce,
ledger.states[&h_3].nonce,
);
// stake distribution snapshot should be taken at the end of slot 9
assert_eq!(
ledger.states[&h_4.id()].epoch_state.commitments,
ledger.states[&h_3.id()].spend_commitments,
ledger.states[&h_4].epoch_state.commitments,
ledger.states[&h_3].spend_commitments,
);
// nonce for epoch 1 should be taken at the end of slot 6
let h_5 = header(10, h_3.id(), coins[3]);
ledger = apply_and_add_coin(ledger, h_5.clone(), coin_5);
let h_5 = apply_and_add_coin(&mut ledger, h_3, 10, coins[3], coin_5);
assert_eq!(
ledger.states[&h_5.id()].epoch_state.nonce,
ledger.states[&h_2.id()].nonce,
ledger.states[&h_5].epoch_state.nonce,
ledger.states[&h_2].nonce,
);
let h_6 = header(20, h_5.id(), coins[3].evolve());
ledger = ledger.try_apply_header(&h_6).unwrap();
let h_6 = update_ledger(&mut ledger, h_5, 20, coins[3].evolve()).unwrap();
// stake distribution snapshot should be taken at the end of slot 9, check that changes in slot 10
// are ignored
assert_eq!(
ledger.states[&h_6.id()].epoch_state.commitments,
ledger.states[&h_3.id()].spend_commitments,
ledger.states[&h_6].epoch_state.commitments,
ledger.states[&h_3].spend_commitments,
);
}
@ -542,25 +596,23 @@ pub mod tests {
fn test_evolved_coin_is_eligible_for_leadership() {
let coin = Coin::new(0);
let (mut ledger, genesis) = ledger(&[coin.commitment()]);
let h = header(1, genesis.id(), coin);
ledger = ledger.try_apply_header(&h).unwrap();
let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap();
// reusing the same coin should be prevented
assert!(matches!(
ledger.try_apply_header(&header(2, h.id(), coin)),
update_ledger(&mut ledger, h, 2, coin),
Err(LedgerError::NullifierExists),
));
// the evolved coin is not elibile before block 2 as it has not appeared on the ledger yet
assert!(matches!(
ledger.try_apply_header(&header(2, genesis.id(), coin.evolve())),
update_ledger(&mut ledger, genesis, 2, coin.evolve()),
Err(LedgerError::CommitmentNotFound),
));
// the evolved coin is eligible after coin 1 is spent
assert!(ledger
.try_apply_header(&header(2, h.id(), coin.evolve()))
.is_ok());
assert!(update_ledger(&mut ledger, h, 2, coin.evolve()).is_ok());
}
#[test]
@ -570,39 +622,34 @@ pub mod tests {
let (mut ledger, genesis) = ledger(&[coin.commitment()]);
// EPOCH 0
let h_0_1 = header(1, genesis.id(), coin);
// mint a new coin to be used for leader elections in upcoming epochs
ledger = apply_and_add_coin(ledger, h_0_1.clone(), coin_1);
let h_0_1 = apply_and_add_coin(&mut ledger, genesis, 1, coin, coin_1);
let h_0_2 = header(2, h_0_1.id(), coin_1);
// the new coin is not yet eligible for leader elections
assert!(matches!(
ledger.try_apply_header(&h_0_2),
update_ledger(&mut ledger, h_0_1, 2, coin_1),
Err(LedgerError::CommitmentNotFound),
));
// but the evolved coin can
let h_0_2 = header(2, h_0_1.id(), coin.evolve());
ledger = ledger.try_apply_header(&h_0_2).unwrap();
// // but the evolved coin can
let h_0_2 = update_ledger(&mut ledger, h_0_1, 2, coin.evolve()).unwrap();
// EPOCH 1
for i in 10..20 {
// the newly minted coin is still not eligible in the following epoch since the
// stake distribution snapshot is taken at the beginning of the previous epoch
assert!(matches!(
ledger.try_apply_header(&header(i, h_0_2.id(), coin_1)),
update_ledger(&mut ledger, h_0_2, i, coin_1),
Err(LedgerError::CommitmentNotFound),
));
}
// EPOCH 2
// the coin is finally eligible 2 epochs after it was first minted
let h_2_0 = header(20, h_0_2.id(), coin_1);
ledger = ledger.try_apply_header(&h_2_0).unwrap();
let h_2_0 = update_ledger(&mut ledger, h_0_2, 20, coin_1).unwrap();
// and now the minted coin can freely use the evolved coin for subsequent blocks
let h_2_1 = header(21, h_2_0.id(), coin_1.evolve());
ledger.try_apply_header(&h_2_1).unwrap();
update_ledger(&mut ledger, h_2_0, 21, coin_1.evolve()).unwrap();
}
#[test]
@ -614,83 +661,83 @@ pub mod tests {
let coin_new_new = coin_new.evolve();
// produce a fork where the coin has been spent twice
let fork_1 = header(1, genesis.id(), coin);
let fork_2 = header(2, fork_1.id(), coin_new);
let fork_1 = make_id(genesis, 1, coin);
let fork_2 = make_id(fork_1, 2, coin_new);
// neither of the evolved coins should be usable right away in another branch
assert!(matches!(
ledger.try_apply_header(&header(1, genesis.id(), coin_new)),
update_ledger(&mut ledger, genesis, 1, coin_new),
Err(LedgerError::CommitmentNotFound)
));
assert!(matches!(
ledger.try_apply_header(&header(1, genesis.id(), coin_new_new)),
update_ledger(&mut ledger, genesis, 1, coin_new_new),
Err(LedgerError::CommitmentNotFound)
));
// they also should not be accepted if the fork from where they have been imported has not been seen already
assert!(matches!(
ledger.try_apply_header(&header_with_orphans(
1,
genesis.id(),
coin_new,
vec![fork_1.clone()]
)),
update_orphans(&mut ledger, genesis, 1, coin_new, vec![(fork_1, (1, coin))]),
Err(LedgerError::OrphanMissing(_))
));
// now the first block of the fork is seen (and accepted)
ledger = ledger.try_apply_header(&fork_1).unwrap();
let h_1 = update_ledger(&mut ledger, genesis, 1, coin).unwrap();
assert_eq!(h_1, fork_1);
// and it can now be imported in another branch (note this does not validate it's for an earlier slot)
ledger
.try_apply_header(&header_with_orphans(
update_orphans(
&mut ledger.clone(),
genesis,
1,
genesis.id(),
coin_new,
vec![fork_1.clone()],
))
vec![(fork_1, (1, coin))],
)
.unwrap();
// but the next coin is still not accepted since the second block using the evolved coin has not been seen yet
assert!(matches!(
ledger.try_apply_header(&header_with_orphans(
update_orphans(
&mut ledger.clone(),
genesis,
1,
genesis.id(),
coin_new_new,
vec![fork_1.clone(), fork_2.clone()]
)),
vec![(fork_1, (1, coin)), (fork_2, (2, coin_new))],
),
Err(LedgerError::OrphanMissing(_))
));
// now the second block of the fork is seen as well and the coin evolved twice can be used in another branch
ledger = ledger.try_apply_header(&fork_2).unwrap();
ledger
.try_apply_header(&header_with_orphans(
let h_2 = update_ledger(&mut ledger, h_1, 2, coin_new).unwrap();
assert_eq!(h_2, fork_2);
update_orphans(
&mut ledger.clone(),
genesis,
1,
genesis.id(),
coin_new_new,
vec![fork_1.clone(), fork_2.clone()],
))
vec![(fork_1, (1, coin)), (fork_2, (2, coin_new))],
)
.unwrap();
// but we can't import just the second proof because it's using an evolved coin that has not been seen yet
assert!(matches!(
ledger.try_apply_header(&header_with_orphans(
update_orphans(
&mut ledger.clone(),
genesis,
1,
genesis.id(),
coin_new_new,
vec![fork_2.clone()]
)),
vec![(fork_2, (2, coin_new))],
),
Err(LedgerError::CommitmentNotFound)
));
// an imported proof that uses a coin that was already used in the base branch should not be allowed
let header_1 = header(1, genesis.id(), coin);
ledger = ledger.try_apply_header(&header_1).unwrap();
let header_1 = update_ledger(&mut ledger, genesis, 1, coin).unwrap();
assert!(matches!(
ledger.try_apply_header(&header_with_orphans(
update_orphans(
&mut ledger,
header_1,
2,
header_1.id(),
coin_new_new,
vec![fork_1.clone(), fork_2.clone()]
)),
vec![(fork_1, (1, coin)), (fork_2, (2, coin_new))],
),
Err(LedgerError::NullifierExists)
));
}

View File

@ -0,0 +1,17 @@
use crate::utils::serialize_bytes_newtype;
#[derive(Clone, Debug, Eq, PartialEq, Copy)]
pub struct Nonce([u8; 32]);
impl From<[u8; 32]> for Nonce {
fn from(nonce: [u8; 32]) -> Self {
Self(nonce)
}
}
impl From<Nonce> for [u8; 32] {
fn from(nonce: Nonce) -> [u8; 32] {
nonce.0
}
}
serialize_bytes_newtype!(Nonce);

View File

@ -0,0 +1,22 @@
macro_rules! serialize_bytes_newtype {
($newtype:ty) => {
#[cfg(feature = "serde")]
impl serde::Serialize for $newtype {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
nomos_utils::serde::serialize_bytes_array(self.0, serializer)
}
}
#[cfg(feature = "serde")]
impl<'de> serde::de::Deserialize<'de> for $newtype {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
nomos_utils::serde::deserialize_bytes_array(deserializer).map(Self)
}
}
};
}
pub(crate) use serialize_bytes_newtype;

View File

@ -19,9 +19,8 @@ use tower_http::{
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use carnot_engine::BlockId;
use full_replication::{Blob, Certificate};
use nomos_core::{da::blob, tx::Transaction};
use nomos_core::{da::blob, header::HeaderId, tx::Transaction};
use nomos_mempool::{network::adapters::libp2p::Libp2pAdapter, openapi::Status, MempoolMetrics};
use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::backends::StorageSerde;
@ -53,7 +52,7 @@ pub struct AxumBackend<T, S, const SIZE: usize> {
da_status,
),
components(
schemas(Status<BlockId>, MempoolMetrics)
schemas(Status<HeaderId>, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")
@ -255,8 +254,8 @@ where
#[derive(Deserialize)]
struct QueryParams {
from: Option<BlockId>,
to: Option<BlockId>,
from: Option<HeaderId>,
to: Option<HeaderId>,
}
#[utoipa::path(
@ -300,7 +299,7 @@ async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn block<S, Tx>(State(handle): State<OverwatchHandle>, Json(id): Json<BlockId>) -> Response
async fn block<S, Tx>(State(handle): State<OverwatchHandle>, Json(id): Json<HeaderId>) -> Response
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
S: StorageSerde + Send + Sync + 'static,

View File

@ -15,8 +15,8 @@ use bytes::Bytes;
use carnot_consensus::CarnotConsensus;
use nomos_api::ApiService;
use nomos_core::{
block::BlockId,
da::{blob, certificate},
header::HeaderId,
tx::Transaction,
wire,
};
@ -59,10 +59,10 @@ const MB16: usize = 1024 * 1024 * 16;
pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter,
MockPool<BlockId, Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
BlockId,
HeaderId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
@ -82,7 +82,7 @@ pub type DataAvailability = DataAvailabilityService<
DaLibp2pAdapter<Blob, Attestation>,
>;
type Mempool<K, V, D> = MempoolService<MempoolLibp2pAdapter<K, V>, MockPool<BlockId, K, V>, D>;
type Mempool<K, V, D> = MempoolService<MempoolLibp2pAdapter<K, V>, MockPool<HeaderId, K, V>, D>;
#[derive(Services)]
pub struct Nomos {

View File

@ -1,6 +1,7 @@
use super::CLIENT;
use carnot_consensus::CarnotInfo;
use carnot_engine::{Block, BlockId};
use carnot_engine::Block;
use nomos_core::header::HeaderId;
use reqwest::Url;
pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
@ -15,9 +16,9 @@ pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
pub async fn get_blocks_info(
node: &Url,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Result<Vec<Block>, reqwest::Error> {
from: Option<HeaderId>,
to: Option<HeaderId>,
) -> Result<Vec<Block<HeaderId>>, reqwest::Error> {
const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks";
let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap());
if let Some(from) = from {

View File

@ -1,13 +1,13 @@
use super::CLIENT;
use carnot_engine::BlockId;
use full_replication::Certificate;
use nomos_core::block::Block;
use nomos_core::header::HeaderId;
use nomos_node::Tx;
use reqwest::Url;
pub async fn get_block_contents(
node: &Url,
block: &BlockId,
block: &HeaderId,
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
const BLOCK_PATH: &str = "storage/block";
CLIENT

View File

@ -18,7 +18,7 @@ use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use futures::{stream, StreamExt};
use nomos_core::{block::BlockId, da::DaProtocol, wire};
use nomos_core::{da::DaProtocol, header::HeaderId, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
@ -266,7 +266,7 @@ struct ChatMessage {
#[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Should ask for the genesis block to be more robust
let mut last_tip = BlockId::zeros();
let mut last_tip = [0; 32].into();
loop {
if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await {
@ -280,7 +280,7 @@ async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Process a single block's blobs and return chat messages
async fn process_block_blobs(
node: Url,
block_id: &BlockId,
block_id: &HeaderId,
da_settings: DaSettings,
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let blobs = get_block_blobs(&node, block_id).await?;
@ -304,9 +304,9 @@ async fn process_block_blobs(
// Fetch new messages since the last tip
async fn fetch_new_messages(
last_tip: &BlockId,
last_tip: &HeaderId,
node: &Url,
) -> Result<(BlockId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
) -> Result<(HeaderId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
// By only specifying the 'to' parameter we get all the blocks since the last tip
let mut new_blocks = get_blocks_info(node, None, Some(*last_tip))
.await?

View File

@ -1,6 +1,6 @@
use carnot_engine::BlockId;
use full_replication::Blob;
use nomos_core::da::certificate::Certificate;
use nomos_core::header::HeaderId;
use reqwest::Url;
use thiserror::Error;
@ -15,7 +15,7 @@ pub enum Error {
}
/// Return the blobs whose certificate has been included in the provided block.
pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result<Vec<Blob>, Error> {
pub async fn get_block_blobs(node: &Url, block: &HeaderId) -> Result<Vec<Blob>, Error> {
let block = get_block_contents(node, block)
.await?
.ok_or(Error::NotFound)?;

View File

@ -13,6 +13,8 @@ async-trait = { version = "0.1" }
blake2 = { version = "0.10" }
bytes = "1.3"
carnot-engine = { path = "../consensus/carnot-engine", features = ["serde"]}
cryptarchia-engine = { path = "../consensus/cryptarchia-engine", features = ["serde"]}
cryptarchia-ledger = { path = "../ledger/cryptarchia-ledger", features = ["serde"]}
futures = "0.3"
raptorq = { version = "1.7", optional = true }
serde = { version = "1.0", features = ["derive"] }
@ -20,6 +22,7 @@ thiserror = "1.0"
bincode = "1.3"
once_cell = "1.0"
indexmap = { version = "1.9", features = ["serde"] }
const-hex = "1"
[dev-dependencies]
rand = "0.8"

View File

@ -1,16 +1,22 @@
// std
use indexmap::IndexSet;
use std::hash::Hash;
// crates
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
use crate::block::Block;
use crate::crypto::Blake2b;
use crate::da::certificate::BlobCertificateSelect;
use crate::da::certificate::Certificate;
use crate::header::{
carnot::Builder as CarnotBuilder, cryptarchia::Builder as CryptarchiaBuilder, Header, HeaderId,
};
use crate::tx::{Transaction, TxSelect};
use crate::wire;
use blake2::digest::Digest;
use carnot_engine::overlay::RandomBeaconState;
use carnot_engine::{NodeId, Qc, View};
use carnot_engine::{LeaderProof, Qc, View};
/// Wrapper over a block building `new` method than holds intermediary state and can be
/// passed around. It also compounds the transaction selection and blob selection heuristics to be
/// used for transaction and blob selection.
@ -20,10 +26,6 @@ use carnot_engine::{NodeId, Qc, View};
/// use nomos_core::block::builder::BlockBuilder;
/// let builder: BlockBuilder<(), (), FirstTx, FirstBlob> = {
/// BlockBuilder::new( FirstTx::default(), FirstBlob::default())
/// .with_view(View::from(0))
/// .with_parent_qc(qc)
/// .with_proposer(proposer)
/// .with_beacon_state(beacon)
/// .with_transactions([tx1].into_iter())
/// .with_blobs([blob1].into_iter())
/// };
@ -32,14 +34,33 @@ use carnot_engine::{NodeId, Qc, View};
pub struct BlockBuilder<Tx, Blob, TxSelector, BlobSelector> {
tx_selector: TxSelector,
blob_selector: BlobSelector,
view: Option<View>,
parent_qc: Option<Qc>,
proposer: Option<NodeId>,
beacon: Option<RandomBeaconState>,
carnot_header_builder: Option<CarnotBuilder>,
cryptarchia_header_builder: Option<CryptarchiaBuilder>,
txs: Option<Box<dyn Iterator<Item = Tx>>>,
blobs: Option<Box<dyn Iterator<Item = Blob>>>,
}
impl<Tx, C, TxSelector, BlobSelector> BlockBuilder<Tx, C, TxSelector, BlobSelector>
where
Tx: Clone + Eq + Hash,
C: Clone + Eq + Hash,
{
pub fn empty_carnot(
beacon: RandomBeaconState,
view: View,
parent_qc: Qc<HeaderId>,
leader_proof: LeaderProof,
) -> Block<Tx, C> {
Block {
header: Header::Carnot(
CarnotBuilder::new(beacon, view, parent_qc, leader_proof).build([0; 32].into(), 0),
),
cl_transactions: IndexSet::new(),
bl_blobs: IndexSet::new(),
}
}
}
impl<Tx, C, TxSelector, BlobSelector> BlockBuilder<Tx, C, TxSelector, BlobSelector>
where
Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned,
@ -51,36 +72,25 @@ where
Self {
tx_selector,
blob_selector,
view: None,
parent_qc: None,
proposer: None,
beacon: None,
carnot_header_builder: None,
cryptarchia_header_builder: None,
txs: None,
blobs: None,
}
}
#[must_use]
pub fn with_view(mut self, view: View) -> Self {
self.view = Some(view);
pub fn with_carnot_builder(mut self, carnot_header_builder: CarnotBuilder) -> Self {
self.carnot_header_builder = Some(carnot_header_builder);
self
}
#[must_use]
pub fn with_parent_qc(mut self, qc: Qc) -> Self {
self.parent_qc = Some(qc);
self
}
#[must_use]
pub fn with_proposer(mut self, proposer: NodeId) -> Self {
self.proposer = Some(proposer);
self
}
#[must_use]
pub fn with_beacon_state(mut self, beacon: RandomBeaconState) -> Self {
self.beacon = Some(beacon);
pub fn with_cryptarchia_builder(
mut self,
cryptarchia_header_builder: CryptarchiaBuilder,
) -> Self {
self.cryptarchia_header_builder = Some(cryptarchia_header_builder);
self
}
@ -100,28 +110,48 @@ where
}
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<Block<Tx, C>, Self> {
pub fn build(self) -> Result<Block<Tx, C>, String> {
if let Self {
tx_selector,
blob_selector,
view: Some(view),
parent_qc: Some(parent_qc),
proposer: Some(proposer),
beacon: Some(beacon),
carnot_header_builder: carnot_builder,
cryptarchia_header_builder: cryptarchia_builder,
txs: Some(txs),
blobs: Some(blobs),
} = self
{
Ok(Block::new(
view,
parent_qc,
tx_selector.select_tx_from(txs),
blob_selector.select_blob_from(blobs),
proposer,
beacon,
))
let txs = tx_selector.select_tx_from(txs).collect::<IndexSet<_>>();
let blobs = blob_selector
.select_blob_from(blobs)
.collect::<IndexSet<_>>();
let serialized_content = wire::serialize(&(&txs, &blobs)).unwrap();
let content_size = u32::try_from(serialized_content.len()).map_err(|_| {
format!(
"Content is too big: {} out of {} max",
serialized_content.len(),
u32::MAX
)
})?;
let content_id = <[u8; 32]>::from(Blake2b::digest(&serialized_content)).into();
let header = match (carnot_builder, cryptarchia_builder) {
(Some(carnot_builder), None) => {
Header::Carnot(carnot_builder.build(content_id, content_size))
}
(None, Some(cryptarchia_builder)) => {
Header::Cryptarchia(cryptarchia_builder.build(content_id, content_size))
}
_ => return Err("Exactly one header builder should be set".to_string()),
};
Ok(Block {
header,
cl_transactions: txs,
bl_blobs: blobs,
})
} else {
Err(self)
Err("incomplete block".to_string())
}
}
}

View File

@ -1,68 +1,27 @@
pub mod builder;
use carnot_engine::overlay::RandomBeaconState;
use indexmap::IndexSet;
// std
use core::hash::Hash;
// crates
use crate::header::Header;
use crate::wire;
use ::serde::{
de::{DeserializeOwned, Deserializer},
Deserialize, Serialize, Serializer,
};
use ::serde::{de::DeserializeOwned, Deserialize, Serialize};
use bytes::Bytes;
pub use carnot_engine::BlockId;
use carnot_engine::{LeaderProof, NodeId, Qc, View};
// internal
pub type TxHash = [u8; 32];
/// A block
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Block<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> {
header: carnot_engine::Block,
beacon: RandomBeaconState,
header: Header,
cl_transactions: IndexSet<Tx>,
bl_blobs: IndexSet<BlobCertificate>,
}
impl<
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned,
> Block<Tx, BlobCertificate>
{
pub fn new(
view: View,
parent_qc: Qc,
txs: impl Iterator<Item = Tx>,
blobs: impl Iterator<Item = BlobCertificate>,
proposer: NodeId,
beacon: RandomBeaconState,
) -> Self {
let transactions = txs.collect();
let blobs = blobs.collect();
let header = carnot_engine::Block {
id: BlockId::zeros(),
view,
parent_qc,
leader_proof: LeaderProof::LeaderId {
leader_id: proposer,
},
};
let mut s = Self {
header,
beacon,
cl_transactions: transactions,
bl_blobs: blobs,
};
let id = block_id_from_wire_content(&s);
s.header.id = id;
s
}
}
impl<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> Block<Tx, BlobCertificate> {
pub fn header(&self) -> &carnot_engine::Block {
pub fn header(&self) -> &Header {
&self.header
}
@ -73,24 +32,6 @@ impl<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> Block<Tx, BlobCe
pub fn blobs(&self) -> impl Iterator<Item = &BlobCertificate> + '_ {
self.bl_blobs.iter()
}
pub fn beacon(&self) -> &RandomBeaconState {
&self.beacon
}
}
pub fn block_id_from_wire_content<
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned,
>(
block: &Block<Tx, BlobCertificate>,
) -> carnot_engine::BlockId {
use blake2::digest::{consts::U32, Digest};
use blake2::Blake2b;
let bytes = block.as_bytes();
let mut hasher = Blake2b::<U32>::new();
hasher.update(bytes);
BlockId::new(hasher.finalize().into())
}
impl<
@ -104,87 +45,6 @@ impl<
}
pub fn from_bytes(bytes: &[u8]) -> Self {
let mut result: Self = wire::deserialize(bytes).unwrap();
result.header.id = block_id_from_wire_content(&result);
result
}
}
mod serde {
use super::*;
// use ::serde::{de::Deserializer, Deserialize, Serialize};
/// consensus_engine::Block but without the id field, which will be computed
/// from the rest of the block.
#[derive(Serialize, Deserialize)]
struct StrippedHeader {
pub view: View,
pub parent_qc: Qc,
pub leader_proof: LeaderProof,
}
#[derive(Serialize, Deserialize)]
struct StrippedBlock<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> {
header: StrippedHeader,
beacon: RandomBeaconState,
cl_transactions: IndexSet<Tx>,
bl_blobs: IndexSet<BlobCertificate>,
}
impl<
'de,
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned,
> Deserialize<'de> for Block<Tx, BlobCertificate>
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let StrippedBlock {
header,
beacon,
cl_transactions,
bl_blobs,
} = StrippedBlock::deserialize(deserializer)?;
let header = carnot_engine::Block {
id: BlockId::zeros(),
view: header.view,
parent_qc: header.parent_qc,
leader_proof: header.leader_proof,
};
let mut block = Block {
beacon,
cl_transactions,
bl_blobs,
header,
};
block.header.id = block_id_from_wire_content(&block);
Ok(block)
}
}
impl<
Tx: Clone + Eq + Hash + Serialize + DeserializeOwned,
BlobCertificate: Clone + Eq + Hash + Serialize + DeserializeOwned,
> Serialize for Block<Tx, BlobCertificate>
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// TODO: zero copy serialization
let block = StrippedBlock {
header: StrippedHeader {
view: self.header.view,
parent_qc: self.header.parent_qc.clone(),
leader_proof: self.header.leader_proof.clone(),
},
beacon: self.beacon.clone(),
cl_transactions: self.cl_transactions.clone(),
bl_blobs: self.bl_blobs.clone(),
};
block.serialize(serializer)
}
wire::deserialize(bytes).unwrap()
}
}

View File

@ -1,3 +1,7 @@
use blake2::digest::typenum::U32;
pub type PublicKey = [u8; 32];
pub type PrivateKey = [u8; 32];
pub type Signature = [u8; 32];
pub(crate) type Blake2b = blake2::Blake2b<U32>;

View File

@ -0,0 +1,116 @@
use super::{ContentId, HeaderId};
use crate::crypto::Blake2b;
use crate::wire;
use blake2::Digest;
use serde::{Deserialize, Serialize};
use carnot_engine::overlay::RandomBeaconState;
use carnot_engine::{LeaderProof, Qc, View};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Header {
beacon: RandomBeaconState,
view: View,
parent_qc: Qc<HeaderId>,
leader_proof: LeaderProof,
content_id: ContentId,
content_size: u32,
}
impl Header {
pub fn new(
beacon: RandomBeaconState,
view: View,
parent_qc: Qc<HeaderId>,
leader_proof: LeaderProof,
content_id: ContentId,
content_size: u32,
) -> Self {
Self {
beacon,
view,
parent_qc,
leader_proof,
content_id,
content_size,
}
}
pub fn beacon(&self) -> &RandomBeaconState {
&self.beacon
}
pub fn id(&self) -> HeaderId {
let mut h = Blake2b::new();
let bytes = wire::serialize(&self).unwrap();
h.update(&bytes);
HeaderId(h.finalize().into())
}
pub fn parent_qc(&self) -> &Qc<HeaderId> {
&self.parent_qc
}
pub fn leader_proof(&self) -> &LeaderProof {
&self.leader_proof
}
pub fn content_id(&self) -> ContentId {
self.content_id
}
pub fn content_size(&self) -> u32 {
self.content_size
}
pub fn view(&self) -> View {
self.view
}
pub fn parent(&self) -> HeaderId {
self.parent_qc.block()
}
pub fn to_carnot_block(&self) -> carnot_engine::Block<HeaderId> {
carnot_engine::Block {
id: self.id(),
parent_qc: self.parent_qc.clone(),
view: self.view(),
leader_proof: self.leader_proof().clone(),
}
}
}
pub struct Builder {
beacon: RandomBeaconState,
view: View,
parent_qc: Qc<HeaderId>,
leader_proof: LeaderProof,
}
impl Builder {
pub fn new(
beacon: RandomBeaconState,
view: View,
parent_qc: Qc<HeaderId>,
leader_proof: LeaderProof,
) -> Self {
Self {
beacon,
view,
parent_qc,
leader_proof,
}
}
pub fn build(self, content_id: ContentId, content_size: u32) -> Header {
Header::new(
self.beacon,
self.view,
self.parent_qc,
self.leader_proof,
content_id,
content_size,
)
}
}

View File

@ -1,24 +1,22 @@
use crate::{crypto::Blake2b, leader_proof::LeaderProof};
use super::{ContentId, HeaderId};
use crate::crypto::Blake2b;
use blake2::Digest;
use cryptarchia_engine::Slot;
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)]
pub struct HeaderId([u8; 32]);
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)]
pub struct ContentId([u8; 32]);
use cryptarchia_ledger::LeaderProof;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, Copy)]
pub struct Nonce([u8; 32]);
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Header {
parent: HeaderId,
slot: Slot,
// TODO: move this to common header fields
// length of block contents in bytes
content_size: u32,
// id of block contents
content_id: ContentId,
slot: Slot,
leader_proof: LeaderProof,
orphaned_leader_proofs: Vec<Header>,
}
@ -85,40 +83,36 @@ impl Header {
}
}
// ----------- conversions
pub struct Builder {
parent: HeaderId,
slot: Slot,
leader_proof: LeaderProof,
orphaned_leader_proofs: Vec<Header>,
}
impl From<[u8; 32]> for Nonce {
fn from(nonce: [u8; 32]) -> Self {
Self(nonce)
impl Builder {
pub fn new(parent: HeaderId, slot: Slot, leader_proof: LeaderProof) -> Self {
Self {
parent,
slot,
leader_proof,
orphaned_leader_proofs: vec![],
}
}
impl From<Nonce> for [u8; 32] {
fn from(nonce: Nonce) -> [u8; 32] {
nonce.0
}
pub fn with_orphaned_proofs(mut self, orphaned_leader_proofs: Vec<Header>) -> Self {
self.orphaned_leader_proofs = orphaned_leader_proofs;
self
}
impl From<[u8; 32]> for HeaderId {
fn from(id: [u8; 32]) -> Self {
Self(id)
pub fn build(self, content_id: ContentId, content_size: u32) -> Header {
Header {
parent: self.parent,
slot: self.slot,
content_size,
content_id,
leader_proof: self.leader_proof,
orphaned_leader_proofs: self.orphaned_leader_proofs,
}
}
impl From<HeaderId> for [u8; 32] {
fn from(id: HeaderId) -> Self {
id.0
}
}
impl From<[u8; 32]> for ContentId {
fn from(id: [u8; 32]) -> Self {
Self(id)
}
}
impl From<ContentId> for [u8; 32] {
fn from(id: ContentId) -> Self {
id.0
}
}

View File

@ -0,0 +1,89 @@
use serde::{Deserialize, Serialize};
use crate::utils::{display_hex_bytes_newtype, serde_bytes_newtype};
pub mod carnot;
pub mod cryptarchia;
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
pub struct HeaderId([u8; 32]);
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)]
pub struct ContentId([u8; 32]);
// This lint is a false positive?
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Header {
Cryptarchia(cryptarchia::Header),
Carnot(carnot::Header),
}
impl Header {
pub fn cryptarchia(&self) -> &cryptarchia::Header {
match self {
Self::Cryptarchia(header) => header,
Self::Carnot(_) => panic!("Header is not a Cryptarchia header"),
}
}
pub fn carnot(&self) -> &carnot::Header {
match self {
Self::Carnot(header) => header,
Self::Cryptarchia(_) => panic!("Header is not a Carnot header"),
}
}
pub fn id(&self) -> HeaderId {
match self {
Self::Cryptarchia(header) => header.id(),
Self::Carnot(header) => header.id(),
}
}
pub fn parent(&self) -> HeaderId {
match self {
Self::Cryptarchia(header) => header.parent(),
Self::Carnot(header) => header.parent(),
}
}
}
impl From<[u8; 32]> for HeaderId {
fn from(id: [u8; 32]) -> Self {
Self(id)
}
}
impl From<HeaderId> for [u8; 32] {
fn from(id: HeaderId) -> Self {
id.0
}
}
impl From<[u8; 32]> for ContentId {
fn from(id: [u8; 32]) -> Self {
Self(id)
}
}
impl From<ContentId> for [u8; 32] {
fn from(id: ContentId) -> Self {
id.0
}
}
display_hex_bytes_newtype!(HeaderId);
display_hex_bytes_newtype!(ContentId);
serde_bytes_newtype!(HeaderId, 32);
serde_bytes_newtype!(ContentId, 32);
#[test]
fn test_serde() {
assert_eq!(
crate::wire::deserialize::<HeaderId>(&crate::wire::serialize(&HeaderId([0; 32])).unwrap())
.unwrap(),
HeaderId([0; 32])
);
}

View File

@ -2,6 +2,7 @@ pub mod account;
pub mod block;
pub mod crypto;
pub mod da;
pub mod header;
pub mod staking;
pub mod tx;
pub mod utils;

View File

@ -1 +1,53 @@
pub mod select;
macro_rules! display_hex_bytes_newtype {
($newtype:ty) => {
impl core::fmt::Display for $newtype {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(f, "0x")?;
for v in self.0 {
write!(f, "{:02x}", v)?;
}
Ok(())
}
}
};
}
macro_rules! serde_bytes_newtype {
($newtype:ty, $len:expr) => {
impl serde::Serialize for $newtype {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
const_hex::const_encode::<$len, false>(&self.0)
.as_str()
.serialize(serializer)
} else {
self.0.serialize(serializer)
}
}
}
impl<'de> serde::Deserialize<'de> for $newtype {
fn deserialize<D>(deserializer: D) -> Result<$newtype, D::Error>
where
D: serde::Deserializer<'de>,
{
if deserializer.is_human_readable() {
let s = <&str>::deserialize(deserializer)?;
const_hex::decode_to_array(s)
.map(Self)
.map_err(serde::de::Error::custom)
} else {
<[u8; $len]>::deserialize(deserializer).map(Self)
}
}
}
};
}
pub(crate) use display_hex_bytes_newtype;
pub(crate) use serde_bytes_newtype;

View File

@ -1,5 +1,6 @@
// std
// crates
use crate::header::HeaderId;
use carnot_engine::{Block, View};
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
@ -49,7 +50,7 @@ impl Tally for MockTally {
type Vote = MockVote;
type Qc = MockQc;
type Outcome = ();
type Subject = Block;
type Subject = Block<HeaderId>;
type TallyError = Error;
type Settings = MockTallySettings;
@ -60,7 +61,7 @@ impl Tally for MockTally {
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
block: Block,
block: Block<HeaderId>,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut count_votes = 0;

View File

@ -1,6 +1,6 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::block::BlockId;
use nomos_core::header::HeaderId;
use nomos_core::tx::Transaction;
use nomos_mempool::{
backend::mockpool::MockPool,
@ -13,7 +13,7 @@ use tokio::sync::oneshot;
type ClMempoolService<T> = MempoolService<
Libp2pAdapter<T, <T as Transaction>::Hash>,
MockPool<BlockId, T, <T as Transaction>::Hash>,
MockPool<HeaderId, T, <T as Transaction>::Hash>,
TxDiscriminant,
>;
@ -47,7 +47,7 @@ where
pub async fn cl_mempool_status<T>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<T as Transaction>::Hash>,
) -> Result<Vec<Status<BlockId>>, super::DynError>
) -> Result<Vec<Status<HeaderId>>, super::DynError>
where
T: Transaction
+ Clone

View File

@ -10,7 +10,7 @@ use carnot_consensus::{
};
use carnot_engine::{
overlay::{RandomBeaconState, RoundRobin, TreeOverlay},
Block, BlockId,
Block,
};
use full_replication::Certificate;
use nomos_core::{
@ -18,6 +18,7 @@ use nomos_core::{
blob,
certificate::{self, select::FillSize as FillSizeWithBlobsCertificate},
},
header::HeaderId,
tx::{select::FillSize as FillSizeWithTx, Transaction},
};
use nomos_mempool::{
@ -27,10 +28,10 @@ use nomos_storage::backends::{sled::SledBackend, StorageSerde};
pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
ConsensusLibp2pAdapter,
MockPool<BlockId, Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
BlockId,
HeaderId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
@ -64,9 +65,9 @@ where
pub async fn carnot_blocks<Tx, SS, const SIZE: usize>(
handle: &OverwatchHandle,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Result<Vec<Block>, super::DynError>
from: Option<HeaderId>,
to: Option<HeaderId>,
) -> Result<Vec<Block<HeaderId>>, super::DynError>
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,

View File

@ -1,6 +1,6 @@
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication};
use nomos_core::block::BlockId;
use nomos_core::da::blob;
use nomos_core::header::HeaderId;
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
DaMsg, DataAvailabilityService,
@ -15,7 +15,7 @@ use tokio::sync::oneshot;
pub type DaMempoolService = MempoolService<
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<BlockId, Certificate, <Blob as blob::Blob>::Hash>,
MockPool<HeaderId, Certificate, <Blob as blob::Blob>::Hash>,
CertDiscriminant,
>;
@ -43,7 +43,7 @@ pub async fn da_mempool_metrics(
pub async fn da_mempool_status(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Status<BlockId>>, super::DynError> {
) -> Result<Vec<Status<HeaderId>>, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay

View File

@ -1,5 +1,5 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::block::BlockId;
use nomos_core::header::HeaderId;
use nomos_mempool::{
backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService,
};
@ -20,7 +20,7 @@ where
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<MempoolService<A, MockPool<BlockId, Item, Key>, D>>()
.relay::<MempoolService<A, MockPool<HeaderId, Item, Key>, D>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -1,5 +1,5 @@
use carnot_engine::BlockId;
use nomos_core::block::Block;
use nomos_core::header::HeaderId;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
StorageMsg, StorageService,
@ -7,7 +7,7 @@ use nomos_storage::{
pub async fn block_req<S, Tx>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
id: BlockId,
id: HeaderId,
) -> Result<Option<Block<Tx, full_replication::Certificate>>, super::DynError>
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,

View File

@ -6,10 +6,10 @@ use std::hash::Hash;
// crates
// internal
use crate::TimeoutQc;
use carnot_engine::overlay::{
CommitteeMembership, Error as RandomBeaconError, FreezeMembership, RandomBeaconState,
};
use carnot_engine::TimeoutQc;
use nomos_core::block::Block;
pub trait UpdateableCommitteeMembership: CommitteeMembership {
@ -44,7 +44,10 @@ impl UpdateableCommitteeMembership for RandomBeaconState {
&self,
block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error> {
self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view())
self.check_advance_happy(
block.header().carnot().beacon().clone(),
block.header().carnot().parent_qc().view(),
)
}
fn on_timeout_qc_received(&self, qc: &TimeoutQc) -> Result<Self, Self::Error> {

View File

@ -1,8 +1,6 @@
use crate::TimeoutQc;
use carnot_engine::overlay::RoundRobin;
use carnot_engine::{
overlay::{Error as RandomBeaconError, LeaderSelection, RandomBeaconState},
TimeoutQc,
};
use carnot_engine::overlay::{Error as RandomBeaconError, LeaderSelection, RandomBeaconState};
use nomos_core::block::Block;
use std::{convert::Infallible, error::Error, hash::Hash};
@ -38,7 +36,10 @@ impl UpdateableLeaderSelection for RandomBeaconState {
&self,
block: &Block<Tx, Blob>,
) -> Result<Self, Self::Error> {
self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view())
self.check_advance_happy(
block.header().carnot().beacon().clone(),
block.header().carnot().parent_qc().view(),
)
// TODO: check random beacon public keys is leader id
}

View File

@ -29,8 +29,7 @@ use crate::tally::{
happy::CarnotTally, timeout::TimeoutTally, unhappy::NewViewTally, CarnotTallySettings,
};
use carnot_engine::{
overlay::RandomBeaconState, AggregateQc, BlockId, Carnot, Committee, LeaderProof, NewView,
Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote,
overlay::RandomBeaconState, Carnot, Committee, LeaderProof, Overlay, Payload, View,
};
use task_manager::TaskManager;
@ -38,6 +37,7 @@ use crate::committee_membership::UpdateableCommitteeMembership;
use nomos_core::block::builder::BlockBuilder;
use nomos_core::block::Block;
use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
use nomos_core::header::{carnot::Builder, HeaderId};
use nomos_core::tx::{Transaction, TxSelect};
use nomos_core::vote::Tally;
use nomos_mempool::{
@ -65,6 +65,13 @@ fn default_timeout() -> Duration {
// Random seed for each round provided by the protocol
pub type Seed = [u8; 32];
type TimeoutQc = carnot_engine::TimeoutQc<HeaderId>;
type NewView = carnot_engine::NewView<HeaderId>;
type AggregateQc = carnot_engine::AggregateQc<HeaderId>;
type Qc = carnot_engine::Qc<HeaderId>;
type StandardQc = carnot_engine::StandardQc<HeaderId>;
type Vote = carnot_engine::Vote<HeaderId>;
type Timeout = carnot_engine::Timeout<HeaderId>;
#[derive(Debug, Deserialize, Serialize)]
pub struct CarnotSettings<O: Overlay, Ts, Bs> {
@ -113,8 +120,8 @@ pub struct CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, T
where
A: NetworkAdapter,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = BlockId>,
DaPool: MemPool<BlockId = BlockId>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
O: Overlay + Debug,
ClPool::Item: Debug + 'static,
@ -140,10 +147,10 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage> Servi
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where
A: NetworkAdapter,
ClPool: MemPool<BlockId = BlockId>,
ClPool: MemPool<BlockId = HeaderId>,
ClPool::Item: Debug,
ClPool::Key: Debug,
DaPool: MemPool<BlockId = BlockId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPool::Item: Debug,
DaPool::Key: Debug,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
@ -165,9 +172,9 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage> Servi
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
@ -252,9 +259,9 @@ where
let overlay = O::new(overlay_settings);
let genesis = carnot_engine::Block {
id: BlockId::zeros(),
id: [0; 32].into(),
view: View::new(0),
parent_qc: Qc::Standard(StandardQc::genesis()),
parent_qc: Qc::Standard(StandardQc::genesis([0; 32].into())),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
},
@ -299,6 +306,7 @@ where
);
if carnot.is_next_leader() {
tracing::info!("is next leader, gathering vores");
let network_adapter = adapter.clone();
task_manager.push(genesis_block.view.next(), async move {
let Event::Approve { qc, .. } = Self::gather_votes(
@ -312,6 +320,7 @@ where
tracing::debug!("Failed to gather initial votes");
return Event::None;
};
tracing::info!("got enough votes");
Event::ProposeBlock { qc }
});
}
@ -351,7 +360,7 @@ where
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Output<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> {
Send(carnot_engine::Send),
Send(carnot_engine::Send<HeaderId>),
BroadcastTimeoutQc {
timeout_qc: TimeoutQc,
},
@ -364,9 +373,9 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
@ -414,7 +423,7 @@ where
}
}
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
fn process_message(carnot: &Carnot<O, HeaderId>, msg: ConsensusMsg) {
match msg {
ConsensusMsg::Info { tx } => {
let info = CarnotInfo {
@ -457,18 +466,18 @@ where
#[allow(clippy::too_many_arguments)]
async fn process_carnot_event(
mut carnot: Carnot<O>,
mut carnot: Carnot<O, HeaderId>,
event: Event<ClPool::Item, DaPool::Item>,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
private_key: PrivateKey,
cl_mempool_relay: OutboundRelay<MempoolMsg<BlockId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<BlockId, DaPool::Item, DaPool::Key>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
tx_selector: TxS,
blobl_selector: BS,
timeout: Duration,
) -> Carnot<O> {
) -> Carnot<O, HeaderId> {
let mut output = None;
let prev_view = carnot.current_view();
match event {
@ -571,24 +580,26 @@ where
)
)]
async fn process_block(
mut carnot: Carnot<O>,
mut carnot: Carnot<O, HeaderId>,
block: Block<ClPool::Item, DaPool::Item>,
mut stream: Pin<Box<dyn Stream<Item = Block<ClPool::Item, DaPool::Item>> + Send>>,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<BlockId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<BlockId, DaPool::Item, DaPool::Key>>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
) -> (
Carnot<O, HeaderId>,
Option<Output<ClPool::Item, DaPool::Item>>,
) {
tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view {
tracing::debug!("already voted for view {}", block.header().view);
let original_block = block;
let block = original_block.header().carnot().clone();
if carnot.highest_voted_view() >= block.view() {
tracing::debug!("already voted for view {}", block.view());
return (carnot, None);
}
let original_block = block;
let block = original_block.header().clone();
let self_committee = carnot.self_committee();
let leader_committee = [carnot.id()].into_iter().collect();
@ -602,10 +613,10 @@ where
participating_nodes: carnot.root_committee(),
};
match carnot.receive_block(block.clone()) {
match carnot.receive_block(block.to_carnot_block()) {
Ok(mut new_state) => {
let new_view = new_state.current_view();
let msg = <StorageMsg<_>>::new_store_message(block.id, original_block.clone());
let msg = <StorageMsg<_>>::new_store_message(block.id(), original_block.clone());
if let Err((e, _msg)) = storage_relay.send(msg).await {
tracing::error!("Could not send block to storage: {e}");
}
@ -614,24 +625,24 @@ where
mark_in_block(
cl_mempool_relay,
original_block.transactions().map(Transaction::hash),
block.id,
block.id(),
)
.await;
mark_in_block(
da_mempool_relay,
original_block.blobs().map(Certificate::hash),
block.id,
block.id(),
)
.await;
if new_view != carnot.current_view() {
task_manager.push(
block.view,
block.view(),
Self::gather_votes(
adapter.clone(),
self_committee,
block.clone(),
block.to_carnot_block(),
tally_settings,
),
);
@ -643,7 +654,7 @@ where
},
);
} else {
task_manager.push(block.view, async move {
task_manager.push(block.view(), async move {
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
@ -657,9 +668,13 @@ where
}
if carnot.is_next_leader() {
task_manager.push(block.view, async move {
let Event::Approve { qc, .. } =
Self::gather_votes(adapter, leader_committee, block, leader_tally_settings)
task_manager.push(block.view(), async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
block.to_carnot_block(),
leader_tally_settings,
)
.await
else {
unreachable!()
@ -674,12 +689,15 @@ where
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(task_manager, adapter))]
async fn approve_new_view(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
) -> (
Carnot<O, HeaderId>,
Option<Output<ClPool::Item, DaPool::Item>>,
) {
let leader_committee = [carnot.id()].into_iter().collect();
let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
@ -713,11 +731,14 @@ where
#[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(task_manager, adapter))]
async fn receive_timeout_qc(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
timeout_qc: TimeoutQc,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
) -> (
Carnot<O, HeaderId>,
Option<Output<ClPool::Item, DaPool::Item>>,
) {
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
let self_committee = carnot.self_committee();
let tally_settings = CarnotTallySettings {
@ -741,9 +762,12 @@ where
#[allow(clippy::type_complexity)]
#[instrument(level = "debug")]
async fn process_root_timeout(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
timeouts: HashSet<Timeout>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
) -> (
Carnot<O, HeaderId>,
Option<Output<ClPool::Item, DaPool::Item>>,
) {
// we might have received a timeout_qc sent by some other node and advanced the view
// already, in which case we should ignore the timeout
if carnot.current_view()
@ -793,8 +817,8 @@ where
qc: Qc,
tx_selector: TxS,
blob_selector: BS,
cl_mempool_relay: OutboundRelay<MempoolMsg<BlockId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<BlockId, DaPool::Item, DaPool::Key>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
) -> Option<Output<ClPool::Item, DaPool::Item>> {
let mut output = None;
let cl_txs = get_mempool_contents(cl_mempool_relay);
@ -804,10 +828,12 @@ where
(Ok(cl_txs), Ok(da_certs)) => {
let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key);
let Ok(proposal) = BlockBuilder::new(tx_selector, blob_selector)
.with_view(qc.view().next())
.with_parent_qc(qc)
.with_proposer(id)
.with_beacon_state(beacon)
.with_carnot_builder(Builder::new(
beacon,
qc.view().next(),
qc,
LeaderProof::LeaderId { leader_id: id },
))
.with_transactions(cl_txs)
.with_blobs_certificates(da_certs)
.build()
@ -823,7 +849,7 @@ where
}
async fn process_view_change(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
prev_view: View,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
@ -883,7 +909,7 @@ where
async fn gather_votes(
adapter: A,
committee: Committee,
block: carnot_engine::Block,
block: carnot_engine::Block<HeaderId>,
tally: CarnotTallySettings,
) -> Event<ClPool::Item, DaPool::Item> {
let tally = CarnotTally::new(tally);
@ -947,7 +973,7 @@ where
.filter_map(move |msg| {
async move {
let proposal = Block::from_bytes(&msg.data);
if proposal.header().id == msg.proposal {
if proposal.header().id() == msg.proposal {
// TODO: Leader is faulty? what should we do?
Some(proposal)
} else {
@ -967,9 +993,9 @@ where
E: std::error::Error,
Fl: FnOnce(O::LeaderSelection) -> Result<O::LeaderSelection, E>,
>(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
leader_selection_f: Fl,
) -> Carnot<O> {
) -> Carnot<O, HeaderId> {
carnot
.update_overlay(|overlay| overlay.update_leader_selection(leader_selection_f))
.unwrap()
@ -979,9 +1005,9 @@ where
E: std::error::Error,
Fm: FnOnce(O::CommitteeMembership) -> Result<O::CommitteeMembership, E>,
>(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
committee_membership_f: Fm,
) -> Carnot<O> {
) -> Carnot<O, HeaderId> {
carnot
.update_overlay(|overlay| overlay.update_committees(committee_membership_f))
.unwrap()
@ -993,10 +1019,10 @@ where
Fl: FnOnce(O::LeaderSelection) -> Result<O::LeaderSelection, El>,
Fm: FnOnce(O::CommitteeMembership) -> Result<O::CommitteeMembership, Em>,
>(
carnot: Carnot<O>,
carnot: Carnot<O, HeaderId>,
leader_selection_f: Fl,
committee_membership_f: Fm,
) -> Carnot<O> {
) -> Carnot<O, HeaderId> {
let carnot = Self::update_leader_selection(carnot, leader_selection_f);
Self::update_committee_membership(carnot, committee_membership_f)
}
@ -1048,9 +1074,9 @@ where
Output::BroadcastProposal { proposal } => {
adapter
.broadcast(NetworkMessage::Proposal(ProposalMsg {
proposal: proposal.header().id,
proposal: proposal.header().id(),
data: proposal.as_bytes().to_vec().into_boxed_slice(),
view: proposal.header().view,
view: proposal.header().carnot().view(),
}))
.await;
}
@ -1074,7 +1100,7 @@ enum Event<Tx: Clone + Hash + Eq, BlobCertificate: Clone + Eq + Hash> {
#[allow(dead_code)]
Approve {
qc: Qc,
block: carnot_engine::Block,
block: carnot_engine::Block<HeaderId>,
votes: HashSet<Vote>,
},
LocalTimeout {
@ -1105,9 +1131,9 @@ pub enum ConsensusMsg {
/// 'to' (the oldest block). If 'from' is None, the tip of the chain is used as a starting
/// point. If 'to' is None or not known to the node, the genesis block is used as an end point.
GetBlocks {
from: Option<BlockId>,
to: Option<BlockId>,
tx: Sender<Vec<carnot_engine::Block>>,
from: Option<HeaderId>,
to: Option<HeaderId>,
tx: Sender<Vec<carnot_engine::Block<HeaderId>>>,
},
}
@ -1121,19 +1147,19 @@ pub struct CarnotInfo {
pub current_view: View,
pub highest_voted_view: View,
pub local_high_qc: StandardQc,
pub tip: carnot_engine::Block,
pub tip: carnot_engine::Block<HeaderId>,
pub last_view_timeout_qc: Option<TimeoutQc>,
pub last_committed_block: carnot_engine::Block,
pub last_committed_block: carnot_engine::Block<HeaderId>,
}
async fn get_mempool_contents<Item, Key>(
mempool: OutboundRelay<MempoolMsg<BlockId, Item, Key>>,
mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>,
) -> Result<Box<dyn Iterator<Item = Item> + Send>, tokio::sync::oneshot::error::RecvError> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
mempool
.send(MempoolMsg::View {
ancestor_hint: BlockId::zeros(),
ancestor_hint: [0; 32].into(),
reply_channel,
})
.await
@ -1143,9 +1169,9 @@ async fn get_mempool_contents<Item, Key>(
}
async fn mark_in_block<Item, Key>(
mempool: OutboundRelay<MempoolMsg<BlockId, Item, Key>>,
mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>,
ids: impl Iterator<Item = Key>,
block: BlockId,
block: HeaderId,
) {
mempool
.send(MempoolMsg::MarkInBlock {
@ -1170,14 +1196,14 @@ mod tests {
highest_voted_view: View::new(-1),
local_high_qc: StandardQc {
view: View::new(0),
id: BlockId::zeros(),
id: [0; 32].into(),
},
tip: Block {
id: BlockId::zeros(),
id: [0; 32].into(),
view: View::new(0),
parent_qc: Qc::Standard(StandardQc {
view: View::new(0),
id: BlockId::zeros(),
id: [0; 32].into(),
}),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
@ -1185,11 +1211,11 @@ mod tests {
},
last_view_timeout_qc: None,
last_committed_block: Block {
id: BlockId::zeros(),
id: [0; 32].into(),
view: View::new(0),
parent_qc: Qc::Standard(StandardQc {
view: View::new(0),
id: BlockId::zeros(),
id: [0; 32].into(),
}),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),

View File

@ -13,8 +13,8 @@ use crate::network::{
messages::{NetworkMessage, ProposalMsg, VoteMsg},
BoxedStream, NetworkAdapter,
};
use carnot_engine::{BlockId, Committee, CommitteeId, View};
use nomos_core::wire;
use carnot_engine::{Committee, CommitteeId, View};
use nomos_core::{header::HeaderId, wire};
use nomos_network::{
backends::libp2p::{Command, Event, EventKind, Libp2p},
NetworkMsg, NetworkService,
@ -94,7 +94,7 @@ impl<T> Spsc<T> {
#[derive(Default)]
struct Messages {
proposal_chunks: Spsc<ProposalMsg>,
votes: HashMap<CommitteeId, HashMap<BlockId, Spsc<VoteMsg>>>,
votes: HashMap<CommitteeId, HashMap<HeaderId, Spsc<VoteMsg>>>,
new_views: HashMap<CommitteeId, Spsc<NewViewMsg>>,
timeouts: HashMap<CommitteeId, Spsc<TimeoutMsg>>,
timeout_qcs: Spsc<TimeoutQcMsg>,
@ -153,7 +153,7 @@ impl MessageCache {
&self,
view: View,
committee_id: CommitteeId,
proposal_id: BlockId,
proposal_id: HeaderId,
) -> Option<Receiver<VoteMsg>> {
self.cache.lock().unwrap().get_mut(&view).map(|m| {
m.votes
@ -264,7 +264,7 @@ impl NetworkAdapter for Libp2pAdapter {
}
}
NetworkMessage::Vote(msg) => {
tracing::debug!("received vote");
tracing::debug!("received vote {:?}", msg);
let mut cache = cache.cache.lock().unwrap();
let view = msg.vote.view;
if let Some(messages) = cache.get_mut(&view) {
@ -356,7 +356,7 @@ impl NetworkAdapter for Libp2pAdapter {
&self,
committee: &Committee,
view: View,
proposal_id: BlockId,
proposal_id: HeaderId,
) -> BoxedStream<VoteMsg> {
self.message_cache
.get_votes(view, committee.id::<blake2::Blake2s256>(), proposal_id)

View File

@ -3,13 +3,15 @@
use serde::{Deserialize, Serialize};
// internal
use crate::NodeId;
use carnot_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote};
use crate::{NewView, Qc, Timeout, TimeoutQc, Vote};
use carnot_engine::View;
use nomos_core::header::HeaderId;
use nomos_core::wire;
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct ProposalMsg {
pub data: Box<[u8]>,
pub proposal: BlockId,
pub proposal: HeaderId,
pub view: View,
}
@ -84,7 +86,7 @@ impl TimeoutQcMsg {
}
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub enum NetworkMessage {
Timeout(TimeoutMsg),
TimeoutQc(TimeoutQcMsg),

View File

@ -4,11 +4,12 @@ pub mod messages;
// std
// crates
use futures::Stream;
use nomos_core::header::HeaderId;
// internal
use crate::network::messages::{
NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
};
use carnot_engine::{BlockId, Committee, View};
use carnot_engine::{Committee, View};
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
@ -33,7 +34,7 @@ pub trait NetworkAdapter {
&self,
committee: &Committee,
view: View,
proposal_id: BlockId,
proposal_id: HeaderId,
) -> BoxedStream<VoteMsg>;
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg>;
async fn send(&self, message: NetworkMessage, committee: &Committee);

View File

@ -4,15 +4,17 @@
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
use nomos_core::header::HeaderId;
// internal
use super::CarnotTallySettings;
use crate::network::messages::VoteMsg;
use carnot_engine::{Block, Qc, StandardQc, Vote};
use crate::{Qc, StandardQc, Vote};
use nomos_core::crypto::PublicKey;
use nomos_core::vote::Tally;
pub type NodeId = PublicKey;
type Block = carnot_engine::Block<HeaderId>;
#[derive(thiserror::Error, Debug)]
pub enum CarnotTallyError {
@ -82,7 +84,6 @@ impl Tally for CarnotTally {
));
}
}
Err(CarnotTallyError::StreamEnded)
}
}

View File

@ -5,7 +5,8 @@ use futures::{Stream, StreamExt};
// internal
use super::CarnotTallySettings;
use crate::network::messages::TimeoutMsg;
use carnot_engine::{Timeout, View};
use crate::Timeout;
use carnot_engine::View;
use nomos_core::vote::Tally;
#[derive(Clone, Debug)]

View File

@ -6,9 +6,10 @@ use serde::{Deserialize, Serialize};
// internal
use super::CarnotTallySettings;
use crate::network::messages::NewViewMsg;
use carnot_engine::{NewView, TimeoutQc};
use nomos_core::vote::Tally;
use crate::{NewView, TimeoutQc};
#[derive(thiserror::Error, Debug)]
pub enum NewViewTallyError {
#[error("Did not receive enough votes")]

View File

@ -1,5 +1,5 @@
use nomos_core::{
block::BlockId,
header::HeaderId,
tx::mock::{MockTransaction, MockTxId},
};
use nomos_log::{Logger, LoggerSettings};
@ -23,7 +23,7 @@ struct MockPoolNode {
mockpool: ServiceHandle<
MempoolService<
MockAdapter,
MockPool<BlockId, MockTransaction<MockMessage>, MockTxId>,
MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>,
Transaction,
>,
>,
@ -80,7 +80,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app.handle().relay::<MempoolService<
MockAdapter,
MockPool<BlockId, MockTransaction<MockMessage>, MockTxId>,
MockPool<HeaderId, MockTransaction<MockMessage>, MockTxId>,
Transaction,
>>();
@ -102,7 +102,7 @@ fn test_mockmempool() {
let (mtx, mrx) = tokio::sync::oneshot::channel();
mempool_outbound
.send(MempoolMsg::View {
ancestor_hint: BlockId::default(),
ancestor_hint: [0; 32].into(),
reply_channel: mtx,
})
.await

View File

@ -6,9 +6,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use anyhow::Ok;
use carnot_engine::overlay::RandomBeaconState;
use carnot_engine::{Block, View};
use carnot_engine::{Block, LeaderProof, View};
use clap::Parser;
use crossbeam::channel;
use nomos_core::block::builder::BlockBuilder;
use parking_lot::Mutex;
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
@ -129,15 +130,13 @@ impl SimulationApp {
let leader = nodes.first().copied().unwrap();
// FIXME: Actually use a proposer and a key to generate random beacon state
let genesis = nomos_core::block::Block::new(
View::new(0),
Block::genesis().parent_qc,
[].into_iter(),
[].into_iter(),
leader,
let genesis = <BlockBuilder<_, _, (), ()>>::empty_carnot(
RandomBeaconState::Sad {
entropy: Box::new([0; 32]),
},
View::new(0),
Block::genesis([0; 32].into()).parent_qc,
LeaderProof::LeaderId { leader_id: leader },
);
let mut rng = SmallRng::seed_from_u64(seed);
overlay_node::to_overlay_node(

View File

@ -1,10 +1,10 @@
use crate::node::carnot::{messages::CarnotMessage, tally::Tally, timeout::TimeoutHandler};
use crate::node::carnot::{AggregateQc, Carnot, NewView, Qc, StandardQc, Timeout, TimeoutQc, Vote};
use carnot_consensus::network::messages::{NewViewMsg, TimeoutMsg, VoteMsg};
use carnot_consensus::NodeId;
use carnot_engine::{
AggregateQc, Carnot, NewView, Overlay, Qc, StandardQc, Timeout, TimeoutQc, View, Vote,
};
use carnot_engine::{Overlay, View};
use nomos_core::block::Block;
use nomos_core::header::HeaderId;
use std::collections::HashSet;
use std::hash::Hash;
use std::time::Duration;
@ -97,8 +97,8 @@ impl EventBuilder {
tracing::info!(
node=%self.id,
current_view = %engine.current_view(),
block_view=%block.header().view,
block=?block.header().id,
block_view=%block.header().carnot().view(),
block=?block.header().id(),
parent_block=?block.header().parent(),
"receive proposal message",
);
@ -236,7 +236,7 @@ pub enum Event<Tx: Clone + Hash + Eq> {
#[allow(dead_code)]
Approve {
qc: Qc,
block: carnot_engine::Block,
block: carnot_engine::Block<HeaderId>,
votes: HashSet<Vote>,
},
ProposeBlock {

View File

@ -4,6 +4,8 @@ mod event_builder;
mod message_cache;
pub mod messages;
mod state;
use nomos_core::block::builder::BlockBuilder;
use nomos_core::header::HeaderId;
pub use state::*;
mod serde_util;
mod tally;
@ -36,9 +38,18 @@ use carnot_consensus::{
network::messages::{NewViewMsg, TimeoutMsg, VoteMsg},
};
use carnot_engine::overlay::RandomBeaconState;
use carnot_engine::{
Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote,
};
use carnot_engine::{Committee, LeaderProof, Overlay, View};
type Block = carnot_engine::Block<HeaderId>;
type AggregateQc = carnot_engine::AggregateQc<HeaderId>;
type Carnot<O> = carnot_engine::Carnot<O, HeaderId>;
type Payload = carnot_engine::Payload<HeaderId>;
type TimeoutQc = carnot_engine::TimeoutQc<HeaderId>;
type Vote = carnot_engine::Vote<HeaderId>;
type Qc = carnot_engine::Qc<HeaderId>;
type StandardQc = carnot_engine::StandardQc<HeaderId>;
type NewView = carnot_engine::NewView<HeaderId>;
type Timeout = carnot_engine::Timeout<HeaderId>;
static RECORD_SETTINGS: std::sync::OnceLock<BTreeMap<String, bool>> = std::sync::OnceLock::new();
@ -95,7 +106,7 @@ impl<
rng: &mut R,
) -> Self {
let overlay = O::new(overlay_settings);
let engine = Carnot::from_genesis(id, genesis.header().clone(), overlay);
let engine = Carnot::from_genesis(id, genesis.header().carnot().to_carnot_block(), overlay);
let state = CarnotState::from(&engine);
let timeout = settings.timeout;
RECORD_SETTINGS.get_or_init(|| settings.record_settings.clone());
@ -179,8 +190,8 @@ impl<
self.network_interface
.broadcast(CarnotMessage::Proposal(ProposalMsg {
data: proposal.as_bytes().to_vec().into(),
proposal: proposal.header().id,
view: proposal.header().view,
proposal: proposal.header().id(),
view: proposal.header().carnot().view(),
}))
}
}
@ -195,12 +206,15 @@ impl<
node=%self.id,
last_committed_view=%self.engine.latest_committed_view(),
current_view = %current_view,
block_view = %block.header().view,
block = %block.header().id,
block_view = %block.header().carnot().view(),
block = %block.header().id(),
parent_block=%block.header().parent(),
"receive block proposal",
);
match self.engine.receive_block(block.header().clone()) {
match self
.engine
.receive_block(block.header().carnot().to_carnot_block())
{
Ok(mut new) => {
if self.engine.current_view() != new.current_view() {
new = Self::update_overlay_with_block(new, &block);
@ -211,7 +225,7 @@ impl<
tracing::error!(
node = %self.id,
current_view = %self.engine.current_view(),
block_view = %block.header().view, block = %block.header().id,
block_view = %block.header().carnot().view(), block = %block.header().id(),
"receive block proposal, but is invalid",
);
}
@ -230,7 +244,7 @@ impl<
to,
payload: Payload::Vote(Vote {
view: self.engine.current_view(),
block: block.header().id,
block: block.header().id(),
}),
}))
}
@ -265,13 +279,13 @@ impl<
}
Event::ProposeBlock { qc } => {
output = Some(Output::BroadcastProposal {
proposal: nomos_core::block::Block::new(
qc.view().next(),
qc.clone(),
[].into_iter(),
[].into_iter(),
self.id,
proposal: <BlockBuilder<_, _, (), ()>>::empty_carnot(
RandomBeaconState::generate_happy(qc.view().next(), &self.random_beacon_pk),
qc.view().next(),
qc,
LeaderProof::LeaderId {
leader_id: [0; 32].into(),
},
),
});
}
@ -440,7 +454,7 @@ impl<
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Output<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> {
Send(carnot_engine::Send),
Send(carnot_engine::Send<HeaderId>),
BroadcastTimeoutQc {
timeout_qc: TimeoutQc,
},

View File

@ -10,7 +10,8 @@ use self::{
standard_qc::StandardQcHelper,
timeout_qc::TimeoutQcHelper,
};
use carnot_engine::{AggregateQc, Block, BlockId, Committee, Qc, StandardQc, TimeoutQc, View};
use crate::node::carnot::{AggregateQc, Block, Committee, Qc, StandardQc, TimeoutQc};
use carnot_engine::View;
const NODE_ID: &str = "node_id";
const CURRENT_VIEW: &str = "current_view";
@ -238,16 +239,24 @@ pub(crate) mod timeout_qc {
}
pub(crate) mod serde_id {
use carnot_engine::{BlockId, NodeId};
use carnot_engine::NodeId;
use nomos_core::header::HeaderId;
use super::*;
#[derive(Serialize)]
pub(crate) struct BlockIdHelper<'a>(#[serde(with = "serde_array32")] &'a [u8; 32]);
pub(crate) struct BlockIdHelper<'a> {
#[serde(with = "serde_array32")]
header: [u8; 32],
_marker: std::marker::PhantomData<&'a HeaderId>,
}
impl<'a> From<&'a BlockId> for BlockIdHelper<'a> {
fn from(val: &'a BlockId) -> Self {
Self(val.into())
impl<'a> From<&'a HeaderId> for BlockIdHelper<'a> {
fn from(val: &'a HeaderId) -> Self {
Self {
header: (*val).into(),
_marker: std::marker::PhantomData,
}
}
}

View File

@ -1,4 +1,5 @@
use super::*;
use nomos_core::header::HeaderId;
use serde_block::BlockHelper;
serializer!(CarnotStateCsvSerializer);
@ -76,10 +77,10 @@ impl<'a> From<&'a StandardQc> for LocalHighQcHelper<'a> {
}
}
struct SafeBlocksHelper<'a>(&'a HashMap<BlockId, Block>);
struct SafeBlocksHelper<'a>(&'a HashMap<HeaderId, Block>);
impl<'a> From<&'a HashMap<BlockId, Block>> for SafeBlocksHelper<'a> {
fn from(val: &'a HashMap<BlockId, Block>) -> Self {
impl<'a> From<&'a HashMap<HeaderId, Block>> for SafeBlocksHelper<'a> {
fn from(val: &'a HashMap<HeaderId, Block>) -> Self {
Self(val)
}
}
@ -142,10 +143,10 @@ impl<'a> Serialize for CommitteesHelper<'a> {
}
}
struct CommittedBlockHelper<'a>(&'a [BlockId]);
struct CommittedBlockHelper<'a>(&'a [HeaderId]);
impl<'a> From<&'a [BlockId]> for CommittedBlockHelper<'a> {
fn from(val: &'a [BlockId]) -> Self {
impl<'a> From<&'a [HeaderId]> for CommittedBlockHelper<'a> {
fn from(val: &'a [HeaderId]) -> Self {
Self(val)
}
}

View File

@ -1,4 +1,5 @@
use super::*;
use nomos_core::header::HeaderId;
use serde_block::BlockHelper;
serializer!(CarnotStateJsonSerializer);
@ -50,10 +51,10 @@ pub(crate) mod serde_block {
}
}
struct SafeBlocksHelper<'a>(&'a HashMap<BlockId, Block>);
struct SafeBlocksHelper<'a>(&'a HashMap<HeaderId, Block>);
impl<'a> From<&'a HashMap<BlockId, Block>> for SafeBlocksHelper<'a> {
fn from(val: &'a HashMap<BlockId, Block>) -> Self {
impl<'a> From<&'a HashMap<HeaderId, Block>> for SafeBlocksHelper<'a> {
fn from(val: &'a HashMap<HeaderId, Block>) -> Self {
Self(val)
}
}
@ -115,10 +116,10 @@ impl<'a> Serialize for CommitteesHelper<'a> {
}
}
struct CommittedBlockHelper<'a>(&'a [BlockId]);
struct CommittedBlockHelper<'a>(&'a [HeaderId]);
impl<'a> From<&'a [BlockId]> for CommittedBlockHelper<'a> {
fn from(val: &'a [BlockId]) -> Self {
impl<'a> From<&'a [HeaderId]> for CommittedBlockHelper<'a> {
fn from(val: &'a [HeaderId]) -> Self {
Self(val)
}
}

View File

@ -8,14 +8,14 @@ pub struct CarnotState {
pub(crate) current_view: View,
pub(crate) highest_voted_view: View,
pub(crate) local_high_qc: StandardQc,
pub(crate) safe_blocks: HashMap<BlockId, Block>,
pub(crate) safe_blocks: HashMap<HeaderId, Block>,
pub(crate) last_view_timeout_qc: Option<TimeoutQc>,
pub(crate) latest_committed_block: Block,
pub(crate) latest_committed_view: View,
pub(crate) root_committee: Committee,
pub(crate) parent_committee: Option<Committee>,
pub(crate) child_committees: Vec<Committee>,
pub(crate) committed_blocks: Vec<BlockId>,
pub(crate) committed_blocks: Vec<HeaderId>,
pub(super) step_duration: Duration,
/// Step id for this state

View File

@ -7,9 +7,9 @@ use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node, SpawnConfig};
use carnot_consensus::{CarnotInfo, CarnotSettings};
use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings};
use carnot_engine::{BlockId, NodeId, Overlay};
use carnot_engine::{NodeId, Overlay};
use full_replication::Certificate;
use nomos_core::block::Block;
use nomos_core::{block::Block, header::HeaderId};
use nomos_libp2p::{Multiaddr, Swarm};
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
@ -112,7 +112,7 @@ impl NomosNode {
format!("http://{}", self.addr).parse().unwrap()
}
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, Certificate>> {
CLIENT
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
.header("Content-Type", "application/json")
@ -146,9 +146,9 @@ impl NomosNode {
pub async fn get_blocks_info(
&self,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Vec<carnot_engine::Block> {
from: Option<HeaderId>,
to: Option<HeaderId>,
) -> Vec<carnot_engine::Block<HeaderId>> {
let mut req = CLIENT.get(format!("http://{}/{}", self.addr, GET_BLOCKS_INFO));
if let Some(from) = from {
@ -162,7 +162,7 @@ impl NomosNode {
req.send()
.await
.unwrap()
.json::<Vec<carnot_engine::Block>>()
.json::<Vec<carnot_engine::Block<_>>>()
.await
.unwrap()
}

View File

@ -1,13 +1,17 @@
use carnot_consensus::CarnotInfo;
use carnot_engine::{Block, NodeId, TimeoutQc, View};
use carnot_engine::{NodeId, View};
use fraction::Fraction;
use futures::stream::{self, StreamExt};
use nomos_core::header::HeaderId;
use std::{collections::HashSet, time::Duration};
use tests::{adjust_timeout, ConsensusConfig, Node, NomosNode, SpawnConfig};
const TARGET_VIEW: View = View::new(20);
const DUMMY_NODE_ID: NodeId = NodeId::new([0u8; 32]);
type Block = carnot_engine::Block<HeaderId>;
type TimeoutQc = carnot_engine::TimeoutQc<HeaderId>;
#[tokio::test]
async fn ten_nodes_one_down() {
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Chain {