DA: Nomos Cli dispersal tests (#711)
* Dissemination app in tests module * Expose remaining executor settings * Configure peer ids and addresses for cli tests * Add BlobInfo to mempool via api * Expose columns number args for nomos cli * Spawn all nodes in nodes config
This commit is contained in:
parent
0cb039d806
commit
4ad9ebec46
|
@ -1,7 +1,8 @@
|
|||
// std
|
||||
use std::error::Error;
|
||||
use std::ops::Range;
|
||||
use std::{fmt::Debug, hash::Hash};
|
||||
|
||||
// crates
|
||||
use axum::{
|
||||
extract::{Query, State},
|
||||
http::HeaderValue,
|
||||
|
@ -35,6 +36,7 @@ use tower_http::{
|
|||
};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
// internal
|
||||
|
||||
/// Configuration for the Http Server
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
|
@ -178,6 +180,7 @@ where
|
|||
.route("/network/info", routing::get(libp2p_info))
|
||||
.route("/storage/block", routing::post(block::<S, T>))
|
||||
.route("/mempool/add/tx", routing::post(add_tx::<T>))
|
||||
.route("/mempool/add/blobinfo", routing::post(add_blob_info::<V>))
|
||||
.route("/metrics", routing::get(get_metrics))
|
||||
.with_state(handle);
|
||||
|
||||
|
@ -450,6 +453,38 @@ where
|
|||
>(&handle, tx, Transaction::hash))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/mempool/add/blobinfo",
|
||||
responses(
|
||||
(status = 200, description = "Add blob info to the mempool"),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_blob_info<B>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(blob_info): Json<B>,
|
||||
) -> Response
|
||||
where
|
||||
B: DispersedBlobInfo
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<B as DispersedBlobInfo>::BlobId: std::cmp::Ord + Clone + Debug + Hash + Send + Sync + 'static,
|
||||
{
|
||||
make_request_and_return_response!(mempool::add_blob_info::<
|
||||
NetworkBackend,
|
||||
MempoolNetworkAdapter<B, <B as DispersedBlobInfo>::BlobId>,
|
||||
B,
|
||||
<B as DispersedBlobInfo>::BlobId,
|
||||
>(&handle, blob_info, DispersedBlobInfo::blob_id))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/metrics",
|
||||
|
|
|
@ -6,7 +6,7 @@ pub async fn send_blob_info<I>(node: &Url, info: &I) -> Result<Response, Error>
|
|||
where
|
||||
I: Serialize,
|
||||
{
|
||||
const NODE_CERT_PATH: &str = "mempool/add/cert";
|
||||
const NODE_CERT_PATH: &str = "mempool/add/blobinfo";
|
||||
CLIENT
|
||||
.post(node.join(NODE_CERT_PATH).unwrap())
|
||||
.json(info)
|
||||
|
|
|
@ -29,10 +29,18 @@ pub struct Disseminate {
|
|||
/// for block inclusion, if present.
|
||||
#[clap(long)]
|
||||
pub node_addr: Option<Url>,
|
||||
// Application ID for dispersed data.
|
||||
#[clap(long)]
|
||||
pub app_id: String,
|
||||
// Index for the Blob associated with Application ID.
|
||||
#[clap(long)]
|
||||
pub index: u64,
|
||||
// Use Kzg RS cache.
|
||||
#[clap(long)]
|
||||
pub with_cache: bool,
|
||||
// Number of columns to use for encoding.
|
||||
#[clap(long, default_value = "4096")]
|
||||
pub columns: usize,
|
||||
/// File to write the certificate to, if present.
|
||||
#[clap(long)]
|
||||
pub output: Option<PathBuf>,
|
||||
|
@ -51,14 +59,18 @@ impl Disseminate {
|
|||
>(std::fs::File::open(&self.network_config)?)?;
|
||||
let (status_updates, rx) = std::sync::mpsc::channel();
|
||||
|
||||
let bytes: Box<[u8]> = if let Some(data) = &self.data {
|
||||
data.clone().as_bytes().into()
|
||||
let mut bytes: Vec<u8> = if let Some(data) = &self.data {
|
||||
data.clone().into_bytes()
|
||||
} else {
|
||||
let file_path = self.file.as_ref().unwrap();
|
||||
let file_bytes = std::fs::read(file_path)?;
|
||||
file_bytes.into_boxed_slice()
|
||||
std::fs::read(file_path)?
|
||||
};
|
||||
|
||||
let remainder = bytes.len() % 31;
|
||||
if remainder != 0 {
|
||||
bytes.resize(bytes.len() + (31 - remainder), 0);
|
||||
}
|
||||
|
||||
let app_id: [u8; 32] = hex::decode(&self.app_id)?
|
||||
.try_into()
|
||||
.map_err(|_| "Invalid app_id")?;
|
||||
|
@ -66,8 +78,10 @@ impl Disseminate {
|
|||
let timeout = Duration::from_secs(self.timeout);
|
||||
let node_addr = self.node_addr.clone();
|
||||
let output = self.output.clone();
|
||||
let num_columns = self.columns;
|
||||
let with_cache = self.with_cache;
|
||||
let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
payload_sender.send(bytes).unwrap();
|
||||
payload_sender.send(bytes.into_boxed_slice()).unwrap();
|
||||
std::thread::spawn(move || {
|
||||
OverwatchRunner::<DisseminateApp>::run(
|
||||
DisseminateAppServiceSettings {
|
||||
|
@ -75,7 +89,10 @@ impl Disseminate {
|
|||
send_blob: Settings {
|
||||
payload: Arc::new(Mutex::new(payload_rx)),
|
||||
timeout,
|
||||
kzgrs_settings: KzgrsSettings::default(),
|
||||
kzgrs_settings: KzgrsSettings {
|
||||
num_columns,
|
||||
with_cache,
|
||||
},
|
||||
metadata,
|
||||
status_updates,
|
||||
node_addr,
|
||||
|
|
|
@ -75,7 +75,7 @@ where
|
|||
|
||||
if !res.status().is_success() {
|
||||
tracing::error!("ERROR: {:?}", res);
|
||||
return Err(format!("Failed to send certificate to node: {}", res.status()).into());
|
||||
return Err(format!("Failed to send blob info to node: {}", res.status()).into());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,8 +210,8 @@ impl ServiceCore for DisseminateService {
|
|||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct KzgrsSettings {
|
||||
num_columns: usize,
|
||||
with_cache: bool,
|
||||
pub num_columns: usize,
|
||||
pub with_cache: bool,
|
||||
}
|
||||
|
||||
impl Default for KzgrsSettings {
|
||||
|
|
|
@ -57,8 +57,8 @@ pub struct ExecutorBackendSettings<Membership> {
|
|||
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
|
||||
pub node_key: ed25519::SecretKey,
|
||||
/// Membership of DA network PoV set
|
||||
membership: Membership,
|
||||
node_addrs: HashMap<PeerId, Multiaddr>,
|
||||
pub membership: Membership,
|
||||
pub node_addrs: HashMap<PeerId, Multiaddr>,
|
||||
}
|
||||
|
||||
impl<Membership> ExecutorBackend<Membership> {
|
||||
|
|
|
@ -41,7 +41,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn add_cert<N, A, V, Item, Key>(
|
||||
pub async fn add_blob_info<N, A, Item, Key>(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
item: A::Payload,
|
||||
converter: impl Fn(&A::Payload) -> Key,
|
||||
|
|
|
@ -42,6 +42,10 @@ time = "0.3"
|
|||
name = "test_cryptarchia_happy_path"
|
||||
path = "src/tests/cryptarchia/happy.rs"
|
||||
|
||||
[[test]]
|
||||
name = "test_cli"
|
||||
path = "src/tests/cli.rs"
|
||||
|
||||
[features]
|
||||
mixnet = ["nomos-network/mixnet"]
|
||||
metrics = ["nomos-node/metrics"]
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
pub mod nodes;
|
||||
// pub use nodes::NomosNode;
|
||||
pub use nodes::NomosNode;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use std::env;
|
||||
|
|
|
@ -0,0 +1,125 @@
|
|||
use nomos_cli::cmds::disseminate::Disseminate;
|
||||
use nomos_cli::da::network::backend::ExecutorBackend;
|
||||
use nomos_cli::da::network::backend::ExecutorBackendSettings;
|
||||
use nomos_da_network_service::NetworkConfig;
|
||||
use nomos_libp2p::ed25519;
|
||||
use nomos_libp2p::libp2p;
|
||||
use nomos_libp2p::libp2p::multiaddr::multiaddr;
|
||||
use nomos_libp2p::Multiaddr;
|
||||
use nomos_libp2p::PeerId;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use subnetworks_assignations::versions::v1::FillFromNodeList;
|
||||
use tempfile::NamedTempFile;
|
||||
use tests::nodes::NomosNode;
|
||||
use tests::Node;
|
||||
use tests::SpawnConfig;
|
||||
|
||||
const CLI_BIN: &str = "../target/debug/nomos-cli";
|
||||
|
||||
use std::process::Command;
|
||||
|
||||
fn run_disseminate(disseminate: &Disseminate) {
|
||||
let mut binding = Command::new(CLI_BIN);
|
||||
let c = binding
|
||||
.args(["disseminate", "--network-config"])
|
||||
.arg(disseminate.network_config.as_os_str())
|
||||
.arg("--app-id")
|
||||
.arg(&disseminate.app_id)
|
||||
.arg("--index")
|
||||
.arg(disseminate.index.to_string())
|
||||
.arg("--columns")
|
||||
.arg(disseminate.columns.to_string())
|
||||
.arg("--node-addr")
|
||||
.arg(disseminate.node_addr.as_ref().unwrap().as_str());
|
||||
|
||||
match (&disseminate.data, &disseminate.file) {
|
||||
(Some(data), None) => c.args(["--data", &data]),
|
||||
(None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]),
|
||||
(_, _) => panic!("Either data or file needs to be provided, but not both"),
|
||||
};
|
||||
|
||||
c.status().expect("failed to execute nomos cli");
|
||||
}
|
||||
|
||||
async fn disseminate(config: &mut Disseminate) {
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2)).await;
|
||||
|
||||
let node_addrs: HashMap<PeerId, Multiaddr> = nodes
|
||||
.iter()
|
||||
.map(|n| {
|
||||
let libp2p_config = &n.config().network.backend.inner;
|
||||
let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from(
|
||||
libp2p_config.node_key.clone(),
|
||||
));
|
||||
let peer_id = PeerId::from(keypair.public());
|
||||
let address = multiaddr!(Ip4(libp2p_config.host), Udp(libp2p_config.port), QuicV1);
|
||||
(peer_id, address)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let peer_ids: Vec<nomos_libp2p::PeerId> = node_addrs.keys().cloned().collect();
|
||||
|
||||
let da_network_config: NetworkConfig<ExecutorBackend<FillFromNodeList>> = NetworkConfig {
|
||||
backend: ExecutorBackendSettings {
|
||||
node_key: ed25519::SecretKey::generate(),
|
||||
membership: FillFromNodeList::new(&peer_ids, 2, 1),
|
||||
node_addrs,
|
||||
},
|
||||
};
|
||||
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
let config_path = file.path().to_owned();
|
||||
serde_yaml::to_writer(&mut file, &da_network_config).unwrap();
|
||||
|
||||
config.timeout = 20;
|
||||
config.network_config = config_path;
|
||||
config.node_addr = Some(
|
||||
format!(
|
||||
"http://{}",
|
||||
nodes[0].config().http.backend_settings.address.clone()
|
||||
)
|
||||
.parse()
|
||||
.unwrap(),
|
||||
);
|
||||
config.app_id = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715".to_string();
|
||||
config.index = 0;
|
||||
config.columns = 32;
|
||||
|
||||
run_disseminate(&config);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disseminate_blob() {
|
||||
let mut config = Disseminate {
|
||||
data: Some("hello world".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
disseminate(&mut config).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disseminate_big_blob() {
|
||||
const MSG_SIZE: usize = 1024;
|
||||
let mut config = Disseminate {
|
||||
data: std::iter::repeat(String::from("X"))
|
||||
.take(MSG_SIZE)
|
||||
.collect::<Vec<_>>()
|
||||
.join("")
|
||||
.into(),
|
||||
..Default::default()
|
||||
};
|
||||
disseminate(&mut config).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disseminate_blob_from_file() {
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
file.write_all("hello world".as_bytes()).unwrap();
|
||||
|
||||
let mut config = Disseminate {
|
||||
file: Some(file.path().to_path_buf()),
|
||||
..Default::default()
|
||||
};
|
||||
disseminate(&mut config).await;
|
||||
}
|
Loading…
Reference in New Issue