Send certificate to node after dissemination + tests (#450)

* Send DA certificate to node after dissemination

* rename mempool endpoints

* Check certificate inclusion in tests

* rename endpoint

* Rename addcert and addtx to add

* tweak test condition

* add option to save certificate to file

* move thread join

* remove fancy prints
This commit is contained in:
Giacomo Pasini 2023-10-04 13:37:13 +02:00 committed by GitHub
parent 29b963fa29
commit 58686b2a04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 107 additions and 41 deletions

View File

@ -166,7 +166,7 @@ where
MempoolService<A, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>, MempoolService<A, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>,
AxumBackend, AxumBackend,
_, _,
>(handle.clone(), HttpMethod::POST, "addtx") >(handle.clone(), HttpMethod::POST, "add")
.await .await
.unwrap(); .unwrap();
@ -206,9 +206,7 @@ where
AxumBackend, AxumBackend,
_, _,
>( >(
handle.clone(), handle.clone(), HttpMethod::POST, "add"
HttpMethod::POST,
"addcert",
) )
.await .await
.unwrap(); .unwrap();

View File

@ -12,7 +12,6 @@ tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
async-trait = "0.1" async-trait = "0.1"
clap = {version = "4", features = ["derive"] } clap = {version = "4", features = ["derive"] }
crossterm = "0.27"
serde_yaml = "0.9" serde_yaml = "0.9"
futures = "0.3" futures = "0.3"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
@ -22,4 +21,5 @@ nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
nomos-libp2p = { path = "../nomos-libp2p"} nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" } nomos-core = { path = "../nomos-core" }
full-replication = { path = "../nomos-da/full-replication" } full-replication = { path = "../nomos-da/full-replication" }
reqwest = "0.11"

View File

@ -1,8 +1,7 @@
use clap::{Args, ValueEnum}; use clap::{Args, ValueEnum};
use crossterm::execute;
use full_replication::{AbsoluteNumber, FullReplication}; use full_replication::{AbsoluteNumber, FullReplication};
use futures::StreamExt; use futures::StreamExt;
use nomos_core::da::DaProtocol; use nomos_core::{da::DaProtocol, wire};
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_derive::*; use overwatch_derive::*;
@ -16,8 +15,11 @@ use overwatch_rs::{
}, },
DynError, DynError,
}; };
use reqwest::{Client, Url};
use std::{path::PathBuf, time::Duration}; use std::{path::PathBuf, time::Duration};
const NODE_CERT_PATH: &str = "mempool-da/add";
#[derive(Args, Debug)] #[derive(Args, Debug)]
pub struct Disseminate { pub struct Disseminate {
// TODO: accept bytes // TODO: accept bytes
@ -32,6 +34,11 @@ pub struct Disseminate {
/// Timeout in seconds. Defaults to 120 seconds. /// Timeout in seconds. Defaults to 120 seconds.
#[clap(short, long, default_value = "120")] #[clap(short, long, default_value = "120")]
pub timeout: u64, pub timeout: u64,
/// Address of the node to send the certificate to
/// for block inclusion, if present.
pub node_addr: Option<Url>,
/// File to write the certificate to, if present.
pub output: Option<PathBuf>,
} }
#[derive(Debug, Clone, Args)] #[derive(Debug, Clone, Args)]
@ -55,6 +62,8 @@ impl Disseminate {
bytes: self.data.clone().as_bytes().into(), bytes: self.data.clone().as_bytes().into(),
timeout: Duration::from_secs(self.timeout), timeout: Duration::from_secs(self.timeout),
da_protocol: self.da_protocol.clone(), da_protocol: self.da_protocol.clone(),
node_addr: self.node_addr.clone(),
output: self.output.clone(),
}, },
}, },
None, None,
@ -65,26 +74,11 @@ impl Disseminate {
} }
} }
// Write '✓' at the end of the previous line in terminal
fn terminal_cmd_done() {
let go_to_previous_line = crossterm::cursor::MoveToPreviousLine(1);
let go_to_end_of_line =
crossterm::cursor::MoveToColumn(crossterm::terminal::size().unwrap().0 - 1);
let write_done = crossterm::style::Print("");
execute!(
std::io::stdout(),
go_to_previous_line,
go_to_end_of_line,
write_done
)
.unwrap()
}
async fn disseminate<D, B, N, A, C>( async fn disseminate<D, B, N, A, C>(
mut da: D, mut da: D,
data: Box<[u8]>, data: Box<[u8]>,
adapter: N, adapter: N,
) -> Result<C, Box<dyn std::error::Error>> ) -> Result<C, Box<dyn std::error::Error + Send + Sync>>
where where
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>, D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync, N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
@ -92,14 +86,10 @@ where
// 1) Building blob // 1) Building blob
tracing::info!("Building blobs..."); tracing::info!("Building blobs...");
let blobs = da.encode(data); let blobs = da.encode(data);
terminal_cmd_done();
// 2) Send blob to network // 2) Send blob to network
tracing::info!("Sending blobs to network..."); tracing::info!("Sending blobs to network...");
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))) futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))).await?;
.await
.map_err(|e| e as Box<dyn std::error::Error>)?;
terminal_cmd_done();
// 3) Collect attestations and create proof // 3) Collect attestations and create proof
tracing::info!("Collecting attestations to create proof..."); tracing::info!("Collecting attestations to create proof...");
@ -108,7 +98,6 @@ where
da.recv_attestation(attestations.next().await.unwrap()); da.recv_attestation(attestations.next().await.unwrap());
if let Some(cert) = da.certify_dispersal() { if let Some(cert) = da.certify_dispersal() {
terminal_cmd_done();
return Ok(cert); return Ok(cert);
} }
} }
@ -127,6 +116,8 @@ pub struct Settings {
bytes: Box<[u8]>, bytes: Box<[u8]>,
timeout: Duration, timeout: Duration,
da_protocol: DaProtocolChoice, da_protocol: DaProtocolChoice,
node_addr: Option<Url>,
output: Option<PathBuf>,
} }
pub struct DisseminateService { pub struct DisseminateService {
@ -150,6 +141,7 @@ impl ServiceCore for DisseminateService {
async fn run(self) -> Result<(), DynError> { async fn run(self) -> Result<(), DynError> {
let Self { service_state } = self; let Self { service_state } = self;
let settings = service_state.settings_reader.get_updated_settings(); let settings = service_state.settings_reader.get_updated_settings();
let node_addr = settings.node_addr.clone();
match settings.da_protocol { match settings.da_protocol {
DaProtocolChoice { DaProtocolChoice {
@ -185,7 +177,29 @@ impl ServiceCore for DisseminateService {
); );
std::process::exit(1); std::process::exit(1);
} }
_ => {} Ok(Ok(cert)) => {
if let Some(output) = settings.output {
tracing::info!("Writing certificate to file...");
std::fs::write(output, wire::serialize(&cert).unwrap())?;
}
if let Some(node_addr) = node_addr {
let client = Client::new();
tracing::info!("Sending certificate to node...");
let res = client
.post(node_addr.join(NODE_CERT_PATH).unwrap())
.body(wire::serialize(&cert).unwrap())
.send()
.await?;
if !res.status().is_success() {
tracing::error!(
"Failed to send certificate to node: {}",
res.status()
);
std::process::exit(1);
}
}
}
} }
} }
} }

