Add chat demo for testnet

This commit adds a simple demo to showcase the capabilities of the
Nomos architecture. In particular, we want to leverage the DA
features and explore participants roles.
At the same time, we're not ready to commit to any speficic format
or decision regarding common ground yet.
For this reason, we chose to implement the demo at the Execution
Zone (EZ) level.
In contrast to the coordination layer, each execution
zone can decide on its own format, which allows us to experiment
without having to set a standard.

The application of choice for the demo is an (almost) instant
messaging app where the messages are broadcast to the public by
leveraging the full replication data availability protocol.
In this context, the cli app acts as a small EZ disseminating
blobs, promoting blob inclusion and updating its state (i.e. list
of exchanged messages) upon blob inclusion in the chain.
This commit is contained in:
Giacomo Pasini 2023-10-30 13:03:46 +01:00
parent eb69cd34ec
commit 61f06436c8
No known key found for this signature in database
GPG Key ID: FC08489D2D895D4B
4 changed files with 428 additions and 1 deletions

View File

@ -24,10 +24,14 @@ nomos-consensus = { path = "../nomos-services/consensus" }
nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" }
nomos-node = { path = "../nodes/nomos-node" }
nomos-consensus = { path = "../nomos-services/consensus" }
full-replication = { path = "../nomos-da/full-replication" }
reqwest = "0.11"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
hex = "0.4.3"
once_cell = "1"
once_cell = "1"
crossterm = "0.27"
ratatui = "0.24"
tui-input = "0.8"

View File

