From d672be3bb0be884585b2b4296010d48e7a44d59b Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Mon, 18 Sep 2023 15:35:20 +0200 Subject: [PATCH] [nomos-cli] Add command to disseminate data (#400) * Add nomos-cli command to disseminate data across the network Add nomos-cli to hold various utilities for the node. To start, this commit adds a command to disseminate some data through the network and build a certificate of correct dispersal for inclusion in a block * fmt * reorder loop instructions * Send blobs concurrently --- Cargo.toml | 1 + nomos-cli/Cargo.toml | 25 +++ nomos-cli/config.yaml | 3 + nomos-cli/src/cmds/disseminate/mod.rs | 218 ++++++++++++++++++++++++++ nomos-cli/src/cmds/mod.rs | 17 ++ nomos-cli/src/main.rs | 15 ++ 6 files changed, 279 insertions(+) create mode 100644 nomos-cli/Cargo.toml create mode 100644 nomos-cli/config.yaml create mode 100644 nomos-cli/src/cmds/disseminate/mod.rs create mode 100644 nomos-cli/src/cmds/mod.rs create mode 100644 nomos-cli/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 611ad5ec..3a6c5779 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "nomos-da/reed-solomon", "nomos-da/kzg", "nomos-da/full-replication", + "nomos-cli", "nodes/nomos-node", "nodes/mixnode", "simulations", diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml new file mode 100644 index 00000000..402a83c8 --- /dev/null +++ b/nomos-cli/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "nomos-cli" +version = "0.1.0" +edition = "2021" +description = "Cli app to interact with Nomos nodes and perform various tasks" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fraction = "0.13" +tracing = "0.1" +tracing-subscriber = "0.3" +async-trait = "0.1" +clap = {version = "4", features = ["derive"] } +crossterm = "0.27" +serde_yaml = "0.9" +futures = "0.3" +tokio = { version = "1", features = ["sync"] } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } +nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } +nomos-libp2p = { path = "../nomos-libp2p"} +nomos-core = { path = "../nomos-core" } +full-replication = { path = "../nomos-da/full-replication" } \ No newline at end of file diff --git a/nomos-cli/config.yaml b/nomos-cli/config.yaml new file mode 100644 index 00000000..98f59795 --- /dev/null +++ b/nomos-cli/config.yaml @@ -0,0 +1,3 @@ +backend: + host: "127.0.0.1" + port: 8000 \ No newline at end of file diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs new file mode 100644 index 00000000..d4e8ddbf --- /dev/null +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -0,0 +1,218 @@ +use clap::{Args, ValueEnum}; +use crossterm::execute; +use full_replication::{AbsoluteNumber, FullReplication}; +use futures::StreamExt; +use nomos_core::da::DaProtocol; +use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; +use nomos_network::{backends::libp2p::Libp2p, NetworkService}; +use overwatch_derive::*; +use overwatch_rs::{ + overwatch::OverwatchRunner, + services::{ + handle::{ServiceHandle, ServiceStateHandle}, + relay::NoMessage, + state::*, + ServiceCore, ServiceData, ServiceId, + }, + DynError, +}; +use std::{path::PathBuf, time::Duration}; + +#[derive(Args, Debug)] +pub struct Disseminate { + // TODO: accept bytes + #[clap(short, long)] + data: String, + /// Path to the network config file + #[clap(short, long)] + network_config: PathBuf, + /// The data availability protocol to use. Defaults to full replication. + #[clap(flatten)] + da_protocol: DaProtocolChoice, + /// Timeout in seconds. Defaults to 120 seconds. + #[clap(short, long, default_value = "120")] + timeout: u64, +} + +#[derive(Debug, Clone, Args)] +pub struct FullReplicationSettings { + #[clap(long, default_value = "1")] + num_attestations: usize, +} + +impl Disseminate { + pub fn run(&self) -> Result<(), Box> { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new()) + .expect("setting tracing default failed"); + let network = serde_yaml::from_reader::< + _, + as ServiceData>::Settings, + >(std::fs::File::open(&self.network_config)?)?; + OverwatchRunner::::run( + DisseminateAppServiceSettings { + network, + send_blob: Settings { + bytes: self.data.clone().as_bytes().into(), + timeout: Duration::from_secs(self.timeout), + da_protocol: self.da_protocol.clone(), + }, + }, + None, + ) + .unwrap() + .wait_finished(); + Ok(()) + } +} + +// Write '✓' at the end of the previous line in terminal +fn terminal_cmd_done() { + let go_to_previous_line = crossterm::cursor::MoveToPreviousLine(1); + let go_to_end_of_line = + crossterm::cursor::MoveToColumn(crossterm::terminal::size().unwrap().0 - 1); + let write_done = crossterm::style::Print("✓"); + execute!( + std::io::stdout(), + go_to_previous_line, + go_to_end_of_line, + write_done + ) + .unwrap() +} + +async fn disseminate( + mut da: D, + data: Box<[u8]>, + adapter: N, +) -> Result> +where + D: DaProtocol, + N: NetworkAdapter + Send + Sync, +{ + // 1) Building blob + tracing::info!("Building blobs..."); + let blobs = da.encode(data); + terminal_cmd_done(); + + // 2) Send blob to network + tracing::info!("Sending blobs to network..."); + futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))) + .await + .map_err(|e| e as Box)?; + terminal_cmd_done(); + + // 3) Collect attestations and create proof + tracing::info!("Collecting attestations to create proof..."); + let mut attestations = adapter.attestation_stream().await; + loop { + da.recv_attestation(attestations.next().await.unwrap()); + + if let Some(cert) = da.certify_dispersal() { + terminal_cmd_done(); + return Ok(cert); + } + } +} + +// To interact with the network service it's easier to just spawn +// an overwatch app +#[derive(Services)] +struct DisseminateApp { + network: ServiceHandle>, + send_blob: ServiceHandle, +} + +#[derive(Clone, Debug)] +pub struct Settings { + bytes: Box<[u8]>, + timeout: Duration, + da_protocol: DaProtocolChoice, +} + +pub struct DisseminateService { + service_state: ServiceStateHandle, +} + +impl ServiceData for DisseminateService { + const SERVICE_ID: ServiceId = "Disseminate"; + type Settings = Settings; + type State = NoState; + type StateOperator = NoOperator; + type Message = NoMessage; +} + +#[async_trait::async_trait] +impl ServiceCore for DisseminateService { + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { service_state }) + } + + async fn run(self) -> Result<(), DynError> { + let Self { service_state } = self; + let settings = service_state.settings_reader.get_updated_settings(); + + match settings.da_protocol { + DaProtocolChoice { + da_protocol: Protocol::FullReplication, + settings: + ProtocolSettings { + full_replication: da_settings, + }, + } => { + let network_relay = service_state + .overwatch_handle + .relay::>() + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + let adapter = Libp2pAdapter::new(network_relay).await; + let da = FullReplication::new(AbsoluteNumber::new(da_settings.num_attestations)); + + if tokio::time::timeout(settings.timeout, disseminate(da, settings.bytes, adapter)) + .await + .is_err() + { + tracing::error!("Timeout reached, check the logs for additional details"); + } + } + } + + service_state.overwatch_handle.shutdown().await; + Ok(()) + } +} + +// This format is for clap args convenience, I could not +// find a way to use enums directly without having to implement +// parsing by hand. +// The `settings` field will hold the settings for all possible +// protocols, but only the one chosen will be used. +// We can enforce only sensible combinations of protocol/settings +// are specified by using special clap directives +#[derive(Clone, Debug, Args)] +pub(super) struct DaProtocolChoice { + #[clap(long, default_value = "full-replication")] + da_protocol: Protocol, + #[clap(flatten)] + settings: ProtocolSettings, +} + +#[derive(Clone, Debug, Args)] +struct ProtocolSettings { + #[clap(flatten)] + full_replication: FullReplicationSettings, +} + +#[derive(Clone, Debug, ValueEnum)] +pub(super) enum Protocol { + FullReplication, +} + +impl Default for FullReplicationSettings { + fn default() -> Self { + Self { + num_attestations: 1, + } + } +} diff --git a/nomos-cli/src/cmds/mod.rs b/nomos-cli/src/cmds/mod.rs new file mode 100644 index 00000000..8c6e8b05 --- /dev/null +++ b/nomos-cli/src/cmds/mod.rs @@ -0,0 +1,17 @@ +use clap::Subcommand; + +mod disseminate; + +#[derive(Debug, Subcommand)] +pub enum Command { + /// Send a blob to the network and collect attestations to create a DA proof + Disseminate(disseminate::Disseminate), +} + +impl Command { + pub fn run(&self) -> Result<(), Box> { + match self { + Command::Disseminate(cmd) => cmd.run(), + } + } +} diff --git a/nomos-cli/src/main.rs b/nomos-cli/src/main.rs new file mode 100644 index 00000000..dad1ce85 --- /dev/null +++ b/nomos-cli/src/main.rs @@ -0,0 +1,15 @@ +mod cmds; + +use clap::Parser; +use cmds::Command; +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + #[command(subcommand)] + command: Command, +} + +fn main() { + let cli = Cli::parse(); + cli.command.run().unwrap(); +}