diff --git a/consensus-engine/src/lib.rs b/consensus-engine/src/lib.rs index 37b8a5bc..8bb750f0 100644 --- a/consensus-engine/src/lib.rs +++ b/consensus-engine/src/lib.rs @@ -37,6 +37,10 @@ impl Carnot { self.highest_voted_view } + pub fn safe_blocks(&self) -> &HashMap { + &self.safe_blocks + } + /// Upon reception of a block /// /// Preconditions: diff --git a/nodes/mockpool-node/src/bridges.rs b/nodes/mockpool-node/src/bridges.rs index cfe5ec5b..dbe60e74 100644 --- a/nodes/mockpool-node/src/bridges.rs +++ b/nodes/mockpool-node/src/bridges.rs @@ -1,7 +1,9 @@ // std // crates +use crate::Carnot; use bytes::Bytes; use http::StatusCode; +use nomos_consensus::{CarnotInfo, ConsensusMsg}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tracing::error; @@ -23,6 +25,25 @@ use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use waku_bindings::WakuMessage; +pub fn carnot_info_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (carnot_channel, mut http_request_channel) = + build_http_bridge::(handle, HttpMethod::GET, "info") + .await + .unwrap(); + + while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { + if let Err(e) = handle_carnot_info_req(&carnot_channel, &res_tx).await { + error!(e); + } + } + + Ok(()) + })) +} + pub fn mempool_metrics_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { @@ -37,7 +58,7 @@ pub fn mempool_metrics_bridge( .unwrap(); while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_metrics_req(&mempool_channel, res_tx).await { + if let Err(e) = handle_mempool_metrics_req(&mempool_channel, res_tx).await { error!(e); } } @@ -62,7 +83,9 @@ pub fn mempool_add_tx_bridge( res_tx, payload, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_add_tx_req(&handle, &mempool_channel, res_tx, payload).await { + if let Err(e) = + handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await + { error!(e); } } @@ -84,7 +107,7 @@ pub fn waku_info_bridge( .unwrap(); while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_info_req(&waku_channel, &res_tx).await { + if let Err(e) = handle_waku_info_req(&waku_channel, &res_tx).await { error!(e); } } @@ -116,7 +139,24 @@ pub fn waku_add_conn_bridge( })) } -async fn handle_metrics_req( +async fn handle_carnot_info_req( + carnot_channel: &OutboundRelay, + res_tx: &Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + carnot_channel + .send(ConsensusMsg::Info { tx: sender }) + .await + .map_err(|(e, _)| e)?; + let carnot_info: CarnotInfo = receiver.await.unwrap(); + res_tx + .send(Ok(serde_json::to_vec(&carnot_info)?.into())) + .await?; + + Ok(()) +} + +async fn handle_mempool_metrics_req( mempool_channel: &OutboundRelay>, res_tx: Sender, ) -> Result<(), overwatch_rs::DynError> { @@ -141,7 +181,7 @@ async fn handle_metrics_req( Ok(()) } -async fn handle_add_tx_req( +async fn handle_mempool_add_tx_req( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, mempool_channel: &OutboundRelay>, res_tx: Sender, @@ -186,7 +226,7 @@ async fn handle_add_tx_req( } } -async fn handle_info_req( +async fn handle_waku_info_req( waku_channel: &OutboundRelay>, res_tx: &Sender, ) -> Result<(), overwatch_rs::DynError> { diff --git a/nodes/mockpool-node/src/main.rs b/nodes/mockpool-node/src/main.rs index f3754d27..bb9f960a 100644 --- a/nodes/mockpool-node/src/main.rs +++ b/nodes/mockpool-node/src/main.rs @@ -85,6 +85,7 @@ fn main() -> Result<()> { let Args { config } = Args::parse(); let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?; let bridges: Vec = vec![ + Arc::new(Box::new(bridges::carnot_info_bridge)), Arc::new(Box::new(bridges::mempool_add_tx_bridge)), Arc::new(Box::new(bridges::mempool_metrics_bridge)), Arc::new(Box::new(bridges::waku_add_conn_bridge)), diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 1af3ec8d..24fe06e4 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -26,8 +26,12 @@ tokio-util = "0.7" tracing = "0.1" waku-bindings = { version = "0.1.0-rc.2", optional = true} bls-signatures = "0.14" +serde_with = "3.0.0" [features] default = [] waku = ["nomos-network/waku", "waku-bindings"] mock = ["nomos-network/mock"] + +[dev-dependencies] +serde_json = "1.0.96" diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 240c7dfd..666e7d6d 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -4,7 +4,7 @@ mod tally; mod task_manager; // std -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; use std::pin::Pin; @@ -15,14 +15,16 @@ use futures::{Stream, StreamExt}; use leader_selection::UpdateableLeaderSelection; use serde::Deserialize; use serde::{de::DeserializeOwned, Serialize}; +use serde_with::serde_as; +use tokio::sync::oneshot::Sender; use tracing::instrument; // internal use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg}; use crate::network::NetworkAdapter; use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings}; use consensus_engine::{ - overlay::RandomBeaconState, AggregateQc, Carnot, Committee, LeaderProof, NewView, Overlay, - Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, + overlay::RandomBeaconState, AggregateQc, BlockId, Carnot, Committee, LeaderProof, NewView, + Overlay, Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote, }; use task_manager::TaskManager; @@ -35,10 +37,9 @@ use nomos_mempool::{ backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolMsg, MempoolService, }; use nomos_network::NetworkService; -use overwatch_rs::services::relay::{OutboundRelay, Relay}; +use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, - relay::NoMessage, state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; @@ -117,7 +118,7 @@ where type Settings = CarnotSettings; type State = NoState; type StateOperator = NoOperator; - type Message = NoMessage; + type Message = ConsensusMsg; } #[async_trait::async_trait] @@ -219,82 +220,25 @@ where }); } - while let Some(event) = task_manager.next().await { - let mut output = None; - let prev_view = carnot.current_view(); - match event { - Event::Proposal { block, stream } => { - (carnot, output) = Self::process_block( - carnot, - block, - stream, - &mut task_manager, - adapter.clone(), - ) - .await; - } - Event::Approve { block, .. } => { - tracing::debug!("approving proposal {:?}", block); - let (new_carnot, out) = carnot.approve_block(block.clone()); - carnot = new_carnot; - output = Some(Output::Send::(out)); - } - Event::LocalTimeout => { - tracing::debug!("local timeout"); - let (new_carnot, out) = carnot.local_timeout(); - carnot = new_carnot; - output = out.map(Output::Send); - } - Event::NewView { - timeout_qc, - new_views, - } => { - (carnot, output) = Self::approve_new_view( - carnot, - timeout_qc, - new_views, - &mut task_manager, - adapter.clone(), - ) - .await; - } - Event::TimeoutQc { timeout_qc } => { - (carnot, output) = Self::receive_timeout_qc( - carnot, - timeout_qc, - &mut task_manager, - adapter.clone(), - ) - .await; - } - Event::RootTimeout { timeouts } => { - (carnot, output) = Self::process_root_timeout(carnot, timeouts).await; - } - Event::ProposeBlock { qc } => { - output = - Self::propose_block(carnot.id(), private_key, qc, mempool_relay.clone()) - .await; - } - _ => {} - } - - let current_view = carnot.current_view(); - if current_view != prev_view { - Self::process_view_change( - carnot.clone(), - prev_view, - &mut task_manager, - adapter.clone(), - ) - .await; - } - - if let Some(output) = output { - handle_output(&adapter, &fountain, carnot.id(), output).await; + loop { + tokio::select! { + Some(event) = task_manager.next() => { + carnot = Self::process_carnot_event( + carnot, + event, + &mut task_manager, + adapter.clone(), + private_key, + mempool_relay.clone(), + &fountain, + ) + .await + } + Some(msg) = self.service_state.inbound_relay.next() => { + Self::process_message(&carnot, msg); + } } } - - unreachable!("carnot exited"); } } @@ -318,6 +262,93 @@ where O: Overlay + Debug + Send + Sync + 'static, O::LeaderSelection: UpdateableLeaderSelection, { + fn process_message(carnot: &Carnot, msg: ConsensusMsg) { + match msg { + ConsensusMsg::Info { tx } => { + let info = CarnotInfo { + id: carnot.id(), + current_view: carnot.current_view(), + highest_voted_view: carnot.highest_voted_view(), + local_high_qc: carnot.high_qc(), + safe_blocks: carnot.safe_blocks().clone(), + last_view_timeout_qc: carnot.last_view_timeout_qc(), + committed_blocks: carnot.committed_blocks(), + }; + tx.send(info).unwrap_or_else(|e| { + tracing::error!("Could not send consensus info through channel: {:?}", e) + }); + } + } + } + + async fn process_carnot_event( + mut carnot: Carnot, + event: Event, + task_manager: &mut TaskManager>, + adapter: A, + private_key: PrivateKey, + mempool_relay: OutboundRelay>, + fountain: &F, + ) -> Carnot { + let mut output = None; + let prev_view = carnot.current_view(); + match event { + Event::Proposal { block, stream } => { + (carnot, output) = + Self::process_block(carnot, block, stream, task_manager, adapter.clone()).await; + } + Event::Approve { block, .. } => { + tracing::debug!("approving proposal {:?}", block); + let (new_carnot, out) = carnot.approve_block(block); + carnot = new_carnot; + output = Some(Output::Send::(out)); + } + Event::LocalTimeout => { + tracing::debug!("local timeout"); + let (new_carnot, out) = carnot.local_timeout(); + carnot = new_carnot; + output = out.map(Output::Send); + } + Event::NewView { + timeout_qc, + new_views, + } => { + (carnot, output) = Self::approve_new_view( + carnot, + timeout_qc, + new_views, + task_manager, + adapter.clone(), + ) + .await; + } + Event::TimeoutQc { timeout_qc } => { + (carnot, output) = + Self::receive_timeout_qc(carnot, timeout_qc, task_manager, adapter.clone()) + .await; + } + Event::RootTimeout { timeouts } => { + (carnot, output) = Self::process_root_timeout(carnot, timeouts).await; + } + Event::ProposeBlock { qc } => { + output = Self::propose_block(carnot.id(), private_key, qc, mempool_relay).await; + } + _ => {} + } + + let current_view = carnot.current_view(); + if current_view != prev_view { + Self::process_view_change(carnot.clone(), prev_view, task_manager, adapter.clone()) + .await; + } + + if let Some(output) = output { + handle_output(&adapter, fountain, carnot.id(), output).await; + } + + carnot + } + #[instrument(level = "debug", skip(adapter, task_manager, stream))] async fn process_block( mut carnot: Carnot, @@ -743,3 +774,66 @@ enum Event { }, None, } + +#[derive(Debug)] +pub enum ConsensusMsg { + Info { tx: Sender }, +} + +impl RelayMessage for ConsensusMsg {} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct CarnotInfo { + id: NodeId, + current_view: View, + highest_voted_view: View, + local_high_qc: StandardQc, + #[serde_as(as = "Vec<(_, _)>")] + safe_blocks: HashMap, + last_view_timeout_qc: Option, + committed_blocks: Vec, +} + +#[cfg(test)] +mod tests { + use consensus_engine::Block; + + use super::*; + + #[test] + fn serde_carnot_info() { + let info = CarnotInfo { + id: [0; 32], + current_view: 1, + highest_voted_view: -1, + local_high_qc: StandardQc { + view: 0, + id: [0; 32], + }, + safe_blocks: HashMap::from([( + [0; 32], + Block { + id: [0; 32], + view: 0, + parent_qc: Qc::Standard(StandardQc { + view: 0, + id: [0; 32], + }), + leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] }, + }, + )]), + last_view_timeout_qc: None, + committed_blocks: vec![[0; 32]], + }; + + let serialized = serde_json::to_string(&info).unwrap(); + assert_eq!( + serialized, + r#"{"id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"current_view":1,"highest_voted_view":-1,"local_high_qc":{"view":0,"id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"safe_blocks":[[[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],{"view":0,"parent_qc":{"Standard":{"view":0,"id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"leader_proof":{"LeaderId":{"leader_id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}}]],"last_view_timeout_qc":null,"committed_blocks":[[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]]}"# + ); + + let deserialized: CarnotInfo = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, info); + } +}