working node

This commit is contained in:
Giacomo Pasini 2023-05-08 15:36:56 +02:00
parent 430dbf510e
commit 3c569e1278
No known key found for this signature in database
GPG Key ID: FC08489D2D895D4B
22 changed files with 437 additions and 340 deletions

View File

@ -23,7 +23,7 @@ impl<O: Overlay> Carnot<O> {
highest_voted_view: -1,
last_view_timeout_qc: None,
overlay,
safe_blocks: [(id, genesis_block)].into(),
safe_blocks: [(genesis_block.id, genesis_block)].into(),
}
}
@ -44,12 +44,10 @@ impl<O: Overlay> Carnot<O> {
self.safe_blocks.contains_key(&block.parent()),
"out of order view not supported, missing parent block for {block:?}",
);
// if the block has already been processed, return early
if self.safe_blocks.contains_key(&block.id) {
return Ok(self.clone());
}
if self.blocks_in_view(block.view).contains(&block)
|| block.view <= self.latest_committed_view()
{
@ -60,7 +58,6 @@ impl<O: Overlay> Carnot<O> {
// By rejecting any other blocks except the first one received for a view this code does NOT do that.
return Err(());
}
let mut new_state = self.clone();
if new_state.block_is_safe(block.clone()) {
@ -70,7 +67,6 @@ impl<O: Overlay> Carnot<O> {
// Non safe block, not necessarily an error
return Err(());
}
Ok(new_state)
}
@ -99,7 +95,12 @@ impl<O: Overlay> Carnot<O> {
/// * `receive_block(b)` must have been called successfully before trying to approve a block b.
/// * A node should not attempt to vote for a block in a view earlier than the latest one it actively participated in.
pub fn approve_block(&self, block: Block) -> (Self, Send) {
assert!(self.safe_blocks.contains_key(&block.id));
assert!(
self.safe_blocks.contains_key(&block.id),
"{:?} not in {:?}",
block,
self.safe_blocks
);
assert!(
self.highest_voted_view < block.view,
"can't vote for a block in the past"
@ -321,8 +322,8 @@ impl<O: Overlay> Carnot<O> {
self.local_high_qc.clone()
}
pub fn is_leader_for_current_view(&self) -> bool {
self.overlay.leader(self.current_view) == self.id
pub fn is_leader_for_view(&self, view: View) -> bool {
self.overlay.leader(view) == self.id
}
pub fn super_majority_threshold(&self) -> usize {
@ -330,12 +331,28 @@ impl<O: Overlay> Carnot<O> {
}
pub fn leader_super_majority_threshold(&self) -> usize {
self.overlay.super_majority_threshold(self.id)
self.overlay.leader_super_majority_threshold(self.id)
}
pub fn id(&self) -> NodeId {
self.id
}
pub fn child_committee(&self) -> Committee {
self.overlay.child_committee(self.id)
}
pub fn parent_committee(&self) -> Committee {
self.overlay.parent_committee(self.id)
}
pub fn root_committee(&self) -> Committee {
self.overlay.root_committee()
}
pub fn is_member_of_root_committee(&self) -> bool {
self.overlay.is_member_of_root_committee(self.id)
}
}
#[cfg(test)]
@ -378,6 +395,10 @@ mod test {
todo!()
}
fn child_committee(&self, _id: NodeId) -> Committee {
todo!()
}
fn leaf_committees(&self, _id: NodeId) -> Vec<Committee> {
todo!()
}

View File

@ -157,6 +157,7 @@ pub trait Overlay: Clone {
fn is_member_of_leaf_committee(&self, id: NodeId) -> bool;
fn is_child_of_root_committee(&self, id: NodeId) -> bool;
fn parent_committee(&self, id: NodeId) -> Committee;
fn child_committee(&self, id: NodeId) -> Committee;
fn leaf_committees(&self, id: NodeId) -> Vec<Committee>;
fn leader(&self, view: View) -> NodeId;
fn super_majority_threshold(&self, id: NodeId) -> usize;

View File

@ -22,6 +22,7 @@ nomos-network = { path = "../../nomos-services/network", features = ["waku"] }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
nomos-consensus = { path = "../../nomos-services/consensus", features = ["waku"] }
tracing-subscriber = "0.3"
tokio = {version = "1.24", features = ["sync"] }
serde_json = "1.0"

View File

@ -3,13 +3,19 @@ mod tx;
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_consensus::{
network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, overlay::FlatRoundRobin,
CarnotConsensus,
};
use nomos_core::fountain::mock::MockFountain;
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{HttpBridge, HttpBridgeService, HttpBridgeSettings};
use nomos_http::http::HttpService;
use nomos_log::Logger;
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::waku::WakuAdapter;
use nomos_mempool::MempoolService;
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::waku::WakuAdapter as MempoolWakuAdapter,
MempoolService,
};
use nomos_network::{backends::waku::Waku, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
@ -28,18 +34,28 @@ struct Args {
config: std::path::PathBuf,
}
type Carnot = CarnotConsensus<
ConsensusWakuAdapter,
MockPool<Tx>,
MempoolWakuAdapter<Tx>,
MockFountain,
FlatRoundRobin,
>;
#[derive(Deserialize)]
struct Config {
log: <Logger as ServiceData>::Settings,
network: <NetworkService<Waku> as ServiceData>::Settings,
http: <HttpService<AxumBackend> as ServiceData>::Settings,
consensus: <Carnot as ServiceData>::Settings,
}
#[derive(Services)]
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Waku>>,
mockpool: ServiceHandle<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>>,
mockpool: ServiceHandle<MempoolService<MempoolWakuAdapter<Tx>, MockPool<Tx>>>,
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,
}
@ -80,6 +96,7 @@ fn main() -> Result<()> {
logging: config.log,
http: config.http,
mockpool: (),
consensus: config.consensus,
bridges: HttpBridgeSettings { bridges },
},
None,

View File

@ -3,7 +3,7 @@ use nomos_core::tx::{Transaction, TransactionHasher};
use serde::{Deserialize, Serialize};
use std::hash::Hash;
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct Tx(pub String);
fn hash_tx(tx: &Tx) -> String {

View File

@ -1,4 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct AccountId;

View File

@ -29,6 +29,7 @@ impl<TxId: Clone + Eq + Hash + Serialize + DeserializeOwned> Block<TxId> {
view,
parent_qc,
};
let mut s = Self {
header,
transactions,

View File

@ -8,7 +8,7 @@ use crate::fountain::{FountainCode, FountainError};
/// Fountain code that does no protocol at all.
/// Just bypasses the raw bytes into a single chunk and reconstruct from it.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MockFountain;
#[async_trait]

View File

@ -8,7 +8,7 @@ use crate::tx::{Transaction, TransactionHasher};
mod transaction;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Tx {
Transfer(TransferTransaction),
}

View File

@ -6,7 +6,7 @@ use crate::crypto::Signature;
/// Can only be constructed if the signature is valid,
/// but does not imply that it can be successfully applied
/// to the ledger.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TransferTransaction {
pub from: AccountId,
pub to: AccountId,

View File

@ -1,6 +1,6 @@
// std
// crates
use consensus_engine::View;
use consensus_engine::{Block, View};
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
// internal
@ -49,6 +49,7 @@ impl Tally for MockTally {
type Vote = MockVote;
type Qc = MockQc;
type Outcome = ();
type Subject = Block;
type TallyError = Error;
type Settings = MockTallySettings;
@ -59,12 +60,12 @@ impl Tally for MockTally {
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: View,
block: Block,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut count_votes = 0;
while let Some(vote) = vote_stream.next().await {
if vote.view() != view {
if vote.view() != block.view {
return Err(Error("Invalid vote".into()));
}
count_votes += 1;

View File

@ -1,6 +1,4 @@
pub mod mock;
use consensus_engine::View;
use futures::Stream;
#[async_trait::async_trait]
@ -8,12 +6,13 @@ pub trait Tally {
type Vote;
type Qc;
type Outcome;
type Subject;
type TallyError;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: View,
subject: Self::Subject,
vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError>;
}

View File

@ -7,21 +7,21 @@
pub mod network;
pub mod overlay;
mod tally;
mod tip;
// std
use std::collections::HashSet;
use std::fmt::Debug;
use std::hash::Hash;
use std::pin::Pin;
use std::time::Duration;
// crates
use futures::{future::FusedFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize};
// internal
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use crate::network::NetworkAdapter;
use crate::tally::happy::CarnotTally;
use crate::tally::unhappy::NewViewTally;
use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings};
use consensus_engine::{
AggregateQc, Carnot, Committee, NewView, Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc,
Vote,
@ -42,52 +42,42 @@ use overwatch_rs::services::{
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use tip::Tip;
// TODO: tale this from config
const TIMEOUT: Duration = Duration::from_secs(60);
// Raw bytes for now, could be a ed25519 public key
pub type NodeId = PublicKey;
// Random seed for each round provided by the protocol
pub type Seed = [u8; 32];
#[derive(Debug)]
pub struct CarnotSettings<Fountain: FountainCode, VoteTally: Tally, NewViewTally: Tally> {
#[derive(Debug, Deserialize, Serialize)]
pub struct CarnotSettings<Fountain: FountainCode> {
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
tally_settings: VoteTally::Settings,
new_view_tally_settings: NewViewTally::Settings,
nodes: Vec<NodeId>,
}
impl<Fountain: FountainCode, VoteTally: Tally, NewViewTally: Tally> Clone
for CarnotSettings<Fountain, VoteTally, NewViewTally>
{
impl<Fountain: FountainCode> Clone for CarnotSettings<Fountain> {
fn clone(&self) -> Self {
Self {
private_key: self.private_key,
fountain_settings: self.fountain_settings.clone(),
tally_settings: self.tally_settings.clone(),
new_view_tally_settings: self.new_view_tally_settings.clone(),
nodes: self.nodes.clone(),
}
}
}
impl<Fountain: FountainCode, VoteTally: Tally, NewViewTally: Tally>
CarnotSettings<Fountain, VoteTally, NewViewTally>
{
impl<Fountain: FountainCode> CarnotSettings<Fountain> {
#[inline]
pub const fn new(
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
tally_settings: VoteTally::Settings,
new_view_tally_settings: NewViewTally::Settings,
nodes: Vec<NodeId>,
) -> Self {
Self {
private_key,
fountain_settings,
tally_settings,
new_view_tally_settings,
nodes,
}
}
@ -99,7 +89,7 @@ where
A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>,
P: MemPool,
O: Overlay,
O: Overlay + Debug,
P::Tx: Transaction + Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
A::Backend: 'static,
@ -121,10 +111,10 @@ where
P::Tx: Transaction + Debug,
<P::Tx as Transaction>::Hash: Debug,
M: MempoolAdapter<Tx = P::Tx>,
O: Overlay,
O: Overlay + Debug,
{
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F, CarnotTally, NewViewTally>;
type Settings = CarnotSettings<F>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
@ -141,7 +131,7 @@ where
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -171,44 +161,198 @@ where
let CarnotSettings {
private_key,
fountain_settings,
tally_settings,
nodes,
new_view_tally_settings,
} = self.service_state.settings_reader.get_updated_settings();
let network_adapter = A::new(network_relay).await;
let tip = Tip;
let fountain = F::new(fountain_settings);
let tally = CarnotTally::new(tally_settings);
let new_view_tally = NewViewTally::new(new_view_tally_settings);
// let leadership = Leadership::<P::Tx>::new(private_key, mempool_relay.clone());
let overlay = O::new(nodes);
let genesis = consensus_engine::Block {
id: [0; 32],
view: 0,
parent_qc: Qc::Standard(StandardQc::genesis()),
};
let mut carnot = Carnot::from_genesis(private_key, genesis, overlay);
let network_adapter = A::new(network_relay).await;
let adapter = &network_adapter;
let _child_committee = carnot.child_committee();
let child_committee = &_child_committee;
let _leader_committee = [carnot.id()].into_iter().collect();
let leader_committee = &_leader_committee;
let fountain = F::new(fountain_settings);
let _tally_settings = CarnotTallySettings {
threshold: carnot.super_majority_threshold(),
participating_nodes: carnot.child_committee(),
};
let tally_settings = &_tally_settings;
let _leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
// TODO: add children of root committee
participating_nodes: carnot.root_committee(),
};
let leader_tally_settings = &_leader_tally_settings;
loop {
// we iterate view per view because it's easier to keep an ordering for events
carnot = View {
timeout: Duration::from_secs(2),
committee: Committee::new(),
adapter: network_adapter.clone(),
tally: tally.clone(),
new_view_tally: new_view_tally.clone(),
carnot,
fountain: fountain.clone(),
tip: tip.clone(),
mempool: mempool_relay.clone(),
let events: FuturesUnordered<Pin<Box<dyn Future<Output = Event<P::Tx>> + Send>>> =
FuturesUnordered::new();
let genesis_block = carnot.genesis_block();
events.push(Box::pin(Self::gather_block(
adapter,
genesis_block.view + 1,
)));
events.push(Box::pin(Self::gather_votes(
adapter,
child_committee,
genesis_block,
tally_settings.clone(),
)));
tokio::pin!(events);
while let Some(event) = events.next().await {
let mut output = None;
let prev_view = carnot.current_view();
match event {
Event::Proposal { block, mut stream } => {
tracing::debug!("received proposal {:?}", block);
let block = block.header().clone();
match carnot.receive_block(block.clone()) {
Ok(new_state) => {
let new_view = new_state.current_view();
if new_view != carnot.current_view() {
events.push(Box::pin(Self::gather_votes(
adapter,
child_committee,
block,
tally_settings.clone(),
)));
} else {
events.push(Box::pin(async move {
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
Event::None
}
}));
}
carnot = new_state;
}
Err(_) => tracing::debug!("invalid block {:?}", block),
}
}
Event::Approve { block, .. } => {
tracing::debug!("approving proposal {:?}", block);
let (new_carnot, out) = carnot.approve_block(block.clone());
carnot = new_carnot;
output = Some(Output::Send::<P::Tx>(out));
if carnot.is_leader_for_view(block.view + 1) {
events.push(Box::pin(async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
block,
leader_tally_settings.clone(),
)
.await else { unreachable!() };
Event::ProposeBlock { qc }
}));
}
}
Event::LocalTimeout => {
tracing::debug!("local timeout");
let (new_carnot, out) = carnot.local_timeout();
carnot = new_carnot;
output = out.map(Output::Send);
}
Event::NewView {
timeout_qc,
new_views,
} => {
tracing::debug!("approving new view {:?}", timeout_qc);
let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views);
carnot = new_carnot;
output = Some(Output::Send(out));
let next_view = timeout_qc.view + 2;
if carnot.is_leader_for_view(next_view) {
let high_qc = carnot.high_qc();
events.push(Box::pin(async move {
let _votes = Self::gather_new_views(
adapter,
leader_committee,
timeout_qc,
leader_tally_settings.clone(),
)
.await;
Event::ProposeBlock {
qc: Qc::Aggregated(AggregateQc {
high_qc,
view: next_view,
}),
}
}));
}
}
Event::TimeoutQc { timeout_qc } => {
tracing::debug!("timeout received {:?}", timeout_qc);
carnot = carnot.receive_timeout_qc(timeout_qc.clone());
events.push(Box::pin(Self::gather_new_views(
adapter,
child_committee,
timeout_qc,
tally_settings.clone(),
)));
}
Event::RootTimeout { timeouts } => {
tracing::debug!("root timeout {:?}", timeouts);
// timeout detected
}
Event::ProposeBlock { qc } => {
tracing::debug!("proposing block");
let (reply_channel, rx) = tokio::sync::oneshot::channel();
mempool_relay
.send(MempoolMsg::View {
ancestor_hint: [0; 32],
reply_channel,
})
.await
.unwrap_or_else(|(e, _)| {
eprintln!("Could not get transactions from mempool {e}")
});
match rx.await {
Ok(txs) => {
let proposal = Block::new(qc.view() + 1, qc, txs);
output = Some(Output::BroadcastProposal { proposal });
}
Err(e) => tracing::error!("Could not fetch txs {e}"),
}
}
Event::None => {}
}
let current_view = carnot.current_view();
if current_view != prev_view {
tracing::debug!("Advanced view from {prev_view} to {current_view}");
// View change!
events.push(Box::pin(async {
tokio::time::sleep(TIMEOUT).await;
Event::LocalTimeout
}));
events.push(Box::pin(Self::gather_block(adapter, current_view + 1)));
events.push(Box::pin(Self::gather_timeout_qc(adapter, current_view)));
if carnot.is_member_of_root_committee() {
let threshold = carnot.leader_super_majority_threshold();
events.push(Box::pin(Self::gather_timeout(
adapter,
child_committee,
current_view,
threshold,
)))
}
}
if let Some(output) = output {
handle_output(adapter, &fountain, carnot.id(), output).await;
}
.run()
.await;
}
unreachable!("carnot exited");
}
}
@ -219,32 +363,89 @@ enum Output<Tx: Clone + Eq + Hash> {
BroadcastProposal { proposal: Block<Tx> },
}
pub struct View<A, F, O: Overlay, Tx: Transaction> {
timeout: Duration,
committee: Committee,
adapter: A,
fountain: F,
tally: CarnotTally,
new_view_tally: NewViewTally,
carnot: Carnot<O>,
tip: Tip,
mempool: OutboundRelay<MempoolMsg<Tx>>,
}
impl<A, O, F, Tx> View<A, F, O, Tx>
impl<A, P, M, F, O> CarnotConsensus<A, P, M, F, O>
where
A: NetworkAdapter,
F: FountainCode,
O: Overlay + Clone,
Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned + Debug + Send + Sync,
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static,
P::Tx:
Debug + Clone + Eq + Hash + Serialize + serde::de::DeserializeOwned + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay + Debug + Send + Sync + 'static,
{
fn view(&self) -> consensus_engine::View {
self.carnot.current_view()
async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
if let Some(timeout_qc) = adapter
.timeout_qc_stream(view)
.await
.map(|msg| msg.qc)
.next()
.await
{
Event::TimeoutQc { timeout_qc }
} else {
Event::None
}
}
pub async fn proposal_stream(&self) -> impl Stream<Item = Block<Tx>> {
self.adapter
.proposal_chunks_stream(self.view())
async fn gather_votes(
adapter: &A,
committee: &Committee,
block: consensus_engine::Block,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await {
Ok((qc, votes)) => Event::Approve { qc, votes, block },
Err(_e) => {
todo!("Handle tally error {_e}");
}
}
}
async fn gather_new_views(
adapter: &A,
committee: &Committee,
timeout_qc: TimeoutQc,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let tally = NewViewTally::new(tally);
let stream = adapter
.new_view_stream(committee, timeout_qc.view + 1)
.await;
match tally.tally(timeout_qc.clone(), stream).await {
Ok((_qc, new_views)) => Event::NewView {
timeout_qc,
new_views,
},
Err(_e) => {
todo!("Handle tally error {_e}");
}
}
}
async fn gather_timeout(
adapter: &A,
committee: &Committee,
view: consensus_engine::View,
threshold: usize,
) -> Event<P::Tx> {
futures::pending!();
let timeouts = adapter
.timeout_stream(committee, view)
.await
.take(threshold)
.map(|msg| msg.vote)
.collect()
.await;
Event::RootTimeout { timeouts }
}
async fn gather_block(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
let stream = adapter
.proposal_chunks_stream(view)
.await
.filter_map(move |msg| {
async move {
@ -256,198 +457,21 @@ where
None
}
}
})
}
async fn timeout_qc_stream(&self) -> impl Stream<Item = TimeoutQc> {
self.adapter
.timeout_qc_stream(self.view())
.await
.map(|msg| msg.qc)
}
async fn gather_votes(
&self,
committee: &Committee,
block: consensus_engine::Block,
tally: &CarnotTally,
) -> (Qc, HashSet<Vote>, consensus_engine::Block) {
let votes_stream = self
.adapter
.votes_stream(committee, block.view, block.id)
.await;
match tally.tally(block.view, votes_stream).await {
Ok((qc, outcome)) => (qc, outcome, block),
Err(_e) => {
todo!("Handle tally error");
}
});
let mut stream = Box::pin(stream);
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
Event::None
}
}
async fn gather_new_views(
&self,
committee: &Committee,
view: consensus_engine::View,
) -> (HashSet<NewView>, TimeoutQc) {
match self.carnot.last_view_timeout_qc() {
Some(timeout_qc) if timeout_qc.view == view - 1 => {
let votes_stream = self.adapter.new_view_stream(committee, view).await;
match self.new_view_tally.tally(self.view(), votes_stream).await {
Ok((_, outcome)) => (outcome, timeout_qc),
Err(_e) => {
todo!("Handle tally error");
}
}
}
_ => {
futures::pending!();
unreachable!()
}
}
}
async fn gather_timeout(&self) -> HashSet<Timeout> {
self.adapter
.timeout_stream(&self.committee, self.view())
.await
.take(self.carnot.leader_super_majority_threshold())
.map(|msg| msg.vote)
.collect()
.await
}
async fn get_previous_block_qc(&self) -> Qc {
// if we're not the leader we don't have to do anything
if !self.carnot.is_leader_for_current_view() {
futures::pending!();
}
let previous_view = self.carnot.current_view() - 1;
let leader_committee = [self.carnot.id()].into_iter().collect();
let mut happy_path = self
.carnot
.blocks_in_view(previous_view)
.into_iter()
.map(|block| async {
// TODO: this should use overlay.leader_super_majority_threshold
self.gather_votes(&leader_committee, block, &self.tally)
.await
})
.collect::<FuturesUnordered<_>>();
tokio::select! {
Some((qc, _, _)) = happy_path.next() => {
qc
}
_votes = self.gather_new_views(&leader_committee, previous_view) => {
Qc::Aggregated(AggregateQc {
high_qc: self.carnot.high_qc(),
view: self.carnot.current_view(),
})
}
}
}
pub async fn run(self) -> Carnot<O> {
let mut carnot = self.carnot.clone();
// Some tasks are only activated after another event has been triggered,
// We thus push them to this stream and wait for them on demand
let mut gather_block_votes = FuturesUnordered::new();
// we create futures here and just poll them through a mut reference in the loop
// to avoid creating a new future for each iteration
let proposal_stream = self.proposal_stream().await;
let mut timeout_qc_stream = self.timeout_qc_stream().await;
let local_timeout = tokio::time::sleep(self.timeout);
let root_timeout = self.gather_timeout().fuse();
let get_previous_block_qc = self.get_previous_block_qc().fuse();
let gather_new_view_votes = self.gather_new_views(&self.committee, self.view()).fuse();
tokio::pin!(proposal_stream);
tokio::pin!(local_timeout);
tokio::pin!(root_timeout);
tokio::pin!(get_previous_block_qc);
tokio::pin!(gather_new_view_votes);
loop {
let mut output = None;
tokio::select! {
Some(block) = proposal_stream.next() => {
if let Ok(new_state) = carnot.receive_block(block.header().clone()) {
carnot = new_state;
gather_block_votes.push(self.gather_votes(&self.committee, block.header().clone(), &self.tally));
}
}
Some((_qc, _votes, block)) = gather_block_votes.next() => {
let (new_carnot, out) = carnot.approve_block(block);
carnot = new_carnot;
output = Some(Output::Send::<Tx>(out));
}
Some(timeout_qc) = timeout_qc_stream.next() => {
carnot = carnot.receive_timeout_qc(timeout_qc.clone());
}
(new_views, timeout_qc) = &mut gather_new_view_votes, if !gather_new_view_votes.is_terminated() => {
let (new_carnot, out) =
carnot.approve_new_view(timeout_qc, new_views);
carnot = new_carnot;
output = Some(Output::Send(out));
}
_ = &mut local_timeout => {
let (new_carnot, out) =
carnot.local_timeout();
carnot = new_carnot;
output = out.map(Output::Send);
}
_ = &mut root_timeout, if !root_timeout.is_terminated() => {
// timeout detected
}
qc = &mut get_previous_block_qc, if !get_previous_block_qc.is_terminated() => {
// propose block
let (reply_channel, rx) = tokio::sync::oneshot::channel();
self.mempool.send(MempoolMsg::View { ancestor_hint: self.tip.id(), reply_channel })
.await
.unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}"));
if let Ok(txs) = rx.await {
let proposal = Block::new(self.view(), qc, txs);
output = Some(Output::BroadcastProposal { proposal });
}
}
}
if let Some(output) = output {
handle_output(
&self.adapter,
&self.fountain,
carnot.current_view(),
carnot.id(),
output,
)
.await;
}
// Break after we've
if carnot.current_view() != self.view() {
break;
}
}
carnot
}
}
async fn handle_output<A, F, Tx>(
adapter: &A,
fountain: &F,
view: i64,
node_id: NodeId,
output: Output<Tx>,
) where
async fn handle_output<A, F, Tx>(adapter: &A, fountain: &F, node_id: NodeId, output: Output<Tx>)
where
A: NetworkAdapter,
F: FountainCode,
Tx: Hash + Eq + Clone + Serialize + DeserializeOwned,
Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug,
{
match output {
Output::Send(consensus_engine::Send { to, payload }) => match payload {
@ -455,7 +479,7 @@ async fn handle_output<A, F, Tx>(
adapter
.send(
&to,
view,
vote.view,
VoteMsg {
voter: node_id,
vote,
@ -470,7 +494,7 @@ async fn handle_output<A, F, Tx>(
adapter
.send(
&to,
view,
timeout.view,
TimeoutMsg {
voter: node_id,
vote: timeout,
@ -484,7 +508,7 @@ async fn handle_output<A, F, Tx>(
adapter
.send(
&to,
view,
new_view.view,
NewViewMsg {
voter: node_id,
vote: new_view,
@ -502,7 +526,7 @@ async fn handle_output<A, F, Tx>(
adapter.broadcast_block_chunk(ProposalChunkMsg {
proposal: proposal.header().id,
chunk: chunk.to_vec().into_boxed_slice(),
view,
view: proposal.header().view,
})
})
.await;
@ -517,3 +541,31 @@ async fn handle_output<A, F, Tx>(
}
}
}
enum Event<Tx: Clone + Hash + Eq> {
Proposal {
block: Block<Tx>,
stream: Pin<Box<dyn Stream<Item = Block<Tx>> + Send>>,
},
#[allow(dead_code)]
Approve {
qc: Qc,
block: consensus_engine::Block,
votes: HashSet<Vote>,
},
LocalTimeout,
NewView {
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
},
TimeoutQc {
timeout_qc: TimeoutQc,
},
RootTimeout {
timeouts: HashSet<Timeout>,
},
ProposeBlock {
qc: Qc,
},
None,
}

View File

@ -20,6 +20,7 @@ const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSi
const MOCK_APPROVAL_CONTENT_TOPIC: MockContentTopic =
MockContentTopic::new("MockSim", 1, "MockApproval");
#[derive(Clone, Debug)]
pub struct MockAdapter {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,
}

View File

@ -27,6 +27,7 @@ pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
const APPLICATION_NAME: &str = "CarnotSim";
const VERSION: usize = 1;
#[derive(Clone)]
pub struct WakuAdapter {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
}

View File

@ -6,7 +6,7 @@ use crate::NodeId;
use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote};
use nomos_core::wire;
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct ProposalChunkMsg {
pub chunk: Box<[u8]>,
pub proposal: BlockId,
@ -23,7 +23,7 @@ impl ProposalChunkMsg {
}
}
#[derive(Eq, PartialEq, Hash, Serialize, Deserialize)]
#[derive(Eq, PartialEq, Hash, Serialize, Deserialize, Clone)]
pub struct VoteMsg {
pub voter: NodeId,
pub vote: Vote,

View File

@ -1,6 +1,6 @@
use consensus_engine::{NodeId, Overlay, View};
use consensus_engine::{Committee, NodeId, Overlay, View};
#[derive(Clone)]
#[derive(Clone, Debug)]
/// Flat overlay with a single committee and round robin leader selection.
pub struct FlatRoundRobin {
nodes: Vec<NodeId>,
@ -36,7 +36,11 @@ impl Overlay for FlatRoundRobin {
}
fn parent_committee(&self, _id: NodeId) -> consensus_engine::Committee {
panic!("root committee does not have a parent committee")
Committee::new()
}
fn child_committee(&self, _id: NodeId) -> consensus_engine::Committee {
Committee::new()
}
fn leaf_committees(&self, _id: NodeId) -> Vec<consensus_engine::Committee> {
@ -48,10 +52,10 @@ impl Overlay for FlatRoundRobin {
}
fn super_majority_threshold(&self, _id: NodeId) -> usize {
self.nodes.len() * 3 / 2 + 1
0
}
fn leader_super_majority_threshold(&self, id: NodeId) -> usize {
self.super_majority_threshold(id)
fn leader_super_majority_threshold(&self, _id: NodeId) -> usize {
self.nodes.len() * 2 / 3 + 1
}
}

View File

@ -1,13 +1,14 @@
#![allow(dead_code)]
// TODO: Well, remove this when we actually use the fields from the specification
// std
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
// internal
use super::CarnotTallySettings;
use crate::network::messages::VoteMsg;
use consensus_engine::{Qc, StandardQc, View, Vote};
use consensus_engine::{Block, Qc, StandardQc, Vote};
use nomos_core::crypto::PublicKey;
use nomos_core::vote::Tally;
@ -21,14 +22,7 @@ pub enum CarnotTallyError {
InsufficientVotes,
}
#[derive(Clone)]
pub struct CarnotTallySettings {
pub threshold: usize,
// TODO: this probably should be dynamic and should change with the view (?)
pub participating_nodes: HashSet<NodeId>,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct CarnotTally {
settings: CarnotTallySettings,
}
@ -37,6 +31,7 @@ pub struct CarnotTally {
impl Tally for CarnotTally {
type Vote = VoteMsg;
type Qc = Qc;
type Subject = Block;
type Outcome = HashSet<Vote>;
type TallyError = CarnotTallyError;
type Settings = CarnotTallySettings;
@ -47,14 +42,24 @@ impl Tally for CarnotTally {
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: View,
block: Block,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut seen = HashSet::new();
let mut outcome = HashSet::new();
// return early for leaf nodes
if self.settings.threshold == 0 {
return Ok((
Qc::Standard(StandardQc {
view: block.view,
id: block.id,
}),
outcome,
));
}
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if !vote.vote.view != view {
if !vote.vote.view != block.view || vote.vote.block != block.id {
continue;
}
@ -75,6 +80,6 @@ impl Tally for CarnotTally {
));
}
}
Err(CarnotTallyError::InsufficientVotes)
unreachable!()
}
}

View File

@ -1,2 +1,18 @@
pub mod happy;
pub mod unhappy;
// std
use std::collections::HashSet;
// crates
use serde::{Deserialize, Serialize};
// internal
use consensus_engine::NodeId;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CarnotTallySettings {
pub threshold: usize,
// TODO: this probably should be dynamic and should change with the view (?)
pub participating_nodes: HashSet<NodeId>,
}

View File

@ -1,45 +1,33 @@
#![allow(dead_code)]
// TODO: Well, remove this when we actually use the fields from the specification
// std
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
// internal
use super::CarnotTallySettings;
use crate::network::messages::NewViewMsg;
use consensus_engine::{NewView, View};
use nomos_core::crypto::PublicKey;
use consensus_engine::{NewView, TimeoutQc};
use nomos_core::vote::Tally;
pub type NodeId = PublicKey;
#[derive(thiserror::Error, Debug)]
pub enum NewViewTallyError {
#[error("Received invalid vote: {0}")]
InvalidVote(String),
#[error("Did not receive enough votes")]
InsufficientVotes,
}
#[derive(Clone)]
pub struct NewViewTallySettings {
threshold: usize,
// TODO: this probably should be dynamic and should change with the view (?)
participating_nodes: HashSet<NodeId>,
}
#[derive(Clone)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NewViewTally {
settings: NewViewTallySettings,
settings: CarnotTallySettings,
}
#[async_trait::async_trait]
impl Tally for NewViewTally {
type Vote = NewViewMsg;
type Qc = ();
type Subject = TimeoutQc;
type Outcome = HashSet<NewView>;
type TallyError = NewViewTallyError;
type Settings = NewViewTallySettings;
type Settings = CarnotTallySettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
@ -47,14 +35,14 @@ impl Tally for NewViewTally {
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: View,
timeout_qc: TimeoutQc,
mut vote_stream: S,
) -> Result<(Self::Qc, Self::Outcome), Self::TallyError> {
let mut seen = HashSet::new();
let mut outcome = HashSet::new();
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if !vote.vote.view != view {
if !vote.vote.view != timeout_qc.view {
continue;
}

View File

@ -1,11 +0,0 @@
use nomos_core::block::BlockId;
/// Assuming determining which tip to consider is integral part of consensus
#[derive(Clone)]
pub struct Tip;
impl Tip {
pub fn id(&self) -> BlockId {
unimplemented!()
}
}

View File

@ -181,7 +181,7 @@ impl NetworkBackend for Waku {
let tx = message_event.clone();
waku_set_event_callback(move |sig| match sig.event() {
Event::WakuMessage(ref msg_event) => {
debug!("received message event");
debug!("received message event {:?}", msg_event.waku_message());
if tx
.send(NetworkEvent::RawMessage(msg_event.waku_message().clone()))
.is_err()