diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index c881972d..a570fad1 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -166,7 +166,7 @@ where MempoolService::Hash>, TxDiscriminant>, AxumBackend, _, - >(handle.clone(), HttpMethod::POST, "addtx") + >(handle.clone(), HttpMethod::POST, "add") .await .unwrap(); @@ -206,9 +206,7 @@ where AxumBackend, _, >( - handle.clone(), - HttpMethod::POST, - "addcert", + handle.clone(), HttpMethod::POST, "add" ) .await .unwrap(); diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index 402a83c8..93f8dee0 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -12,7 +12,6 @@ tracing = "0.1" tracing-subscriber = "0.3" async-trait = "0.1" clap = {version = "4", features = ["derive"] } -crossterm = "0.27" serde_yaml = "0.9" futures = "0.3" tokio = { version = "1", features = ["sync"] } @@ -22,4 +21,5 @@ nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } -full-replication = { path = "../nomos-da/full-replication" } \ No newline at end of file +full-replication = { path = "../nomos-da/full-replication" } +reqwest = "0.11" \ No newline at end of file diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index bdd47148..2bd99f63 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -1,8 +1,7 @@ use clap::{Args, ValueEnum}; -use crossterm::execute; use full_replication::{AbsoluteNumber, FullReplication}; use futures::StreamExt; -use nomos_core::da::DaProtocol; +use nomos_core::{da::DaProtocol, wire}; use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_derive::*; @@ -16,8 +15,11 @@ use overwatch_rs::{ }, DynError, }; +use reqwest::{Client, Url}; use std::{path::PathBuf, time::Duration}; +const NODE_CERT_PATH: &str = "mempool-da/add"; + #[derive(Args, Debug)] pub struct Disseminate { // TODO: accept bytes @@ -32,6 +34,11 @@ pub struct Disseminate { /// Timeout in seconds. Defaults to 120 seconds. #[clap(short, long, default_value = "120")] pub timeout: u64, + /// Address of the node to send the certificate to + /// for block inclusion, if present. + pub node_addr: Option, + /// File to write the certificate to, if present. + pub output: Option, } #[derive(Debug, Clone, Args)] @@ -55,6 +62,8 @@ impl Disseminate { bytes: self.data.clone().as_bytes().into(), timeout: Duration::from_secs(self.timeout), da_protocol: self.da_protocol.clone(), + node_addr: self.node_addr.clone(), + output: self.output.clone(), }, }, None, @@ -65,26 +74,11 @@ impl Disseminate { } } -// Write '✓' at the end of the previous line in terminal -fn terminal_cmd_done() { - let go_to_previous_line = crossterm::cursor::MoveToPreviousLine(1); - let go_to_end_of_line = - crossterm::cursor::MoveToColumn(crossterm::terminal::size().unwrap().0 - 1); - let write_done = crossterm::style::Print("✓"); - execute!( - std::io::stdout(), - go_to_previous_line, - go_to_end_of_line, - write_done - ) - .unwrap() -} - async fn disseminate( mut da: D, data: Box<[u8]>, adapter: N, -) -> Result> +) -> Result> where D: DaProtocol, N: NetworkAdapter + Send + Sync, @@ -92,14 +86,10 @@ where // 1) Building blob tracing::info!("Building blobs..."); let blobs = da.encode(data); - terminal_cmd_done(); // 2) Send blob to network tracing::info!("Sending blobs to network..."); - futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))) - .await - .map_err(|e| e as Box)?; - terminal_cmd_done(); + futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))).await?; // 3) Collect attestations and create proof tracing::info!("Collecting attestations to create proof..."); @@ -108,7 +98,6 @@ where da.recv_attestation(attestations.next().await.unwrap()); if let Some(cert) = da.certify_dispersal() { - terminal_cmd_done(); return Ok(cert); } } @@ -127,6 +116,8 @@ pub struct Settings { bytes: Box<[u8]>, timeout: Duration, da_protocol: DaProtocolChoice, + node_addr: Option, + output: Option, } pub struct DisseminateService { @@ -150,6 +141,7 @@ impl ServiceCore for DisseminateService { async fn run(self) -> Result<(), DynError> { let Self { service_state } = self; let settings = service_state.settings_reader.get_updated_settings(); + let node_addr = settings.node_addr.clone(); match settings.da_protocol { DaProtocolChoice { @@ -185,7 +177,29 @@ impl ServiceCore for DisseminateService { ); std::process::exit(1); } - _ => {} + Ok(Ok(cert)) => { + if let Some(output) = settings.output { + tracing::info!("Writing certificate to file..."); + std::fs::write(output, wire::serialize(&cert).unwrap())?; + } + + if let Some(node_addr) = node_addr { + let client = Client::new(); + tracing::info!("Sending certificate to node..."); + let res = client + .post(node_addr.join(NODE_CERT_PATH).unwrap()) + .body(wire::serialize(&cert).unwrap()) + .send() + .await?; + if !res.status().is_success() { + tracing::error!( + "Failed to send certificate to node: {}", + res.status() + ); + std::process::exit(1); + } + } + } } } } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 83116f21..37c3ffa2 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -115,11 +115,11 @@ pub trait Discriminant { } impl Discriminant for Transaction { - const ID: &'static str = "mempool-txs"; + const ID: &'static str = "mempool-cl"; } impl Discriminant for Certificate { - const ID: &'static str = "mempool-certs"; + const ID: &'static str = "mempool-da"; } impl ServiceData for MempoolService diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 4ae60fb7..31b0332c 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -28,6 +28,7 @@ reqwest = { version = "0.11", features = ["json"] } nomos-libp2p = { path = "../nomos-libp2p" } tempfile = "3.6" serde_yaml = "0.9" +serde_json = "1.0" tokio = "1" futures = "0.3" async-trait = "0.1" @@ -51,8 +52,6 @@ path = "src/tests/mixnet.rs" [[test]] name = "test_cli" path = "src/tests/cli.rs" -# disseminate is only implemented for libp2p -required-features = ["libp2p"] [[bench]] name = "mixnet" diff --git a/tests/src/lib.rs b/tests/src/lib.rs index f9c015f8..030b9ab9 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,4 +1,4 @@ -mod nodes; +pub mod nodes; use mixnet_node::MixnetNodeConfig; use mixnet_topology::MixnetTopology; pub use nodes::MixNode; diff --git a/tests/src/nodes/mod.rs b/tests/src/nodes/mod.rs index 37f1eea7..05c24ae1 100644 --- a/tests/src/nodes/mod.rs +++ b/tests/src/nodes/mod.rs @@ -1,5 +1,5 @@ mod mixnode; -mod nomos; +pub mod nomos; pub use self::mixnode::MixNode; pub use nomos::NomosNode; diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 379dae0d..ca11bb04 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -13,6 +13,7 @@ use nomos_consensus::{CarnotInfo, CarnotSettings}; use nomos_http::backends::axum::AxumBackendSettings; use nomos_libp2p::Multiaddr; use nomos_log::{LoggerBackend, LoggerFormat}; +use nomos_mempool::MempoolMetrics; use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo}; use nomos_network::NetworkConfig; use nomos_node::Config; @@ -27,6 +28,7 @@ static CLIENT: Lazy = Lazy::new(Client::new); const NOMOS_BIN: &str = "../target/debug/nomos-node"; const CARNOT_INFO_API: &str = "carnot/info"; const NETWORK_INFO_API: &str = "network/info"; +const MEMPOOL_API: &str = "mempool-"; const LOGS_PREFIX: &str = "__logs"; pub struct NomosNode { @@ -110,6 +112,25 @@ impl NomosNode { .swap_remove(0) } + pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics { + let discr = match pool { + Pool::Cl => "cl", + Pool::Da => "da", + }; + let addr = format!("{}{}/metrics", MEMPOOL_API, discr); + let res = self + .get(&addr) + .await + .unwrap() + .json::() + .await + .unwrap(); + MempoolMetrics { + pending_items: res["pending_items"].as_u64().unwrap() as usize, + last_item_timestamp: res["last_item"].as_u64().unwrap(), + } + } + // not async so that we can use this in `Drop` pub fn get_logs_from_file(&self) -> String { println!( @@ -264,3 +285,8 @@ fn create_node_config( config } + +pub enum Pool { + Da, + Cl, +} diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index b9ba71c3..92084ab1 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -7,7 +7,9 @@ use nomos_cli::cmds::{ }; use std::time::Duration; use tempfile::NamedTempFile; -use tests::{MixNode, Node, NomosNode, SpawnConfig}; +use tests::{nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig}; + +const TIMEOUT_SECS: u64 = 20; #[tokio::test] async fn disseminate_blob() { @@ -40,9 +42,36 @@ async fn disseminate_blob() { }, }, }, + node_addr: Some( + format!("http://{}", nodes[0].config().http.backend.address.clone()) + .parse() + .unwrap(), + ), + output: None, }); - std::thread::spawn(move || cmd.run().unwrap()) - .join() - .unwrap(); + let thread = std::thread::spawn(move || cmd.run().unwrap()); + + tokio::time::timeout( + Duration::from_secs(TIMEOUT_SECS), + wait_for_cert_in_mempool(&nodes[0]), + ) + .await + .unwrap(); + + thread.join().unwrap(); +} + +async fn wait_for_cert_in_mempool(node: &NomosNode) { + loop { + if node + .get_mempoool_metrics(Pool::Da) + .await + .last_item_timestamp + != 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } }