From a487d8c7a04c0cb058c8f276f5ab0b9b35d0c508 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Mon, 30 Oct 2023 16:19:25 +0100 Subject: [PATCH] Add DA utilities to nomos-cli (#493) * Move da disseminate command into separate module * Add utilities to retrieve block contents * fix tests * fix typo --- nomos-cli/Cargo.toml | 6 +- nomos-cli/src/cmds/disseminate/mod.rs | 4 +- nomos-cli/src/da/disseminate.rs | 251 +++++++++++++++++++++++++ nomos-cli/src/da/mod.rs | 252 +------------------------- nomos-cli/src/da/retrieve.rs | 56 ++++++ tests/src/tests/cli.rs | 2 +- 6 files changed, 318 insertions(+), 253 deletions(-) create mode 100644 nomos-cli/src/da/disseminate.rs create mode 100644 nomos-cli/src/da/retrieve.rs diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index 2fb2b4da..8e9bef1e 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -12,6 +12,7 @@ tracing = "0.1" tracing-subscriber = "0.3" async-trait = "0.1" clap = {version = "4", features = ["derive"] } +consensus-engine = { path = "../consensus-engine" } serde_yaml = "0.9" futures = "0.3" tokio = { version = "1", features = ["sync"] } @@ -21,6 +22,9 @@ nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } +nomos-node = { path = "../nodes/nomos-node" } full-replication = { path = "../nomos-da/full-replication" } reqwest = "0.11" -serde = { version = "1.0", features = ["derive"] } \ No newline at end of file +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" \ No newline at end of file diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index f4f06c28..adee157c 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -1,4 +1,6 @@ -use crate::da::{DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings}; +use crate::da::disseminate::{ + DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, +}; use clap::Args; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs new file mode 100644 index 00000000..d9231d69 --- /dev/null +++ b/nomos-cli/src/da/disseminate.rs @@ -0,0 +1,251 @@ +use clap::{Args, ValueEnum}; +use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication}; +use futures::StreamExt; +use nomos_core::{da::DaProtocol, wire}; +use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; +use nomos_network::{backends::libp2p::Libp2p, NetworkService}; +use overwatch_derive::*; +use overwatch_rs::{ + services::{ + handle::{ServiceHandle, ServiceStateHandle}, + relay::NoMessage, + state::*, + ServiceCore, ServiceData, ServiceId, + }, + DynError, +}; +use reqwest::{Client, Url}; +use serde::Serialize; +use std::{ + path::PathBuf, + sync::{mpsc::Sender, Arc}, + time::Duration, +}; +use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; + +const NODE_CERT_PATH: &str = "mempool-da/add"; + +pub async fn disseminate_and_wait( + mut da: D, + data: Box<[u8]>, + adapter: N, + status_updates: Sender, + node_addr: Option<&Url>, + output: Option<&PathBuf>, +) -> Result<(), Box> +where + D: DaProtocol, + N: NetworkAdapter + Send + Sync, + C: Serialize, +{ + // 1) Building blob + status_updates.send(Status::Encoding)?; + let blobs = da.encode(data); + + // 2) Send blob to network + status_updates.send(Status::Disseminating)?; + futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))) + .await + .map_err(|e| e as Box)?; + + // 3) Collect attestations and create proof + status_updates.send(Status::WaitingAttestations)?; + let mut attestations = adapter.attestation_stream().await; + let cert: C = loop { + da.recv_attestation(attestations.next().await.unwrap()); + + if let Some(certificate) = da.certify_dispersal() { + status_updates.send(Status::CreatingCert)?; + break certificate; + } + }; + + if let Some(output) = output { + status_updates.send(Status::SavingCert)?; + std::fs::write(output, wire::serialize(&cert)?)?; + } + + if let Some(node) = node_addr { + status_updates.send(Status::SendingCert)?; + let client = Client::new(); + let res = client + .post(node.join(NODE_CERT_PATH).unwrap()) + .body(wire::serialize(&cert).unwrap()) + .send() + .await?; + + tracing::info!("Response: {:?}", res); + if !res.status().is_success() { + tracing::error!("ERROR: {:?}", res); + return Err(format!("Failed to send certificate to node: {}", res.status()).into()); + } + } + + status_updates.send(Status::Done)?; + Ok(()) +} + +pub enum Status { + Encoding, + Disseminating, + WaitingAttestations, + CreatingCert, + SavingCert, + SendingCert, + Done, +} + +impl Status { + pub fn display(&self) -> &str { + match self { + Self::Encoding => "Encoding message into blob(s)", + Self::Disseminating => "Sending blob(s) to the network", + Self::WaitingAttestations => "Waiting for attestations", + Self::CreatingCert => "Creating certificate", + Self::SavingCert => "Saving certificate to file", + Self::SendingCert => "Sending certificate to node", + Self::Done => "", + } + } +} + +// To interact with the network service it's easier to just spawn +// an overwatch app +#[derive(Services)] +pub struct DisseminateApp { + network: ServiceHandle>, + send_blob: ServiceHandle, +} + +#[derive(Clone, Debug)] +pub struct Settings { + // This is wrapped in an Arc just to make the struct Clone + pub payload: Arc>>>, + pub timeout: Duration, + pub da_protocol: DaProtocolChoice, + pub status_updates: Sender, + pub node_addr: Option, + pub output: Option, +} + +pub struct DisseminateService { + service_state: ServiceStateHandle, +} + +impl ServiceData for DisseminateService { + const SERVICE_ID: ServiceId = "Disseminate"; + type Settings = Settings; + type State = NoState; + type StateOperator = NoOperator; + type Message = NoMessage; +} + +#[async_trait::async_trait] +impl ServiceCore for DisseminateService { + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { service_state }) + } + + async fn run(self) -> Result<(), DynError> { + let Self { service_state } = self; + let Settings { + payload, + timeout, + da_protocol, + status_updates, + node_addr, + output, + } = service_state.settings_reader.get_updated_settings(); + + let da_protocol: FullReplication<_> = da_protocol.try_into()?; + + let network_relay = service_state + .overwatch_handle + .relay::>() + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + while let Some(data) = payload.lock().await.recv().await { + match tokio::time::timeout( + timeout, + disseminate_and_wait( + da_protocol.clone(), + data, + Libp2pAdapter::new(network_relay.clone()).await, + status_updates.clone(), + node_addr.as_ref(), + output.as_ref(), + ), + ) + .await + { + Err(_) => { + tracing::error!("Timeout reached, check the logs for additional details"); + std::process::exit(1); + } + Ok(Err(_)) => { + tracing::error!( + "Could not disseminate blob, check logs for additional details" + ); + std::process::exit(1); + } + _ => {} + } + } + + service_state.overwatch_handle.shutdown().await; + Ok(()) + } +} + +// This format is for clap args convenience, I could not +// find a way to use enums directly without having to implement +// parsing by hand. +// The `settings` field will hold the settings for all possible +// protocols, but only the one chosen will be used. +// We can enforce only sensible combinations of protocol/settings +// are specified by using special clap directives +#[derive(Clone, Debug, Args)] +pub struct DaProtocolChoice { + #[clap(long, default_value = "full-replication")] + pub da_protocol: Protocol, + #[clap(flatten)] + pub settings: ProtocolSettings, +} + +impl TryFrom for FullReplication> { + type Error = &'static str; + fn try_from(value: DaProtocolChoice) -> Result { + match (value.da_protocol, value.settings) { + (Protocol::FullReplication, ProtocolSettings { full_replication }) => Ok( + FullReplication::new(AbsoluteNumber::new(full_replication.num_attestations)), + ), + } + } +} + +#[derive(Clone, Debug, Args)] +pub struct ProtocolSettings { + #[clap(flatten)] + pub full_replication: FullReplicationSettings, +} + +#[derive(Clone, Debug, ValueEnum)] +pub enum Protocol { + FullReplication, +} + +impl Default for FullReplicationSettings { + fn default() -> Self { + Self { + num_attestations: 1, + } + } +} + +#[derive(Debug, Clone, Args)] +pub struct FullReplicationSettings { + #[clap(long, default_value = "1")] + pub num_attestations: usize, +} diff --git a/nomos-cli/src/da/mod.rs b/nomos-cli/src/da/mod.rs index 063f8d9c..c6d80d66 100644 --- a/nomos-cli/src/da/mod.rs +++ b/nomos-cli/src/da/mod.rs @@ -1,250 +1,2 @@ -use clap::{Args, ValueEnum}; -use full_replication::{AbsoluteNumber, FullReplication}; -use futures::StreamExt; -use nomos_core::{da::DaProtocol, wire}; -use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; -use nomos_network::{backends::libp2p::Libp2p, NetworkService}; -use overwatch_derive::*; -use overwatch_rs::{ - services::{ - handle::{ServiceHandle, ServiceStateHandle}, - relay::NoMessage, - state::*, - ServiceCore, ServiceData, ServiceId, - }, - DynError, -}; -use reqwest::{Client, Url}; -use serde::Serialize; -use std::{ - path::PathBuf, - sync::{mpsc::Sender, Arc}, - time::Duration, -}; -use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; - -const NODE_CERT_PATH: &str = "mempool-da/add"; - -pub async fn disseminate_and_wait( - mut da: D, - data: Box<[u8]>, - adapter: N, - status_updates: Sender, - node_addr: Option<&Url>, - output: Option<&PathBuf>, -) -> Result<(), Box> -where - D: DaProtocol, - N: NetworkAdapter + Send + Sync, - C: Serialize, -{ - // 1) Building blob - status_updates.send(Status::Encoding)?; - let blobs = da.encode(data); - - // 2) Send blob to network - status_updates.send(Status::Disseminating)?; - futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))) - .await - .map_err(|e| e as Box)?; - - // 3) Collect attestations and create proof - status_updates.send(Status::WaitingAttestations)?; - let mut attestations = adapter.attestation_stream().await; - let cert: C = loop { - da.recv_attestation(attestations.next().await.unwrap()); - - if let Some(certificate) = da.certify_dispersal() { - status_updates.send(Status::CreatingCert)?; - break certificate; - } - }; - - if let Some(output) = output { - status_updates.send(Status::SavingCert)?; - std::fs::write(output, wire::serialize(&cert)?)?; - } - - if let Some(node) = node_addr { - status_updates.send(Status::SendingCert)?; - let client = Client::new(); - let res = client - .post(node.join(NODE_CERT_PATH).unwrap()) - .body(wire::serialize(&cert).unwrap()) - .send() - .await?; - - tracing::info!("Response: {:?}", res); - if !res.status().is_success() { - tracing::error!("ERROR: {:?}", res); - return Err(format!("Failed to send certificate to node: {}", res.status()).into()); - } - } - - status_updates.send(Status::Done)?; - Ok(()) -} - -pub enum Status { - Encoding, - Disseminating, - WaitingAttestations, - CreatingCert, - SavingCert, - SendingCert, - Done, -} - -impl Status { - pub fn display(&self) -> &str { - match self { - Self::Encoding => "Encoding message into blob(s)", - Self::Disseminating => "Sending blob(s) to the network", - Self::WaitingAttestations => "Waiting for attestations", - Self::CreatingCert => "Creating certificate", - Self::SavingCert => "Saving certificate to file", - Self::SendingCert => "Sending certificate to node", - Self::Done => "", - } - } -} - -// To interact with the network service it's easier to just spawn -// an overwatch app -#[derive(Services)] -pub struct DisseminateApp { - network: ServiceHandle>, - send_blob: ServiceHandle, -} - -#[derive(Clone, Debug)] -pub struct Settings { - // This is wrapped in an Arc just to make the struct Clone - pub payload: Arc>>>, - pub timeout: Duration, - pub da_protocol: DaProtocolChoice, - pub status_updates: Sender, - pub node_addr: Option, - pub output: Option, -} - -pub struct DisseminateService { - service_state: ServiceStateHandle, -} - -impl ServiceData for DisseminateService { - const SERVICE_ID: ServiceId = "Disseminate"; - type Settings = Settings; - type State = NoState; - type StateOperator = NoOperator; - type Message = NoMessage; -} - -#[async_trait::async_trait] -impl ServiceCore for DisseminateService { - fn init(service_state: ServiceStateHandle) -> Result { - Ok(Self { service_state }) - } - - async fn run(self) -> Result<(), DynError> { - let Self { service_state } = self; - let Settings { - payload, - timeout, - da_protocol, - status_updates, - node_addr, - output, - } = service_state.settings_reader.get_updated_settings(); - - match da_protocol { - DaProtocolChoice { - da_protocol: Protocol::FullReplication, - settings: - ProtocolSettings { - full_replication: da_settings, - }, - } => { - let network_relay = service_state - .overwatch_handle - .relay::>() - .connect() - .await - .expect("Relay connection with NetworkService should succeed"); - - while let Some(data) = payload.lock().await.recv().await { - match tokio::time::timeout( - timeout, - disseminate_and_wait( - FullReplication::new(AbsoluteNumber::new(da_settings.num_attestations)), - data, - Libp2pAdapter::new(network_relay.clone()).await, - status_updates.clone(), - node_addr.as_ref(), - output.as_ref(), - ), - ) - .await - { - Err(_) => { - tracing::error!( - "Timeout reached, check the logs for additional details" - ); - std::process::exit(1); - } - Ok(Err(_)) => { - tracing::error!( - "Could not disseminate blob, check logs for additional details" - ); - std::process::exit(1); - } - _ => {} - } - } - } - } - - service_state.overwatch_handle.shutdown().await; - Ok(()) - } -} - -// This format is for clap args convenience, I could not -// find a way to use enums directly without having to implement -// parsing by hand. -// The `settings` field will hold the settings for all possible -// protocols, but only the one chosen will be used. -// We can enforce only sensible combinations of protocol/settings -// are specified by using special clap directives -#[derive(Clone, Debug, Args)] -pub struct DaProtocolChoice { - #[clap(long, default_value = "full-replication")] - pub da_protocol: Protocol, - #[clap(flatten)] - pub settings: ProtocolSettings, -} - -#[derive(Clone, Debug, Args)] -pub struct ProtocolSettings { - #[clap(flatten)] - pub full_replication: FullReplicationSettings, -} - -#[derive(Clone, Debug, ValueEnum)] -pub enum Protocol { - FullReplication, -} - -impl Default for FullReplicationSettings { - fn default() -> Self { - Self { - num_attestations: 1, - } - } -} - -#[derive(Debug, Clone, Args)] -pub struct FullReplicationSettings { - #[clap(long, default_value = "1")] - pub num_attestations: usize, -} +pub mod disseminate; +pub mod retrieve; diff --git a/nomos-cli/src/da/retrieve.rs b/nomos-cli/src/da/retrieve.rs new file mode 100644 index 00000000..89649f07 --- /dev/null +++ b/nomos-cli/src/da/retrieve.rs @@ -0,0 +1,56 @@ +use consensus_engine::BlockId; +use full_replication::{Blob, Certificate}; +use nomos_core::{ + block::Block, + da::{blob, certificate::Certificate as _}, +}; +use nomos_node::Tx; +use reqwest::Url; +use thiserror::Error; + +const BLOCK_PATH: &str = "storage/block"; +const BLOBS_PATH: &str = "da/blobs"; + +#[derive(Error, Debug)] +pub enum Error { + #[error(transparent)] + Reqwest(#[from] reqwest::Error), + #[error("Block not found")] + NotFound, +} + +/// Return the blobs whose certificate has been included in the provided block. +pub async fn get_block_blobs(node: Url, block: BlockId) -> Result, Error> { + let block = get_block_contents(&node, block) + .await? + .ok_or(Error::NotFound)?; + + Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?) +} + +pub async fn get_block_contents( + node: &Url, + block: BlockId, +) -> Result>, reqwest::Error> { + let block = reqwest::Client::new() + .post(node.join(BLOCK_PATH).unwrap()) + .body(serde_json::to_string(&block).unwrap()) + .send() + .await? + .json() + .await?; + Ok(block) +} + +pub async fn get_blobs( + node: Url, + ids: Vec<::Hash>, +) -> Result, reqwest::Error> { + reqwest::Client::new() + .post(node.join(BLOBS_PATH).unwrap()) + .body(serde_json::to_string(&ids).unwrap()) + .send() + .await? + .json() + .await +} diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 35b4fa3a..7e5828be 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -1,7 +1,7 @@ use fraction::{Fraction, One}; use nomos_cli::{ cmds::{disseminate::Disseminate, Command}, - da::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings}, + da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings}, }; use std::time::Duration; use tempfile::NamedTempFile;