Add test for big blob dissemination (#563)
* Add tests for dissemination of big blob * Warn if a requested blob was not returned
This commit is contained in:
parent
aeaf13fc88
commit
16c97c07ec
@ -20,5 +20,16 @@ pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result<Vec<Blob>, E
|
|||||||
.await?
|
.await?
|
||||||
.ok_or(Error::NotFound)?;
|
.ok_or(Error::NotFound)?;
|
||||||
|
|
||||||
Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?)
|
let blobs = block.blobs().map(|cert| cert.blob()).collect::<Vec<_>>();
|
||||||
|
|
||||||
|
if blobs.is_empty() {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
|
||||||
|
let n_blobs = blobs.len();
|
||||||
|
let resp = get_blobs(node, blobs).await?;
|
||||||
|
if resp.len() != n_blobs {
|
||||||
|
tracing::warn!("Only {}/{} blobs returned", resp.len(), n_blobs);
|
||||||
|
}
|
||||||
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@ use nomos_node::{api::AxumBackendSettings, Config, Tx};
|
|||||||
use fraction::Fraction;
|
use fraction::Fraction;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use reqwest::Client;
|
use reqwest::{Client, Url};
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
|
|
||||||
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
|
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
|
||||||
@ -115,6 +115,10 @@ impl NomosNode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn url(&self) -> Url {
|
||||||
|
format!("http://{}", self.addr).parse().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
|
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
|
||||||
CLIENT
|
CLIENT
|
||||||
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
|
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
|
||||||
|
@ -1,17 +1,37 @@
|
|||||||
|
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication};
|
||||||
use nomos_cli::{
|
use nomos_cli::{
|
||||||
cmds::{disseminate::Disseminate, Command},
|
api::da::get_blobs,
|
||||||
|
cmds::disseminate::{self, Disseminate},
|
||||||
da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
|
da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
|
||||||
};
|
};
|
||||||
use std::time::Duration;
|
use nomos_core::da::{blob::Blob as _, DaProtocol};
|
||||||
|
use std::{
|
||||||
|
path::{self, PathBuf},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
use tests::{
|
use tests::{
|
||||||
adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig,
|
adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const CLI_BIN: &str = "../target/debug/nomos-cli";
|
||||||
|
|
||||||
|
use std::process::Command;
|
||||||
|
|
||||||
const TIMEOUT_SECS: u64 = 20;
|
const TIMEOUT_SECS: u64 = 20;
|
||||||
|
|
||||||
#[tokio::test]
|
fn run_disseminate(disseminate: &Disseminate) {
|
||||||
async fn disseminate_blob() {
|
Command::new(CLI_BIN)
|
||||||
|
.args(["disseminate", "--network-config"])
|
||||||
|
.arg(disseminate.network_config.as_os_str())
|
||||||
|
.args(["--data", &disseminate.data])
|
||||||
|
.arg("--node-addr")
|
||||||
|
.arg(disseminate.node_addr.as_ref().unwrap().as_str())
|
||||||
|
.status()
|
||||||
|
.expect("failed to execute nomos cli");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn disseminate(data: String) {
|
||||||
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
|
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
|
||||||
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
|
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
|
||||||
|
|
||||||
@ -25,19 +45,24 @@ async fn disseminate_blob() {
|
|||||||
let mut file = NamedTempFile::new().unwrap();
|
let mut file = NamedTempFile::new().unwrap();
|
||||||
let config_path = file.path().to_owned();
|
let config_path = file.path().to_owned();
|
||||||
serde_yaml::to_writer(&mut file, &network_config).unwrap();
|
serde_yaml::to_writer(&mut file, &network_config).unwrap();
|
||||||
let cmd = Command::Disseminate(Disseminate {
|
let da_protocol = DaProtocolChoice {
|
||||||
data: "Hello World".into(),
|
da_protocol: Protocol::FullReplication,
|
||||||
timeout: 20,
|
settings: ProtocolSettings {
|
||||||
network_config: config_path,
|
full_replication: FullReplicationSettings {
|
||||||
da_protocol: DaProtocolChoice {
|
voter: [0; 32],
|
||||||
da_protocol: Protocol::FullReplication,
|
num_attestations: 1,
|
||||||
settings: ProtocolSettings {
|
|
||||||
full_replication: FullReplicationSettings {
|
|
||||||
voter: [0; 32],
|
|
||||||
num_attestations: 1,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
let da =
|
||||||
|
<FullReplication<AbsoluteNumber<Attestation, Certificate>>>::try_from(da_protocol.clone())
|
||||||
|
.unwrap();
|
||||||
|
let config = Disseminate {
|
||||||
|
data: data.clone(),
|
||||||
|
timeout: 20,
|
||||||
|
network_config: config_path,
|
||||||
|
da_protocol,
|
||||||
node_addr: Some(
|
node_addr: Some(
|
||||||
format!(
|
format!(
|
||||||
"http://{}",
|
"http://{}",
|
||||||
@ -47,9 +72,10 @@ async fn disseminate_blob() {
|
|||||||
.unwrap(),
|
.unwrap(),
|
||||||
),
|
),
|
||||||
output: None,
|
output: None,
|
||||||
});
|
};
|
||||||
|
|
||||||
let thread = std::thread::spawn(move || cmd.run().unwrap());
|
run_disseminate(&config);
|
||||||
|
// let thread = std::thread::spawn(move || cmd.run().unwrap());
|
||||||
|
|
||||||
tokio::time::timeout(
|
tokio::time::timeout(
|
||||||
adjust_timeout(Duration::from_secs(TIMEOUT_SECS)),
|
adjust_timeout(Duration::from_secs(TIMEOUT_SECS)),
|
||||||
@ -58,7 +84,29 @@ async fn disseminate_blob() {
|
|||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
thread.join().unwrap();
|
let blob = da.encode(data.as_bytes().to_vec())[0].hash();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(),
|
||||||
|
data.as_bytes()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn disseminate_blob() {
|
||||||
|
disseminate("hello world".to_string()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn disseminate_big_blob() {
|
||||||
|
const MSG_SIZE: usize = 1024;
|
||||||
|
disseminate(
|
||||||
|
std::iter::repeat(String::from("X"))
|
||||||
|
.take(MSG_SIZE)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(""),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_cert_in_mempool(node: &NomosNode) {
|
async fn wait_for_cert_in_mempool(node: &NomosNode) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user