From 6e718e7bba764ce0c1c4945b327df19c10a01f2d Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Wed, 3 Jan 2024 15:47:21 +0100 Subject: [PATCH] Add chat demo for testnet (#495) * Add chat demo for testnet This commit adds a simple demo to showcase the capabilities of the Nomos architecture. In particular, we want to leverage the DA features and explore participants roles. At the same time, we're not ready to commit to any speficic format or decision regarding common ground yet. For this reason, we chose to implement the demo at the Execution Zone (EZ) level. In contrast to the coordination layer, each execution zone can decide on its own format, which allows us to experiment without having to set a standard. The application of choice for the demo is an (almost) instant messaging app where the messages are broadcast to the public by leveraging the full replication data availability protocol. In this context, the cli app acts as a small EZ disseminating blobs, promoting blob inclusion and updating its state (i.e. list of exchanged messages) upon blob inclusion in the chain. --------- Co-authored-by: danielsanchezq --- nomos-cli/Cargo.toml | 7 +- nomos-cli/src/api/consensus.rs | 18 ++ nomos-cli/src/cmds/chat/mod.rs | 283 ++++++++++++++++++++++++++ nomos-cli/src/cmds/chat/ui.rs | 197 ++++++++++++++++++ nomos-cli/src/cmds/disseminate/mod.rs | 9 +- nomos-cli/src/cmds/mod.rs | 4 + nomos-cli/src/da/disseminate.rs | 22 +- nomos-services/log/src/lib.rs | 12 +- 8 files changed, 537 insertions(+), 15 deletions(-) create mode 100644 nomos-cli/src/cmds/chat/mod.rs create mode 100644 nomos-cli/src/cmds/chat/ui.rs diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index a183d8ad..cb1e9fbd 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -21,6 +21,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-consensus = { path = "../nomos-services/consensus" } +nomos-log = { path = "../nomos-services/log" } nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } nomos-node = { path = "../nodes/nomos-node" } @@ -30,4 +31,8 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" hex = "0.4.3" -once_cell = "1" \ No newline at end of file +once_cell = "1" +crossterm = "0.27" +ratatui = "0.24" +tui-input = "0.8" +ansi-to-tui = "3" \ No newline at end of file diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs index d370be74..80bd6a16 100644 --- a/nomos-cli/src/api/consensus.rs +++ b/nomos-cli/src/api/consensus.rs @@ -1,4 +1,5 @@ use super::CLIENT; +use consensus_engine::{Block, BlockId}; use nomos_consensus::CarnotInfo; use reqwest::Url; @@ -11,3 +12,20 @@ pub async fn carnot_info(node: &Url) -> Result { .json::() .await } + +pub async fn get_blocks_info( + node: &Url, + from: Option, + to: Option, +) -> Result, reqwest::Error> { + const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks"; + let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap()); + if let Some(from) = from { + req = req.query(&[("from", from)]); + } + if let Some(to) = to { + req = req.query(&[("to", to)]); + } + + req.send().await?.json().await +} diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs new file mode 100644 index 00000000..b0463b43 --- /dev/null +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -0,0 +1,283 @@ +/// The dumbest possible backend for a chat protocol. +/// Just because running Doom on Nomos was too much work for a demo. +/// +/// +mod ui; + +use crate::{ + api::consensus::get_blocks_info, + da::{ + disseminate::{ + DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, + }, + retrieve::get_block_blobs, + }, +}; +use clap::Args; +use full_replication::{ + AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, +}; +use nomos_core::{block::BlockId, da::DaProtocol, wire}; +use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; +use nomos_network::{backends::libp2p::Libp2p, NetworkService}; +use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use std::{ + io, + path::PathBuf, + sync::{ + self, + mpsc::{Receiver, Sender}, + Arc, + }, + time::{Duration, Instant}, +}; +use tokio::sync::{mpsc::UnboundedSender, Mutex}; + +use crossterm::{ + event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use ratatui::{ + backend::{Backend, CrosstermBackend}, + Terminal, +}; +use tui_input::{backend::crossterm::EventHandler, Input}; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); + +#[derive(Clone, Debug, Args)] +/// The almost-instant messaging protocol. +pub struct NomosChat { + /// Path to the network config file + #[clap(short, long)] + pub network_config: PathBuf, + /// The data availability protocol to use. Defaults to full replication. + #[clap(flatten)] + pub da_protocol: DaProtocolChoice, + /// The node to connect to to fetch blocks and blobs + #[clap(long)] + pub node: Url, +} + +pub struct App { + input: Input, + username: Option, + messages: Vec, + message_status: Option, + message_in_flight: bool, + last_updated: Instant, + payload_sender: UnboundedSender>, + status_updates: Receiver, + node: Url, + logs: Arc>>, + scroll_logs: u16, +} + +impl NomosChat { + pub fn run(&self) -> Result<(), Box> { + let network = serde_yaml::from_reader::< + _, + as ServiceData>::Settings, + >(std::fs::File::open(&self.network_config)?)?; + let da_protocol = self.da_protocol.clone(); + // setup terminal + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + + let node_addr = Some(self.node.clone()); + + let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (status_sender, status_updates) = std::sync::mpsc::channel(); + + let shared_writer = Arc::new(sync::Mutex::new(Vec::new())); + let backend = SharedWriter::from_inner(shared_writer.clone()); + + std::thread::spawn(move || { + OverwatchRunner::::run( + DisseminateAppServiceSettings { + network, + send_blob: Settings { + payload: Arc::new(Mutex::new(payload_receiver)), + timeout: DEFAULT_TIMEOUT, + da_protocol, + status_updates: status_sender, + node_addr, + output: None, + }, + logger: LoggerSettings { + backend: LoggerBackend::Writer(backend), + level: tracing::Level::INFO, + ..Default::default() + }, + }, + None, + ) + .unwrap() + .wait_finished() + }); + + let app = App { + input: Input::default(), + username: None, + messages: Vec::new(), + message_status: None, + message_in_flight: false, + last_updated: Instant::now(), + payload_sender, + status_updates, + node: self.node.clone(), + logs: shared_writer, + scroll_logs: 0, + }; + + run_app(&mut terminal, app); + + // restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture + )?; + terminal.show_cursor()?; + + Ok(()) + } +} + +fn run_app(terminal: &mut Terminal, mut app: App) { + let (message_tx, message_rx) = std::sync::mpsc::channel(); + let node = app.node.clone(); + std::thread::spawn(move || check_for_messages(message_tx, node)); + loop { + terminal.draw(|f| ui::ui(f, &app)).unwrap(); + + if let Ok(update) = app.status_updates.try_recv() { + if let Status::Done = update { + app.message_in_flight = false; + app.message_status = None; + } + app.message_status = Some(update); + app.last_updated = Instant::now(); + } + + if let Ok(messages) = message_rx.try_recv() { + app.messages.extend(messages); + } + + // Do not block rendering if there's no user input available + if !event::poll(Duration::from_millis(100)).unwrap() { + continue; + } + + if let Event::Key(key) = event::read().unwrap() { + match key.code { + KeyCode::Enter => { + if app.username.is_none() { + app.username = Some(app.input.value().into()); + } else { + // Do not allow more than one message in flight at a time for simplicity + if !app.message_in_flight && !app.input.value().is_empty() { + app.message_in_flight = true; + app.payload_sender + .send( + wire::serialize(&ChatMessage { + author: app.username.clone().unwrap(), + message: app.input.value().into(), + }) + .unwrap() + .into(), + ) + .unwrap(); + } + } + app.input.reset(); + } + KeyCode::Esc => { + return; + } + KeyCode::Left => { + app.scroll_logs = app.scroll_logs.saturating_sub(1); + } + KeyCode::Right => { + app.scroll_logs = app.scroll_logs.saturating_add(1); + } + _ => { + if !app.message_in_flight { + app.input.handle_event(&Event::Key(key)); + } + } + } + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct ChatMessage { + author: String, + message: String, +} + +#[tokio::main] +async fn check_for_messages(sender: Sender>, node: Url) { + // Should ask for the genesis block to be more robust + let mut last_tip = BlockId::zeros(); + + loop { + if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { + sender.send(messages).expect("channel closed"); + last_tip = new_tip; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +async fn fetch_new_messages( + last_tip: &BlockId, + node: &Url, +) -> Result<(BlockId, Vec), Box> { + let mut new_messages = Vec::new(); + // By only specifying the 'to' parameter we get all the blocks since the last tip + let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) + .await? + .into_iter() + .map(|block| block.id) + .collect::>(); + + // The first block is the most recent one. + // Note that the 'to' is inclusive so the above request will always return at least one block + // as long as the block exists (which is the case since it was returned by a previous call) + let new_tip = new_blocks[0]; + // We already processed the last block so let's remove it + new_blocks.pop(); + + // Note that number of attestations is ignored here since we only use the da protocol to + // decode the blob data, not to validate the certificate + let mut da_protocol = + > as DaProtocol>::new( + DaSettings { + num_attestations: 1, + voter: [0; 32], // voter is ignored as well + }, + ); + + for block in new_blocks.iter().rev() { + let blobs = get_block_blobs(node, block).await?; + for blob in blobs { + da_protocol.recv_blob(blob); + // Full replication only needs one blob to decode the data, so the unwrap is safe + let bytes = da_protocol.extract().unwrap(); + if let Ok(message) = wire::deserialize::(&bytes) { + new_messages.push(message); + } + } + } + + Ok((new_tip, new_messages)) +} diff --git a/nomos-cli/src/cmds/chat/ui.rs b/nomos-cli/src/cmds/chat/ui.rs new file mode 100644 index 00000000..fd8ee53e --- /dev/null +++ b/nomos-cli/src/cmds/chat/ui.rs @@ -0,0 +1,197 @@ +use super::{App, ChatMessage}; +use ratatui::prelude::*; +use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap}; +use std::ops::Deref; + +pub fn ui(f: &mut Frame, app: &App) { + if app.username.is_none() { + select_username(f, app) + } else { + chat(f, app) + } +} + +fn select_username(f: &mut Frame, app: &App) { + assert!(app.username.is_none()); + let block = Block::default() + .title("NomosChat") + .borders(Borders::ALL) + .style(Style::new().white().on_black()); + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Min(5), + Constraint::Min(1), + Constraint::Min(5), + Constraint::Min(0), + ] + .as_ref(), + ) + .split(block.inner(f.size())); + f.render_widget(block, f.size()); + + let text = vec![ + Line::from(vec![ + Span::raw("Welcome to "), + Span::styled("NomosChat", Style::new().green().italic()), + "!".into(), + ]), + Line::from("The first almost-instant messaging protocol".red()), + ]; + + let welcome = Paragraph::new(text) + .style(Style::new().white().on_black()) + .alignment(Alignment::Center) + .wrap(Wrap { trim: true }); + f.render_widget(welcome, chunks[0]); + + let help_text = Line::from("Press to select your username, or to quit."); + let help = Paragraph::new(help_text) + .alignment(Alignment::Center) + .style(Style::new().white().on_black()); + f.render_widget(help, chunks[1]); + + let input = Paragraph::new(app.input.value()) + .style(Style::new().white().on_black()) + .block( + Block::default() + .borders(Borders::ALL) + .title("Select username") + .border_style(Style::default().fg(Color::Yellow)), + ); + f.render_widget(input, centered_rect(chunks[2], 50, 50)); +} + +fn centered_rect(r: Rect, percent_x: u16, percent_y: u16) -> Rect { + let popup_layout = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Percentage((100 - percent_y) / 2), + Constraint::Percentage(percent_y), + Constraint::Percentage((100 - percent_y) / 2), + ] + .as_ref(), + ) + .split(r); + + Layout::default() + .direction(Direction::Horizontal) + .constraints( + [ + Constraint::Percentage((100 - percent_x) / 2), + Constraint::Percentage(percent_x), + Constraint::Percentage((100 - percent_x) / 2), + ] + .as_ref(), + ) + .split(popup_layout[1])[1] +} + +fn chat(f: &mut Frame, app: &App) { + let block = Block::default() + .title("NomosChat") + .borders(Borders::ALL) + .style(Style::new().white().on_black()); + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints( + [ + Constraint::Max(3), + Constraint::Percentage(80), + Constraint::Max(10), + ] + .as_ref(), + ) + .split(block.inner(f.size())); + f.render_widget(block, f.size()); + + let messages_rect = centered_rect(chunks[1], 90, 100); + render_messages(f, app, messages_rect); + render_logs(f, app, chunks[2]); + + let chunks = Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref()) + .split(chunks[0]); + + render_input(f, app, chunks[0]); + render_status(f, app, chunks[1]); +} + +fn render_messages(f: &mut Frame, app: &App, rect: Rect) { + let messages: Vec = app + .messages + .iter() + .map(|ChatMessage { author, message }| { + let content = if author == app.username.as_ref().unwrap() { + static MARGIN: usize = 2; + // pad to make it appear aligned on the right + let pad = " ".repeat( + (rect.width as usize) + .saturating_sub(message.len()) + .saturating_sub(MARGIN), + ); + Line::from(vec![Span::raw(pad), Span::raw(message)]) + } else { + Line::from(vec![ + Span::styled(format!("{author}: "), Style::new().fg(Color::Yellow).bold()), + Span::raw(message), + ]) + .alignment(Alignment::Left) + }; + ListItem::new(content) + }) + .collect(); + let messages = + List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages")); + f.render_widget(messages, rect); +} + +fn render_input(f: &mut Frame, app: &App, rect: Rect) { + let style = if !app.message_in_flight { + Style::default().fg(Color::Yellow) + } else { + Style::default().fg(Color::DarkGray) + }; + let input = Paragraph::new(app.input.value()) + .style(Style::new().white().on_black()) + .block( + Block::default() + .borders(Borders::ALL) + .title("Press to send message") + .border_style(style), + ); + f.render_widget(input, rect); +} +fn render_status(f: &mut Frame, app: &App, rect: Rect) { + let waiting_animation = std::iter::repeat(".") + .take(app.last_updated.elapsed().as_secs() as usize % 4) + .collect::>() + .join(""); + + let status = Paragraph::new( + app.message_status + .as_ref() + .map(|s| format!("{}{}", s, waiting_animation)) + .unwrap_or_default(), + ) + .block(Block::default().borders(Borders::ALL).title("Status:")); + f.render_widget(status, rect); +} + +fn render_logs(f: &mut Frame, app: &App, rect: Rect) { + let logs = String::from_utf8(app.logs.lock().unwrap().deref().clone()).unwrap(); + f.render_widget( + Paragraph::new(logs) + .wrap(Wrap { trim: true }) + .scroll((app.scroll_logs, 0)) + .block( + Block::default() + .borders(Borders::ALL) + .title("Logs: (use ←/→ to scroll up/down"), + ), + rect, + ); +} diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index e96ffd6a..27f511fe 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -2,6 +2,7 @@ use crate::da::disseminate::{ DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, }; use clap::Args; +use nomos_log::{LoggerBackend, LoggerSettings}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use reqwest::Url; @@ -59,6 +60,10 @@ impl Disseminate { node_addr, output, }, + logger: LoggerSettings { + backend: LoggerBackend::None, + ..Default::default() + }, }, None, ) @@ -67,13 +72,13 @@ impl Disseminate { }); // drop to signal we're not going to send more blobs drop(payload_sender); - tracing::info!("{}", rx.recv().unwrap().display()); + tracing::info!("{}", rx.recv().unwrap()); while let Ok(update) = rx.recv() { if let Status::Err(e) = update { tracing::error!("{e}"); return Err(e); } - tracing::info!("{}", update.display()); + tracing::info!("{}", update); } tracing::info!("done"); Ok(()) diff --git a/nomos-cli/src/cmds/mod.rs b/nomos-cli/src/cmds/mod.rs index 47c08a08..bf2648ac 100644 --- a/nomos-cli/src/cmds/mod.rs +++ b/nomos-cli/src/cmds/mod.rs @@ -1,17 +1,21 @@ use clap::Subcommand; +pub mod chat; pub mod disseminate; #[derive(Debug, Subcommand)] pub enum Command { /// Send a blob to the network and collect attestations to create a DA proof Disseminate(disseminate::Disseminate), + /// (Almost) Instant messaging protocol on top of the Nomos network + Chat(chat::NomosChat), } impl Command { pub fn run(&self) -> Result<(), Box> { match self { Command::Disseminate(cmd) => cmd.run(), + Command::Chat(cmd) => cmd.run(), } } } diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index de8d1fc2..5386496f 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -5,6 +5,7 @@ use futures::StreamExt; use hex::FromHex; use nomos_core::{da::DaProtocol, wire}; use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter}; +use nomos_log::Logger; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_derive::*; use overwatch_rs::{ @@ -91,17 +92,17 @@ pub enum Status { Err(Box), } -impl Status { - pub fn display(&self) -> &str { +impl std::fmt::Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Encoding => "Encoding message into blob(s)", - Self::Disseminating => "Sending blob(s) to the network", - Self::WaitingAttestations => "Waiting for attestations", - Self::CreatingCert => "Creating certificate", - Self::SavingCert => "Saving certificate to file", - Self::SendingCert => "Sending certificate to node", - Self::Done => "", - Self::Err(_) => "Error", + Self::Encoding => write!(f, "Encoding message into blob(s)"), + Self::Disseminating => write!(f, "Sending blob(s) to the network"), + Self::WaitingAttestations => write!(f, "Waiting for attestations"), + Self::CreatingCert => write!(f, "Creating certificate"), + Self::SavingCert => write!(f, "Saving certificate to file"), + Self::SendingCert => write!(f, "Sending certificate to node"), + Self::Done => write!(f, ""), + Self::Err(e) => write!(f, "Error: {e}"), } } } @@ -112,6 +113,7 @@ impl Status { pub struct DisseminateApp { network: ServiceHandle>, send_blob: ServiceHandle, + logger: ServiceHandle, } #[derive(Clone, Debug)] diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index 4d01c819..afa726ce 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -28,7 +28,7 @@ pub struct Logger { /// required by contract by Overwatch for a configuration struct #[derive(Clone)] pub struct SharedWriter { - inner: Arc>>, + inner: Arc>, } impl Write for SharedWriter { @@ -44,9 +44,17 @@ impl Write for SharedWriter { impl SharedWriter { pub fn new(writer: W) -> Self { Self { - inner: Arc::new(Mutex::new(Box::new(writer))), + inner: Arc::new(Mutex::new(writer)), } } + + pub fn into_inner(&self) -> Arc> { + self.inner.clone() + } + + pub fn from_inner(inner: Arc>) -> Self { + Self { inner } + } } impl Debug for SharedWriter {