Add DA utilities to nomos-cli (#493)

* Move da disseminate command into separate module

* Add utilities to retrieve block contents

* fix tests

* fix typo
This commit is contained in:
Giacomo Pasini 2023-10-30 16:19:25 +01:00 committed by GitHub
parent eeb88b9430
commit a487d8c7a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 318 additions and 253 deletions

View File

@ -12,6 +12,7 @@ tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
async-trait = "0.1" async-trait = "0.1"
clap = {version = "4", features = ["derive"] } clap = {version = "4", features = ["derive"] }
consensus-engine = { path = "../consensus-engine" }
serde_yaml = "0.9" serde_yaml = "0.9"
futures = "0.3" futures = "0.3"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
@ -21,6 +22,9 @@ nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
nomos-libp2p = { path = "../nomos-libp2p"} nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" } nomos-core = { path = "../nomos-core" }
nomos-node = { path = "../nodes/nomos-node" }
full-replication = { path = "../nomos-da/full-replication" } full-replication = { path = "../nomos-da/full-replication" }
reqwest = "0.11" reqwest = "0.11"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"

View File

@ -1,4 +1,6 @@
use crate::da::{DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings}; use crate::da::disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings,
};
use clap::Args; use clap::Args;
use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};

View File

@ -0,0 +1,251 @@
use clap::{Args, ValueEnum};
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
use futures::StreamExt;
use nomos_core::{da::DaProtocol, wire};
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::NoMessage,
state::*,
ServiceCore, ServiceData, ServiceId,
},
DynError,
};
use reqwest::{Client, Url};
use serde::Serialize;
use std::{
path::PathBuf,
sync::{mpsc::Sender, Arc},
time::Duration,
};
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
const NODE_CERT_PATH: &str = "mempool-da/add";
pub async fn disseminate_and_wait<D, B, N, A, C>(
mut da: D,
data: Box<[u8]>,
adapter: N,
status_updates: Sender<Status>,
node_addr: Option<&Url>,
output: Option<&PathBuf>,
) -> Result<(), Box<dyn std::error::Error>>
where
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
C: Serialize,
{
// 1) Building blob
status_updates.send(Status::Encoding)?;
let blobs = da.encode(data);
// 2) Send blob to network
status_updates.send(Status::Disseminating)?;
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob)))
.await
.map_err(|e| e as Box<dyn std::error::Error>)?;
// 3) Collect attestations and create proof
status_updates.send(Status::WaitingAttestations)?;
let mut attestations = adapter.attestation_stream().await;
let cert: C = loop {
da.recv_attestation(attestations.next().await.unwrap());
if let Some(certificate) = da.certify_dispersal() {
status_updates.send(Status::CreatingCert)?;
break certificate;
}
};
if let Some(output) = output {
status_updates.send(Status::SavingCert)?;
std::fs::write(output, wire::serialize(&cert)?)?;
}
if let Some(node) = node_addr {
status_updates.send(Status::SendingCert)?;
let client = Client::new();
let res = client
.post(node.join(NODE_CERT_PATH).unwrap())
.body(wire::serialize(&cert).unwrap())
.send()
.await?;
tracing::info!("Response: {:?}", res);
if !res.status().is_success() {
tracing::error!("ERROR: {:?}", res);
return Err(format!("Failed to send certificate to node: {}", res.status()).into());
}
}
status_updates.send(Status::Done)?;
Ok(())
}
pub enum Status {
Encoding,
Disseminating,
WaitingAttestations,
CreatingCert,
SavingCert,
SendingCert,
Done,
}
impl Status {
pub fn display(&self) -> &str {
match self {
Self::Encoding => "Encoding message into blob(s)",
Self::Disseminating => "Sending blob(s) to the network",
Self::WaitingAttestations => "Waiting for attestations",
Self::CreatingCert => "Creating certificate",
Self::SavingCert => "Saving certificate to file",
Self::SendingCert => "Sending certificate to node",
Self::Done => "",
}
}
}
// To interact with the network service it's easier to just spawn
// an overwatch app
#[derive(Services)]
pub struct DisseminateApp {
network: ServiceHandle<NetworkService<Libp2p>>,
send_blob: ServiceHandle<DisseminateService>,
}
#[derive(Clone, Debug)]
pub struct Settings {
// This is wrapped in an Arc just to make the struct Clone
pub payload: Arc<Mutex<UnboundedReceiver<Box<[u8]>>>>,
pub timeout: Duration,
pub da_protocol: DaProtocolChoice,
pub status_updates: Sender<Status>,
pub node_addr: Option<Url>,
pub output: Option<std::path::PathBuf>,
}
pub struct DisseminateService {
service_state: ServiceStateHandle<Self>,
}
impl ServiceData for DisseminateService {
const SERVICE_ID: ServiceId = "Disseminate";
type Settings = Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for DisseminateService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
let Self { service_state } = self;
let Settings {
payload,
timeout,
da_protocol,
status_updates,
node_addr,
output,
} = service_state.settings_reader.get_updated_settings();
let da_protocol: FullReplication<_> = da_protocol.try_into()?;
let network_relay = service_state
.overwatch_handle
.relay::<NetworkService<Libp2p>>()
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
while let Some(data) = payload.lock().await.recv().await {
match tokio::time::timeout(
timeout,
disseminate_and_wait(
da_protocol.clone(),
data,
Libp2pAdapter::new(network_relay.clone()).await,
status_updates.clone(),
node_addr.as_ref(),
output.as_ref(),
),
)
.await
{
Err(_) => {
tracing::error!("Timeout reached, check the logs for additional details");
std::process::exit(1);
}
Ok(Err(_)) => {
tracing::error!(
"Could not disseminate blob, check logs for additional details"
);
std::process::exit(1);
}
_ => {}
}
}
service_state.overwatch_handle.shutdown().await;
Ok(())
}
}
// This format is for clap args convenience, I could not
// find a way to use enums directly without having to implement
// parsing by hand.
// The `settings` field will hold the settings for all possible
// protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives
#[derive(Clone, Debug, Args)]
pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol,
#[clap(flatten)]
pub settings: ProtocolSettings,
}
impl TryFrom<DaProtocolChoice> for FullReplication<AbsoluteNumber<Attestation, Certificate>> {
type Error = &'static str;
fn try_from(value: DaProtocolChoice) -> Result<Self, Self::Error> {
match (value.da_protocol, value.settings) {
(Protocol::FullReplication, ProtocolSettings { full_replication }) => Ok(
FullReplication::new(AbsoluteNumber::new(full_replication.num_attestations)),
),
}
}
}
#[derive(Clone, Debug, Args)]
pub struct ProtocolSettings {
#[clap(flatten)]
pub full_replication: FullReplicationSettings,
}
#[derive(Clone, Debug, ValueEnum)]
pub enum Protocol {
FullReplication,
}
impl Default for FullReplicationSettings {
fn default() -> Self {
Self {
num_attestations: 1,
}
}
}
#[derive(Debug, Clone, Args)]
pub struct FullReplicationSettings {
#[clap(long, default_value = "1")]
pub num_attestations: usize,
}

