Add chat demo for testnet (#495)

* Add chat demo for testnet

This commit adds a simple demo to showcase the capabilities of the
Nomos architecture. In particular, we want to leverage the DA
features and explore participants roles.
At the same time, we're not ready to commit to any speficic format
or decision regarding common ground yet.
For this reason, we chose to implement the demo at the Execution
Zone (EZ) level.
In contrast to the coordination layer, each execution
zone can decide on its own format, which allows us to experiment
without having to set a standard.

The application of choice for the demo is an (almost) instant
messaging app where the messages are broadcast to the public by
leveraging the full replication data availability protocol.
In this context, the cli app acts as a small EZ disseminating
blobs, promoting blob inclusion and updating its state (i.e. list
of exchanged messages) upon blob inclusion in the chain.



---------

Co-authored-by: danielsanchezq <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
Giacomo Pasini 2024-01-03 15:47:21 +01:00 committed by GitHub
parent 09cd539bf2
commit 6e718e7bba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 537 additions and 15 deletions

View File

@ -21,6 +21,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
nomos-consensus = { path = "../nomos-services/consensus" }
nomos-log = { path = "../nomos-services/log" }
nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" }
nomos-node = { path = "../nodes/nomos-node" }
@ -30,4 +31,8 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
hex = "0.4.3"
once_cell = "1"
once_cell = "1"
crossterm = "0.27"
ratatui = "0.24"
tui-input = "0.8"
ansi-to-tui = "3"

View File

@ -1,4 +1,5 @@
use super::CLIENT;
use consensus_engine::{Block, BlockId};
use nomos_consensus::CarnotInfo;
use reqwest::Url;
@ -11,3 +12,20 @@ pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
.json::<CarnotInfo>()
.await
}
pub async fn get_blocks_info(
node: &Url,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Result<Vec<Block>, reqwest::Error> {
const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks";
let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap());
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
req.send().await?.json().await
}

View File