View File

@ -115,11 +115,11 @@ pub trait Discriminant {
} }
impl Discriminant for Transaction { impl Discriminant for Transaction {
const ID: &'static str = "mempool-txs"; const ID: &'static str = "mempool-cl";
} }
impl Discriminant for Certificate { impl Discriminant for Certificate {
const ID: &'static str = "mempool-certs"; const ID: &'static str = "mempool-da";
} }
impl<N, P, D> ServiceData for MempoolService<N, P, D> impl<N, P, D> ServiceData for MempoolService<N, P, D>

View File

@ -28,6 +28,7 @@ reqwest = { version = "0.11", features = ["json"] }
nomos-libp2p = { path = "../nomos-libp2p" } nomos-libp2p = { path = "../nomos-libp2p" }
tempfile = "3.6" tempfile = "3.6"
serde_yaml = "0.9" serde_yaml = "0.9"
serde_json = "1.0"
tokio = "1" tokio = "1"
futures = "0.3" futures = "0.3"
async-trait = "0.1" async-trait = "0.1"
@ -51,8 +52,6 @@ path = "src/tests/mixnet.rs"
[[test]] [[test]]
name = "test_cli" name = "test_cli"
path = "src/tests/cli.rs" path = "src/tests/cli.rs"
# disseminate is only implemented for libp2p
required-features = ["libp2p"]
[[bench]] [[bench]]
name = "mixnet" name = "mixnet"