View File

@ -1,250 +1,2 @@
use clap::{Args, ValueEnum}; pub mod disseminate;
use full_replication::{AbsoluteNumber, FullReplication}; pub mod retrieve;
use futures::StreamExt;
use nomos_core::{da::DaProtocol, wire};
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::NoMessage,
state::*,
ServiceCore, ServiceData, ServiceId,
},
DynError,
};
use reqwest::{Client, Url};
use serde::Serialize;
use std::{
path::PathBuf,
sync::{mpsc::Sender, Arc},
time::Duration,
};
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
const NODE_CERT_PATH: &str = "mempool-da/add";
pub async fn disseminate_and_wait<D, B, N, A, C>(
mut da: D,
data: Box<[u8]>,
adapter: N,
status_updates: Sender<Status>,
node_addr: Option<&Url>,
output: Option<&PathBuf>,
) -> Result<(), Box<dyn std::error::Error>>
where
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
C: Serialize,
{
// 1) Building blob
status_updates.send(Status::Encoding)?;
let blobs = da.encode(data);
// 2) Send blob to network
status_updates.send(Status::Disseminating)?;
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob)))
.await
.map_err(|e| e as Box<dyn std::error::Error>)?;
// 3) Collect attestations and create proof
status_updates.send(Status::WaitingAttestations)?;
let mut attestations = adapter.attestation_stream().await;
let cert: C = loop {
da.recv_attestation(attestations.next().await.unwrap());
if let Some(certificate) = da.certify_dispersal() {
status_updates.send(Status::CreatingCert)?;
break certificate;
}
};
if let Some(output) = output {
status_updates.send(Status::SavingCert)?;
std::fs::write(output, wire::serialize(&cert)?)?;
}
if let Some(node) = node_addr {
status_updates.send(Status::SendingCert)?;
let client = Client::new();
let res = client
.post(node.join(NODE_CERT_PATH).unwrap())
.body(wire::serialize(&cert).unwrap())
.send()
.await?;
tracing::info!("Response: {:?}", res);
if !res.status().is_success() {
tracing::error!("ERROR: {:?}", res);
return Err(format!("Failed to send certificate to node: {}", res.status()).into());
}
}
status_updates.send(Status::Done)?;
Ok(())
}
pub enum Status {
Encoding,
Disseminating,
WaitingAttestations,
CreatingCert,
SavingCert,
SendingCert,
Done,
}
impl Status {
pub fn display(&self) -> &str {
match self {
Self::Encoding => "Encoding message into blob(s)",
Self::Disseminating => "Sending blob(s) to the network",
Self::WaitingAttestations => "Waiting for attestations",
Self::CreatingCert => "Creating certificate",
Self::SavingCert => "Saving certificate to file",
Self::SendingCert => "Sending certificate to node",
Self::Done => "",
}
}
}
// To interact with the network service it's easier to just spawn
// an overwatch app
#[derive(Services)]
pub struct DisseminateApp {
network: ServiceHandle<NetworkService<Libp2p>>,
send_blob: ServiceHandle<DisseminateService>,
}
#[derive(Clone, Debug)]
pub struct Settings {
// This is wrapped in an Arc just to make the struct Clone
pub payload: Arc<Mutex<UnboundedReceiver<Box<[u8]>>>>,
pub timeout: Duration,
pub da_protocol: DaProtocolChoice,
pub status_updates: Sender<Status>,
pub node_addr: Option<Url>,
pub output: Option<std::path::PathBuf>,
}
pub struct DisseminateService {
service_state: ServiceStateHandle<Self>,
}
impl ServiceData for DisseminateService {
const SERVICE_ID: ServiceId = "Disseminate";
type Settings = Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for DisseminateService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
let Self { service_state } = self;
let Settings {
payload,
timeout,
da_protocol,
status_updates,
node_addr,
output,
} = service_state.settings_reader.get_updated_settings();
match da_protocol {
DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings:
ProtocolSettings {
full_replication: da_settings,
},
} => {
let network_relay = service_state
.overwatch_handle
.relay::<NetworkService<Libp2p>>()
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
while let Some(data) = payload.lock().await.recv().await {
match tokio::time::timeout(
timeout,
disseminate_and_wait(
FullReplication::new(AbsoluteNumber::new(da_settings.num_attestations)),
data,
Libp2pAdapter::new(network_relay.clone()).await,
status_updates.clone(),
node_addr.as_ref(),
output.as_ref(),
),
)
.await
{
Err(_) => {
tracing::error!(
"Timeout reached, check the logs for additional details"
);
std::process::exit(1);
}
Ok(Err(_)) => {
tracing::error!(
"Could not disseminate blob, check logs for additional details"
);
std::process::exit(1);
}
_ => {}
}
}
}
}
service_state.overwatch_handle.shutdown().await;
Ok(())
}
}
// This format is for clap args convenience, I could not
// find a way to use enums directly without having to implement
// parsing by hand.
// The `settings` field will hold the settings for all possible
// protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives
#[derive(Clone, Debug, Args)]
pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol,
#[clap(flatten)]
pub settings: ProtocolSettings,
}
#[derive(Clone, Debug, Args)]
pub struct ProtocolSettings {
#[clap(flatten)]
pub full_replication: FullReplicationSettings,
}
#[derive(Clone, Debug, ValueEnum)]
pub enum Protocol {
FullReplication,
}
impl Default for FullReplicationSettings {
fn default() -> Self {
Self {
num_attestations: 1,
}
}
}
#[derive(Debug, Clone, Args)]
pub struct FullReplicationSettings {
#[clap(long, default_value = "1")]
pub num_attestations: usize,
}