@ -0,0 +1,283 @@
/// The dumbest possible backend for a chat protocol.
/// Just because running Doom on Nomos was too much work for a demo.
///
///
mod ui;
use crate::{
api::consensus::get_blocks_info,
da::{
disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
},
retrieve::get_block_blobs,
},
};
use clap::Args;
use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use nomos_core::{block::BlockId, da::DaProtocol, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::{
io,
path::PathBuf,
sync::{
self,
mpsc::{Receiver, Sender},
Arc,
},
time::{Duration, Instant},
};
use tokio::sync::{mpsc::UnboundedSender, Mutex};
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
backend::{Backend, CrosstermBackend},
Terminal,
};
use tui_input::{backend::crossterm::EventHandler, Input};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Clone, Debug, Args)]
/// The almost-instant messaging protocol.
pub struct NomosChat {
/// Path to the network config file
#[clap(short, long)]
pub network_config: PathBuf,
/// The data availability protocol to use. Defaults to full replication.
#[clap(flatten)]
pub da_protocol: DaProtocolChoice,
/// The node to connect to to fetch blocks and blobs
#[clap(long)]
pub node: Url,
}
pub struct App {
input: Input,
username: Option<String>,
messages: Vec<ChatMessage>,
message_status: Option<Status>,
message_in_flight: bool,
last_updated: Instant,
payload_sender: UnboundedSender<Box<[u8]>>,
status_updates: Receiver<Status>,
node: Url,
logs: Arc<sync::Mutex<Vec<u8>>>,
scroll_logs: u16,
}
impl NomosChat {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let network = serde_yaml::from_reader::<
_,
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let da_protocol = self.da_protocol.clone();
// setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let node_addr = Some(self.node.clone());
let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel();
let (status_sender, status_updates) = std::sync::mpsc::channel();
let shared_writer = Arc::new(sync::Mutex::new(Vec::new()));
let backend = SharedWriter::from_inner(shared_writer.clone());
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
payload: Arc::new(Mutex::new(payload_receiver)),
timeout: DEFAULT_TIMEOUT,
da_protocol,
status_updates: status_sender,
node_addr,
output: None,
},
logger: LoggerSettings {
backend: LoggerBackend::Writer(backend),
level: tracing::Level::INFO,
..Default::default()
},
},
None,
)
.unwrap()
.wait_finished()
});
let app = App {
input: Input::default(),
username: None,
messages: Vec::new(),
message_status: None,
message_in_flight: false,
last_updated: Instant::now(),
payload_sender,
status_updates,
node: self.node.clone(),
logs: shared_writer,
scroll_logs: 0,
};
run_app(&mut terminal, app);
// restore terminal
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
Ok(())
}
}
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone();
std::thread::spawn(move || check_for_messages(message_tx, node));
loop {
terminal.draw(|f| ui::ui(f, &app)).unwrap();
if let Ok(update) = app.status_updates.try_recv() {
if let Status::Done = update {
app.message_in_flight = false;
app.message_status = None;
}
app.message_status = Some(update);
app.last_updated = Instant::now();
}
if let Ok(messages) = message_rx.try_recv() {
app.messages.extend(messages);
}
// Do not block rendering if there's no user input available
if !event::poll(Duration::from_millis(100)).unwrap() {
continue;
}
if let Event::Key(key) = event::read().unwrap() {
match key.code {
KeyCode::Enter => {
if app.username.is_none() {
app.username = Some(app.input.value().into());
} else {
// Do not allow more than one message in flight at a time for simplicity
if !app.message_in_flight && !app.input.value().is_empty() {
app.message_in_flight = true;
app.payload_sender
.send(
wire::serialize(&ChatMessage {
author: app.username.clone().unwrap(),
message: app.input.value().into(),
})
.unwrap()
.into(),
)
.unwrap();
}
}
app.input.reset();
}
KeyCode::Esc => {
return;
}
KeyCode::Left => {
app.scroll_logs = app.scroll_logs.saturating_sub(1);
}
KeyCode::Right => {
app.scroll_logs = app.scroll_logs.saturating_add(1);
}
_ => {
if !app.message_in_flight {
app.input.handle_event(&Event::Key(key));
}
}
}
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct ChatMessage {
author: String,
message: String,
}
#[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Should ask for the genesis block to be more robust
let mut last_tip = BlockId::zeros();
loop {
if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await {
sender.send(messages).expect("channel closed");
last_tip = new_tip;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn fetch_new_messages(
last_tip: &BlockId,
node: &Url,
) -> Result<(BlockId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
let mut new_messages = Vec::new();
// By only specifying the 'to' parameter we get all the blocks since the last tip
let mut new_blocks = get_blocks_info(node, None, Some(*last_tip))
.await?
.into_iter()
.map(|block| block.id)
.collect::<Vec<_>>();
// The first block is the most recent one.
// Note that the 'to' is inclusive so the above request will always return at least one block
// as long as the block exists (which is the case since it was returned by a previous call)
let new_tip = new_blocks[0];
// We already processed the last block so let's remove it
new_blocks.pop();
// Note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate
let mut da_protocol =
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(
DaSettings {
num_attestations: 1,
voter: [0; 32], // voter is ignored as well
},
);
for block in new_blocks.iter().rev() {
let blobs = get_block_blobs(node, block).await?;
for blob in blobs {
da_protocol.recv_blob(blob);
// Full replication only needs one blob to decode the data, so the unwrap is safe
let bytes = da_protocol.extract().unwrap();
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
new_messages.push(message);
}
}
}
Ok((new_tip, new_messages))
}

View File

@ -0,0 +1,197 @@
use super::{App, ChatMessage};
use ratatui::prelude::*;
use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap};
use std::ops::Deref;
pub fn ui(f: &mut Frame, app: &App) {
if app.username.is_none() {
select_username(f, app)
} else {
chat(f, app)
}
}
fn select_username(f: &mut Frame, app: &App) {
assert!(app.username.is_none());
let block = Block::default()
.title("NomosChat")
.borders(Borders::ALL)
.style(Style::new().white().on_black());
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Min(5),
Constraint::Min(1),
Constraint::Min(5),
Constraint::Min(0),
]
.as_ref(),
)
.split(block.inner(f.size()));
f.render_widget(block, f.size());
let text = vec![
Line::from(vec![
Span::raw("Welcome to "),
Span::styled("NomosChat", Style::new().green().italic()),
"!".into(),
]),
Line::from("The first almost-instant messaging protocol".red()),
];
let welcome = Paragraph::new(text)
.style(Style::new().white().on_black())
.alignment(Alignment::Center)
.wrap(Wrap { trim: true });
f.render_widget(welcome, chunks[0]);
let help_text = Line::from("Press <Enter> to select your username, or <Esc> to quit.");
let help = Paragraph::new(help_text)
.alignment(Alignment::Center)
.style(Style::new().white().on_black());
f.render_widget(help, chunks[1]);
let input = Paragraph::new(app.input.value())
.style(Style::new().white().on_black())
.block(
Block::default()
.borders(Borders::ALL)
.title("Select username")
.border_style(Style::default().fg(Color::Yellow)),
);
f.render_widget(input, centered_rect(chunks[2], 50, 50));
}
fn centered_rect(r: Rect, percent_x: u16, percent_y: u16) -> Rect {
let popup_layout = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Percentage((100 - percent_y) / 2),
Constraint::Percentage(percent_y),
Constraint::Percentage((100 - percent_y) / 2),
]
.as_ref(),
)
.split(r);
Layout::default()
.direction(Direction::Horizontal)
.constraints(
[
Constraint::Percentage((100 - percent_x) / 2),
Constraint::Percentage(percent_x),
Constraint::Percentage((100 - percent_x) / 2),
]
.as_ref(),
)
.split(popup_layout[1])[1]
}
fn chat(f: &mut Frame, app: &App) {
let block = Block::default()
.title("NomosChat")
.borders(Borders::ALL)
.style(Style::new().white().on_black());
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Max(3),
Constraint::Percentage(80),
Constraint::Max(10),
]
.as_ref(),
)
.split(block.inner(f.size()));
f.render_widget(block, f.size());
let messages_rect = centered_rect(chunks[1], 90, 100);
render_messages(f, app, messages_rect);
render_logs(f, app, chunks[2]);
let chunks = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref())
.split(chunks[0]);
render_input(f, app, chunks[0]);
render_status(f, app, chunks[1]);
}
fn render_messages(f: &mut Frame, app: &App, rect: Rect) {
let messages: Vec<ListItem> = app
.messages
.iter()
.map(|ChatMessage { author, message }| {
let content = if author == app.username.as_ref().unwrap() {
static MARGIN: usize = 2;
// pad to make it appear aligned on the right
let pad = " ".repeat(
(rect.width as usize)
.saturating_sub(message.len())
.saturating_sub(MARGIN),
);
Line::from(vec![Span::raw(pad), Span::raw(message)])
} else {
Line::from(vec![
Span::styled(format!("{author}: "), Style::new().fg(Color::Yellow).bold()),
Span::raw(message),
])
.alignment(Alignment::Left)
};
ListItem::new(content)
})
.collect();
let messages =
List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages"));
f.render_widget(messages, rect);
}
fn render_input(f: &mut Frame, app: &App, rect: Rect) {
let style = if !app.message_in_flight {
Style::default().fg(Color::Yellow)
} else {
Style::default().fg(Color::DarkGray)
};
let input = Paragraph::new(app.input.value())
.style(Style::new().white().on_black())
.block(
Block::default()
.borders(Borders::ALL)
.title("Press <Enter> to send message")
.border_style(style),
);
f.render_widget(input, rect);
}
fn render_status(f: &mut Frame, app: &App, rect: Rect) {
let waiting_animation = std::iter::repeat(".")
.take(app.last_updated.elapsed().as_secs() as usize % 4)
.collect::<Vec<_>>()
.join("");
let status = Paragraph::new(
app.message_status
.as_ref()
.map(|s| format!("{}{}", s, waiting_animation))
.unwrap_or_default(),
)
.block(Block::default().borders(Borders::ALL).title("Status:"));
f.render_widget(status, rect);
}
fn render_logs(f: &mut Frame, app: &App, rect: Rect) {
let logs = String::from_utf8(app.logs.lock().unwrap().deref().clone()).unwrap();
f.render_widget(
Paragraph::new(logs)
.wrap(Wrap { trim: true })
.scroll((app.scroll_logs, 0))
.block(
Block::default()
.borders(Borders::ALL)
.title("Logs: (use ←/→ to scroll up/down"),
),
rect,
);
}

