diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index c8309006..0c82201e 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -24,7 +24,6 @@ nomos-consensus = { path = "../nomos-services/consensus" } nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } nomos-node = { path = "../nodes/nomos-node" } -nomos-consensus = { path = "../nomos-services/consensus" } full-replication = { path = "../nomos-da/full-replication" } reqwest = "0.11" serde = { version = "1.0", features = ["derive"] } diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs index d370be74..80bd6a16 100644 --- a/nomos-cli/src/api/consensus.rs +++ b/nomos-cli/src/api/consensus.rs @@ -1,4 +1,5 @@ use super::CLIENT; +use consensus_engine::{Block, BlockId}; use nomos_consensus::CarnotInfo; use reqwest::Url; @@ -11,3 +12,20 @@ pub async fn carnot_info(node: &Url) -> Result { .json::() .await } + +pub async fn get_blocks_info( + node: &Url, + from: Option, + to: Option, +) -> Result, reqwest::Error> { + const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks"; + let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap()); + if let Some(from) = from { + req = req.query(&[("from", from)]); + } + if let Some(to) = to { + req = req.query(&[("to", to)]); + } + + req.send().await?.json().await +} diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index 78ddf5ed..7c093f87 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -4,24 +4,25 @@ /// mod ui; -use crate::da::{ - disseminate::{ - DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, +use crate::{ + api::consensus::get_blocks_info, + da::{ + disseminate::{ + DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, + }, + retrieve::get_block_blobs, }, - retrieve::get_block_blobs, }; use clap::Args; use full_replication::{ AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, }; -use nomos_consensus::CarnotInfo; use nomos_core::{block::BlockId, da::DaProtocol, wire}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::{ - collections::HashSet, io, path::PathBuf, sync::{ @@ -205,56 +206,58 @@ struct ChatMessage { #[tokio::main] async fn check_for_messages(sender: Sender>, node: Url) { - let mut old_blocks = HashSet::new(); + // Should ask for the genesis block to be more robust + let mut last_tip = BlockId::zeros(); loop { - if let Ok(messages) = fetch_new_messages(&mut old_blocks, node.clone()).await { + if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { sender.send(messages).expect("channel closed"); + last_tip = new_tip; } tokio::time::sleep(Duration::from_millis(100)).await; } } async fn fetch_new_messages( - old_blocks: &mut HashSet, - node: Url, -) -> Result, Box> { - const NODE_CARNOT_INFO_PATH: &str = "carnot/info"; - + last_tip: &BlockId, + node: &Url, +) -> Result<(BlockId, Vec), Box> { let mut new_messages = Vec::new(); - - let info = reqwest::get(node.join(NODE_CARNOT_INFO_PATH).unwrap()) + // By only specifying the 'to' parameter we get all the blocks since the last tip + let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) .await? - .json::() - .await?; - - let new_blocks = info - .committed_blocks .into_iter() - .filter(|id| !old_blocks.contains(id) && id != &BlockId::zeros()) + .map(|block| block.id) .collect::>(); - // note that number of attestations is ignored here since we only use the da protocol to + // The first block is the most recent one. + // Note that the 'to' is inclusive so the above request will always return at least one block + // as long as the block exists (which is the case since it was returned by a previous call) + let new_tip = new_blocks[0]; + // We already processed the last block so let's remove it + new_blocks.pop(); + + // Note that number of attestations is ignored here since we only use the da protocol to // decode the blob data, not to validate the certificate let mut da_protocol = > as DaProtocol>::new( DaSettings { num_attestations: 1, + voter: [0; 32], // voter is ignored as well }, ); for block in new_blocks { - let blobs = get_block_blobs(node.clone(), block).await?; + let blobs = get_block_blobs(node, &block).await?; for blob in blobs { da_protocol.recv_blob(blob); - // full replication only needs one blob to decode the data, so the unwrap is safe + // Full replication only needs one blob to decode the data, so the unwrap is safe let bytes = da_protocol.extract().unwrap(); if let Ok(message) = wire::deserialize::(&bytes) { new_messages.push(message); } } - old_blocks.insert(block); } - Ok(new_messages) + Ok((new_tip, new_messages)) }