View File

@ -0,0 +1,56 @@
use consensus_engine::BlockId;
use full_replication::{Blob, Certificate};
use nomos_core::{
block::Block,
da::{blob, certificate::Certificate as _},
};
use nomos_node::Tx;
use reqwest::Url;
use thiserror::Error;
const BLOCK_PATH: &str = "storage/block";
const BLOBS_PATH: &str = "da/blobs";
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error("Block not found")]
NotFound,
}
/// Return the blobs whose certificate has been included in the provided block.
pub async fn get_block_blobs(node: Url, block: BlockId) -> Result<Vec<Blob>, Error> {
let block = get_block_contents(&node, block)
.await?
.ok_or(Error::NotFound)?;
Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?)
}
pub async fn get_block_contents(
node: &Url,
block: BlockId,
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
let block = reqwest::Client::new()
.post(node.join(BLOCK_PATH).unwrap())
.body(serde_json::to_string(&block).unwrap())
.send()
.await?
.json()
.await?;
Ok(block)
}
pub async fn get_blobs(
node: Url,
ids: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Blob>, reqwest::Error> {
reqwest::Client::new()
.post(node.join(BLOBS_PATH).unwrap())
.body(serde_json::to_string(&ids).unwrap())
.send()
.await?
.json()
.await
}

View File

@ -1,7 +1,7 @@
use fraction::{Fraction, One}; use fraction::{Fraction, One};
use nomos_cli::{ use nomos_cli::{
cmds::{disseminate::Disseminate, Command}, cmds::{disseminate::Disseminate, Command},
da::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings}, da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
}; };
use std::time::Duration; use std::time::Duration;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;