View File

@ -2,6 +2,7 @@ use crate::da::disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
};
use clap::Args;
use nomos_log::{LoggerBackend, LoggerSettings};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
use reqwest::Url;
@ -59,6 +60,10 @@ impl Disseminate {
node_addr,
output,
},
logger: LoggerSettings {
backend: LoggerBackend::None,
..Default::default()
},
},
None,
)
@ -67,13 +72,13 @@ impl Disseminate {
});
// drop to signal we're not going to send more blobs
drop(payload_sender);
tracing::info!("{}", rx.recv().unwrap().display());
tracing::info!("{}", rx.recv().unwrap());
while let Ok(update) = rx.recv() {
if let Status::Err(e) = update {
tracing::error!("{e}");
return Err(e);
}
tracing::info!("{}", update.display());
tracing::info!("{}", update);
}
tracing::info!("done");
Ok(())

View File

@ -1,17 +1,21 @@
use clap::Subcommand;
pub mod chat;
pub mod disseminate;
#[derive(Debug, Subcommand)]
pub enum Command {
/// Send a blob to the network and collect attestations to create a DA proof
Disseminate(disseminate::Disseminate),
/// (Almost) Instant messaging protocol on top of the Nomos network
Chat(chat::NomosChat),
}
impl Command {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
match self {
Command::Disseminate(cmd) => cmd.run(),
Command::Chat(cmd) => cmd.run(),
}
}
}

