diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index f14f1c01..0aab37ed 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -26,7 +26,7 @@ nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } nomos-node = { path = "../nodes/nomos-node" } full-replication = { path = "../nomos-da/full-replication" } -reqwest = "0.11" +reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs index 86d31592..92b7b278 100644 --- a/nomos-cli/src/api/consensus.rs +++ b/nomos-cli/src/api/consensus.rs @@ -1,6 +1,5 @@ use super::CLIENT; use carnot_consensus::CarnotInfo; -use carnot_engine::{Block, BlockId}; use reqwest::Url; pub async fn carnot_info(node: &Url) -> Result { @@ -12,20 +11,3 @@ pub async fn carnot_info(node: &Url) -> Result { .json::() .await } - -pub async fn get_blocks_info( - node: &Url, - from: Option, - to: Option, -) -> Result, reqwest::Error> { - const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks"; - let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap()); - if let Some(from) = from { - req = req.query(&[("from", from)]); - } - if let Some(to) = to { - req = req.query(&[("to", to)]); - } - - req.send().await?.json().await -} diff --git a/nomos-cli/src/api/da.rs b/nomos-cli/src/api/da.rs index 7a3d3c07..83f9564d 100644 --- a/nomos-cli/src/api/da.rs +++ b/nomos-cli/src/api/da.rs @@ -4,12 +4,12 @@ use nomos_core::da::blob; use reqwest::Url; pub async fn get_blobs( - node: &Url, + explorer: &Url, ids: Vec<::Hash>, ) -> Result, reqwest::Error> { const BLOBS_PATH: &str = "da/blobs"; CLIENT - .post(node.join(BLOBS_PATH).unwrap()) + .post(explorer.join(BLOBS_PATH).unwrap()) .json(&ids) .send() .await? diff --git a/nomos-cli/src/api/storage.rs b/nomos-cli/src/api/storage.rs index cc439600..c8c1d256 100644 --- a/nomos-cli/src/api/storage.rs +++ b/nomos-cli/src/api/storage.rs @@ -6,13 +6,13 @@ use nomos_node::Tx; use reqwest::Url; pub async fn get_block_contents( - node: &Url, + explorer: &Url, block: &BlockId, ) -> Result>, reqwest::Error> { - const BLOCK_PATH: &str = "storage/block"; + const BLOCK_PATH: &str = "explorer/blocks/depth"; CLIENT - .post(node.join(BLOCK_PATH).unwrap()) - .json(block) + .get(explorer.join(BLOCK_PATH).unwrap()) + .query(&[("from", block)]) .send() .await? .json() diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index c89619e4..a69d88a6 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -5,7 +5,7 @@ mod ui; use crate::{ - api::consensus::get_blocks_info, + api::{consensus::carnot_info, storage::get_block_contents}, da::{ disseminate::{ DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, @@ -18,7 +18,7 @@ use full_replication::{ AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, }; use futures::{stream, StreamExt}; -use nomos_core::{block::BlockId, da::DaProtocol, wire}; +use nomos_core::{da::DaProtocol, wire}; use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; @@ -58,9 +58,12 @@ pub struct NomosChat { /// The data availability protocol to use. Defaults to full replication. #[clap(flatten)] pub da_protocol: DaProtocolChoice, - /// The node to connect to to fetch blocks and blobs + /// The node to connect to to fetch carnot info #[clap(long)] pub node: Url, + /// The explorer to connect to to fetch blocks and blobs + #[clap(long)] + pub explorer: Url, } pub struct App { @@ -73,6 +76,7 @@ pub struct App { payload_sender: UnboundedSender>, status_updates: Receiver, node: Url, + explorer: Url, logs: Arc>>, scroll_logs: u16, } @@ -133,6 +137,7 @@ impl NomosChat { payload_sender, status_updates, node: self.node.clone(), + explorer: self.explorer.clone(), logs: shared_writer, scroll_logs: 0, }; @@ -155,7 +160,8 @@ impl NomosChat { fn run_app(terminal: &mut Terminal, mut app: App) { let (message_tx, message_rx) = std::sync::mpsc::channel(); let node = app.node.clone(); - std::thread::spawn(move || check_for_messages(message_tx, node)); + let explorer = app.explorer.clone(); + std::thread::spawn(move || check_for_messages(message_tx, node, explorer)); loop { terminal.draw(|f| ui::ui(f, &app)).unwrap(); @@ -229,14 +235,10 @@ struct ChatMessage { } #[tokio::main] -async fn check_for_messages(sender: Sender>, node: Url) { - // Should ask for the genesis block to be more robust - let mut last_tip = BlockId::zeros(); - +async fn check_for_messages(sender: Sender>, node: Url, explorer: Url) { loop { - if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { + if let Ok(messages) = fetch_new_messages(&explorer, &node).await { sender.send(messages).expect("channel closed"); - last_tip = new_tip; } tokio::time::sleep(Duration::from_millis(100)).await; } @@ -245,10 +247,10 @@ async fn check_for_messages(sender: Sender>, node: Url) { // Process a single block's blobs and return chat messages async fn process_block_blobs( node: Url, - block_id: &BlockId, + block: &nomos_core::block::Block, da_settings: DaSettings, ) -> Result, Box> { - let blobs = get_block_blobs(&node, block_id).await?; + let blobs = get_block_blobs(&node, block).await?; // Note that number of attestations is ignored here since we only use the da protocol to // decode the blob data, not to validate the certificate @@ -269,29 +271,25 @@ async fn process_block_blobs( // Fetch new messages since the last tip async fn fetch_new_messages( - last_tip: &BlockId, + explorer: &Url, node: &Url, -) -> Result<(BlockId, Vec), Box> { +) -> Result, Box> { + let info = carnot_info(node).await?; // By only specifying the 'to' parameter we get all the blocks since the last tip - let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) + let mut blocks = get_block_contents(explorer, &info.tip.id) .await? .into_iter() - .map(|block| block.id) .collect::>(); - // The first block is the most recent one. - // Note that the 'to' is inclusive so the above request will always return at least one block - // as long as the block exists (which is the case since it was returned by a previous call) - let new_tip = new_blocks[0]; // We already processed the last block so let's remove it - new_blocks.pop(); + blocks.pop(); let da_settings = DaSettings { num_attestations: 1, voter: [0; 32], }; - let block_stream = stream::iter(new_blocks.iter().rev()); + let block_stream = stream::iter(blocks.iter()); let results: Vec<_> = block_stream .map(|block| { let node = node.clone(); @@ -299,7 +297,7 @@ async fn fetch_new_messages( process_block_blobs(node, block, da_settings) }) - .buffer_unordered(new_blocks.len()) + .buffer_unordered(blocks.len()) .collect::>() .await; @@ -308,5 +306,5 @@ async fn fetch_new_messages( new_messages.extend(result?); } - Ok((new_tip, new_messages)) + Ok(new_messages) } diff --git a/nomos-cli/src/da/retrieve.rs b/nomos-cli/src/da/retrieve.rs index 6723d410..b534d765 100644 --- a/nomos-cli/src/da/retrieve.rs +++ b/nomos-cli/src/da/retrieve.rs @@ -1,10 +1,10 @@ -use carnot_engine::BlockId; use full_replication::Blob; -use nomos_core::da::certificate::Certificate; +use nomos_core::{block::Block, da::certificate::Certificate}; +use nomos_node::Tx; use reqwest::Url; use thiserror::Error; -use crate::api::{da::get_blobs, storage::get_block_contents}; +use crate::api::da::get_blobs; #[derive(Error, Debug)] pub enum Error { @@ -15,10 +15,9 @@ pub enum Error { } /// Return the blobs whose certificate has been included in the provided block. -pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result, Error> { - let block = get_block_contents(node, block) - .await? - .ok_or(Error::NotFound)?; - +pub async fn get_block_blobs( + node: &Url, + block: &Block, +) -> Result, Error> { Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?) }