diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index a183d8ad..cb1e9fbd 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -21,6 +21,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-consensus = { path = "../nomos-services/consensus" } +nomos-log = { path = "../nomos-services/log" } nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } nomos-node = { path = "../nodes/nomos-node" } @@ -30,4 +31,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" hex = "0.4.3" -once_cell = "1" \ No newline at end of file +once_cell = "1" +crossterm = "0.27" +ratatui = "0.24" +tui-input = "0.8" +ansi-to-tui = "3" \ No newline at end of file diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs index d370be74..80bd6a16 100644 --- a/nomos-cli/src/api/consensus.rs +++ b/nomos-cli/src/api/consensus.rs @@ -1,4 +1,5 @@ use super::CLIENT; +use consensus_engine::{Block, BlockId}; use nomos_consensus::CarnotInfo; use reqwest::Url; @@ -11,3 +12,20 @@ pub async fn carnot_info(node: &Url) -> Result { .json::() .await } + +pub async fn get_blocks_info( + node: &Url, + from: Option, + to: Option, +) -> Result, reqwest::Error> { + const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks"; + let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap()); + if let Some(from) = from { + req = req.query(&[("from", from)]); + } + if let Some(to) = to { + req = req.query(&[("to", to)]); + } + + req.send().await?.json().await +} diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs new file mode 100644 index 00000000..b0463b43 --- /dev/null +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -0,0 +1,283 @@ +/// The dumbest possible backend for a chat protocol. +/// Just because running Doom on Nomos was too much work for a demo. +/// +/// +mod ui; + +use crate::{ + api::consensus::get_blocks_info, + da::{ + disseminate::{ + DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, + }, + retrieve::get_block_blobs, + }, +}; +use clap::Args; +use full_replication::{ + AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, +}; +use nomos_core::{block::BlockId, da::DaProtocol, wire}; +use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; +use nomos_network::{backends::libp2p::Libp2p, NetworkService}; +use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use std::{ + io, + path::PathBuf, + sync::{ + self, + mpsc::{Receiver, Sender}, + Arc, + }, + time::{Duration, Instant}, +}; +use tokio::sync::{mpsc::UnboundedSender, Mutex}; + +use crossterm::{ + event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use ratatui::{ + backend::{Backend, CrosstermBackend}, + Terminal, +}; +use tui_input::{backend::crossterm::EventHandler, Input}; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); + +#[derive(Clone, Debug, Args)] +/// The almost-instant messaging protocol. +pub struct NomosChat { + /// Path to the network config file + #[clap(short, long)] + pub network_config: PathBuf, + /// The data availability protocol to use. Defaults to full replication. + #[clap(flatten)] + pub da_protocol: DaProtocolChoice, + /// The node to connect to to fetch blocks and blobs + #[clap(long)] + pub node: Url, +} + +pub struct App { + input: Input, + username: Option, + messages: Vec, + message_status: Option, + message_in_flight: bool, + last_updated: Instant, + payload_sender: UnboundedSender>, + status_updates: Receiver, + node: Url, + logs: Arc>>, + scroll_logs: u16, +} + +impl NomosChat { + pub fn run(&self) -> Result<(), Box> { + let network = serde_yaml::from_reader::< + _, + as ServiceData>::Settings, + >(std::fs::File::open(&self.network_config)?)?; + let da_protocol = self.da_protocol.clone(); + // setup terminal + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + let node_addr = Some(self.node.clone()); + + let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (status_sender, status_updates) = std::sync::mpsc::channel(); + + let shared_writer = Arc::new(sync::Mutex::new(Vec::new())); + let backend = SharedWriter::from_inner(shared_writer.clone()); + + std::thread::spawn(move || { + OverwatchRunner::::run( + DisseminateAppServiceSettings { + network, + send_blob: Settings { + payload: Arc::new(Mutex::new(payload_receiver)), + timeout: DEFAULT_TIMEOUT, + da_protocol, + status_updates: status_sender, + node_addr, + output: None, + }, + logger: LoggerSettings { + backend: LoggerBackend::Writer(backend), + level: tracing::Level::INFO, + ..Default::default() + }, + }, + None, + ) + .unwrap() + .wait_finished() + }); + + let app = App { + input: Input::default(), + username: None, + messages: Vec::new(), + message_status: None, + message_in_flight: false, + last_updated: Instant::now(), + payload_sender, + status_updates, + node: self.node.clone(), + logs: shared_writer, + scroll_logs: 0, + }; + + run_app(&mut terminal, app); + + // restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture + )?; + terminal.show_cursor()?; + + Ok(()) + } +} + +fn run_app(terminal: &mut Terminal, mut app: App) { + let (message_tx, message_rx) = std::sync::mpsc::channel(); + let node = app.node.clone(); + std::thread::spawn(move || check_for_messages(message_tx, node)); + loop { + terminal.draw(|f| ui::ui(f, &app)).unwrap(); + + if let Ok(update) = app.status_updates.try_recv() { + if let Status::Done = update { + app.message_in_flight = false; + app.message_status = None; + } + app.message_status = Some(update); + app.last_updated = Instant::now(); + } + + if let Ok(messages) = message_rx.try_recv() { + app.messages.extend(messages); + } + + // Do not block rendering if there's no user input available + if !event::poll(Duration::from_millis(100)).unwrap() { + continue; + } + + if let Event::Key(key) = event::read().unwrap() { + match key.code { + KeyCode::Enter => { + if app.username.is_none() { + app.username = Some(app.input.value().into()); + } else { + // Do not allow more than one message in flight at a time for simplicity + if !app.message_in_flight && !app.input.value().is_empty() { + app.message_in_flight = true; + app.payload_sender + .send( + wire::serialize(&ChatMessage { + author: app.username.clone().unwrap(), + message: app.input.value().into(), + }) + .unwrap() + .into(), + ) + .unwrap(); + } + } + app.input.reset(); + } + KeyCode::Esc => { + return; + } + KeyCode::Left => { + app.scroll_logs = app.scroll_logs.saturating_sub(1); + } + KeyCode::Right => { + app.scroll_logs = app.scroll_logs.saturating_add(1); + } + _ => { + if !app.message_in_flight { + app.input.handle_event(&Event::Key(key)); + } + } + } + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ChatMessage { + author: String, + message: String, +} + +#[tokio::main] +async fn check_for_messages(sender: Sender>, node: Url) { + // Should ask for the genesis block to be more robust + let mut last_tip = BlockId::zeros(); + + loop { + if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { + sender.send(messages).expect("channel closed"); + last_tip = new_tip; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +async fn fetch_new_messages( + last_tip: &BlockId, + node: &Url, +) -> Result<(BlockId, Vec), Box> { + let mut new_messages = Vec::new(); + // By only specifying the 'to' parameter we get all the blocks since the last tip + let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) + .await? + .into_iter() + .map(|block| block.id) + .collect::>(); + + // The first block is the most recent one. + // Note that the 'to' is inclusive so the above request will always return at least one block + // as long as the block exists (which is the case since it was returned by a previous call) + let new_tip = new_blocks[0]; + // We already processed the last block so let's remove it + new_blocks.pop(); + + // Note that number of attestations is ignored here since we only use the da protocol to + // decode the blob data, not to validate the certificate + let mut da_protocol = + > as DaProtocol>::new( + DaSettings { + num_attestations: 1, + voter: [0; 32], // voter is ignored as well + }, + ); + + for block in new_blocks.iter().rev() { + let blobs = get_block_blobs(node, block).await?; + for blob in blobs { + da_protocol.recv_blob(blob); + // Full replication only needs one blob to decode the data, so the unwrap is safe + let bytes = da_protocol.extract().unwrap(); + if let Ok(message) = wire::deserialize::(&bytes) { + new_messages.push(message); + } + } + } + + Ok((new_tip, new_messages)) +} diff --git a/nomos-cli/src/cmds/chat/ui.rs b/nomos-cli/src/cmds/chat/ui.rs new file mode 100644 index 00000000..fd8ee53e --- /dev/null +++ b/nomos-cli/src/cmds/chat/ui.rs @@ -0,0 +1,197 @@ +use super::{App, ChatMessage}; +use ratatui::prelude::*; +use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap}; +use std::ops::Deref; + +pub fn ui(f: &mut Frame, app: &App) { + if app.username.is_none() { + select_username(f, app) + } else { + chat(f, app) + } +} + +fn select_username(f: &mut Frame, app: &App) { + assert!(app.username.is_none()); + let block = Block::default() + .title("NomosChat") + .borders(Borders::ALL) + .style(Style::new().white().on_black()); + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Min(5), + Constraint::Min(1), + Constraint::Min(5), + Constraint::Min(0), + ] + .as_ref(), + ) + .split(block.inner(f.size())); + f.render_widget(block, f.size()); + + let text = vec![ + Line::from(vec![ + Span::raw("Welcome to "), + Span::styled("NomosChat", Style::new().green().italic()), + "!".into(), + ]), + Line::from("The first almost-instant messaging protocol".red()), + ]; + + let welcome = Paragraph::new(text) + .style(Style::new().white().on_black()) + .alignment(Alignment::Center) + .wrap(Wrap { trim: true }); + f.render_widget(welcome, chunks[0]); + + let help_text = Line::from("Press to select your username, or to quit."); + let help = Paragraph::new(help_text) + .alignment(Alignment::Center) + .style(Style::new().white().on_black()); + f.render_widget(help, chunks[1]); + + let input = Paragraph::new(app.input.value()) + .style(Style::new().white().on_black()) + .block( + Block::default() + .borders(Borders::ALL) + .title("Select username") + .border_style(Style::default().fg(Color::Yellow)), + ); + f.render_widget(input, centered_rect(chunks[2], 50, 50)); +} + +fn centered_rect(r: Rect, percent_x: u16, percent_y: u16) -> Rect { + let popup_layout = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Percentage((100 - percent_y) / 2), + Constraint::Percentage(percent_y), + Constraint::Percentage((100 - percent_y) / 2), + ] + .as_ref(), + ) + .split(r); + + Layout::default() + .direction(Direction::Horizontal) + .constraints( + [ + Constraint::Percentage((100 - percent_x) / 2), + Constraint::Percentage(percent_x), + Constraint::Percentage((100 - percent_x) / 2), + ] + .as_ref(), + ) + .split(popup_layout[1])[1] +} + +fn chat(f: &mut Frame, app: &App) { + let block = Block::default() + .title("NomosChat") + .borders(Borders::ALL) + .style(Style::new().white().on_black()); + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Max(3), + Constraint::Percentage(80), + Constraint::Max(10), + ] + .as_ref(), + ) + .split(block.inner(f.size())); + f.render_widget(block, f.size()); + + let messages_rect = centered_rect(chunks[1], 90, 100); + render_messages(f, app, messages_rect); + render_logs(f, app, chunks[2]); + + let chunks = Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref()) + .split(chunks[0]); + + render_input(f, app, chunks[0]); + render_status(f, app, chunks[1]); +} + +fn render_messages(f: &mut Frame, app: &App, rect: Rect) { + let messages: Vec = app + .messages + .iter() + .map(|ChatMessage { author, message }| { + let content = if author == app.username.as_ref().unwrap() { + static MARGIN: usize = 2; + // pad to make it appear aligned on the right + let pad = " ".repeat( + (rect.width as usize) + .saturating_sub(message.len()) + .saturating_sub(MARGIN), + ); + Line::from(vec![Span::raw(pad), Span::raw(message)]) + } else { + Line::from(vec![ + Span::styled(format!("{author}: "), Style::new().fg(Color::Yellow).bold()), + Span::raw(message), + ]) + .alignment(Alignment::Left) + }; + ListItem::new(content) + }) + .collect(); + let messages = + List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages")); + f.render_widget(messages, rect); +} + +fn render_input(f: &mut Frame, app: &App, rect: Rect) { + let style = if !app.message_in_flight { + Style::default().fg(Color::Yellow) + } else { + Style::default().fg(Color::DarkGray) + }; + let input = Paragraph::new(app.input.value()) + .style(Style::new().white().on_black()) + .block( + Block::default() + .borders(Borders::ALL) + .title("Press to send message") + .border_style(style), + ); + f.render_widget(input, rect); +} +fn render_status(f: &mut Frame, app: &App, rect: Rect) { + let waiting_animation = std::iter::repeat(".") + .take(app.last_updated.elapsed().as_secs() as usize % 4) + .collect::>() + .join(""); + + let status = Paragraph::new( + app.message_status + .as_ref() + .map(|s| format!("{}{}", s, waiting_animation)) + .unwrap_or_default(), + ) + .block(Block::default().borders(Borders::ALL).title("Status:")); + f.render_widget(status, rect); +} + +fn render_logs(f: &mut Frame, app: &App, rect: Rect) { + let logs = String::from_utf8(app.logs.lock().unwrap().deref().clone()).unwrap(); + f.render_widget( + Paragraph::new(logs) + .wrap(Wrap { trim: true }) + .scroll((app.scroll_logs, 0)) + .block( + Block::default() + .borders(Borders::ALL) + .title("Logs: (use ←/→ to scroll up/down"), + ), + rect, + ); +} diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index e96ffd6a..27f511fe 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -2,6 +2,7 @@ use crate::da::disseminate::{ DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, }; use clap::Args; +use nomos_log::{LoggerBackend, LoggerSettings}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use reqwest::Url; @@ -59,6 +60,10 @@ impl Disseminate { node_addr, output, }, + logger: LoggerSettings { + backend: LoggerBackend::None, + ..Default::default() + }, }, None, ) @@ -67,13 +72,13 @@ impl Disseminate { }); // drop to signal we're not going to send more blobs drop(payload_sender); - tracing::info!("{}", rx.recv().unwrap().display()); + tracing::info!("{}", rx.recv().unwrap()); while let Ok(update) = rx.recv() { if let Status::Err(e) = update { tracing::error!("{e}"); return Err(e); } - tracing::info!("{}", update.display()); + tracing::info!("{}", update); } tracing::info!("done"); Ok(()) diff --git a/nomos-cli/src/cmds/mod.rs b/nomos-cli/src/cmds/mod.rs index 47c08a08..bf2648ac 100644 --- a/nomos-cli/src/cmds/mod.rs +++ b/nomos-cli/src/cmds/mod.rs @@ -1,17 +1,21 @@ use clap::Subcommand; +pub mod chat; pub mod disseminate; #[derive(Debug, Subcommand)] pub enum Command { /// Send a blob to the network and collect attestations to create a DA proof Disseminate(disseminate::Disseminate), + /// (Almost) Instant messaging protocol on top of the Nomos network + Chat(chat::NomosChat), } impl Command { pub fn run(&self) -> Result<(), Box> { match self { Command::Disseminate(cmd) => cmd.run(), + Command::Chat(cmd) => cmd.run(), } } } diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index de8d1fc2..5386496f 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -5,6 +5,7 @@ use futures::StreamExt; use hex::FromHex; use nomos_core::{da::DaProtocol, wire}; use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; +use nomos_log::Logger; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_derive::*; use overwatch_rs::{ @@ -91,17 +92,17 @@ pub enum Status { Err(Box), } -impl Status { - pub fn display(&self) -> &str { +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Encoding => "Encoding message into blob(s)", - Self::Disseminating => "Sending blob(s) to the network", - Self::WaitingAttestations => "Waiting for attestations", - Self::CreatingCert => "Creating certificate", - Self::SavingCert => "Saving certificate to file", - Self::SendingCert => "Sending certificate to node", - Self::Done => "", - Self::Err(_) => "Error", + Self::Encoding => write!(f, "Encoding message into blob(s)"), + Self::Disseminating => write!(f, "Sending blob(s) to the network"), + Self::WaitingAttestations => write!(f, "Waiting for attestations"), + Self::CreatingCert => write!(f, "Creating certificate"), + Self::SavingCert => write!(f, "Saving certificate to file"), + Self::SendingCert => write!(f, "Sending certificate to node"), + Self::Done => write!(f, ""), + Self::Err(e) => write!(f, "Error: {e}"), } } } @@ -112,6 +113,7 @@ impl Status { pub struct DisseminateApp { network: ServiceHandle>, send_blob: ServiceHandle, + logger: ServiceHandle, } #[derive(Clone, Debug)] diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index 4d01c819..afa726ce 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -28,7 +28,7 @@ pub struct Logger { /// required by contract by Overwatch for a configuration struct #[derive(Clone)] pub struct SharedWriter { - inner: Arc>>, + inner: Arc>, } impl Write for SharedWriter { @@ -44,9 +44,17 @@ impl Write for SharedWriter { impl SharedWriter { pub fn new(writer: W) -> Self { Self { - inner: Arc::new(Mutex::new(Box::new(writer))), + inner: Arc::new(Mutex::new(writer)), } } + + pub fn into_inner(&self) -> Arc> { + self.inner.clone() + } + + pub fn from_inner(inner: Arc>) -> Self { + Self { inner } + } } impl Debug for SharedWriter {