Extract nomos-cli da stuff into shared module (#461)

This will be useful for other commands that might want to interact
with the network to perform tasks related to DA. One example is
the upcoming testnet demo.
This commit is contained in:
Giacomo Pasini 2023-10-23 11:34:20 +02:00 committed by GitHub
parent aa69eeca00
commit d528ebd2ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 295 additions and 214 deletions

View File

@ -22,4 +22,5 @@ nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"]
nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" }
full-replication = { path = "../nomos-da/full-replication" }
reqwest = "0.11"
reqwest = "0.11"
serde = { version = "1.0", features = ["derive"] }

View File

@ -1,24 +1,10 @@
use clap::{Args, ValueEnum};
use full_replication::{AbsoluteNumber, FullReplication};
use futures::StreamExt;
use nomos_core::{da::DaProtocol, wire};
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
use crate::da::{DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings};
use clap::Args;
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
overwatch::OverwatchRunner,
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::NoMessage,
state::*,
ServiceCore, ServiceData, ServiceId,
},
DynError,
};
use reqwest::{Client, Url};
use std::{path::PathBuf, time::Duration};
const NODE_CERT_PATH: &str = "mempool-da/add";
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
use reqwest::Url;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::Mutex;
#[derive(Args, Debug)]
pub struct Disseminate {
@ -36,17 +22,13 @@ pub struct Disseminate {
pub timeout: u64,
/// Address of the node to send the certificate to
/// for block inclusion, if present.
#[clap(long)]
pub node_addr: Option<Url>,
/// File to write the certificate to, if present.
#[clap(long)]
pub output: Option<PathBuf>,
}
#[derive(Debug, Clone, Args)]
pub struct FullReplicationSettings {
#[clap(long, default_value = "1")]
pub num_attestations: usize,
}
impl Disseminate {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new())
@ -55,190 +37,39 @@ impl Disseminate {
_,
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
bytes: self.data.clone().as_bytes().into(),
timeout: Duration::from_secs(self.timeout),
da_protocol: self.da_protocol.clone(),
node_addr: self.node_addr.clone(),
output: self.output.clone(),
},
},
None,
)
.unwrap()
.wait_finished();
Ok(())
}
}
async fn disseminate<D, B, N, A, C>(
mut da: D,
data: Box<[u8]>,
adapter: N,
) -> Result<C, Box<dyn std::error::Error + Send + Sync>>
where
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
{
// 1) Building blob
tracing::info!("Building blobs...");
let blobs = da.encode(data);
// 2) Send blob to network
tracing::info!("Sending blobs to network...");
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))).await?;
// 3) Collect attestations and create proof
tracing::info!("Collecting attestations to create proof...");
let mut attestations = adapter.attestation_stream().await;
loop {
da.recv_attestation(attestations.next().await.unwrap());
if let Some(cert) = da.certify_dispersal() {
return Ok(cert);
}
}
}
// To interact with the network service it's easier to just spawn
// an overwatch app
#[derive(Services)]
struct DisseminateApp {
network: ServiceHandle<NetworkService<Libp2p>>,
send_blob: ServiceHandle<DisseminateService>,
}
#[derive(Clone, Debug)]
pub struct Settings {
bytes: Box<[u8]>,
timeout: Duration,
da_protocol: DaProtocolChoice,
node_addr: Option<Url>,
output: Option<PathBuf>,
}
pub struct DisseminateService {
service_state: ServiceStateHandle<Self>,
}
impl ServiceData for DisseminateService {
const SERVICE_ID: ServiceId = "Disseminate";
type Settings = Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for DisseminateService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
let Self { service_state } = self;
let settings = service_state.settings_reader.get_updated_settings();
let node_addr = settings.node_addr.clone();
match settings.da_protocol {
DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings:
ProtocolSettings {
full_replication: da_settings,
let (status_updates, rx) = std::sync::mpsc::channel();
let bytes: Box<[u8]> = self.data.clone().as_bytes().into();
let timeout = Duration::from_secs(self.timeout);
let da_protocol = self.da_protocol.clone();
let node_addr = self.node_addr.clone();
let output = self.output.clone();
let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel();
payload_sender.send(bytes).unwrap();
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
payload: Arc::new(Mutex::new(payload_rx)),
timeout,
da_protocol,
status_updates,
node_addr,
output,
},
} => {
let network_relay = service_state
.overwatch_handle
.relay::<NetworkService<Libp2p>>()
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = Libp2pAdapter::new(network_relay).await;
let da = FullReplication::new(AbsoluteNumber::new(da_settings.num_attestations));
match tokio::time::timeout(
settings.timeout,
disseminate(da, settings.bytes, adapter),
)
.await
{
Err(_) => {
tracing::error!("Timeout reached, check the logs for additional details");
std::process::exit(1);
}
Ok(Err(_)) => {
tracing::error!(
"Could not disseminate blob, check logs for additional details"
);
std::process::exit(1);
}
Ok(Ok(cert)) => {
if let Some(output) = settings.output {
tracing::info!("Writing certificate to file...");
std::fs::write(output, wire::serialize(&cert).unwrap())?;
}
if let Some(node_addr) = node_addr {
let client = Client::new();
tracing::info!("Sending certificate to node...");
let res = client
.post(node_addr.join(NODE_CERT_PATH).unwrap())
.body(wire::serialize(&cert).unwrap())
.send()
.await?;
if !res.status().is_success() {
tracing::error!(
"Failed to send certificate to node: {}",
res.status()
);
std::process::exit(1);
}
}
}
}
}
},
None,
)
.unwrap()
.wait_finished();
});
// drop to signal we're not going to send more blobs
drop(payload_sender);
tracing::info!("{}", rx.recv().unwrap().display());
while let Ok(update) = rx.recv() {
tracing::info!("{}", update.display());
}
service_state.overwatch_handle.shutdown().await;
tracing::info!("done");
Ok(())
}
}
// This format is for clap args convenience, I could not
// find a way to use enums directly without having to implement
// parsing by hand.
// The `settings` field will hold the settings for all possible
// protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives
#[derive(Clone, Debug, Args)]
pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol,
#[clap(flatten)]
pub settings: ProtocolSettings,
}
#[derive(Clone, Debug, Args)]
pub struct ProtocolSettings {
#[clap(flatten)]
pub full_replication: FullReplicationSettings,
}
#[derive(Clone, Debug, ValueEnum)]
pub enum Protocol {
FullReplication,
}
impl Default for FullReplicationSettings {
fn default() -> Self {
Self {
num_attestations: 1,
}
}
}