@ -0,0 +1,260 @@
/// The dumbest possible backend for a chat protocol.
/// Just because running Doom on Nomos was too much work for a demo.
///
///
mod ui;
use crate::da::{
disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
},
retrieve::get_block_blobs,
};
use clap::Args;
use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use nomos_consensus::CarnotInfo;
use nomos_core::{block::BlockId, da::DaProtocol, wire};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::{
collections::HashSet,
io,
path::PathBuf,
sync::{
mpsc::{Receiver, Sender},
Arc,
},
time::{Duration, Instant},
};
use tokio::sync::{mpsc::UnboundedSender, Mutex};
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
backend::{Backend, CrosstermBackend},
Terminal,
};
use tui_input::{backend::crossterm::EventHandler, Input};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
#[derive(Clone, Debug, Args)]
/// The almost-instant messaging protocol.
pub struct NomosChat {
/// Path to the network config file
#[clap(short, long)]
pub network_config: PathBuf,
/// The data availability protocol to use. Defaults to full replication.
#[clap(flatten)]
pub da_protocol: DaProtocolChoice,
/// The node to connect to to fetch blocks and blobs
#[clap(long)]
pub node: Url,
}
pub struct App {
input: Input,
username: Option<String>,
messages: Vec<ChatMessage>,
message_status: Option<Status>,
message_in_flight: bool,
last_updated: Instant,
payload_sender: UnboundedSender<Box<[u8]>>,
status_updates: Receiver<Status>,
node: Url,
}
impl NomosChat {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let network = serde_yaml::from_reader::<
_,
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let da_protocol = self.da_protocol.clone();
// setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let node_addr = Some(self.node.clone());
let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel();
let (status_sender, status_updates) = std::sync::mpsc::channel();
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
payload: Arc::new(Mutex::new(payload_receiver)),
timeout: DEFAULT_TIMEOUT,
da_protocol,
status_updates: status_sender,
node_addr,
output: None,
},
},
None,
)
.unwrap()
.wait_finished()
});
let app = App {
input: Input::default(),
username: None,
messages: Vec::new(),
message_status: None,
message_in_flight: false,
last_updated: Instant::now(),
payload_sender,
status_updates,
node: self.node.clone(),
};
run_app(&mut terminal, app);
// restore terminal
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
Ok(())
}
}
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone();
std::thread::spawn(move || check_for_messages(message_tx, node));
loop {
terminal.draw(|f| ui::ui(f, &app)).unwrap();
if let Ok(update) = app.status_updates.try_recv() {
if let Status::Done = update {
app.message_in_flight = false;
app.message_status = None;
}
app.message_status = Some(update);
app.last_updated = Instant::now();
}
if let Ok(messages) = message_rx.try_recv() {
app.messages.extend(messages);
}
// Do not block rendering if there's no user input available
if !event::poll(Duration::from_millis(100)).unwrap() {
continue;
}
if let Event::Key(key) = event::read().unwrap() {
match key.code {
KeyCode::Enter => {
if app.username.is_none() {
app.username = Some(app.input.value().into());
} else {
// Do not allow more than one message in flight at a time for simplicity
if !app.message_in_flight && !app.input.value().is_empty() {
app.message_in_flight = true;
app.payload_sender
.send(
wire::serialize(&ChatMessage {
author: app.username.clone().unwrap(),
message: app.input.value().into(),
})
.unwrap()
.into(),
)
.unwrap();
}
}
app.input.reset();
}
KeyCode::Esc => {
return;
}
_ => {
if !app.message_in_flight {
app.input.handle_event(&Event::Key(key));
}
}
}
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct ChatMessage {
author: String,
message: String,
}
#[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
let mut old_blocks = HashSet::new();
loop {
if let Ok(messages) = fetch_new_messages(&mut old_blocks, node.clone()).await {
sender.send(messages).expect("channel closed");
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn fetch_new_messages(
old_blocks: &mut HashSet<BlockId>,
node: Url,
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
const NODE_CARNOT_INFO_PATH: &str = "carnot/info";
let mut new_messages = Vec::new();
let info = reqwest::get(node.join(NODE_CARNOT_INFO_PATH).unwrap())
.await?
.json::<CarnotInfo>()
.await?;
let new_blocks = info
.committed_blocks
.into_iter()
.filter(|id| !old_blocks.contains(id) && id != &BlockId::zeros())
.collect::<Vec<_>>();
// note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate
let mut da_protocol =
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(
DaSettings {
num_attestations: 1,
},
);
for block in new_blocks {
let blobs = get_block_blobs(node.clone(), block).await?;
for blob in blobs {
da_protocol.recv_blob(blob);
// full replication only needs one blob to decode the data, so the unwrap is safe
let bytes = da_protocol.extract().unwrap();
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
new_messages.push(message);
}
}
old_blocks.insert(block);
}
Ok(new_messages)
}

View File

@ -0,0 +1,159 @@
use super::{App, ChatMessage};
use ratatui::prelude::*;
use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap};
pub fn ui(f: &mut Frame, app: &App) {
if app.username.is_none() {
select_username(f, app)
} else {
chat(f, app)
}
}
fn select_username(f: &mut Frame, app: &App) {
assert!(app.username.is_none());
let block = Block::default()
.title("NomosChat")
.borders(Borders::ALL)
.style(Style::new().white().on_black());
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Min(5),
Constraint::Min(1),
Constraint::Min(5),
Constraint::Min(0),
]
.as_ref(),
)
.split(block.inner(f.size()));
f.render_widget(block, f.size());
let text = vec![
Line::from(vec![
Span::raw("Welcome to "),
Span::styled("NomosChat", Style::new().green().italic()),
"!".into(),
]),
Line::from("The first almost-instant messaging protocol".red()),
];
let welcome = Paragraph::new(text)
.style(Style::new().white().on_black())
.alignment(Alignment::Center)
.wrap(Wrap { trim: true });
f.render_widget(welcome, chunks[0]);
let help_text = Line::from("Press <Enter> to select your username, or <Esc> to quit.");
let help = Paragraph::new(help_text)
.alignment(Alignment::Center)
.style(Style::new().white().on_black());
f.render_widget(help, chunks[1]);
let input = Paragraph::new(app.input.value())
.style(Style::new().white().on_black())
.block(
Block::default()
.borders(Borders::ALL)
.title("Select username")
.border_style(Style::default().fg(Color::Yellow)),
);
f.render_widget(input, centered_rect(chunks[2], 50, 50));
}
fn centered_rect(r: Rect, percent_x: u16, percent_y: u16) -> Rect {
let popup_layout = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Percentage((100 - percent_y) / 2),
Constraint::Percentage(percent_y),
Constraint::Percentage((100 - percent_y) / 2),
]
.as_ref(),
)
.split(r);
Layout::default()
.direction(Direction::Horizontal)
.constraints(
[
Constraint::Percentage((100 - percent_x) / 2),
Constraint::Percentage(percent_x),
Constraint::Percentage((100 - percent_x) / 2),
]
.as_ref(),
)
.split(popup_layout[1])[1]
}
fn chat(f: &mut Frame, app: &App) {
let block = Block::default()
.title("NomosChat")
.borders(Borders::ALL)
.style(Style::new().white().on_black());
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints([Constraint::Min(5), Constraint::Min(1)].as_ref())
.split(block.inner(f.size()));
f.render_widget(block, f.size());
let messages_rect = centered_rect(chunks[1], 90, 100);
let messages: Vec<ListItem> = app
.messages
.iter()
.map(|ChatMessage { author, message }| {
let content = if author == app.username.as_ref().unwrap() {
// pad to make it appear aligned on the right
let pad = " ".repeat((messages_rect.width as usize).saturating_sub(message.len()));
Line::from(vec![Span::raw(pad), Span::raw(message)])
} else {
Line::from(vec![
Span::styled(format!("{author}: "), Style::new().fg(Color::Yellow).bold()),
Span::raw(message),
])
.alignment(Alignment::Left)
};
ListItem::new(content)
})
.collect();
let messages =
List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages"));
f.render_widget(messages, messages_rect);
let style = if !app.message_in_flight {
Style::default().fg(Color::Yellow)
} else {
Style::default().fg(Color::DarkGray)
};
let input = Paragraph::new(app.input.value())
.style(Style::new().white().on_black())
.block(
Block::default()
.borders(Borders::ALL)
.title("Press <Enter> to send message")
.border_style(style),
);
let chunks = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref())
.split(chunks[0]);
let waiting_animation = std::iter::repeat(".")
.take(app.last_updated.elapsed().as_secs() as usize % 4)
.collect::<Vec<_>>()
.join("");
let status = Paragraph::new(
app.message_status
.as_ref()
.map(|s| format!("{}{}", s.display(), waiting_animation))
.unwrap_or_default(),
)
.block(Block::default().borders(Borders::ALL).title("Status:"));
f.render_widget(input, centered_rect(chunks[0], 80, 50));
f.render_widget(status, centered_rect(chunks[1], 80, 50));
}

View File

@ -1,17 +1,21 @@
use clap::Subcommand;
pub mod chat;
pub mod disseminate;
#[derive(Debug, Subcommand)]
pub enum Command {
/// Send a blob to the network and collect attestations to create a DA proof
Disseminate(disseminate::Disseminate),
/// (Almost) Instant messaging protocol on top of the Nomos network
Chat(chat::NomosChat),
}
impl Command {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
match self {
Command::Disseminate(cmd) => cmd.run(),
Command::Chat(cmd) => cmd.run(),
}
}
}