View File

@ -5,6 +5,7 @@ use futures::StreamExt;
use hex::FromHex;
use nomos_core::{da::DaProtocol, wire};
use nomos_da::network::{adapters::libp2p::Libp2pAdapter, NetworkAdapter};
use nomos_log::Logger;
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
@ -91,17 +92,17 @@ pub enum Status {
Err(Box<dyn std::error::Error + Send + Sync>),
}
impl Status {
pub fn display(&self) -> &str {
impl std::fmt::Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Encoding => "Encoding message into blob(s)",
Self::Disseminating => "Sending blob(s) to the network",
Self::WaitingAttestations => "Waiting for attestations",
Self::CreatingCert => "Creating certificate",
Self::SavingCert => "Saving certificate to file",
Self::SendingCert => "Sending certificate to node",
Self::Done => "",
Self::Err(_) => "Error",
Self::Encoding => write!(f, "Encoding message into blob(s)"),
Self::Disseminating => write!(f, "Sending blob(s) to the network"),
Self::WaitingAttestations => write!(f, "Waiting for attestations"),
Self::CreatingCert => write!(f, "Creating certificate"),
Self::SavingCert => write!(f, "Saving certificate to file"),
Self::SendingCert => write!(f, "Sending certificate to node"),
Self::Done => write!(f, ""),
Self::Err(e) => write!(f, "Error: {e}"),
}
}
}
@ -112,6 +113,7 @@ impl Status {
pub struct DisseminateApp {
network: ServiceHandle<NetworkService<Libp2p>>,
send_blob: ServiceHandle<DisseminateService>,
logger: ServiceHandle<Logger>,
}
#[derive(Clone, Debug)]

View File

@ -28,7 +28,7 @@ pub struct Logger {
/// required by contract by Overwatch for a configuration struct
#[derive(Clone)]
pub struct SharedWriter {
inner: Arc<Mutex<Box<dyn Write + Send + Sync>>>,
inner: Arc<Mutex<dyn Write + Send + Sync>>,
}
impl Write for SharedWriter {
@ -44,9 +44,17 @@ impl Write for SharedWriter {
impl SharedWriter {
pub fn new<W: Write + Send + Sync + 'static>(writer: W) -> Self {
Self {
inner: Arc::new(Mutex::new(Box::new(writer))),
inner: Arc::new(Mutex::new(writer)),
}
}
pub fn into_inner(&self) -> Arc<Mutex<dyn Write + Send + Sync>> {
self.inner.clone()
}
pub fn from_inner(inner: Arc<Mutex<dyn Write + Send + Sync>>) -> Self {
Self { inner }
}
}
impl Debug for SharedWriter {