250
nomos-cli/src/da/mod.rs Normal file
View File

@ -0,0 +1,250 @@
use clap::{Args, ValueEnum};
use full_replication::{AbsoluteNumber, FullReplication};
use futures::StreamExt;
use nomos_core::{da::DaProtocol, wire};
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::NoMessage,
state::*,
ServiceCore, ServiceData, ServiceId,
},
DynError,
};
use reqwest::{Client, Url};
use serde::Serialize;
use std::{
path::PathBuf,
sync::{mpsc::Sender, Arc},
time::Duration,
};
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
const NODE_CERT_PATH: &str = "mempool-da/add";
pub async fn disseminate_and_wait<D, B, N, A, C>(
mut da: D,
data: Box<[u8]>,
adapter: N,
status_updates: Sender<Status>,
node_addr: Option<&Url>,
output: Option<&PathBuf>,
) -> Result<(), Box<dyn std::error::Error>>
where
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
C: Serialize,
{
// 1) Building blob
status_updates.send(Status::Encoding)?;
let blobs = da.encode(data);
// 2) Send blob to network
status_updates.send(Status::Disseminating)?;
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob)))
.await
.map_err(|e| e as Box<dyn std::error::Error>)?;
// 3) Collect attestations and create proof
status_updates.send(Status::WaitingAttestations)?;
let mut attestations = adapter.attestation_stream().await;
let cert: C = loop {
da.recv_attestation(attestations.next().await.unwrap());
if let Some(certificate) = da.certify_dispersal() {
status_updates.send(Status::CreatingCert)?;
break certificate;
}
};
if let Some(output) = output {
status_updates.send(Status::SavingCert)?;
std::fs::write(output, wire::serialize(&cert)?)?;
}
if let Some(node) = node_addr {
status_updates.send(Status::SendingCert)?;
let client = Client::new();
let res = client
.post(node.join(NODE_CERT_PATH).unwrap())
.body(wire::serialize(&cert).unwrap())
.send()
.await?;
tracing::info!("Response: {:?}", res);
if !res.status().is_success() {
tracing::error!("ERROR: {:?}", res);
return Err(format!("Failed to send certificate to node: {}", res.status()).into());
}
}
status_updates.send(Status::Done)?;
Ok(())
}
pub enum Status {
Encoding,
Disseminating,
WaitingAttestations,
CreatingCert,
SavingCert,
SendingCert,
Done,
}
impl Status {
pub fn display(&self) -> &str {
match self {
Self::Encoding => "Encoding message into blob(s)",
Self::Disseminating => "Sending blob(s) to the network",
Self::WaitingAttestations => "Waiting for attestations",
Self::CreatingCert => "Creating certificate",
Self::SavingCert => "Saving certificate to file",
Self::SendingCert => "Sending certificate to node",
Self::Done => "",
}
}
}
// To interact with the network service it's easier to just spawn
// an overwatch app
#[derive(Services)]
pub struct DisseminateApp {
network: ServiceHandle<NetworkService<Libp2p>>,
send_blob: ServiceHandle<DisseminateService>,
}
#[derive(Clone, Debug)]
pub struct Settings {
// This is wrapped in an Arc just to make the struct Clone
pub payload: Arc<Mutex<UnboundedReceiver<Box<[u8]>>>>,
pub timeout: Duration,
pub da_protocol: DaProtocolChoice,
pub status_updates: Sender<Status>,
pub node_addr: Option<Url>,
pub output: Option<std::path::PathBuf>,
}
pub struct DisseminateService {
service_state: ServiceStateHandle<Self>,
}
impl ServiceData for DisseminateService {
const SERVICE_ID: ServiceId = "Disseminate";
type Settings = Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for DisseminateService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
let Self { service_state } = self;
let Settings {
payload,
timeout,
da_protocol,
status_updates,
node_addr,
output,
} = service_state.settings_reader.get_updated_settings();
match da_protocol {
DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings:
ProtocolSettings {
full_replication: da_settings,
},
} => {
let network_relay = service_state
.overwatch_handle
.relay::<NetworkService<Libp2p>>()
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
while let Some(data) = payload.lock().await.recv().await {
match tokio::time::timeout(
timeout,
disseminate_and_wait(
FullReplication::new(AbsoluteNumber::new(da_settings.num_attestations)),
data,
Libp2pAdapter::new(network_relay.clone()).await,
status_updates.clone(),
node_addr.as_ref(),
output.as_ref(),
),
)
.await
{
Err(_) => {
tracing::error!(
"Timeout reached, check the logs for additional details"
);
std::process::exit(1);
}
Ok(Err(_)) => {
tracing::error!(
"Could not disseminate blob, check logs for additional details"
);
std::process::exit(1);
}
_ => {}
}
}
}
}
service_state.overwatch_handle.shutdown().await;
Ok(())
}
}
// This format is for clap args convenience, I could not
// find a way to use enums directly without having to implement
// parsing by hand.
// The `settings` field will hold the settings for all possible
// protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives
#[derive(Clone, Debug, Args)]
pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol,
#[clap(flatten)]
pub settings: ProtocolSettings,
}
#[derive(Clone, Debug, Args)]
pub struct ProtocolSettings {
#[clap(flatten)]
pub full_replication: FullReplicationSettings,
}
#[derive(Clone, Debug, ValueEnum)]
pub enum Protocol {
FullReplication,
}
impl Default for FullReplicationSettings {
fn default() -> Self {
Self {
num_attestations: 1,
}
}
}
#[derive(Debug, Clone, Args)]
pub struct FullReplicationSettings {
#[clap(long, default_value = "1")]
pub num_attestations: usize,
}

View File

@ -1,4 +1,5 @@
pub mod cmds;
pub mod da;
use clap::Parser;
use cmds::Command;

View File

@ -1,9 +1,7 @@
use fraction::{Fraction, One};
use nomos_cli::cmds::{
disseminate::{
DaProtocolChoice, Disseminate, FullReplicationSettings, Protocol, ProtocolSettings,
},
Command,
use nomos_cli::{
cmds::{disseminate::Disseminate, Command},
da::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
};
use std::time::Duration;
use tempfile::NamedTempFile;