1
0
mirror of synced 2025-01-11 00:05:48 +00:00

Random beacon (#167)

* move overlay to consensus engine

* Integrate random beacon in overlay

This commit integrates the random beacon as specified in the nomos
spec into the consensus engine library as part of the overlay.
In addition, it separates the overlay part responsible for leader
selection as an independent trait LeaderSelection, so as to share
the overall overlay structure among most constructions.

Furthermore, a leader proof has been added to a block to verify
that the proposer had valid rights to do so.

The current implementation hardcodes the leader selection update
in the consensus service, but we probably want to abstract it away
through something like the adapter pattern we use of other services
in the node.

* Move leader selection update to separate function

* Add generic support for leader selection in consensus service (#170)

* Add generic support for leader selection in consensus service

* fix

* use settings struct instead of tuple

* fix tests
This commit is contained in:
Giacomo Pasini 2023-06-12 15:14:49 +02:00 committed by GitHub
parent cfaa7cf772
commit 9c81b72711
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 457 additions and 180 deletions

View File

@ -7,6 +7,12 @@ edition = "2021"
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"], optional = true } serde = { version = "1.0", features = ["derive"], optional = true }
bls-signatures = "0.14"
integer-encoding = "3"
sha2 = "0.10"
rand = "0.8"
rand_chacha = "0.3"
thiserror = "1"
[features] [features]
default = [] default = []

View File

@ -1,6 +1,8 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
pub mod overlay;
mod types; mod types;
pub use overlay::Overlay;
pub use types::*; pub use types::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -48,6 +50,16 @@ impl<O: Overlay> Carnot<O> {
if self.safe_blocks.contains_key(&block.id) { if self.safe_blocks.contains_key(&block.id) {
return Ok(self.clone()); return Ok(self.clone());
} }
match block.leader_proof {
LeaderProof::LeaderId { leader_id } => {
// This only accepts blocks from the leader of current_view + 1
if leader_id != self.overlay.next_leader() {
return Err(());
}
}
}
if self.blocks_in_view(block.view).contains(&block) if self.blocks_in_view(block.view).contains(&block)
|| block.view <= self.latest_committed_view() || block.view <= self.latest_committed_view()
{ {
@ -111,9 +123,7 @@ impl<O: Overlay> Carnot<O> {
new_state.highest_voted_view = block.view; new_state.highest_voted_view = block.view;
let to = if new_state.overlay.is_member_of_root_committee(new_state.id) { let to = if new_state.overlay.is_member_of_root_committee(new_state.id) {
[new_state.overlay.leader(block.view + 1)] [new_state.overlay.next_leader()].into_iter().collect()
.into_iter()
.collect()
} else { } else {
new_state.overlay.parent_committee(self.id) new_state.overlay.parent_committee(self.id)
}; };
@ -180,9 +190,7 @@ impl<O: Overlay> Carnot<O> {
new_state.highest_voted_view = new_view; new_state.highest_voted_view = new_view;
let to = if new_state.overlay.is_member_of_root_committee(new_state.id) { let to = if new_state.overlay.is_member_of_root_committee(new_state.id) {
[new_state.overlay.leader(new_view + 1)] [new_state.overlay.next_leader()].into_iter().collect()
.into_iter()
.collect()
} else { } else {
new_state.overlay.parent_committee(new_state.id) new_state.overlay.parent_committee(new_state.id)
}; };
@ -317,8 +325,8 @@ impl<O: Overlay> Carnot<O> {
self.local_high_qc.clone() self.local_high_qc.clone()
} }
pub fn is_leader_for_view(&self, view: View) -> bool { pub fn is_next_leader(&self) -> bool {
self.overlay.leader(view) == self.id self.overlay.next_leader() == self.id
} }
pub fn super_majority_threshold(&self) -> usize { pub fn super_majority_threshold(&self) -> usize {
@ -352,82 +360,42 @@ impl<O: Overlay> Carnot<O> {
pub fn is_member_of_root_committee(&self) -> bool { pub fn is_member_of_root_committee(&self) -> bool {
self.overlay.is_member_of_root_committee(self.id) self.overlay.is_member_of_root_committee(self.id)
} }
/// A way to allow for overlay extendability without compromising the engine
/// generality.
pub fn update_overlay<F, E>(&self, f: F) -> Result<Self, E>
where
F: FnOnce(O) -> Result<O, E>,
{
match f(self.overlay.clone()) {
Ok(overlay) => Ok(Self {
overlay,
..self.clone()
}),
Err(e) => Err(e),
}
}
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::overlay::{FlatOverlay, RoundRobin, Settings};
use super::*; use super::*;
#[derive(Clone, Debug, PartialEq)] fn init_from_genesis() -> Carnot<FlatOverlay<RoundRobin>> {
struct MockOverlay;
impl Overlay for MockOverlay {
fn new(_nodes: Vec<NodeId>) -> Self {
Self
}
fn root_committee(&self) -> Committee {
vec![[0; 32]].into_iter().collect()
}
fn rebuild(&mut self, _timeout_qc: TimeoutQc) {
todo!()
}
fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool {
false
}
fn is_member_of_root_committee(&self, _id: NodeId) -> bool {
true
}
fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool {
true
}
fn is_child_of_root_committee(&self, _id: NodeId) -> bool {
false
}
fn node_committee(&self, _id: NodeId) -> Committee {
self.root_committee()
}
fn parent_committee(&self, _id: NodeId) -> Committee {
self.root_committee()
}
fn child_committees(&self, _id: NodeId) -> Vec<Committee> {
vec![]
}
fn leaf_committees(&self, _id: NodeId) -> Vec<Committee> {
vec![self.root_committee()]
}
fn leader(&self, _view: View) -> NodeId {
[0; 32]
}
fn super_majority_threshold(&self, _id: NodeId) -> usize {
todo!()
}
fn leader_super_majority_threshold(&self, _id: NodeId) -> usize {
self.root_committee().len() * 2 / 3 + 1
}
}
fn init_from_genesis() -> Carnot<MockOverlay> {
Carnot::from_genesis( Carnot::from_genesis(
[0; 32], [0; 32],
Block { Block {
view: 0, view: 0,
id: [0; 32], id: [0; 32],
parent_qc: Qc::Standard(StandardQc::genesis()), parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] },
}, },
MockOverlay, FlatOverlay::new(Settings {
nodes: vec![[0; 32]],
leader: RoundRobin::default(),
}),
) )
} }
@ -442,6 +410,7 @@ mod test {
view: block.view, view: block.view,
id: block.id, id: block.id,
}), }),
leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] },
}; };
} }
@ -499,6 +468,7 @@ mod test {
view: engine.current_view(), view: engine.current_view(),
id: parent_block_id, id: parent_block_id,
}), }),
leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] },
}; };
let _ = engine.receive_block(block.clone()); let _ = engine.receive_block(block.clone());

