[nomos-cli] Add command to disseminate data (#400)
* Add nomos-cli command to disseminate data across the network Add nomos-cli to hold various utilities for the node. To start, this commit adds a command to disseminate some data through the network and build a certificate of correct dispersal for inclusion in a block * fmt * reorder loop instructions * Send blobs concurrently
This commit is contained in:
parent
2429893ac2
commit
d672be3bb0
|
@ -13,6 +13,7 @@ members = [
|
|||
"nomos-da/reed-solomon",
|
||||
"nomos-da/kzg",
|
||||
"nomos-da/full-replication",
|
||||
"nomos-cli",
|
||||
"nodes/nomos-node",
|
||||
"nodes/mixnode",
|
||||
"simulations",
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
[package]
|
||||
name = "nomos-cli"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
description = "Cli app to interact with Nomos nodes and perform various tasks"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
fraction = "0.13"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
async-trait = "0.1"
|
||||
clap = {version = "4", features = ["derive"] }
|
||||
crossterm = "0.27"
|
||||
serde_yaml = "0.9"
|
||||
futures = "0.3"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
|
||||
nomos-libp2p = { path = "../nomos-libp2p"}
|
||||
nomos-core = { path = "../nomos-core" }
|
||||
full-replication = { path = "../nomos-da/full-replication" }
|
|
@ -0,0 +1,3 @@
|
|||
backend:
|
||||
host: "127.0.0.1"
|
||||
port: 8000
|
|
@ -0,0 +1,218 @@
|
|||
use clap::{Args, ValueEnum};
|
||||
use crossterm::execute;
|
||||
use full_replication::{AbsoluteNumber, FullReplication};
|
||||
use futures::StreamExt;
|
||||
use nomos_core::da::DaProtocol;
|
||||
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
|
||||
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
|
||||
use overwatch_derive::*;
|
||||
use overwatch_rs::{
|
||||
overwatch::OverwatchRunner,
|
||||
services::{
|
||||
handle::{ServiceHandle, ServiceStateHandle},
|
||||
relay::NoMessage,
|
||||
state::*,
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
},
|
||||
DynError,
|
||||
};
|
||||
use std::{path::PathBuf, time::Duration};
|
||||
|
||||
#[derive(Args, Debug)]
|
||||
pub struct Disseminate {
|
||||
// TODO: accept bytes
|
||||
#[clap(short, long)]
|
||||
data: String,
|
||||
/// Path to the network config file
|
||||
#[clap(short, long)]
|
||||
network_config: PathBuf,
|
||||
/// The data availability protocol to use. Defaults to full replication.
|
||||
#[clap(flatten)]
|
||||
da_protocol: DaProtocolChoice,
|
||||
/// Timeout in seconds. Defaults to 120 seconds.
|
||||
#[clap(short, long, default_value = "120")]
|
||||
timeout: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Args)]
|
||||
pub struct FullReplicationSettings {
|
||||
#[clap(long, default_value = "1")]
|
||||
num_attestations: usize,
|
||||
}
|
||||
|
||||
impl Disseminate {
|
||||
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new())
|
||||
.expect("setting tracing default failed");
|
||||
let network = serde_yaml::from_reader::<
|
||||
_,
|
||||
<NetworkService<Libp2p> as ServiceData>::Settings,
|
||||
>(std::fs::File::open(&self.network_config)?)?;
|
||||
OverwatchRunner::<DisseminateApp>::run(
|
||||
DisseminateAppServiceSettings {
|
||||
network,
|
||||
send_blob: Settings {
|
||||
bytes: self.data.clone().as_bytes().into(),
|
||||
timeout: Duration::from_secs(self.timeout),
|
||||
da_protocol: self.da_protocol.clone(),
|
||||
},
|
||||
},
|
||||
None,
|
||||
)
|
||||
.unwrap()
|
||||
.wait_finished();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Write '✓' at the end of the previous line in terminal
|
||||
fn terminal_cmd_done() {
|
||||
let go_to_previous_line = crossterm::cursor::MoveToPreviousLine(1);
|
||||
let go_to_end_of_line =
|
||||
crossterm::cursor::MoveToColumn(crossterm::terminal::size().unwrap().0 - 1);
|
||||
let write_done = crossterm::style::Print("✓");
|
||||
execute!(
|
||||
std::io::stdout(),
|
||||
go_to_previous_line,
|
||||
go_to_end_of_line,
|
||||
write_done
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn disseminate<D, B, N, A, C>(
|
||||
mut da: D,
|
||||
data: Box<[u8]>,
|
||||
adapter: N,
|
||||
) -> Result<C, Box<dyn std::error::Error>>
|
||||
where
|
||||
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
|
||||
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
|
||||
{
|
||||
// 1) Building blob
|
||||
tracing::info!("Building blobs...");
|
||||
let blobs = da.encode(data);
|
||||
terminal_cmd_done();
|
||||
|
||||
// 2) Send blob to network
|
||||
tracing::info!("Sending blobs to network...");
|
||||
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob)))
|
||||
.await
|
||||
.map_err(|e| e as Box<dyn std::error::Error>)?;
|
||||
terminal_cmd_done();
|
||||
|
||||
// 3) Collect attestations and create proof
|
||||
tracing::info!("Collecting attestations to create proof...");
|
||||
let mut attestations = adapter.attestation_stream().await;
|
||||
loop {
|
||||
da.recv_attestation(attestations.next().await.unwrap());
|
||||
|
||||
if let Some(cert) = da.certify_dispersal() {
|
||||
terminal_cmd_done();
|
||||
return Ok(cert);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// To interact with the network service it's easier to just spawn
|
||||
// an overwatch app
|
||||
#[derive(Services)]
|
||||
struct DisseminateApp {
|
||||
network: ServiceHandle<NetworkService<Libp2p>>,
|
||||
send_blob: ServiceHandle<DisseminateService>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Settings {
|
||||
bytes: Box<[u8]>,
|
||||
timeout: Duration,
|
||||
da_protocol: DaProtocolChoice,
|
||||
}
|
||||
|
||||
pub struct DisseminateService {
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
}
|
||||
|
||||
impl ServiceData for DisseminateService {
|
||||
const SERVICE_ID: ServiceId = "Disseminate";
|
||||
type Settings = Settings;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = NoMessage;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ServiceCore for DisseminateService {
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||
Ok(Self { service_state })
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), DynError> {
|
||||
let Self { service_state } = self;
|
||||
let settings = service_state.settings_reader.get_updated_settings();
|
||||
|
||||
match settings.da_protocol {
|
||||
DaProtocolChoice {
|
||||
da_protocol: Protocol::FullReplication,
|
||||
settings:
|
||||
ProtocolSettings {
|
||||
full_replication: da_settings,
|
||||
},
|
||||
} => {
|
||||
let network_relay = service_state
|
||||
.overwatch_handle
|
||||
.relay::<NetworkService<Libp2p>>()
|
||||
.connect()
|
||||
.await
|
||||
.expect("Relay connection with NetworkService should succeed");
|
||||
|
||||
let adapter = Libp2pAdapter::new(network_relay).await;
|
||||
let da = FullReplication::new(AbsoluteNumber::new(da_settings.num_attestations));
|
||||
|
||||
if tokio::time::timeout(settings.timeout, disseminate(da, settings.bytes, adapter))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
tracing::error!("Timeout reached, check the logs for additional details");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
service_state.overwatch_handle.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// This format is for clap args convenience, I could not
|
||||
// find a way to use enums directly without having to implement
|
||||
// parsing by hand.
|
||||
// The `settings` field will hold the settings for all possible
|
||||
// protocols, but only the one chosen will be used.
|
||||
// We can enforce only sensible combinations of protocol/settings
|
||||
// are specified by using special clap directives
|
||||
#[derive(Clone, Debug, Args)]
|
||||
pub(super) struct DaProtocolChoice {
|
||||
#[clap(long, default_value = "full-replication")]
|
||||
da_protocol: Protocol,
|
||||
#[clap(flatten)]
|
||||
settings: ProtocolSettings,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Args)]
|
||||
struct ProtocolSettings {
|
||||
#[clap(flatten)]
|
||||
full_replication: FullReplicationSettings,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, ValueEnum)]
|
||||
pub(super) enum Protocol {
|
||||
FullReplication,
|
||||
}
|
||||
|
||||
impl Default for FullReplicationSettings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
num_attestations: 1,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
use clap::Subcommand;
|
||||
|
||||
mod disseminate;
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum Command {
|
||||
/// Send a blob to the network and collect attestations to create a DA proof
|
||||
Disseminate(disseminate::Disseminate),
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
match self {
|
||||
Command::Disseminate(cmd) => cmd.run(),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
mod cmds;
|
||||
|
||||
use clap::Parser;
|
||||
use cmds::Command;
|
||||
#[derive(Parser)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Command,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let cli = Cli::parse();
|
||||
cli.command.run().unwrap();
|
||||
}
|
Loading…
Reference in New Issue