Disseminate app file input (#565)

* Accept file option in dissemination app

* File dissemination tests
This commit is contained in:
gusto 2024-01-23 18:17:05 +02:00 committed by GitHub
parent aa06080baa
commit f741590315
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 77 additions and 41 deletions

View File

@ -9,11 +9,11 @@ use reqwest::Url;
use std::{path::PathBuf, sync::Arc, time::Duration}; use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Args, Debug)] #[derive(Args, Debug, Default)]
pub struct Disseminate { pub struct Disseminate {
// TODO: accept bytes // TODO: accept bytes
#[clap(short, long)] #[clap(short, long)]
pub data: String, pub data: Option<String>,
/// Path to the network config file /// Path to the network config file
#[clap(short, long)] #[clap(short, long)]
pub network_config: PathBuf, pub network_config: PathBuf,
@ -30,6 +30,9 @@ pub struct Disseminate {
/// File to write the certificate to, if present. /// File to write the certificate to, if present.
#[clap(long)] #[clap(long)]
pub output: Option<PathBuf>, pub output: Option<PathBuf>,
/// File to disseminate
#[clap(short, long)]
pub file: Option<PathBuf>,
} }
impl Disseminate { impl Disseminate {
@ -41,7 +44,15 @@ impl Disseminate {
<NetworkService<Libp2p> as ServiceData>::Settings, <NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?; >(std::fs::File::open(&self.network_config)?)?;
let (status_updates, rx) = std::sync::mpsc::channel(); let (status_updates, rx) = std::sync::mpsc::channel();
let bytes: Box<[u8]> = self.data.clone().as_bytes().into(); let bytes: Box<[u8]> = match (&self.data, &self.file) {
(Some(data), None) => data.clone().as_bytes().into(),
(None, Some(file_path)) => {
let file_bytes = std::fs::read(file_path)?;
file_bytes.into_boxed_slice()
}
(Some(_), Some(_)) => return Err("Cannot specify both data and file".into()),
(None, None) => return Err("Either data or file must be specified".into()),
};
let timeout = Duration::from_secs(self.timeout); let timeout = Duration::from_secs(self.timeout);
let da_protocol = self.da_protocol.clone(); let da_protocol = self.da_protocol.clone();
let node_addr = self.node_addr.clone(); let node_addr = self.node_addr.clone();

View File

@ -205,7 +205,7 @@ impl ServiceCore for DisseminateService {
// protocols, but only the one chosen will be used. // protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings // We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives // are specified by using special clap directives
#[derive(Clone, Debug, Args)] #[derive(Clone, Debug, Args, Default)]
pub struct DaProtocolChoice { pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")] #[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol, pub da_protocol: Protocol,
@ -227,14 +227,15 @@ impl TryFrom<DaProtocolChoice> for FullReplication<AbsoluteNumber<Attestation, C
} }
} }
#[derive(Clone, Debug, Args)] #[derive(Clone, Debug, Args, Default)]
pub struct ProtocolSettings { pub struct ProtocolSettings {
#[clap(flatten)] #[clap(flatten)]
pub full_replication: FullReplicationSettings, pub full_replication: FullReplicationSettings,
} }
#[derive(Clone, Debug, ValueEnum)] #[derive(Clone, Debug, ValueEnum, Default)]
pub enum Protocol { pub enum Protocol {
#[default]
FullReplication, FullReplication,
} }

View File

@ -1,14 +1,11 @@
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication}; use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
use nomos_cli::{ use nomos_cli::{
api::da::get_blobs, api::da::get_blobs,
cmds::disseminate::{self, Disseminate}, cmds::disseminate::Disseminate,
da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings}, da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
}; };
use nomos_core::da::{blob::Blob as _, DaProtocol}; use nomos_core::da::{blob::Blob as _, DaProtocol};
use std::{ use std::{io::Write, time::Duration};
path::{self, PathBuf},
time::Duration,
};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tests::{ use tests::{
adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig, adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig,
@ -21,17 +18,23 @@ use std::process::Command;
const TIMEOUT_SECS: u64 = 20; const TIMEOUT_SECS: u64 = 20;
fn run_disseminate(disseminate: &Disseminate) { fn run_disseminate(disseminate: &Disseminate) {
Command::new(CLI_BIN) let mut binding = Command::new(CLI_BIN);
let c = binding
.args(["disseminate", "--network-config"]) .args(["disseminate", "--network-config"])
.arg(disseminate.network_config.as_os_str()) .arg(disseminate.network_config.as_os_str())
.args(["--data", &disseminate.data])
.arg("--node-addr") .arg("--node-addr")
.arg(disseminate.node_addr.as_ref().unwrap().as_str()) .arg(disseminate.node_addr.as_ref().unwrap().as_str());
.status()
.expect("failed to execute nomos cli"); match (&disseminate.data, &disseminate.file) {
(Some(data), None) => c.args(["--data", &data]),
(None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]),
(_, _) => panic!("Either data or file needs to be provided, but not both"),
};
c.status().expect("failed to execute nomos cli");
} }
async fn disseminate(data: String) { async fn disseminate(config: &mut Disseminate) {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await; let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await; let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
@ -58,21 +61,18 @@ async fn disseminate(data: String) {
let da = let da =
<FullReplication<AbsoluteNumber<Attestation, Certificate>>>::try_from(da_protocol.clone()) <FullReplication<AbsoluteNumber<Attestation, Certificate>>>::try_from(da_protocol.clone())
.unwrap(); .unwrap();
let config = Disseminate {
data: data.clone(), config.timeout = 20;
timeout: 20, config.network_config = config_path;
network_config: config_path, config.da_protocol = da_protocol;
da_protocol, config.node_addr = Some(
node_addr: Some(
format!( format!(
"http://{}", "http://{}",
nodes[0].config().http.backend_settings.address.clone() nodes[0].config().http.backend_settings.address.clone()
) )
.parse() .parse()
.unwrap(), .unwrap(),
), );
output: None,
};
run_disseminate(&config); run_disseminate(&config);
// let thread = std::thread::spawn(move || cmd.run().unwrap()); // let thread = std::thread::spawn(move || cmd.run().unwrap());
@ -84,29 +84,53 @@ async fn disseminate(data: String) {
.await .await
.unwrap(); .unwrap();
let blob = da.encode(data.as_bytes().to_vec())[0].hash(); let (blob, bytes) = if let Some(data) = &config.data {
let bytes = data.as_bytes().to_vec();
(da.encode(bytes.clone())[0].hash(), bytes)
} else {
let bytes = std::fs::read(&config.file.as_ref().unwrap()).unwrap();
(da.encode(bytes.clone())[0].hash(), bytes)
};
assert_eq!( assert_eq!(
get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(), get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(),
data.as_bytes() bytes.clone()
); );
} }
#[tokio::test] #[tokio::test]
async fn disseminate_blob() { async fn disseminate_blob() {
disseminate("hello world".to_string()).await; let mut config = Disseminate {
data: Some("hello world".to_string()),
..Default::default()
};
disseminate(&mut config).await;
} }
#[tokio::test] #[tokio::test]
async fn disseminate_big_blob() { async fn disseminate_big_blob() {
const MSG_SIZE: usize = 1024; const MSG_SIZE: usize = 1024;
disseminate( let mut config = Disseminate {
std::iter::repeat(String::from("X")) data: std::iter::repeat(String::from("X"))
.take(MSG_SIZE) .take(MSG_SIZE)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(""), .join("")
) .into(),
.await; ..Default::default()
};
disseminate(&mut config).await;
}
#[tokio::test]
async fn disseminate_blob_from_file() {
let mut file = NamedTempFile::new().unwrap();
file.write_all("hello world".as_bytes()).unwrap();
let mut config = Disseminate {
file: Some(file.path().to_path_buf()),
..Default::default()
};
disseminate(&mut config).await;
} }
async fn wait_for_cert_in_mempool(node: &NomosNode) { async fn wait_for_cert_in_mempool(node: &NomosNode) {