diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index b53b0eff..5492ea81 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -1,7 +1,8 @@ +// std use std::error::Error; use std::ops::Range; use std::{fmt::Debug, hash::Hash}; - +// crates use axum::{ extract::{Query, State}, http::HeaderValue, @@ -35,6 +36,7 @@ use tower_http::{ }; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; +// internal /// Configuration for the Http Server #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] @@ -178,6 +180,7 @@ where .route("/network/info", routing::get(libp2p_info)) .route("/storage/block", routing::post(block::)) .route("/mempool/add/tx", routing::post(add_tx::)) + .route("/mempool/add/blobinfo", routing::post(add_blob_info::)) .route("/metrics", routing::get(get_metrics)) .with_state(handle); @@ -450,6 +453,38 @@ where >(&handle, tx, Transaction::hash)) } +#[utoipa::path( + post, + path = "/mempool/add/blobinfo", + responses( + (status = 200, description = "Add blob info to the mempool"), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn add_blob_info( + State(handle): State, + Json(blob_info): Json, +) -> Response +where + B: DispersedBlobInfo + + Clone + + Debug + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::BlobId: std::cmp::Ord + Clone + Debug + Hash + Send + Sync + 'static, +{ + make_request_and_return_response!(mempool::add_blob_info::< + NetworkBackend, + MempoolNetworkAdapter::BlobId>, + B, + ::BlobId, + >(&handle, blob_info, DispersedBlobInfo::blob_id)) +} + #[utoipa::path( get, path = "/metrics", diff --git a/nomos-cli/src/api/mempool.rs b/nomos-cli/src/api/mempool.rs index c6df251e..ea62e683 100644 --- a/nomos-cli/src/api/mempool.rs +++ b/nomos-cli/src/api/mempool.rs @@ -6,7 +6,7 @@ pub async fn send_blob_info(node: &Url, info: &I) -> Result where I: Serialize, { - const NODE_CERT_PATH: &str = "mempool/add/cert"; + const NODE_CERT_PATH: &str = "mempool/add/blobinfo"; CLIENT .post(node.join(NODE_CERT_PATH).unwrap()) .json(info) diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index 7c09e3d6..1b56ab49 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -29,10 +29,18 @@ pub struct Disseminate { /// for block inclusion, if present. #[clap(long)] pub node_addr: Option, + // Application ID for dispersed data. #[clap(long)] pub app_id: String, + // Index for the Blob associated with Application ID. #[clap(long)] pub index: u64, + // Use Kzg RS cache. + #[clap(long)] + pub with_cache: bool, + // Number of columns to use for encoding. + #[clap(long, default_value = "4096")] + pub columns: usize, /// File to write the certificate to, if present. #[clap(long)] pub output: Option, @@ -51,14 +59,18 @@ impl Disseminate { >(std::fs::File::open(&self.network_config)?)?; let (status_updates, rx) = std::sync::mpsc::channel(); - let bytes: Box<[u8]> = if let Some(data) = &self.data { - data.clone().as_bytes().into() + let mut bytes: Vec = if let Some(data) = &self.data { + data.clone().into_bytes() } else { let file_path = self.file.as_ref().unwrap(); - let file_bytes = std::fs::read(file_path)?; - file_bytes.into_boxed_slice() + std::fs::read(file_path)? }; + let remainder = bytes.len() % 31; + if remainder != 0 { + bytes.resize(bytes.len() + (31 - remainder), 0); + } + let app_id: [u8; 32] = hex::decode(&self.app_id)? .try_into() .map_err(|_| "Invalid app_id")?; @@ -66,8 +78,10 @@ impl Disseminate { let timeout = Duration::from_secs(self.timeout); let node_addr = self.node_addr.clone(); let output = self.output.clone(); + let num_columns = self.columns; + let with_cache = self.with_cache; let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel(); - payload_sender.send(bytes).unwrap(); + payload_sender.send(bytes.into_boxed_slice()).unwrap(); std::thread::spawn(move || { OverwatchRunner::::run( DisseminateAppServiceSettings { @@ -75,7 +89,10 @@ impl Disseminate { send_blob: Settings { payload: Arc::new(Mutex::new(payload_rx)), timeout, - kzgrs_settings: KzgrsSettings::default(), + kzgrs_settings: KzgrsSettings { + num_columns, + with_cache, + }, metadata, status_updates, node_addr, diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index 9bcc6345..248ca4e4 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -75,7 +75,7 @@ where if !res.status().is_success() { tracing::error!("ERROR: {:?}", res); - return Err(format!("Failed to send certificate to node: {}", res.status()).into()); + return Err(format!("Failed to send blob info to node: {}", res.status()).into()); } } @@ -210,8 +210,8 @@ impl ServiceCore for DisseminateService { #[derive(Debug, Clone, Args)] pub struct KzgrsSettings { - num_columns: usize, - with_cache: bool, + pub num_columns: usize, + pub with_cache: bool, } impl Default for KzgrsSettings { diff --git a/nomos-cli/src/da/network/backend.rs b/nomos-cli/src/da/network/backend.rs index ff237036..5e9fc520 100644 --- a/nomos-cli/src/da/network/backend.rs +++ b/nomos-cli/src/da/network/backend.rs @@ -57,8 +57,8 @@ pub struct ExecutorBackendSettings { #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] pub node_key: ed25519::SecretKey, /// Membership of DA network PoV set - membership: Membership, - node_addrs: HashMap, + pub membership: Membership, + pub node_addrs: HashMap, } impl ExecutorBackend { diff --git a/nomos-services/api/src/http/mempool.rs b/nomos-services/api/src/http/mempool.rs index 99a0e334..2ea2856d 100644 --- a/nomos-services/api/src/http/mempool.rs +++ b/nomos-services/api/src/http/mempool.rs @@ -41,7 +41,7 @@ where } } -pub async fn add_cert( +pub async fn add_blob_info( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, item: A::Payload, converter: impl Fn(&A::Payload) -> Key, diff --git a/tests/Cargo.toml b/tests/Cargo.toml index d91a0324..236446c2 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -42,6 +42,10 @@ time = "0.3" name = "test_cryptarchia_happy_path" path = "src/tests/cryptarchia/happy.rs" +[[test]] +name = "test_cli" +path = "src/tests/cli.rs" + [features] mixnet = ["nomos-network/mixnet"] metrics = ["nomos-node/metrics"] diff --git a/tests/src/lib.rs b/tests/src/lib.rs index dffd18c5..708da679 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,5 +1,5 @@ pub mod nodes; -// pub use nodes::NomosNode; +pub use nodes::NomosNode; use once_cell::sync::Lazy; use std::env; diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs new file mode 100644 index 00000000..127206d4 --- /dev/null +++ b/tests/src/tests/cli.rs @@ -0,0 +1,125 @@ +use nomos_cli::cmds::disseminate::Disseminate; +use nomos_cli::da::network::backend::ExecutorBackend; +use nomos_cli::da::network::backend::ExecutorBackendSettings; +use nomos_da_network_service::NetworkConfig; +use nomos_libp2p::ed25519; +use nomos_libp2p::libp2p; +use nomos_libp2p::libp2p::multiaddr::multiaddr; +use nomos_libp2p::Multiaddr; +use nomos_libp2p::PeerId; +use std::collections::HashMap; +use std::io::Write; +use subnetworks_assignations::versions::v1::FillFromNodeList; +use tempfile::NamedTempFile; +use tests::nodes::NomosNode; +use tests::Node; +use tests::SpawnConfig; + +const CLI_BIN: &str = "../target/debug/nomos-cli"; + +use std::process::Command; + +fn run_disseminate(disseminate: &Disseminate) { + let mut binding = Command::new(CLI_BIN); + let c = binding + .args(["disseminate", "--network-config"]) + .arg(disseminate.network_config.as_os_str()) + .arg("--app-id") + .arg(&disseminate.app_id) + .arg("--index") + .arg(disseminate.index.to_string()) + .arg("--columns") + .arg(disseminate.columns.to_string()) + .arg("--node-addr") + .arg(disseminate.node_addr.as_ref().unwrap().as_str()); + + match (&disseminate.data, &disseminate.file) { + (Some(data), None) => c.args(["--data", &data]), + (None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]), + (_, _) => panic!("Either data or file needs to be provided, but not both"), + }; + + c.status().expect("failed to execute nomos cli"); +} + +async fn disseminate(config: &mut Disseminate) { + let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2)).await; + + let node_addrs: HashMap = nodes + .iter() + .map(|n| { + let libp2p_config = &n.config().network.backend.inner; + let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from( + libp2p_config.node_key.clone(), + )); + let peer_id = PeerId::from(keypair.public()); + let address = multiaddr!(Ip4(libp2p_config.host), Udp(libp2p_config.port), QuicV1); + (peer_id, address) + }) + .collect(); + + let peer_ids: Vec = node_addrs.keys().cloned().collect(); + + let da_network_config: NetworkConfig> = NetworkConfig { + backend: ExecutorBackendSettings { + node_key: ed25519::SecretKey::generate(), + membership: FillFromNodeList::new(&peer_ids, 2, 1), + node_addrs, + }, + }; + + let mut file = NamedTempFile::new().unwrap(); + let config_path = file.path().to_owned(); + serde_yaml::to_writer(&mut file, &da_network_config).unwrap(); + + config.timeout = 20; + config.network_config = config_path; + config.node_addr = Some( + format!( + "http://{}", + nodes[0].config().http.backend_settings.address.clone() + ) + .parse() + .unwrap(), + ); + config.app_id = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715".to_string(); + config.index = 0; + config.columns = 32; + + run_disseminate(&config); +} + +#[tokio::test] +async fn disseminate_blob() { + let mut config = Disseminate { + data: Some("hello world".to_string()), + ..Default::default() + }; + disseminate(&mut config).await; +} + +#[tokio::test] +async fn disseminate_big_blob() { + const MSG_SIZE: usize = 1024; + let mut config = Disseminate { + data: std::iter::repeat(String::from("X")) + .take(MSG_SIZE) + .collect::>() + .join("") + .into(), + ..Default::default() + }; + disseminate(&mut config).await; +} + +#[tokio::test] +async fn disseminate_blob_from_file() { + let mut file = NamedTempFile::new().unwrap(); + file.write_all("hello world".as_bytes()).unwrap(); + + let mut config = Disseminate { + file: Some(file.path().to_path_buf()), + ..Default::default() + }; + disseminate(&mut config).await; +}