View File

@ -0,0 +1,119 @@
use super::LeaderSelection;
use crate::{Committee, NodeId, Overlay};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)]
/// Flat overlay with a single committee and round robin leader selection.
pub struct FlatOverlay<L: LeaderSelection> {
nodes: Vec<NodeId>,
leader: L,
}
impl<L> Overlay for FlatOverlay<L>
where
L: LeaderSelection + Send + Sync + 'static,
{
type Settings = Settings<L>;
type LeaderSelection = L;
fn new(Settings { leader, nodes }: Self::Settings) -> Self {
Self { nodes, leader }
}
fn root_committee(&self) -> crate::Committee {
self.nodes.clone().into_iter().collect()
}
fn rebuild(&mut self, _timeout_qc: crate::TimeoutQc) {
todo!()
}
fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool {
false
}
fn is_member_of_root_committee(&self, _id: NodeId) -> bool {
true
}
fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool {
true
}
fn is_child_of_root_committee(&self, _id: NodeId) -> bool {
false
}
fn parent_committee(&self, _id: NodeId) -> crate::Committee {
Committee::new()
}
fn node_committee(&self, _id: NodeId) -> crate::Committee {
self.nodes.clone().into_iter().collect()
}
fn child_committees(&self, _id: NodeId) -> Vec<crate::Committee> {
vec![]
}
fn leaf_committees(&self, _id: NodeId) -> Vec<crate::Committee> {
vec![self.root_committee()]
}
fn next_leader(&self) -> NodeId {
self.leader.next_leader(&self.nodes)
}
fn super_majority_threshold(&self, _id: NodeId) -> usize {
0
}
fn leader_super_majority_threshold(&self, _id: NodeId) -> usize {
self.nodes.len() * 2 / 3 + 1
}
fn update_leader_selection<F, E>(&self, f: F) -> Result<Self, E>
where
F: FnOnce(Self::LeaderSelection) -> Result<Self::LeaderSelection, E>,
{
match f(self.leader.clone()) {
Ok(leader_selection) => Ok(Self {
leader: leader_selection,
..self.clone()
}),
Err(e) => Err(e),
}
}
}
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct RoundRobin {
cur: usize,
}
impl RoundRobin {
pub fn new() -> Self {
Self { cur: 0 }
}
pub fn advance(&self) -> Self {
Self {
cur: (self.cur + 1),
}
}
}
impl LeaderSelection for RoundRobin {
fn next_leader(&self, nodes: &[NodeId]) -> NodeId {
nodes[self.cur % nodes.len()]
}
}
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Settings<L> {
pub nodes: Vec<NodeId>,
pub leader: L,
}

View File

@ -0,0 +1,35 @@
use super::types::*;
mod flat_overlay;
mod random_beacon;
pub use flat_overlay::*;
pub use random_beacon::*;
use std::marker::Send;
pub trait Overlay: Clone {
type Settings: Clone + Send + Sync + 'static;
type LeaderSelection: LeaderSelection + Clone + Send + Sync + 'static;
fn new(settings: Self::Settings) -> Self;
fn root_committee(&self) -> Committee;
fn rebuild(&mut self, timeout_qc: TimeoutQc);
fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool;
fn is_member_of_root_committee(&self, id: NodeId) -> bool;
fn is_member_of_leaf_committee(&self, id: NodeId) -> bool;
fn is_child_of_root_committee(&self, id: NodeId) -> bool;
fn parent_committee(&self, id: NodeId) -> Committee;
fn child_committees(&self, id: NodeId) -> Vec<Committee>;
fn leaf_committees(&self, id: NodeId) -> Vec<Committee>;
fn node_committee(&self, id: NodeId) -> Committee;
fn next_leader(&self) -> NodeId;
fn super_majority_threshold(&self, id: NodeId) -> usize;
fn leader_super_majority_threshold(&self, id: NodeId) -> usize;
fn update_leader_selection<F, E>(&self, f: F) -> Result<Self, E>
where
F: FnOnce(Self::LeaderSelection) -> Result<Self::LeaderSelection, E>;
}
pub trait LeaderSelection: Clone {
fn next_leader(&self, nodes: &[NodeId]) -> NodeId;
}

View File

