diff --git a/clients/executor-http-client/Cargo.toml b/clients/executor-http-client/Cargo.toml index 9e0728e7..18152cdf 100644 --- a/clients/executor-http-client/Cargo.toml +++ b/clients/executor-http-client/Cargo.toml @@ -8,4 +8,4 @@ nomos-core = { path = "../../nomos-core/chain-defs" } nomos-executor = { path = "../../nodes/nomos-executor" } reqwest = "0.12" serde = "1.0" -thiserror = "1.0" \ No newline at end of file +thiserror = "1.0" diff --git a/clients/executor-http-client/src/lib.rs b/clients/executor-http-client/src/lib.rs index 52ea7525..04b04c07 100644 --- a/clients/executor-http-client/src/lib.rs +++ b/clients/executor-http-client/src/lib.rs @@ -1,9 +1,9 @@ // std // crates use reqwest::{Client, ClientBuilder, StatusCode, Url}; +use serde::Serialize; // internal use nomos_executor::api::{handlers::DispersalRequest, paths}; -use serde::Serialize; #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/nodes/nomos-node/src/api/handlers.rs b/nodes/nomos-node/src/api/handlers.rs index 8989fe6d..6689f781 100644 --- a/nodes/nomos-node/src/api/handlers.rs +++ b/nodes/nomos-node/src/api/handlers.rs @@ -236,8 +236,8 @@ where ::AppId: Serialize + DeserializeOwned, ::Index: Serialize + DeserializeOwned, { - app_id: ::AppId, - range: Range<::Index>, + pub app_id: ::AppId, + pub range: Range<::Index>, } #[utoipa::path( diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index 3f457675..bdd55d85 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -7,37 +7,14 @@ description = "Cli app to interact with Nomos nodes and perform various tasks" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -fraction = "0.13" -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -async-trait = "0.1" clap = { version = "4", features = ["derive"] } -serde_yaml = "0.9" -futures = "0.3" -tokio = { version = "1", features = ["sync"] } -tokio-stream = "0.1" -log = "0.4.19" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } -nomos-da-network-service = { path = "../nomos-services/data-availability/network" } -cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } +executor-http-client = { path = "../clients/executor-http-client" } +hex = "0.4.3" kzgrs-backend = { path = "../nomos-da/kzgrs-backend" } -nomos-log = { path = "../nomos-services/log" } -nomos-libp2p = { path = "../nomos-libp2p" } -libp2p = { version = "0.53", features = ["macros", "serde"] } nomos-core = { path = "../nomos-core/chain-defs" } nomos-node = { path = "../nodes/nomos-node" } -nomos-da-network-core = { path = "../nomos-da/network/core" } -subnetworks-assignations = { path = "../nomos-da/network/subnetworks-assignations" } -full-replication = { path = "../nomos-da/full-replication" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.12", features = ["json"] } serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -thiserror = "1.0" -hex = "0.4.3" -once_cell = "1" -crossterm = "0.27" -ratatui = "0.24" -tui-input = "0.8" -ansi-to-tui = "3" -rand = "0.8" +tokio = { version = "1", features = ["sync"] } +tracing = "0.1" +tracing-subscriber = "0.3" diff --git a/nomos-cli/config.yaml b/nomos-cli/config.yaml deleted file mode 100644 index b9f11225..00000000 --- a/nomos-cli/config.yaml +++ /dev/null @@ -1,9 +0,0 @@ -backend: - host: 0.0.0.0 - port: 3019 - log_level: "fatal" - # Node key needs to be unique for every client. - node_key: "0000000000000000000000000000000000000000000000000000000000001444" - discV5BootstrapNodes: [] - initial_peers: ["/dns/testnet.nomos.tech/tcp/3000"] - relayTopics: [] diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs deleted file mode 100644 index 313ffedd..00000000 --- a/nomos-cli/src/api/consensus.rs +++ /dev/null @@ -1,31 +0,0 @@ -use super::CLIENT; -use cryptarchia_consensus::CryptarchiaInfo; -use nomos_core::header::HeaderId; -use reqwest::Url; - -pub async fn cryptarchia_info(node: &Url) -> Result { - const NODE_CRYPTARCHIA_INFO_PATH: &str = "cryptarchia/info"; - CLIENT - .get(node.join(NODE_CRYPTARCHIA_INFO_PATH).unwrap()) - .send() - .await? - .json::() - .await -} - -pub async fn get_headers_info( - node: &Url, - from: Option, - to: Option, -) -> Result, reqwest::Error> { - const NODE_CRYPTARCHIA_HEADERS_PATH: &str = "cryptarchia/headers"; - let mut req = CLIENT.get(node.join(NODE_CRYPTARCHIA_HEADERS_PATH).unwrap()); - if let Some(from) = from { - req = req.query(&[("from", from)]); - } - if let Some(to) = to { - req = req.query(&[("to", to)]); - } - - req.send().await?.json().await -} diff --git a/nomos-cli/src/api/da.rs b/nomos-cli/src/api/da.rs deleted file mode 100644 index 8b137891..00000000 --- a/nomos-cli/src/api/da.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/nomos-cli/src/api/mempool.rs b/nomos-cli/src/api/mempool.rs deleted file mode 100644 index ea62e683..00000000 --- a/nomos-cli/src/api/mempool.rs +++ /dev/null @@ -1,15 +0,0 @@ -use super::CLIENT; -use reqwest::{Error, Response, Url}; -use serde::Serialize; - -pub async fn send_blob_info(node: &Url, info: &I) -> Result -where - I: Serialize, -{ - const NODE_CERT_PATH: &str = "mempool/add/blobinfo"; - CLIENT - .post(node.join(NODE_CERT_PATH).unwrap()) - .json(info) - .send() - .await -} diff --git a/nomos-cli/src/api/mod.rs b/nomos-cli/src/api/mod.rs deleted file mode 100644 index 1fa5c4ec..00000000 --- a/nomos-cli/src/api/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub mod consensus; -pub mod da; -pub mod mempool; -pub mod storage; - -use once_cell::sync::Lazy; -use reqwest::Client; - -static CLIENT: Lazy = Lazy::new(Client::new); diff --git a/nomos-cli/src/api/storage.rs b/nomos-cli/src/api/storage.rs deleted file mode 100644 index 18f43120..00000000 --- a/nomos-cli/src/api/storage.rs +++ /dev/null @@ -1,20 +0,0 @@ -use super::CLIENT; -use full_replication::BlobInfo; -use nomos_core::block::Block; -use nomos_core::header::HeaderId; -use nomos_node::Tx; -use reqwest::Url; - -pub async fn get_block_contents( - node: &Url, - block: &HeaderId, -) -> Result>, reqwest::Error> { - const BLOCK_PATH: &str = "storage/block"; - CLIENT - .post(node.join(BLOCK_PATH).unwrap()) - .json(block) - .send() - .await? - .json() - .await -} diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs deleted file mode 100644 index 62d3c9df..00000000 --- a/nomos-cli/src/cmds/chat/mod.rs +++ /dev/null @@ -1,347 +0,0 @@ -/// The dumbest possible backend for a chat protocol. -/// Just because running Doom on Nomos was too much work for a demo. -/// -/// -mod ui; - -use crate::{ - api::consensus::get_headers_info, - //da::{ - // disseminate::{ - // DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, - // }, - // retrieve::get_block_blobs, - //}, -}; -use clap::Args; -use full_replication::{ - AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, -}; -use futures::{stream, StreamExt}; -use nomos_core::{header::HeaderId, wire}; -use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; -use nomos_network::backends::libp2p::Libp2p as NetworkBackend; -use nomos_network::NetworkService; -use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; -use reqwest::Url; -use serde::{Deserialize, Serialize}; -use std::{ - io, - path::PathBuf, - sync::{ - self, - mpsc::{Receiver, Sender}, - Arc, - }, - time::{Duration, Instant}, -}; -use tokio::sync::{mpsc::UnboundedSender, Mutex}; - -use crossterm::{ - event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, - execute, - terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, -}; -use ratatui::{ - backend::{Backend, CrosstermBackend}, - Terminal, -}; -use tui_input::{backend::crossterm::EventHandler, Input}; - -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); -// Limit the number of maximum in-flight requests -const MAX_BUFFERED_REQUESTS: usize = 20; - -#[derive(Clone, Debug, Args)] -/// The almost-instant messaging protocol. -pub struct NomosChat { - /// Path to the network config file - #[clap(short, long)] - pub network_config: PathBuf, - /// The data availability protocol to use. Defaults to full replication. - #[clap(flatten)] - pub da_protocol: DaProtocolChoice, - /// The node to connect to to fetch blocks and blobs - #[clap(long)] - pub node: Url, - /// Author for non interactive message formation - #[clap(long, requires("message"))] - pub author: Option, - /// Message for non interactive message formation - #[clap(long, requires("author"))] - pub message: Option, -} - -pub struct App { - input: Input, - username: Option, - messages: Vec, - message_status: Option, - message_in_flight: bool, - last_updated: Instant, - payload_sender: UnboundedSender>, - status_updates: Receiver, - node: Url, - logs: Arc>>, - scroll_logs: u16, -} - -impl NomosChat { - pub fn run(&self) -> Result<(), Box> { - let network = serde_yaml::from_reader::< - _, - as ServiceData>::Settings, - >(std::fs::File::open(&self.network_config)?)?; - let da_protocol = self.da_protocol.clone(); - - let node_addr = Some(self.node.clone()); - - let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (status_sender, status_updates) = std::sync::mpsc::channel(); - - let shared_writer = Arc::new(sync::Mutex::new(Vec::new())); - let backend = SharedWriter::from_inner(shared_writer.clone()); - - std::thread::spawn(move || { - OverwatchRunner::::run( - DisseminateAppServiceSettings { - network, - send_blob: Settings { - payload: Arc::new(Mutex::new(payload_receiver)), - timeout: DEFAULT_TIMEOUT, - da_protocol, - status_updates: status_sender, - node_addr, - output: None, - }, - logger: LoggerSettings { - backend: LoggerBackend::Writer(backend), - level: tracing::Level::INFO, - ..Default::default() - }, - }, - None, - ) - .unwrap() - .wait_finished() - }); - - if let Some(author) = self.author.as_ref() { - let message = self - .message - .as_ref() - .expect("Should be available if author is set"); - return run_once(author, message, payload_sender); - } - - // setup terminal - enable_raw_mode()?; - let mut stdout = io::stdout(); - execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; - let backend = CrosstermBackend::new(stdout); - let mut terminal = Terminal::new(backend)?; - - let app = App { - input: Input::default(), - username: None, - messages: Vec::new(), - message_status: None, - message_in_flight: false, - last_updated: Instant::now(), - payload_sender, - status_updates, - node: self.node.clone(), - logs: shared_writer, - scroll_logs: 0, - }; - - run_app(&mut terminal, app); - - // restore terminal - disable_raw_mode()?; - execute!( - terminal.backend_mut(), - LeaveAlternateScreen, - DisableMouseCapture - )?; - terminal.show_cursor()?; - - Ok(()) - } -} - -fn run_once( - author: &str, - message: &str, - payload_sender: UnboundedSender>, -) -> Result<(), Box> { - payload_sender.send( - wire::serialize(&ChatMessage { - author: author.to_string(), - message: message.to_string(), - _nonce: rand::random(), - }) - .unwrap() - .into(), - )?; - - Ok(()) -} - -fn run_app(terminal: &mut Terminal, mut app: App) { - let (message_tx, message_rx) = std::sync::mpsc::channel(); - let node = app.node.clone(); - std::thread::spawn(move || check_for_messages(message_tx, node)); - loop { - terminal.draw(|f| ui::ui(f, &app)).unwrap(); - - if let Ok(update) = app.status_updates.try_recv() { - if let Status::Done | Status::Err(_) = update { - app.message_in_flight = false; - } - app.message_status = Some(update); - app.last_updated = Instant::now(); - } - - if let Ok(messages) = message_rx.try_recv() { - app.messages.extend(messages); - } - - // Do not block rendering if there's no user input available - if !event::poll(Duration::from_millis(100)).unwrap() { - continue; - } - - if let Event::Key(key) = event::read().unwrap() { - match key.code { - KeyCode::Enter => { - if app.username.is_none() { - app.username = Some(app.input.value().into()); - } else { - // Do not allow more than one message in flight at a time for simplicity - if !app.message_in_flight && !app.input.value().is_empty() { - app.message_in_flight = true; - app.payload_sender - .send( - wire::serialize(&ChatMessage { - author: app.username.clone().unwrap(), - message: app.input.value().into(), - _nonce: rand::random(), - }) - .unwrap() - .into(), - ) - .unwrap(); - } - } - app.input.reset(); - } - KeyCode::Esc => { - return; - } - KeyCode::Left => { - app.scroll_logs = app.scroll_logs.saturating_sub(1); - } - KeyCode::Right => { - app.scroll_logs = app.scroll_logs.saturating_add(1); - } - _ => { - if !app.message_in_flight { - app.input.handle_event(&Event::Key(key)); - } - } - } - } - } -} - -#[derive(Serialize, Deserialize, Debug)] -struct ChatMessage { - author: String, - message: String, - // Since DA will rightfully ignore duplicated messages, we need to add a nonce to make sure - // every message is unique. This is randomly generated for simplicity. - _nonce: u64, -} - -#[tokio::main] -async fn check_for_messages(sender: Sender>, node: Url) { - // Should ask for the genesis block to be more robust - let mut last_tip = [0; 32].into(); - - loop { - if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { - sender.send(messages).expect("channel closed"); - last_tip = new_tip; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } -} - -// Process a single block's blobs and return chat messages -async fn process_block_blobs( - node: Url, - block_id: &HeaderId, - da_settings: DaSettings, -) -> Result, Box> { - let blobs = get_block_blobs(&node, block_id).await?; - - // Note that number of attestations is ignored here since we only use the da protocol to - // decode the blob data, not to validate the certificate - let mut da_protocol = - > as DaProtocol>::new(da_settings); - let mut messages = Vec::new(); - - for blob in blobs { - da_protocol.recv_blob(blob); - let bytes = da_protocol.extract().unwrap(); - if let Ok(message) = wire::deserialize::(&bytes) { - messages.push(message); - } - } - - Ok(messages) -} - -// Fetch new messages since the last tip -async fn fetch_new_messages( - last_tip: &HeaderId, - node: &Url, -) -> Result<(HeaderId, Vec), Box> { - // By only specifying the 'to' parameter we get all the blocks since the last tip - let mut new_blocks = get_headers_info(node, None, Some(*last_tip)) - .await? - .into_iter() - .collect::>(); - - // The first block is the most recent one. - // Note that the 'to' is inclusive so the above request will always return at least one block - // as long as the block exists (which is the case since it was returned by a previous call) - let new_tip = new_blocks[0]; - // We already processed the last block so let's remove it - new_blocks.pop(); - - let da_settings = DaSettings { - num_attestations: 1, - voter: [0; 32], - }; - - let block_stream = stream::iter(new_blocks.iter().rev()); - let results: Vec<_> = block_stream - .map(|block| { - let node = node.clone(); - let da_settings = da_settings.clone(); - - process_block_blobs(node, block, da_settings) - }) - .buffered(MAX_BUFFERED_REQUESTS) - .collect::>() - .await; - - let mut new_messages = Vec::new(); - for result in results { - new_messages.extend(result?); - } - - Ok((new_tip, new_messages)) -} diff --git a/nomos-cli/src/cmds/chat/ui.rs b/nomos-cli/src/cmds/chat/ui.rs deleted file mode 100644 index 63b3e86a..00000000 --- a/nomos-cli/src/cmds/chat/ui.rs +++ /dev/null @@ -1,203 +0,0 @@ -use super::{App, ChatMessage}; -use ratatui::prelude::*; -use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap}; -use std::ops::Deref; - -pub fn ui(f: &mut Frame, app: &App) { - if app.username.is_none() { - select_username(f, app) - } else { - chat(f, app) - } -} - -fn select_username(f: &mut Frame, app: &App) { - assert!(app.username.is_none()); - let block = Block::default() - .title("NomosChat") - .borders(Borders::ALL) - .style(Style::new().white().on_black()); - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints( - [ - Constraint::Min(5), - Constraint::Min(1), - Constraint::Min(5), - Constraint::Min(0), - ] - .as_ref(), - ) - .split(block.inner(f.size())); - f.render_widget(block, f.size()); - - let text = vec![ - Line::from(vec![ - Span::raw("Welcome to "), - Span::styled("NomosChat", Style::new().green().italic()), - "!".into(), - ]), - Line::from("The first almost-instant messaging protocol".red()), - ]; - - let welcome = Paragraph::new(text) - .style(Style::new().white().on_black()) - .alignment(Alignment::Center) - .wrap(Wrap { trim: true }); - f.render_widget(welcome, chunks[0]); - - let help_text = Line::from("Press to select your username, or to quit."); - let help = Paragraph::new(help_text) - .alignment(Alignment::Center) - .style(Style::new().white().on_black()); - f.render_widget(help, chunks[1]); - - let input = Paragraph::new(app.input.value()) - .style(Style::new().white().on_black()) - .block( - Block::default() - .borders(Borders::ALL) - .title("Select username") - .border_style(Style::default().fg(Color::Yellow)), - ); - f.render_widget(input, centered_rect(chunks[2], 50, 50)); -} - -fn centered_rect(r: Rect, percent_x: u16, percent_y: u16) -> Rect { - let popup_layout = Layout::default() - .direction(Direction::Vertical) - .constraints( - [ - Constraint::Percentage((100 - percent_y) / 2), - Constraint::Percentage(percent_y), - Constraint::Percentage((100 - percent_y) / 2), - ] - .as_ref(), - ) - .split(r); - - Layout::default() - .direction(Direction::Horizontal) - .constraints( - [ - Constraint::Percentage((100 - percent_x) / 2), - Constraint::Percentage(percent_x), - Constraint::Percentage((100 - percent_x) / 2), - ] - .as_ref(), - ) - .split(popup_layout[1])[1] -} - -fn chat(f: &mut Frame, app: &App) { - let block = Block::default() - .title("NomosChat") - .borders(Borders::ALL) - .style(Style::new().white().on_black()); - let chunks = Layout::default() - .direction(Direction::Vertical) - .constraints( - [ - Constraint::Max(3), - Constraint::Percentage(80), - Constraint::Max(10), - ] - .as_ref(), - ) - .split(block.inner(f.size())); - f.render_widget(block, f.size()); - - let messages_rect = centered_rect(chunks[1], 90, 100); - render_messages(f, app, messages_rect); - render_logs(f, app, chunks[2]); - - let chunks = Layout::default() - .direction(Direction::Horizontal) - .constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref()) - .split(chunks[0]); - - render_input(f, app, chunks[0]); - render_status(f, app, chunks[1]); -} - -fn render_messages(f: &mut Frame, app: &App, rect: Rect) { - let messages: Vec = app - .messages - .iter() - .map( - |ChatMessage { - author, - message, - _nonce, - }| { - let content = if author == app.username.as_ref().unwrap() { - static MARGIN: usize = 2; - // pad to make it appear aligned on the right - let pad = " ".repeat( - (rect.width as usize) - .saturating_sub(message.len()) - .saturating_sub(MARGIN), - ); - Line::from(vec![Span::raw(pad), Span::raw(message)]) - } else { - Line::from(vec![ - Span::styled(format!("{author}: "), Style::new().fg(Color::Yellow).bold()), - Span::raw(message), - ]) - .alignment(Alignment::Left) - }; - ListItem::new(content) - }, - ) - .collect(); - let messages = - List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages")); - f.render_widget(messages, rect); -} - -fn render_input(f: &mut Frame, app: &App, rect: Rect) { - let style = if !app.message_in_flight { - Style::default().fg(Color::Yellow) - } else { - Style::default().fg(Color::DarkGray) - }; - let input = Paragraph::new(app.input.value()) - .style(Style::new().white().on_black()) - .block( - Block::default() - .borders(Borders::ALL) - .title("Press to send message") - .border_style(style), - ); - f.render_widget(input, rect); -} -fn render_status(f: &mut Frame, app: &App, rect: Rect) { - let waiting_animation = std::iter::repeat(".") - .take(app.last_updated.elapsed().as_secs() as usize % 4) - .collect::>() - .join(""); - - let status = Paragraph::new( - app.message_status - .as_ref() - .map(|s| format!("{}{}", s, waiting_animation)) - .unwrap_or_default(), - ) - .block(Block::default().borders(Borders::ALL).title("Status:")); - f.render_widget(status, rect); -} - -fn render_logs(f: &mut Frame, app: &App, rect: Rect) { - let logs = String::from_utf8(app.logs.lock().unwrap().deref().clone()).unwrap(); - f.render_widget( - Paragraph::new(logs) - .wrap(Wrap { trim: true }) - .scroll((app.scroll_logs, 0)) - .block( - Block::default() - .borders(Borders::ALL) - .title("Logs: (use ←/→ to scroll up/down"), - ), - rect, - ); -} diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs deleted file mode 100644 index f2504c2a..00000000 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ /dev/null @@ -1,135 +0,0 @@ -// std -use std::{path::PathBuf, sync::Arc, time::Duration}; -// crates -use clap::Args; -use kzgrs_backend::dispersal::Metadata; -use nomos_da_network_service::NetworkService; -use nomos_log::{LoggerBackend, LoggerSettings}; -use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; -use reqwest::Url; -use tokio::sync::Mutex; -// internal -use crate::da::{ - disseminate::{DisseminateApp, DisseminateAppServiceSettings, KzgrsSettings, Settings, Status}, - NetworkBackend, -}; - -#[derive(Args, Debug, Default)] -pub struct Disseminate { - // TODO: accept bytes - #[clap(short, long, required_unless_present("file"))] - pub data: Option, - /// Path to the network config file - #[clap(short, long)] - pub network_config: PathBuf, - /// Timeout in seconds. Defaults to 120 seconds. - #[clap(short, long, default_value = "120")] - pub timeout: u64, - /// Address of the node to send the certificate to - /// for block inclusion, if present. - #[clap(long)] - pub node_addr: Option, - // Application ID for dispersed data. - #[clap(long)] - pub app_id: String, - // Index for the Blob associated with Application ID. - #[clap(long)] - pub index: u64, - // Use Kzg RS cache. - #[clap(long)] - pub with_cache: bool, - // Number of columns to use for encoding. - #[clap(long, default_value = "4096")] - pub columns: usize, - // Duration in seconds to wait before publishing blob info. - #[clap(short, long, default_value = "5")] - pub wait_until_disseminated: u64, - /// File to write the certificate to, if present. - #[clap(long)] - pub output: Option, - /// File to disseminate - #[clap(short, long)] - pub file: Option, - // Path to the KzgRs global parameters. - #[clap(long)] - pub global_params_path: String, -} - -impl Disseminate { - pub fn run(&self) -> Result<(), Box> { - tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new()) - .expect("setting tracing default failed"); - let network = serde_yaml::from_reader::< - _, - as ServiceData>::Settings, - >(std::fs::File::open(&self.network_config)?)?; - let (status_updates, rx) = std::sync::mpsc::channel(); - - let mut bytes: Vec = if let Some(data) = &self.data { - data.clone().into_bytes() - } else { - let file_path = self.file.as_ref().unwrap(); - std::fs::read(file_path)? - }; - - let remainder = bytes.len() % 31; - if remainder != 0 { - bytes.resize(bytes.len() + (31 - remainder), 0); - } - - let app_id: [u8; 32] = hex::decode(&self.app_id)? - .try_into() - .map_err(|_| "Invalid app_id")?; - let metadata = Metadata::new(app_id, self.index.into()); - let timeout = Duration::from_secs(self.timeout); - let node_addr = self.node_addr.clone(); - let output = self.output.clone(); - let num_columns = self.columns; - let with_cache = self.with_cache; - let wait_until_disseminated = Duration::from_secs(self.wait_until_disseminated); - let global_params_path = self.global_params_path.clone(); - let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel(); - payload_sender.send(bytes.into_boxed_slice()).unwrap(); - - std::thread::spawn(move || { - OverwatchRunner::::run( - DisseminateAppServiceSettings { - network, - send_blob: Settings { - payload: Arc::new(Mutex::new(payload_rx)), - timeout, - kzgrs_settings: KzgrsSettings { - num_columns, - with_cache, - }, - metadata, - status_updates, - node_addr, - wait_until_disseminated, - output, - global_params_path, - }, - logger: LoggerSettings { - backend: LoggerBackend::None, - ..Default::default() - }, - }, - None, - ) - .unwrap() - .wait_finished(); - }); - // drop to signal we're not going to send more blobs - drop(payload_sender); - tracing::info!("{}", rx.recv().unwrap()); - while let Ok(update) = rx.recv() { - if let Status::Err(e) = update { - tracing::error!("{e}"); - return Err(e); - } - tracing::info!("{}", update); - } - tracing::info!("done"); - Ok(()) - } -} diff --git a/nomos-cli/src/cmds/executor.rs b/nomos-cli/src/cmds/executor.rs new file mode 100644 index 00000000..5db40d70 --- /dev/null +++ b/nomos-cli/src/cmds/executor.rs @@ -0,0 +1,82 @@ +// std +use std::path::PathBuf; +// crates +use clap::Args; +use reqwest::Url; +// internal +use executor_http_client::ExecutorHttpClient; +use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams}; + +#[derive(Args, Debug)] +pub struct Disseminate { + /// Text to disseminate. + #[clap(short, long, required_unless_present("file"))] + pub data: Option, + /// File to disseminate. + #[clap(short, long)] + pub file: Option, + /// Application ID for dispersed data. + #[clap(long)] + pub app_id: String, + /// Index for the Blob associated with Application ID. + #[clap(long)] + pub index: u64, + /// Executor address which is responsible for dissemination. + #[clap(long)] + pub addr: Url, +} + +impl Disseminate { + pub fn run(self) -> Result<(), Box> { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new()) + .expect("setting tracing default failed"); + + let client = ExecutorHttpClient::new(reqwest::Client::new(), self.addr.clone()); + + let mut bytes: Vec = if let Some(data) = &self.data { + data.clone().into_bytes() + } else { + let file_path = self.file.as_ref().unwrap(); + std::fs::read(file_path)? + }; + + let remainder = bytes.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE; + if remainder != 0 { + bytes.resize( + bytes.len() + (DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder), + 0, + ); + } + + let app_id: [u8; 32] = hex::decode(&self.app_id)? + .try_into() + .map_err(|_| "Invalid app_id")?; + let metadata = Metadata::new(app_id, self.index.into()); + + let (res_sender, res_receiver) = std::sync::mpsc::channel(); + tokio::spawn(async move { + let res = client + .publish_blob(bytes, metadata) + .await + .map_err(|err| format!("Failed to publish blob: {:?}", err)); + res_sender.send(res).unwrap(); + }); + + match res_receiver.recv() { + Ok(update) => match update { + Ok(_) => tracing::info!("Data successfully disseminated."), + Err(e) => { + tracing::error!("Error disseminating data: {e}"); + return Err(e.into()); + } + }, + Err(e) => { + tracing::error!("Failed to receive from client thread: {e}"); + return Err(e.into()); + } + } + + tracing::info!("Done"); + Ok(()) + } +} diff --git a/nomos-cli/src/cmds/mod.rs b/nomos-cli/src/cmds/mod.rs index fd92ac5c..c5ad809a 100644 --- a/nomos-cli/src/cmds/mod.rs +++ b/nomos-cli/src/cmds/mod.rs @@ -1,21 +1,23 @@ -use clap::Subcommand; +pub mod executor; +pub mod validator; -// pub mod chat; -pub mod disseminate; +// std +// crates +use clap::Subcommand; +// internal #[derive(Debug, Subcommand)] pub enum Command { - /// Send a blob to the network and collect attestations to create a DA proof - Disseminate(disseminate::Disseminate), - // /// (Almost) Instant messaging protocol on top of the Nomos network - // Chat(chat::NomosChat), + /// Send data to the executor for encoding and dispersal. + Disseminate(executor::Disseminate), + Retrieve(validator::Retrieve), } impl Command { - pub fn run(&self) -> Result<(), Box> { + pub fn run(self) -> Result<(), Box> { match self { Command::Disseminate(cmd) => cmd.run(), - // Command::Chat(cmd) => cmd.run(), + Command::Retrieve(cmd) => cmd.run(), }?; Ok(()) } diff --git a/nomos-cli/src/cmds/validator.rs b/nomos-cli/src/cmds/validator.rs new file mode 100644 index 00000000..b575618e --- /dev/null +++ b/nomos-cli/src/cmds/validator.rs @@ -0,0 +1,105 @@ +// std +use std::{error::Error, ops::Range}; +// crates +use clap::Args; +use reqwest::{Client, Url}; +use serde::{de::DeserializeOwned, Serialize}; +// internal +use kzgrs_backend::{common::blob::DaBlob, dispersal::Index}; +use nomos_core::da::blob::{self, metadata}; +use nomos_node::api::{handlers::GetRangeReq, paths}; + +#[derive(Args, Debug)] +pub struct Retrieve { + /// Application ID of data in Indexer. + #[clap(long)] + pub app_id: String, + /// Retrieve from this Index associated with Application ID. + #[clap(long)] + pub from: u64, + /// Retrieve to this Index associated with Application ID. + #[clap(long)] + pub to: u64, + /// Node address to retrieve appid blobs. + #[clap(long)] + pub addr: Url, +} + +impl Retrieve { + pub fn run(self) -> Result<(), Box> { + tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new()) + .expect("setting tracing default failed"); + + let app_id: [u8; 32] = hex::decode(&self.app_id)? + .try_into() + .map_err(|_| "Invalid app_id")?; + let from: Index = self.from.into(); + let to: Index = self.to.into(); + + let (res_sender, res_receiver) = std::sync::mpsc::channel(); + tokio::spawn(async move { + let res = get_app_data_range_from_node::( + reqwest::Client::new(), + self.addr.clone(), + app_id, + from..to, + ) + .await + .map_err(|err| format!("Failed to retrieve data form appid {app_id:?}: {err:?}",)); + res_sender.send(res).unwrap(); + }); + + match res_receiver.recv() { + Ok(update) => match update { + Ok(app_blobs) => { + for (index, blobs) in app_blobs.iter() { + tracing::info!("Index {:?} has {:} blobs", (index), blobs.len()); + for blob in blobs.iter() { + tracing::info!("Index {:?}; Blob: {blob:?}", index.to_u64()); + } + } + } + Err(e) => { + tracing::error!("Error receiving data: {e}"); + return Err(e.into()); + } + }, + Err(e) => { + tracing::error!("Failed to receive from client thread: {e}"); + return Err(e.into()); + } + } + + tracing::info!("Done"); + Ok(()) + } +} + +pub async fn get_app_data_range_from_node( + client: Client, + url: Url, + app_id: Metadata::AppId, + range: Range, +) -> Result)>, Box> +where + Blob: blob::Blob + DeserializeOwned, + Metadata: metadata::Metadata + Serialize, + ::Index: Serialize + DeserializeOwned, + ::AppId: Serialize + DeserializeOwned, +{ + let url = url + .join(paths::DA_GET_RANGE) + .expect("Url should build properly"); + let req = &GetRangeReq:: { app_id, range }; + + Ok(client + .post(url) + .header("Content-Type", "application/json") + .json(&req) + .send() + .await + .unwrap() + .json::)>>() + .await + .unwrap()) +} diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs deleted file mode 100644 index 7d2f8572..00000000 --- a/nomos-cli/src/da/disseminate.rs +++ /dev/null @@ -1,238 +0,0 @@ -// std -use std::{ - path::PathBuf, - sync::{mpsc::Sender, Arc}, - time::Duration, -}; -// crates -use clap::Args; -use kzgrs_backend::{ - common::build_blob_id, - dispersal::{BlobInfo, Metadata}, - encoder::EncodedData as KzgEncodedData, -}; -use nomos_core::{ - da::{DaDispersal, DaEncoder}, - wire, -}; -use nomos_da_network_service::NetworkService; -use nomos_log::Logger; -use overwatch_derive::*; -use overwatch_rs::{ - services::{ - handle::{ServiceHandle, ServiceStateHandle}, - relay::{NoMessage, OutboundRelay, Relay}, - state::*, - ServiceCore, ServiceData, ServiceId, - }, - DynError, -}; -use reqwest::Url; -use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; -// internal -use super::{network::adapters::libp2p::Libp2pExecutorDispersalAdapter, NetworkBackend}; -use crate::api::mempool::send_blob_info; - -#[allow(clippy::too_many_arguments)] -pub async fn disseminate_and_wait( - encoder: &E, - disperal: &D, - data: Box<[u8]>, - metadata: Metadata, - status_updates: Sender, - node_addr: Option<&Url>, - wait_until_disseminated: Duration, - output: Option<&PathBuf>, -) -> Result<(), Box> -where - E: DaEncoder, - D: DaDispersal, - ::Error: std::error::Error + Send + Sync + 'static, - ::Error: std::error::Error + Send + Sync + 'static, -{ - // 1) Building blob - status_updates.send(Status::Encoding)?; - let encoded_data = encoder.encode(&data).map_err(Box::new)?; - let blob_hash = build_blob_id( - &encoded_data.aggregated_column_commitment, - &encoded_data.row_commitments, - ); - - // 2) Send blob to network - status_updates.send(Status::Disseminating)?; - disperal.disperse(encoded_data).await.map_err(Box::new)?; - - // 3) Build blob info. - let blob_info = BlobInfo::new(blob_hash, metadata); - - if let Some(output) = output { - status_updates.send(Status::SavingBlobInfo)?; - std::fs::write(output, wire::serialize(&blob_info)?)?; - } - - // Wait for blobs be replicated in da network. - tokio::time::sleep(wait_until_disseminated).await; - - // 4) Send blob info to the mempool. - if let Some(node) = node_addr { - status_updates.send(Status::SendingBlobInfo)?; - let res = send_blob_info(node, &blob_info).await?; - - if !res.status().is_success() { - tracing::error!("ERROR: {:?}", res); - return Err(format!("Failed to send blob info to node: {}", res.status()).into()); - } - } - - status_updates.send(Status::Done)?; - Ok(()) -} - -pub enum Status { - Encoding, - Disseminating, - WaitingAttestations, - CreatingCert, - SavingBlobInfo, - SendingBlobInfo, - Done, - Err(Box), -} - -impl std::fmt::Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Encoding => write!(f, "Encoding message into blob(s)"), - Self::Disseminating => write!(f, "Sending blob(s) to the network"), - Self::WaitingAttestations => write!(f, "Waiting for attestations"), - Self::CreatingCert => write!(f, "Creating certificate"), - Self::SavingBlobInfo => write!(f, "Saving blob info to file"), - Self::SendingBlobInfo => write!(f, "Sending blob info to node"), - Self::Done => write!(f, ""), - Self::Err(e) => write!(f, "Error: {e}"), - } - } -} - -// To interact with the network service it's easier to just spawn -// an overwatch app -#[derive(Services)] -pub struct DisseminateApp { - network: ServiceHandle>, - send_blob: ServiceHandle, - logger: ServiceHandle, -} - -#[derive(Clone, Debug)] -pub struct Settings { - // This is wrapped in an Arc just to make the struct Clone - pub payload: Arc>>>, - pub timeout: Duration, - pub kzgrs_settings: KzgrsSettings, - pub metadata: Metadata, - pub status_updates: Sender, - pub node_addr: Option, - pub wait_until_disseminated: Duration, - pub output: Option, - pub global_params_path: String, -} - -pub struct DisseminateService { - service_state: ServiceStateHandle, -} - -impl ServiceData for DisseminateService { - const SERVICE_ID: ServiceId = "Disseminate"; - type Settings = Settings; - type State = NoState; - type StateOperator = NoOperator; - type Message = NoMessage; -} - -#[async_trait::async_trait] -impl ServiceCore for DisseminateService { - fn init(service_state: ServiceStateHandle) -> Result { - Ok(Self { service_state }) - } - - async fn run(self) -> Result<(), DynError> { - let Self { service_state } = self; - let Settings { - payload, - timeout, - kzgrs_settings, - metadata, - status_updates, - node_addr, - wait_until_disseminated, - output, - global_params_path, - } = service_state.settings_reader.get_updated_settings(); - - let network_relay: Relay> = - service_state.overwatch_handle.relay(); - let network_relay: OutboundRelay<_> = network_relay - .connect() - .await - .expect("Relay connection with NetworkService should succeed"); - - let global_params = kzgrs_backend::global::global_parameters_from_file(&global_params_path) - .expect("Global parameters should be loaded from file"); - - let params = kzgrs_backend::encoder::DaEncoderParams::new( - kzgrs_settings.num_columns, - kzgrs_settings.with_cache, - global_params, - ); - let da_encoder = kzgrs_backend::encoder::DaEncoder::new(params); - let da_dispersal = Libp2pExecutorDispersalAdapter::new(network_relay); - - while let Some(data) = payload.lock().await.recv().await { - match tokio::time::timeout( - timeout, - disseminate_and_wait( - &da_encoder, - &da_dispersal, - data, - metadata, - status_updates.clone(), - node_addr.as_ref(), - wait_until_disseminated, - output.as_ref(), - ), - ) - .await - { - Err(_) => { - tracing::error!("Timeout reached, check the logs for additional details"); - let _ = status_updates.send(Status::Err("Timeout reached".into())); - } - Ok(Err(e)) => { - tracing::error!( - "Could not disseminate blob, check logs for additional details" - ); - let _ = status_updates.send(Status::Err(e)); - } - _ => {} - } - } - - service_state.overwatch_handle.shutdown().await; - Ok(()) - } -} - -#[derive(Debug, Clone, Args)] -pub struct KzgrsSettings { - pub num_columns: usize, - pub with_cache: bool, -} - -impl Default for KzgrsSettings { - fn default() -> Self { - Self { - num_columns: 4096, - with_cache: true, - } - } -} diff --git a/nomos-cli/src/da/mod.rs b/nomos-cli/src/da/mod.rs deleted file mode 100644 index bb286487..00000000 --- a/nomos-cli/src/da/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub mod disseminate; -pub mod network; -// pub mod retrieve; - -use network::backend::ExecutorBackend; -use subnetworks_assignations::versions::v1::FillFromNodeList; - -pub type NetworkBackend = ExecutorBackend; diff --git a/nomos-cli/src/da/network/adapters/libp2p.rs b/nomos-cli/src/da/network/adapters/libp2p.rs deleted file mode 100644 index 778a1017..00000000 --- a/nomos-cli/src/da/network/adapters/libp2p.rs +++ /dev/null @@ -1,116 +0,0 @@ -// std -use std::collections::HashSet; -// crates -use futures::{future::join_all, StreamExt}; -use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData}; -use nomos_core::da::DaDispersal; -use nomos_da_network_core::SubnetworkId; -use nomos_da_network_service::{DaNetworkMsg, NetworkService}; -use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; -use thiserror::Error; -use tokio::sync::oneshot; -// internal -use crate::da::{ - network::{backend::Command, swarm::DispersalEvent}, - NetworkBackend, -}; - -type Relay = OutboundRelay< as ServiceData>::Message>; - -#[derive(Debug, Error)] -#[error("{0}")] -pub struct DispersalError(String); - -impl From for DispersalError { - fn from(s: String) -> Self { - DispersalError(s) - } -} - -pub struct Libp2pExecutorDispersalAdapter { - network_relay: Relay, -} - -impl Libp2pExecutorDispersalAdapter { - pub fn new(network_relay: Relay) -> Self { - Self { network_relay } - } -} - -#[async_trait::async_trait] -impl DaDispersal for Libp2pExecutorDispersalAdapter { - type EncodedData = KzgEncodedData; - type Error = DispersalError; - - async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> { - let mut tasks = Vec::new(); - - let (sender, receiver) = oneshot::channel(); - self.network_relay - .send(DaNetworkMsg::Subscribe { kind: (), sender }) - .await - .map_err(|(e, _)| e.to_string())?; - let mut event_stream = receiver.await.map_err(|e| e.to_string())?; - let mut expected_acknowledgments = HashSet::new(); - - for (i, column) in encoded_data.extended_data.columns().enumerate() { - let blob = DaBlob { - column: column.clone(), - column_idx: i - .try_into() - .expect("Column index shouldn't overflow the target type"), - column_commitment: encoded_data.column_commitments[i], - aggregated_column_commitment: encoded_data.aggregated_column_commitment, - aggregated_column_proof: encoded_data.aggregated_column_proofs[i], - rows_commitments: encoded_data.row_commitments.clone(), - rows_proofs: encoded_data - .rows_proofs - .iter() - .map(|proofs| proofs.get(i).cloned().unwrap()) - .collect(), - }; - - expected_acknowledgments.insert((blob.id().clone(), i as SubnetworkId)); - - let relay = self.network_relay.clone(); - let command = DaNetworkMsg::Process(Command::Disperse { - blob, - subnetwork_id: i as u32, - }); - - let task = async move { relay.send(command).await.map_err(|(e, _)| e.to_string()) }; - - tasks.push(task); - } - - let results: Vec> = join_all(tasks).await; - - for result in results { - result?; - } - - while !expected_acknowledgments.is_empty() { - let event = event_stream.next().await; - match event { - Some(event) => match event { - DispersalEvent::DispersalSuccess { - blob_id, - subnetwork_id, - } => { - expected_acknowledgments.remove(&(blob_id.to_vec(), subnetwork_id)); - } - DispersalEvent::DispersalError { error } => { - return Err(DispersalError(format!("Received dispersal error: {error}"))); - } - }, - None => { - return Err(DispersalError( - "Event stream ended before receiving all acknowledgments".into(), - )); - } - } - } - - Ok(()) - } -} diff --git a/nomos-cli/src/da/network/adapters/mock.rs b/nomos-cli/src/da/network/adapters/mock.rs deleted file mode 100644 index b5fa131a..00000000 --- a/nomos-cli/src/da/network/adapters/mock.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::fmt; - -use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData}; -use nomos_core::da::DaDispersal; -use nomos_da_network_service::{ - backends::mock::executor::{Command, MockExecutorBackend}, - DaNetworkMsg, NetworkService, -}; -use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; -type Relay = OutboundRelay< as ServiceData>::Message>; - -#[derive(Debug)] -pub struct DispersalError(String); - -impl fmt::Display for DispersalError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::error::Error for DispersalError {} - -impl From for DispersalError { - fn from(s: String) -> Self { - DispersalError(s) - } -} - -pub struct MockExecutorDispersalAdapter { - network_relay: Relay, -} - -impl MockExecutorDispersalAdapter { - pub fn new(network_relay: Relay) -> Self { - Self { network_relay } - } -} - -#[async_trait::async_trait] -impl DaDispersal for MockExecutorDispersalAdapter { - type EncodedData = KzgEncodedData; - type Error = DispersalError; - - async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> { - for (i, column) in encoded_data.extended_data.columns().enumerate() { - let blob = DaBlob { - column: column.clone(), - column_idx: i - .try_into() - .expect("Column index shouldn't overflow the target type"), - column_commitment: encoded_data.column_commitments[i], - aggregated_column_commitment: encoded_data.aggregated_column_commitment, - aggregated_column_proof: encoded_data.aggregated_column_proofs[i], - rows_commitments: encoded_data.row_commitments.clone(), - rows_proofs: encoded_data - .rows_proofs - .iter() - .map(|proofs| proofs.get(i).cloned().unwrap()) - .collect(), - }; - - self.network_relay - .send(DaNetworkMsg::Process(Command::Disperse { - blob, - subnetwork_id: i as u32, - })) - .await - .map_err(|(e, _)| e.to_string())? - } - Ok(()) - } -} diff --git a/nomos-cli/src/da/network/adapters/mod.rs b/nomos-cli/src/da/network/adapters/mod.rs deleted file mode 100644 index 1cc7172c..00000000 --- a/nomos-cli/src/da/network/adapters/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod libp2p; -pub mod mock; diff --git a/nomos-cli/src/da/network/backend.rs b/nomos-cli/src/da/network/backend.rs deleted file mode 100644 index 08466ad0..00000000 --- a/nomos-cli/src/da/network/backend.rs +++ /dev/null @@ -1,189 +0,0 @@ -// std -use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; -use std::marker::PhantomData; -use std::pin::Pin; -// crates -use futures::{Stream, StreamExt}; -use kzgrs_backend::common::blob::DaBlob; -use libp2p::{Multiaddr, PeerId}; -use log::error; -use nomos_da_network_core::SubnetworkId; -use nomos_da_network_service::backends::NetworkBackend; -use nomos_libp2p::{ed25519, secret_key_serde}; -use overwatch_rs::overwatch::handle::OverwatchHandle; -use overwatch_rs::services::state::NoState; -use serde::{Deserialize, Serialize}; -use subnetworks_assignations::MembershipHandler; -use tokio::sync::broadcast; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; -// internal -use super::swarm::{DispersalEvent, ExecutorSwarm}; - -const BROADCAST_CHANNEL_SIZE: usize = 128; - -/// Message that the backend replies to -#[derive(Debug)] -pub enum Command { - /// Disperse a blob to a subnetwork. - Disperse { - subnetwork_id: SubnetworkId, - blob: DaBlob, - }, -} - -/// DA network backend for nomos cli as an executor. -/// Internally uses a libp2p swarm composed of the [`ExecutorBehaviour`] -/// It forwards network messages to the corresponding subscription channels/streams -pub struct ExecutorBackend { - // TODO: this join handles should be cancelable tasks. We should add an stop method for - // the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open - // sub-tasks as well. - #[allow(dead_code)] - task: JoinHandle<()>, - #[allow(dead_code)] - replies_task: JoinHandle<()>, - dispersal_request_sender: UnboundedSender<(SubnetworkId, DaBlob)>, - dispersal_broadcast_receiver: broadcast::Receiver, - _membership: PhantomData, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ExecutorBackendSettings { - // Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random. - #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] - pub node_key: ed25519::SecretKey, - /// Membership of DA network PoV set - pub membership: Membership, - pub node_addrs: HashMap, - pub num_subnets: u16, -} - -impl ExecutorBackend { - /// Send the dispersal request to the underlying dispersal behaviour - async fn handle_dispersal_request(&self, subnetwork_id: SubnetworkId, blob: DaBlob) { - if let Err(SendError((subnetwork_id, blob_id))) = - self.dispersal_request_sender.send((subnetwork_id, blob)) - { - error!( - "Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}" - ); - } - } -} - -#[async_trait::async_trait] -impl NetworkBackend for ExecutorBackend -where - Membership: MembershipHandler - + Clone - + Debug - + Send - + Sync - + 'static, -{ - type Settings = ExecutorBackendSettings; - type State = NoState; - type Message = Command; - type EventKind = (); - type NetworkEvent = DispersalEvent; - - fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { - let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel(); - - let keypair = - libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone())); - let mut executor_swarm = - ExecutorSwarm::new(keypair, config.membership.clone(), dispersal_events_sender); - let dispersal_request_sender = executor_swarm.blobs_sender(); - - let mut connected_peers = HashSet::new(); - - let local_peer_id = *executor_swarm.local_peer_id(); - for subnetwork_id in 0..config.num_subnets { - // Connect to one peer in a subnet. - let mut members = config.membership.members_of(&(subnetwork_id as u32)); - members.remove(&local_peer_id); - let peer_id = *members - .iter() - .next() - .expect("Subnet should have at least one node which is not the nomos_cli"); - - let addr = config - .node_addrs - .get(&peer_id) - .expect("Peer address should be in the list"); - - executor_swarm - .dial(addr.clone()) - .expect("Should schedule the dials"); - - connected_peers.insert(peer_id); - } - - let executor_open_stream_sender = executor_swarm.open_stream_sender(); - - let task = overwatch_handle - .runtime() - .spawn(async move { executor_swarm.run().await }); - - for peer_id in connected_peers.iter() { - executor_open_stream_sender.send(*peer_id).unwrap(); - } - - let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = - broadcast::channel(BROADCAST_CHANNEL_SIZE); - let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver); - - let replies_task = overwatch_handle - .runtime() - .spawn(handle_dispersal_events_stream( - dispersal_events_receiver, - dispersal_broadcast_sender, - )); - - Self { - task, - replies_task, - dispersal_request_sender, - dispersal_broadcast_receiver, - _membership: Default::default(), - } - } - - async fn process(&self, msg: Self::Message) { - match msg { - Command::Disperse { - subnetwork_id, - blob, - } => { - self.handle_dispersal_request(subnetwork_id, blob).await; - } - } - } - - async fn subscribe( - &mut self, - _event: Self::EventKind, - ) -> Pin + Send>> { - Box::pin( - BroadcastStream::new(self.dispersal_broadcast_receiver.resubscribe()) - .filter_map(|event| async { event.ok() }), - ) - } -} - -/// Task that handles forwarding of events to the subscriptions channels/stream -async fn handle_dispersal_events_stream( - mut events_stream: UnboundedReceiverStream, - dispersal_broadcast_sender: broadcast::Sender, -) { - while let Some(dispersal_event) = events_stream.next().await { - if let Err(e) = dispersal_broadcast_sender.send(dispersal_event) { - error!("Error in internal broadcast of dispersal event: {e:?}"); - } - } -} diff --git a/nomos-cli/src/da/network/mod.rs b/nomos-cli/src/da/network/mod.rs deleted file mode 100644 index 93431ea8..00000000 --- a/nomos-cli/src/da/network/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod adapters; -pub mod backend; -mod swarm; diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs deleted file mode 100644 index 2fcea087..00000000 --- a/nomos-cli/src/da/network/swarm.rs +++ /dev/null @@ -1,275 +0,0 @@ -// std -// crates -use kzgrs_backend::common::blob::DaBlob; -use libp2p::futures::StreamExt; -use libp2p::Multiaddr; -use libp2p::{identity::Keypair, swarm::SwarmEvent, PeerId, Swarm}; -use nomos_core::da::BlobId; -use nomos_da_network_core::protocols::dispersal::executor::behaviour::{ - DispersalError, DispersalExecutorEvent, -}; -use nomos_da_network_core::{ - protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour, SubnetworkId, -}; -use nomos_libp2p::DialError; -use subnetworks_assignations::MembershipHandler; -use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error}; -// internal - -#[derive(Debug, Clone)] -pub enum DispersalEvent { - /// A blob successfully arrived its destination - DispersalSuccess { - blob_id: BlobId, - subnetwork_id: SubnetworkId, - }, - /// Something went wrong delivering the blob - DispersalError { error: DispersalError }, -} - -pub struct ExecutorSwarm -where - Membership: MembershipHandler + 'static, -{ - swarm: Swarm>, - dispersal_broadcast_sender: UnboundedSender, -} - -impl ExecutorSwarm -where - Membership: MembershipHandler + Clone + Send, -{ - pub fn new( - key: Keypair, - membership: Membership, - dispersal_broadcast_sender: UnboundedSender, - ) -> Self { - let swarm = Self::build_swarm(key, membership); - Self { - swarm, - dispersal_broadcast_sender, - } - } - - pub fn blobs_sender(&self) -> UnboundedSender<(SubnetworkId, DaBlob)> { - self.swarm.behaviour().blobs_sender() - } - - pub fn open_stream_sender(&self) -> UnboundedSender { - self.swarm.behaviour().open_stream_sender() - } - - fn build_swarm( - _key: Keypair, - _membership: Membership, - ) -> Swarm> { - todo!("CLI will be removed"); - // libp2p::SwarmBuilder::with_existing_identity(key) - // .with_tokio() - // .with_quic() - // .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership)) - // .expect("Validator behaviour should build") - // .with_swarm_config(|cfg| { - // cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) - // }) - // .build() - } - - pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> { - self.swarm.dial(addr)?; - Ok(()) - } - - pub fn local_peer_id(&self) -> &PeerId { - self.swarm.local_peer_id() - } - - pub async fn run(&mut self) { - loop { - tokio::select! { - Some(event) = self.swarm.next() => { - debug!("Executor received an event: {event:?}"); - match event { - SwarmEvent::Behaviour(behaviour_event) => { - self.handle_dispersal_event(behaviour_event).await; - }, - SwarmEvent::ConnectionEstablished{ .. } => {} - SwarmEvent::ConnectionClosed{ .. } => {} - SwarmEvent::IncomingConnection{ .. } => {} - SwarmEvent::IncomingConnectionError{ .. } => {} - SwarmEvent::OutgoingConnectionError{ .. } => {} - SwarmEvent::NewListenAddr{ .. } => {} - SwarmEvent::ExpiredListenAddr{ .. } => {} - SwarmEvent::ListenerClosed{ .. } => {} - SwarmEvent::ListenerError{ .. } => {} - SwarmEvent::Dialing{ .. } => {} - SwarmEvent::NewExternalAddrCandidate{ .. } => {} - SwarmEvent::ExternalAddrConfirmed{ .. } => {} - SwarmEvent::ExternalAddrExpired{ .. } => {} - SwarmEvent::NewExternalAddrOfPeer{ .. } => {} - event => { - debug!("Unsupported validator swarm event: {event:?}"); - } - } - } - } - } - } - - async fn handle_dispersal_event(&mut self, event: DispersalExecutorEvent) { - match event { - DispersalExecutorEvent::DispersalSuccess { - blob_id, - subnetwork_id, - } => { - if let Err(e) = - self.dispersal_broadcast_sender - .send(DispersalEvent::DispersalSuccess { - blob_id, - subnetwork_id, - }) - { - error!("Error in internal broadcast of dispersal success: {e:?}"); - } - } - DispersalExecutorEvent::DispersalError { error } => { - if let Err(e) = self - .dispersal_broadcast_sender - .send(DispersalEvent::DispersalError { error }) - { - error! {"Error in internal broadcast of dispersal error: {e:?}"}; - } - } - } - } -} - -#[cfg(test)] -pub mod test { - use crate::da::network::swarm::ExecutorSwarm; - use crate::test_utils::AllNeighbours; - use futures::StreamExt; - use kzgrs_backend::common::blob::DaBlob; - use kzgrs_backend::common::Column; - use libp2p::identity::Keypair; - use libp2p::PeerId; - use nomos_da_network_core::address_book::AddressBook; - use nomos_da_network_core::behaviour::validator::ValidatorBehaviourEvent; - use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent; - use nomos_da_network_core::swarm::validator::ValidatorSwarm; - use nomos_libp2p::{Multiaddr, SwarmEvent}; - use std::time::Duration; - use tokio::sync::mpsc::unbounded_channel; - use tokio::time; - use tracing::{error, info}; - use tracing_subscriber::fmt::TestWriter; - use tracing_subscriber::EnvFilter; - - #[tokio::test] - #[ignore] - async fn test_dispersal_with_swarms() { - let _ = tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .with_writer(TestWriter::default()) - .try_init(); - let k1 = Keypair::generate_ed25519(); - let k2 = Keypair::generate_ed25519(); - let executor_peer = PeerId::from_public_key(&k1.public()); - let validator_peer = PeerId::from_public_key(&k2.public()); - let neighbours = AllNeighbours { - neighbours: [ - PeerId::from_public_key(&k1.public()), - PeerId::from_public_key(&k2.public()), - ] - .into_iter() - .collect(), - }; - - let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap(); - let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); - let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]); - let (dispersal_events_sender, _) = unbounded_channel(); - - let mut executor = ExecutorSwarm::new(k1, neighbours.clone(), dispersal_events_sender); - let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); - - let msg_count = 10usize; - let validator_task = async move { - let validator_swarm = validator.protocol_swarm_mut(); - validator_swarm.listen_on(addr).unwrap(); - - let mut res = vec![]; - loop { - match validator_swarm.select_next_some().await { - SwarmEvent::Behaviour(ValidatorBehaviourEvent::Dispersal(event)) => { - res.push(event); - } - event => { - info!("Validator event: {event:?}"); - } - } - if res.len() == msg_count { - tokio::time::sleep(Duration::from_secs(1)).await; - break; - } - } - res - }; - let join_validator = tokio::spawn(validator_task); - - executor.dial(addr2).unwrap(); - - let executor_open_stream_sender = executor.open_stream_sender(); - let executor_disperse_blob_sender = executor.blobs_sender(); - - let executor_poll = async move { - let mut res = vec![]; - loop { - tokio::select! { - Some(event) = executor.swarm.next() => { - info!("Executor event: {event:?}"); - if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{blob_id, ..}) = event { - res.push(blob_id); - } - } - - _ = time::sleep(Duration::from_secs(2)) => { - if res.len() < msg_count {error!("Executor timeout reached");} - break; - } - } - } - res - }; - - let executor_task = tokio::spawn(executor_poll); - - executor_open_stream_sender.send(validator_peer).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - - for i in 0..msg_count { - info!("Sending blob {i}..."); - executor_disperse_blob_sender - .send(( - 0, - DaBlob { - column_idx: 0, - column: Column(vec![]), - column_commitment: Default::default(), - aggregated_column_commitment: Default::default(), - aggregated_column_proof: Default::default(), - rows_commitments: vec![], - rows_proofs: vec![], - }, - )) - .unwrap(); - } - - assert_eq!( - executor_task.await.unwrap().len(), - join_validator.await.unwrap().len() - ); - } -} diff --git a/nomos-cli/src/da/retrieve.rs b/nomos-cli/src/da/retrieve.rs deleted file mode 100644 index 6ef50210..00000000 --- a/nomos-cli/src/da/retrieve.rs +++ /dev/null @@ -1,34 +0,0 @@ -use full_replication::Blob; -use nomos_core::header::HeaderId; -use reqwest::Url; -use thiserror::Error; - -use crate::api::storage::get_block_contents; - -#[derive(Error, Debug)] -pub enum Error { - #[error(transparent)] - Reqwest(#[from] reqwest::Error), - #[error("Block not found")] - NotFound, -} - -/// Return the blobs whose certificate has been included in the provided block. -pub async fn get_block_blobs(node: &Url, block: &HeaderId) -> Result, Error> { - let block = get_block_contents(node, block) - .await? - .ok_or(Error::NotFound)?; - - let blobs = block.blobs().map(|cert| cert.blob()).collect::>(); - - if blobs.is_empty() { - return Ok(vec![]); - } - - let n_blobs = blobs.len(); - let resp = get_blobs(node, blobs).await?; - if resp.len() != n_blobs { - tracing::warn!("Only {}/{} blobs returned", resp.len(), n_blobs); - } - Ok(resp) -} diff --git a/nomos-cli/src/lib.rs b/nomos-cli/src/lib.rs index f2c30c50..2b3129b4 100644 --- a/nomos-cli/src/lib.rs +++ b/nomos-cli/src/lib.rs @@ -1,9 +1,4 @@ -pub mod api; pub mod cmds; -pub mod da; - -#[cfg(test)] -pub mod test_utils; use clap::Parser; use cmds::Command; diff --git a/nomos-cli/src/test_utils.rs b/nomos-cli/src/test_utils.rs deleted file mode 100644 index 122bb469..00000000 --- a/nomos-cli/src/test_utils.rs +++ /dev/null @@ -1,29 +0,0 @@ -use libp2p::PeerId; -use std::collections::HashSet; -use subnetworks_assignations::MembershipHandler; - -#[derive(Clone)] -pub struct AllNeighbours { - pub neighbours: HashSet, -} - -impl MembershipHandler for AllNeighbours { - type NetworkId = u32; - type Id = PeerId; - - fn membership(&self, _self_id: &Self::Id) -> HashSet { - [0].into_iter().collect() - } - - fn is_allowed(&self, _id: &Self::Id) -> bool { - true - } - - fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet { - self.neighbours.clone() - } - - fn members(&self) -> HashSet { - self.neighbours.clone() - } -} diff --git a/nomos-da/kzgrs-backend/src/dispersal.rs b/nomos-da/kzgrs-backend/src/dispersal.rs index c415df39..9761b951 100644 --- a/nomos-da/kzgrs-backend/src/dispersal.rs +++ b/nomos-da/kzgrs-backend/src/dispersal.rs @@ -50,6 +50,12 @@ impl blob::metadata::Metadata for BlobInfo { #[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] pub struct Index([u8; 8]); +impl Index { + pub fn to_u64(self) -> u64 { + u64::from_be_bytes(self.0) + } +} + #[derive(Default, Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Metadata { app_id: [u8; 32],