View File

@ -1,4 +1,4 @@
mod nodes; pub mod nodes;
use mixnet_node::MixnetNodeConfig; use mixnet_node::MixnetNodeConfig;
use mixnet_topology::MixnetTopology; use mixnet_topology::MixnetTopology;
pub use nodes::MixNode; pub use nodes::MixNode;

View File

@ -1,5 +1,5 @@
mod mixnode; mod mixnode;
mod nomos; pub mod nomos;
pub use self::mixnode::MixNode; pub use self::mixnode::MixNode;
pub use nomos::NomosNode; pub use nomos::NomosNode;

View File

@ -13,6 +13,7 @@ use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_http::backends::axum::AxumBackendSettings; use nomos_http::backends::axum::AxumBackendSettings;
use nomos_libp2p::Multiaddr; use nomos_libp2p::Multiaddr;
use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo}; use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo};
use nomos_network::NetworkConfig; use nomos_network::NetworkConfig;
use nomos_node::Config; use nomos_node::Config;
@ -27,6 +28,7 @@ static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node"; const NOMOS_BIN: &str = "../target/debug/nomos-node";
const CARNOT_INFO_API: &str = "carnot/info"; const CARNOT_INFO_API: &str = "carnot/info";
const NETWORK_INFO_API: &str = "network/info"; const NETWORK_INFO_API: &str = "network/info";
const MEMPOOL_API: &str = "mempool-";
const LOGS_PREFIX: &str = "__logs"; const LOGS_PREFIX: &str = "__logs";
pub struct NomosNode { pub struct NomosNode {
@ -110,6 +112,25 @@ impl NomosNode {
.swap_remove(0) .swap_remove(0)
} }
pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics {
let discr = match pool {
Pool::Cl => "cl",
Pool::Da => "da",
};
let addr = format!("{}{}/metrics", MEMPOOL_API, discr);
let res = self
.get(&addr)
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
MempoolMetrics {
pending_items: res["pending_items"].as_u64().unwrap() as usize,
last_item_timestamp: res["last_item"].as_u64().unwrap(),
}
}
// not async so that we can use this in `Drop` // not async so that we can use this in `Drop`
pub fn get_logs_from_file(&self) -> String { pub fn get_logs_from_file(&self) -> String {
println!( println!(
@ -264,3 +285,8 @@ fn create_node_config(
config config
} }
pub enum Pool {
Da,
Cl,
}

View File

@ -7,7 +7,9 @@ use nomos_cli::cmds::{
}; };
use std::time::Duration; use std::time::Duration;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tests::{MixNode, Node, NomosNode, SpawnConfig}; use tests::{nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig};
const TIMEOUT_SECS: u64 = 20;
#[tokio::test] #[tokio::test]
async fn disseminate_blob() { async fn disseminate_blob() {
@ -40,9 +42,36 @@ async fn disseminate_blob() {
}, },
}, },
}, },
node_addr: Some(
format!("http://{}", nodes[0].config().http.backend.address.clone())
.parse()
.unwrap(),
),
output: None,
}); });
std::thread::spawn(move || cmd.run().unwrap()) let thread = std::thread::spawn(move || cmd.run().unwrap());
.join()
.unwrap(); tokio::time::timeout(
Duration::from_secs(TIMEOUT_SECS),
wait_for_cert_in_mempool(&nodes[0]),
)
.await
.unwrap();
thread.join().unwrap();
}
async fn wait_for_cert_in_mempool(node: &NomosNode) {
loop {
if node
.get_mempoool_metrics(Pool::Da)
.await
.last_item_timestamp
!= 0
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
} }