@ -0,0 +1,122 @@
use crate::types::*;
use bls_signatures::{PrivateKey, PublicKey, Serialize, Signature};
use integer_encoding::VarInt;
use rand::{seq::SliceRandom, SeedableRng};
use serde::{Deserialize, Serialize as SerdeSerialize};
use sha2::{Digest, Sha256};
use std::ops::Deref;
use thiserror::Error;
use super::LeaderSelection;
pub type Entropy = [u8];
pub type Context = [u8];
#[cfg_attr(feature = "serde", derive(SerdeSerialize, Deserialize))]
#[derive(Debug, Clone, PartialEq)]
pub enum RandomBeaconState {
Happy {
// a byte string so that we can access the entropy without
// copying memory (can't directly go from the signature to its compressed form)
// We still assume the conversion does not fail, so the format has to be checked
// during deserialization
sig: Box<[u8]>,
#[serde(with = "serialize_bls")]
public_key: PublicKey,
},
Sad {
entropy: Box<Entropy>,
},
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Invalid random beacon transition")]
InvalidRandomBeacon,
}
impl RandomBeaconState {
pub fn entropy(&self) -> &Entropy {
match self {
Self::Happy { sig, .. } => sig,
Self::Sad { entropy } => entropy,
}
}
pub fn generate_happy(view: View, sk: &PrivateKey) -> Self {
let sig = sk.sign(view_to_bytes(view));
Self::Happy {
sig: sig.as_bytes().into(),
public_key: sk.public_key(),
}
}
pub fn generate_sad(view: View, prev: &Self) -> Self {
let context = view_to_bytes(view);
let mut hasher = Sha256::new();
hasher.update(prev.entropy());
hasher.update(context);
let entropy = hasher.finalize().to_vec().into();
Self::Sad { entropy }
}
pub fn check_advance_happy(&self, rb: RandomBeaconState, view: View) -> Result<Self, Error> {
let context = view_to_bytes(view);
match rb {
Self::Happy {
ref sig,
public_key,
} => {
let sig = Signature::from_bytes(sig).unwrap();
if !public_key.verify(sig, context) {
return Err(Error::InvalidRandomBeacon);
}
}
Self::Sad { .. } => return Err(Error::InvalidRandomBeacon),
}
Ok(rb)
}
}
fn view_to_bytes(view: View) -> Box<[u8]> {
View::encode_var_vec(view).into_boxed_slice()
}
// FIXME: the spec should be clearer on what is the expected behavior,
// for now, just use something that works
fn choice(state: &RandomBeaconState, nodes: &[NodeId]) -> NodeId {
let mut seed = [0; 32];
seed.copy_from_slice(&state.entropy().deref()[..32]);
let mut rng = rand_chacha::ChaChaRng::from_seed(seed);
*nodes.choose(&mut rng).unwrap()
}
impl LeaderSelection for RandomBeaconState {
fn next_leader(&self, nodes: &[NodeId]) -> NodeId {
choice(self, nodes)
}
}
mod serialize_bls {
use super::*;
use serde::{Deserializer, Serializer};
pub fn deserialize<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
D: Deserializer<'de>,
T: bls_signatures::Serialize,
{
let bytes = Vec::<u8>::deserialize(deserializer)?;
T::from_bytes(&bytes).map_err(serde::de::Error::custom)
}
pub fn serialize<S, T>(sig: &T, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
T: bls_signatures::Serialize,
{
let bytes = sig.as_bytes();
bytes.serialize(serializer)
}
}

View File

@ -69,6 +69,13 @@ pub struct Block {
pub id: BlockId, pub id: BlockId,
pub view: View, pub view: View,
pub parent_qc: Qc, pub parent_qc: Qc,
pub leader_proof: LeaderProof,
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum LeaderProof {
LeaderId { leader_id: NodeId },
} }
impl Block { impl Block {
@ -147,20 +154,3 @@ impl Qc {
} }
} }
} }
pub trait Overlay: Clone {
fn new(nodes: Vec<NodeId>) -> Self;
fn root_committee(&self) -> Committee;
fn rebuild(&mut self, timeout_qc: TimeoutQc);
fn is_member_of_child_committee(&self, parent: NodeId, child: NodeId) -> bool;
fn is_member_of_root_committee(&self, id: NodeId) -> bool;
fn is_member_of_leaf_committee(&self, id: NodeId) -> bool;
fn is_child_of_root_committee(&self, id: NodeId) -> bool;
fn parent_committee(&self, id: NodeId) -> Committee;
fn child_committees(&self, id: NodeId) -> Vec<Committee>;
fn leaf_committees(&self, id: NodeId) -> Vec<Committee>;
fn node_committee(&self, id: NodeId) -> Committee;
fn leader(&self, view: View) -> NodeId;
fn super_majority_threshold(&self, id: NodeId) -> usize;
fn leader_super_majority_threshold(&self, id: NodeId) -> usize;
}

View File

@ -24,6 +24,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mo
nomos-http = { path = "../../nomos-services/http", features = ["http"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] }
nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] } nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] }
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
consensus-engine = { path = "../../consensus-engine" }
tokio = {version = "1.24", features = ["sync"] } tokio = {version = "1.24", features = ["sync"] }
serde_json = "1.0" serde_json = "1.0"
serde_yaml = "0.9" serde_yaml = "0.9"

View File

@ -3,9 +3,9 @@ mod tx;
use clap::Parser; use clap::Parser;
use color_eyre::eyre::{eyre, Result}; use color_eyre::eyre::{eyre, Result};
use consensus_engine::overlay::{FlatOverlay, RoundRobin};
use nomos_consensus::{ use nomos_consensus::{
network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, overlay::FlatRoundRobin, network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, CarnotConsensus,
CarnotConsensus,
}; };
use nomos_core::fountain::mock::MockFountain; use nomos_core::fountain::mock::MockFountain;
use nomos_http::backends::axum::AxumBackend; use nomos_http::backends::axum::AxumBackend;
@ -39,7 +39,7 @@ type Carnot = CarnotConsensus<
MockPool<Tx>, MockPool<Tx>,
MempoolWakuAdapter<Tx>, MempoolWakuAdapter<Tx>,
MockFountain, MockFountain,
FlatRoundRobin, FlatOverlay<RoundRobin>,
>; >;
#[derive(Deserialize)] #[derive(Deserialize)]

View File

@ -1,10 +1,11 @@
use consensus_engine::overlay::RandomBeaconState;
use indexmap::IndexSet; use indexmap::IndexSet;
// std // std
use core::hash::Hash; use core::hash::Hash;
// crates // crates
use crate::wire; use crate::wire;
use bytes::Bytes; use bytes::Bytes;
use consensus_engine::{Qc, View}; use consensus_engine::{LeaderProof, NodeId, Qc, View};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
// internal // internal
@ -16,29 +17,42 @@ pub type TxHash = [u8; 32];
pub struct Block<TxId: Clone + Eq + Hash> { pub struct Block<TxId: Clone + Eq + Hash> {
header: consensus_engine::Block, header: consensus_engine::Block,
transactions: IndexSet<TxId>, transactions: IndexSet<TxId>,
beacon: RandomBeaconState,
} }
/// Identifier of a block /// Identifier of a block
pub type BlockId = [u8; 32]; pub type BlockId = [u8; 32];
impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> { impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
pub fn new(view: View, parent_qc: Qc, txs: impl Iterator<Item = TxId>) -> Self { pub fn new(
view: View,
parent_qc: Qc,
txs: impl Iterator<Item = TxId>,
proposer: NodeId,
beacon: RandomBeaconState,
) -> Self {
let transactions = txs.collect(); let transactions = txs.collect();
let header = consensus_engine::Block { let header = consensus_engine::Block {
id: [view as u8; 32], id: [view as u8; 32],
view, view,
parent_qc, parent_qc,
leader_proof: LeaderProof::LeaderId {
leader_id: proposer,
},
}; };
let mut s = Self { let mut s = Self {
header, header,
transactions, transactions,
beacon,
}; };
let id = id_from_wire_content(&s.as_bytes()); let id = id_from_wire_content(&s.as_bytes());
s.header.id = id; s.header.id = id;
s s
} }
}
impl<TxId: Clone + Eq + Hash> Block<TxId> {
pub fn header(&self) -> &consensus_engine::Block { pub fn header(&self) -> &consensus_engine::Block {
&self.header &self.header
} }
@ -46,6 +60,10 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
pub fn transactions(&self) -> impl Iterator<Item = &TxId> + '_ { pub fn transactions(&self) -> impl Iterator<Item = &TxId> + '_ {
self.transactions.iter() self.transactions.iter()
} }
pub fn beacon(&self) -> &RandomBeaconState {
&self.beacon
}
} }
fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId { fn id_from_wire_content(bytes: &[u8]) -> consensus_engine::BlockId {

View File

@ -25,6 +25,7 @@ tokio-stream = "0.1"
tokio-util = "0.7" tokio-util = "0.7"
tracing = "0.1" tracing = "0.1"
waku-bindings = { version = "0.1.0-rc.2", optional = true} waku-bindings = { version = "0.1.0-rc.2", optional = true}
bls-signatures = "0.14"
[features] [features]
default = [] default = []

View File

@ -0,0 +1,47 @@
use consensus_engine::{
overlay::{Error as RandomBeaconError, LeaderSelection, RandomBeaconState, RoundRobin},
TimeoutQc,
};
use nomos_core::block::Block;
use std::{convert::Infallible, error::Error, hash::Hash};
pub trait UpdateableLeaderSelection: LeaderSelection {
type Error: Error;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
&self,
block: Block<Tx>,
) -> Result<Self, Self::Error>;
fn on_timeout_qc_received(&self, qc: TimeoutQc) -> Result<Self, Self::Error>;
}
impl UpdateableLeaderSelection for RoundRobin {
type Error = Infallible;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
&self,
_block: Block<Tx>,
) -> Result<Self, Self::Error> {
Ok(self.advance())
}
fn on_timeout_qc_received(&self, _qc: TimeoutQc) -> Result<Self, Self::Error> {
Ok(self.advance())
}
}
impl UpdateableLeaderSelection for RandomBeaconState {
type Error = RandomBeaconError;
fn on_new_block_received<Tx: Hash + Clone + Eq>(
&self,
block: Block<Tx>,
) -> Result<Self, Self::Error> {
self.check_advance_happy(block.beacon().clone(), block.header().parent_qc.view())
// TODO: check random beacon public keys is leader id
}
fn on_timeout_qc_received(&self, qc: TimeoutQc) -> Result<Self, Self::Error> {
Ok(Self::generate_sad(qc.view, self))
}
}

View File

@ -4,8 +4,8 @@
//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views). //! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views).
//! It's obviously extremely important that the information contained in `View` is synchronized across different //! It's obviously extremely important that the information contained in `View` is synchronized across different
//! nodes, but that has to be achieved through different means. //! nodes, but that has to be achieved through different means.
mod leader_selection;
pub mod network; pub mod network;
pub mod overlay;
mod tally; mod tally;
mod view_cancel; mod view_cancel;
@ -16,7 +16,9 @@ use std::hash::Hash;
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration; use std::time::Duration;
// crates // crates
use bls_signatures::PrivateKey;
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use leader_selection::UpdateableLeaderSelection;
use serde::Deserialize; use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
// internal // internal
@ -25,9 +27,10 @@ use crate::network::NetworkAdapter;
use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings}; use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings};
use crate::view_cancel::ViewCancelCache; use crate::view_cancel::ViewCancelCache;
use consensus_engine::{ use consensus_engine::{
AggregateQc, Carnot, Committee, NewView, Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, overlay::RandomBeaconState, AggregateQc, Carnot, Committee, LeaderProof, NewView, Overlay,
Vote, Payload, Qc, StandardQc, Timeout, TimeoutQc, Vote,
}; };
use nomos_core::block::Block; use nomos_core::block::Block;
use nomos_core::crypto::PublicKey; use nomos_core::crypto::PublicKey;
use nomos_core::fountain::FountainCode; use nomos_core::fountain::FountainCode;
@ -54,33 +57,33 @@ pub type NodeId = PublicKey;
pub type Seed = [u8; 32]; pub type Seed = [u8; 32];
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct CarnotSettings<Fountain: FountainCode> { pub struct CarnotSettings<Fountain: FountainCode, O: Overlay> {
private_key: [u8; 32], private_key: [u8; 32],
fountain_settings: Fountain::Settings, fountain_settings: Fountain::Settings,
nodes: Vec<NodeId>, overlay_settings: O::Settings,
} }
impl<Fountain: FountainCode> Clone for CarnotSettings<Fountain> { impl<Fountain: FountainCode, O: Overlay> Clone for CarnotSettings<Fountain, O> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
private_key: self.private_key, private_key: self.private_key,
fountain_settings: self.fountain_settings.clone(), fountain_settings: self.fountain_settings.clone(),
nodes: self.nodes.clone(), overlay_settings: self.overlay_settings.clone(),
} }
} }
} }
impl<Fountain: FountainCode> CarnotSettings<Fountain> { impl<Fountain: FountainCode, O: Overlay> CarnotSettings<Fountain, O> {
#[inline] #[inline]
pub const fn new( pub const fn new(
private_key: [u8; 32], private_key: [u8; 32],
fountain_settings: Fountain::Settings, fountain_settings: Fountain::Settings,
nodes: Vec<NodeId>, overlay_settings: O::Settings,
) -> Self { ) -> Self {
Self { Self {
private_key, private_key,
fountain_settings, fountain_settings,
nodes, overlay_settings,
} }
} }
} }
@ -116,7 +119,7 @@ where
O: Overlay + Debug, O: Overlay + Debug,
{ {
const SERVICE_ID: ServiceId = "Carnot"; const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F>; type Settings = CarnotSettings<F, O>;
type State = NoState<Self::Settings>; type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>; type StateOperator = NoOperator<Self::State>;
type Message = NoMessage; type Message = NoMessage;
@ -134,6 +137,7 @@ where
<P::Tx as Transaction>::Hash: Debug + Send + Sync, <P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static, M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
@ -163,14 +167,15 @@ where
let CarnotSettings { let CarnotSettings {
private_key, private_key,
fountain_settings, fountain_settings,
nodes, overlay_settings,
} = self.service_state.settings_reader.get_updated_settings(); } = self.service_state.settings_reader.get_updated_settings();
let overlay = O::new(nodes); let overlay = O::new(overlay_settings);
let genesis = consensus_engine::Block { let genesis = consensus_engine::Block {
id: [0; 32], id: [0; 32],
view: 0, view: 0,
parent_qc: Qc::Standard(StandardQc::genesis()), parent_qc: Qc::Standard(StandardQc::genesis()),
leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] },
}; };
let mut carnot = Carnot::from_genesis(private_key, genesis, overlay); let mut carnot = Carnot::from_genesis(private_key, genesis, overlay);
let network_adapter = A::new(network_relay).await; let network_adapter = A::new(network_relay).await;
@ -210,7 +215,7 @@ where
tally_settings.clone(), tally_settings.clone(),
), ),
))); )));
if carnot.is_leader_for_view(genesis_block.view + 1) { if carnot.is_next_leader() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future( events.push(Box::pin(view_cancel_cache.cancelable_event_future(
genesis_block.view + 1, genesis_block.view + 1,
async move { async move {
@ -234,9 +239,10 @@ where
match event { match event {
Event::Proposal { block, mut stream } => { Event::Proposal { block, mut stream } => {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
let block = block.header().clone(); let original_block = block;
let block = original_block.header().clone();
match carnot.receive_block(block.clone()) { match carnot.receive_block(block.clone()) {
Ok(new_state) => { Ok(mut new_state) => {
let new_view = new_state.current_view(); let new_view = new_state.current_view();
if new_view != carnot.current_view() { if new_view != carnot.current_view() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future( events.push(Box::pin(view_cancel_cache.cancelable_event_future(
@ -248,6 +254,10 @@ where
tally_settings.clone(), tally_settings.clone(),
), ),
))); )));
new_state =
Self::update_leader_selection(new_state, |leader_selection| {
leader_selection.on_new_block_received(original_block)
});
} else { } else {
events.push(Box::pin(view_cancel_cache.cancelable_event_future( events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view, block.view,
@ -264,7 +274,7 @@ where
} }
Err(_) => tracing::debug!("invalid block {:?}", block), Err(_) => tracing::debug!("invalid block {:?}", block),
} }
if carnot.is_leader_for_view(block.view + 1) { if carnot.is_next_leader() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future( events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view, block.view,
async move { async move {
@ -301,8 +311,7 @@ where
carnot = new_carnot; carnot = new_carnot;
output = Some(Output::Send(out)); output = Some(Output::Send(out));
let new_view = timeout_qc.view + 1; let new_view = timeout_qc.view + 1;
let next_view = new_view + 1; if carnot.is_next_leader() {
if carnot.is_leader_for_view(next_view) {
let high_qc = carnot.high_qc(); let high_qc = carnot.high_qc();
events.push(Box::pin(view_cancel_cache.cancelable_event_future( events.push(Box::pin(view_cancel_cache.cancelable_event_future(
new_view, new_view,
@ -326,16 +335,22 @@ where
} }
Event::TimeoutQc { timeout_qc } => { Event::TimeoutQc { timeout_qc } => {
tracing::debug!("timeout received {:?}", timeout_qc); tracing::debug!("timeout received {:?}", timeout_qc);
carnot = carnot.receive_timeout_qc(timeout_qc.clone()); let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
events.push(Box::pin(view_cancel_cache.cancelable_event_future( events.push(Box::pin(view_cancel_cache.cancelable_event_future(
timeout_qc.view + 1, timeout_qc.view + 1,
Self::gather_new_views( Self::gather_new_views(
adapter, adapter,
self_committee, self_committee,
timeout_qc, timeout_qc.clone(),
tally_settings.clone(), tally_settings.clone(),
), ),
))); )));
if carnot.current_view() != new_state.current_view() {
new_state = Self::update_leader_selection(new_state, |leader_selection| {
leader_selection.on_timeout_qc_received(timeout_qc)
});
}
carnot = new_state;
} }
Event::RootTimeout { timeouts } => { Event::RootTimeout { timeouts } => {
tracing::debug!("root timeout {:?}", timeouts); tracing::debug!("root timeout {:?}", timeouts);
@ -371,7 +386,11 @@ where
}); });
match rx.await { match rx.await {
Ok(txs) => { Ok(txs) => {
let proposal = Block::new(qc.view() + 1, qc, txs); let beacon = RandomBeaconState::generate_happy(
qc.view(),
&PrivateKey::new(private_key),
);
let proposal = Block::new(qc.view() + 1, qc, txs, carnot.id(), beacon);
output = Some(Output::BroadcastProposal { proposal }); output = Some(Output::BroadcastProposal { proposal });
} }
Err(e) => tracing::error!("Could not fetch txs {e}"), Err(e) => tracing::error!("Could not fetch txs {e}"),
@ -437,6 +456,7 @@ where
<P::Tx as Transaction>::Hash: Debug + Send + Sync, <P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static, M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static, O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
{ {
async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> { async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
if let Some(timeout_qc) = adapter if let Some(timeout_qc) = adapter
@ -527,6 +547,19 @@ where
Event::None Event::None
} }
} }
fn update_leader_selection<
E: std::error::Error,
U: FnOnce(O::LeaderSelection) -> Result<O::LeaderSelection, E>,
>(
carnot: Carnot<O>,
f: U,
) -> Carnot<O> {
carnot
.update_overlay(|overlay| overlay.update_leader_selection(f))
// TODO: remove unwrap
.unwrap()
}
} }
async fn handle_output<A, F, Tx>(adapter: &A, fountain: &F, node_id: NodeId, output: Output<Tx>) async fn handle_output<A, F, Tx>(adapter: &A, fountain: &F, node_id: NodeId, output: Output<Tx>)

View File

@ -1,65 +0,0 @@
use consensus_engine::{Committee, NodeId, Overlay, View};
#[derive(Clone, Debug)]
/// Flat overlay with a single committee and round robin leader selection.
pub struct FlatRoundRobin {
nodes: Vec<NodeId>,
}
impl Overlay for FlatRoundRobin {
fn new(nodes: Vec<NodeId>) -> Self {
Self { nodes }
}
fn root_committee(&self) -> consensus_engine::Committee {
self.nodes.clone().into_iter().collect()
}
fn rebuild(&mut self, _timeout_qc: consensus_engine::TimeoutQc) {
todo!()
}
fn is_member_of_child_committee(&self, _parent: NodeId, _child: NodeId) -> bool {
false
}
fn is_member_of_root_committee(&self, _id: NodeId) -> bool {
true
}
fn is_member_of_leaf_committee(&self, _id: NodeId) -> bool {
true
}
fn is_child_of_root_committee(&self, _id: NodeId) -> bool {
false
}
fn parent_committee(&self, _id: NodeId) -> consensus_engine::Committee {
Committee::new()
}
fn node_committee(&self, _id: NodeId) -> consensus_engine::Committee {
self.nodes.clone().into_iter().collect()
}
fn child_committees(&self, _id: NodeId) -> Vec<consensus_engine::Committee> {
vec![]
}
fn leaf_committees(&self, _id: NodeId) -> Vec<consensus_engine::Committee> {
vec![self.root_committee()]
}
fn leader(&self, view: View) -> NodeId {
self.nodes[view as usize % self.nodes.len()]
}
fn super_majority_threshold(&self, _id: NodeId) -> usize {
0
}
fn leader_super_majority_threshold(&self, _id: NodeId) -> usize {
self.nodes.len() * 2 / 3 + 1
}
}