Cli: Dissemination and retrieval (#851)

* Remove chat and dissemination from nomos-cli

* Dissemination using executor http client in nomos-cli

* Blob retrieval from indexer using executor http client

* Range reqeust type from nomos-node

* Split executor and validator commands
This commit is contained in:
gusto 2024-10-24 16:21:27 +03:00 committed by GitHub
parent ce24a03a23
commit 193ff82980
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 214 additions and 1783 deletions

View File

@ -8,4 +8,4 @@ nomos-core = { path = "../../nomos-core/chain-defs" }
nomos-executor = { path = "../../nodes/nomos-executor" }
reqwest = "0.12"
serde = "1.0"
thiserror = "1.0"
thiserror = "1.0"

View File

@ -1,9 +1,9 @@
// std
// crates
use reqwest::{Client, ClientBuilder, StatusCode, Url};
use serde::Serialize;
// internal
use nomos_executor::api::{handlers::DispersalRequest, paths};
use serde::Serialize;
#[derive(thiserror::Error, Debug)]
pub enum Error {

View File

@ -236,8 +236,8 @@ where
<V as Metadata>::AppId: Serialize + DeserializeOwned,
<V as Metadata>::Index: Serialize + DeserializeOwned,
{
app_id: <V as Metadata>::AppId,
range: Range<<V as Metadata>::Index>,
pub app_id: <V as Metadata>::AppId,
pub range: Range<<V as Metadata>::Index>,
}
#[utoipa::path(

View File

@ -7,37 +7,14 @@ description = "Cli app to interact with Nomos nodes and perform various tasks"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
fraction = "0.13"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
async-trait = "0.1"
clap = { version = "4", features = ["derive"] }
serde_yaml = "0.9"
futures = "0.3"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
log = "0.4.19"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
nomos-da-network-service = { path = "../nomos-services/data-availability/network" }
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
executor-http-client = { path = "../clients/executor-http-client" }
hex = "0.4.3"
kzgrs-backend = { path = "../nomos-da/kzgrs-backend" }
nomos-log = { path = "../nomos-services/log" }
nomos-libp2p = { path = "../nomos-libp2p" }
libp2p = { version = "0.53", features = ["macros", "serde"] }
nomos-core = { path = "../nomos-core/chain-defs" }
nomos-node = { path = "../nodes/nomos-node" }
nomos-da-network-core = { path = "../nomos-da/network/core" }
subnetworks-assignations = { path = "../nomos-da/network/subnetworks-assignations" }
full-replication = { path = "../nomos-da/full-replication" }
reqwest = { version = "0.11", features = ["json"] }
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
hex = "0.4.3"
once_cell = "1"
crossterm = "0.27"
ratatui = "0.24"
tui-input = "0.8"
ansi-to-tui = "3"
rand = "0.8"
tokio = { version = "1", features = ["sync"] }
tracing = "0.1"
tracing-subscriber = "0.3"

View File

@ -1,9 +0,0 @@
backend:
host: 0.0.0.0
port: 3019
log_level: "fatal"
# Node key needs to be unique for every client.
node_key: "0000000000000000000000000000000000000000000000000000000000001444"
discV5BootstrapNodes: []
initial_peers: ["/dns/testnet.nomos.tech/tcp/3000"]
relayTopics: []

View File

@ -1,31 +0,0 @@
use super::CLIENT;
use cryptarchia_consensus::CryptarchiaInfo;
use nomos_core::header::HeaderId;
use reqwest::Url;
pub async fn cryptarchia_info(node: &Url) -> Result<CryptarchiaInfo, reqwest::Error> {
const NODE_CRYPTARCHIA_INFO_PATH: &str = "cryptarchia/info";
CLIENT
.get(node.join(NODE_CRYPTARCHIA_INFO_PATH).unwrap())
.send()
.await?
.json::<CryptarchiaInfo>()
.await
}
pub async fn get_headers_info(
node: &Url,
from: Option<HeaderId>,
to: Option<HeaderId>,
) -> Result<Vec<HeaderId>, reqwest::Error> {
const NODE_CRYPTARCHIA_HEADERS_PATH: &str = "cryptarchia/headers";
let mut req = CLIENT.get(node.join(NODE_CRYPTARCHIA_HEADERS_PATH).unwrap());
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
req.send().await?.json().await
}

View File

@ -1 +0,0 @@

View File

@ -1,15 +0,0 @@
use super::CLIENT;
use reqwest::{Error, Response, Url};
use serde::Serialize;
pub async fn send_blob_info<I>(node: &Url, info: &I) -> Result<Response, Error>
where
I: Serialize,
{
const NODE_CERT_PATH: &str = "mempool/add/blobinfo";
CLIENT
.post(node.join(NODE_CERT_PATH).unwrap())
.json(info)
.send()
.await
}

View File

@ -1,9 +0,0 @@
pub mod consensus;
pub mod da;
pub mod mempool;
pub mod storage;
use once_cell::sync::Lazy;
use reqwest::Client;
static CLIENT: Lazy<Client> = Lazy::new(Client::new);

View File

@ -1,20 +0,0 @@
use super::CLIENT;
use full_replication::BlobInfo;
use nomos_core::block::Block;
use nomos_core::header::HeaderId;
use nomos_node::Tx;
use reqwest::Url;
pub async fn get_block_contents(
node: &Url,
block: &HeaderId,
) -> Result<Option<Block<Tx, BlobInfo>>, reqwest::Error> {
const BLOCK_PATH: &str = "storage/block";
CLIENT
.post(node.join(BLOCK_PATH).unwrap())
.json(block)
.send()
.await?
.json()
.await
}

View File

@ -1,347 +0,0 @@
/// The dumbest possible backend for a chat protocol.
/// Just because running Doom on Nomos was too much work for a demo.
///
///
mod ui;
use crate::{
api::consensus::get_headers_info,
//da::{
// disseminate::{
// DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
// },
// retrieve::get_block_blobs,
//},
};
use clap::Args;
use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use futures::{stream, StreamExt};
use nomos_core::{header::HeaderId, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::{
io,
path::PathBuf,
sync::{
self,
mpsc::{Receiver, Sender},
Arc,
},
time::{Duration, Instant},
};
use tokio::sync::{mpsc::UnboundedSender, Mutex};
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use ratatui::{
backend::{Backend, CrosstermBackend},
Terminal,
};
use tui_input::{backend::crossterm::EventHandler, Input};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
// Limit the number of maximum in-flight requests
const MAX_BUFFERED_REQUESTS: usize = 20;
#[derive(Clone, Debug, Args)]
/// The almost-instant messaging protocol.
pub struct NomosChat {
/// Path to the network config file
#[clap(short, long)]
pub network_config: PathBuf,
/// The data availability protocol to use. Defaults to full replication.
#[clap(flatten)]
pub da_protocol: DaProtocolChoice,
/// The node to connect to to fetch blocks and blobs
#[clap(long)]
pub node: Url,
/// Author for non interactive message formation
#[clap(long, requires("message"))]
pub author: Option<String>,
/// Message for non interactive message formation
#[clap(long, requires("author"))]
pub message: Option<String>,
}
pub struct App {
input: Input,
username: Option<String>,
messages: Vec<ChatMessage>,
message_status: Option<Status>,
message_in_flight: bool,
last_updated: Instant,
payload_sender: UnboundedSender<Box<[u8]>>,
status_updates: Receiver<Status>,
node: Url,
logs: Arc<sync::Mutex<Vec<u8>>>,
scroll_logs: u16,
}
impl NomosChat {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let network = serde_yaml::from_reader::<
_,
<NetworkService<NetworkBackend> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let da_protocol = self.da_protocol.clone();
let node_addr = Some(self.node.clone());
let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel();
let (status_sender, status_updates) = std::sync::mpsc::channel();
let shared_writer = Arc::new(sync::Mutex::new(Vec::new()));
let backend = SharedWriter::from_inner(shared_writer.clone());
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
payload: Arc::new(Mutex::new(payload_receiver)),
timeout: DEFAULT_TIMEOUT,
da_protocol,
status_updates: status_sender,
node_addr,
output: None,
},
logger: LoggerSettings {
backend: LoggerBackend::Writer(backend),
level: tracing::Level::INFO,
..Default::default()
},
},
None,
)
.unwrap()
.wait_finished()
});
if let Some(author) = self.author.as_ref() {
let message = self
.message
.as_ref()
.expect("Should be available if author is set");
return run_once(author, message, payload_sender);
}
// setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let app = App {
input: Input::default(),
username: None,
messages: Vec::new(),
message_status: None,
message_in_flight: false,
last_updated: Instant::now(),
payload_sender,
status_updates,
node: self.node.clone(),
logs: shared_writer,
scroll_logs: 0,
};
run_app(&mut terminal, app);
// restore terminal
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
Ok(())
}
}
fn run_once(
author: &str,
message: &str,
payload_sender: UnboundedSender<Box<[u8]>>,
) -> Result<(), Box<dyn std::error::Error>> {
payload_sender.send(
wire::serialize(&ChatMessage {
author: author.to_string(),
message: message.to_string(),
_nonce: rand::random(),
})
.unwrap()
.into(),
)?;
Ok(())
}
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone();
std::thread::spawn(move || check_for_messages(message_tx, node));
loop {
terminal.draw(|f| ui::ui(f, &app)).unwrap();
if let Ok(update) = app.status_updates.try_recv() {
if let Status::Done | Status::Err(_) = update {
app.message_in_flight = false;
}
app.message_status = Some(update);
app.last_updated = Instant::now();
}
if let Ok(messages) = message_rx.try_recv() {
app.messages.extend(messages);
}
// Do not block rendering if there's no user input available
if !event::poll(Duration::from_millis(100)).unwrap() {
continue;
}
if let Event::Key(key) = event::read().unwrap() {
match key.code {
KeyCode::Enter => {
if app.username.is_none() {
app.username = Some(app.input.value().into());
} else {
// Do not allow more than one message in flight at a time for simplicity
if !app.message_in_flight && !app.input.value().is_empty() {
app.message_in_flight = true;
app.payload_sender
.send(
wire::serialize(&ChatMessage {
author: app.username.clone().unwrap(),
message: app.input.value().into(),
_nonce: rand::random(),
})
.unwrap()
.into(),
)
.unwrap();
}
}
app.input.reset();
}
KeyCode::Esc => {
return;
}
KeyCode::Left => {
app.scroll_logs = app.scroll_logs.saturating_sub(1);
}
KeyCode::Right => {
app.scroll_logs = app.scroll_logs.saturating_add(1);
}
_ => {
if !app.message_in_flight {
app.input.handle_event(&Event::Key(key));
}
}
}
}
}
}
#[derive(Serialize, Deserialize, Debug)]
struct ChatMessage {
author: String,
message: String,
// Since DA will rightfully ignore duplicated messages, we need to add a nonce to make sure
// every message is unique. This is randomly generated for simplicity.
_nonce: u64,
}
#[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Should ask for the genesis block to be more robust
let mut last_tip = [0; 32].into();
loop {
if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await {
sender.send(messages).expect("channel closed");
last_tip = new_tip;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
// Process a single block's blobs and return chat messages
async fn process_block_blobs(
node: Url,
block_id: &HeaderId,
da_settings: DaSettings,
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let blobs = get_block_blobs(&node, block_id).await?;
// Note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate
let mut da_protocol =
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(da_settings);
let mut messages = Vec::new();
for blob in blobs {
da_protocol.recv_blob(blob);
let bytes = da_protocol.extract().unwrap();
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
messages.push(message);
}
}
Ok(messages)
}
// Fetch new messages since the last tip
async fn fetch_new_messages(
last_tip: &HeaderId,
node: &Url,
) -> Result<(HeaderId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
// By only specifying the 'to' parameter we get all the blocks since the last tip
let mut new_blocks = get_headers_info(node, None, Some(*last_tip))
.await?
.into_iter()
.collect::<Vec<_>>();
// The first block is the most recent one.
// Note that the 'to' is inclusive so the above request will always return at least one block
// as long as the block exists (which is the case since it was returned by a previous call)
let new_tip = new_blocks[0];
// We already processed the last block so let's remove it
new_blocks.pop();
let da_settings = DaSettings {
num_attestations: 1,
voter: [0; 32],
};
let block_stream = stream::iter(new_blocks.iter().rev());
let results: Vec<_> = block_stream
.map(|block| {
let node = node.clone();
let da_settings = da_settings.clone();
process_block_blobs(node, block, da_settings)
})
.buffered(MAX_BUFFERED_REQUESTS)
.collect::<Vec<_>>()
.await;
let mut new_messages = Vec::new();
for result in results {
new_messages.extend(result?);
}
Ok((new_tip, new_messages))
}

View File

@ -1,203 +0,0 @@
use super::{App, ChatMessage};
use ratatui::prelude::*;
use ratatui::widgets::{Block, Borders, List, ListItem, Paragraph, Wrap};
use std::ops::Deref;
pub fn ui(f: &mut Frame, app: &App) {
if app.username.is_none() {
select_username(f, app)
} else {
chat(f, app)
}
}
fn select_username(f: &mut Frame, app: &App) {
assert!(app.username.is_none());
let block = Block::default()
.title("NomosChat")
.borders(Borders::ALL)
.style(Style::new().white().on_black());
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Min(5),
Constraint::Min(1),
Constraint::Min(5),
Constraint::Min(0),
]
.as_ref(),
)
.split(block.inner(f.size()));
f.render_widget(block, f.size());
let text = vec![
Line::from(vec![
Span::raw("Welcome to "),
Span::styled("NomosChat", Style::new().green().italic()),
"!".into(),
]),
Line::from("The first almost-instant messaging protocol".red()),
];
let welcome = Paragraph::new(text)
.style(Style::new().white().on_black())
.alignment(Alignment::Center)
.wrap(Wrap { trim: true });
f.render_widget(welcome, chunks[0]);
let help_text = Line::from("Press <Enter> to select your username, or <Esc> to quit.");
let help = Paragraph::new(help_text)
.alignment(Alignment::Center)
.style(Style::new().white().on_black());
f.render_widget(help, chunks[1]);
let input = Paragraph::new(app.input.value())
.style(Style::new().white().on_black())
.block(
Block::default()
.borders(Borders::ALL)
.title("Select username")
.border_style(Style::default().fg(Color::Yellow)),
);
f.render_widget(input, centered_rect(chunks[2], 50, 50));
}
fn centered_rect(r: Rect, percent_x: u16, percent_y: u16) -> Rect {
let popup_layout = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Percentage((100 - percent_y) / 2),
Constraint::Percentage(percent_y),
Constraint::Percentage((100 - percent_y) / 2),
]
.as_ref(),
)
.split(r);
Layout::default()
.direction(Direction::Horizontal)
.constraints(
[
Constraint::Percentage((100 - percent_x) / 2),
Constraint::Percentage(percent_x),
Constraint::Percentage((100 - percent_x) / 2),
]
.as_ref(),
)
.split(popup_layout[1])[1]
}
fn chat(f: &mut Frame, app: &App) {
let block = Block::default()
.title("NomosChat")
.borders(Borders::ALL)
.style(Style::new().white().on_black());
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Max(3),
Constraint::Percentage(80),
Constraint::Max(10),
]
.as_ref(),
)
.split(block.inner(f.size()));
f.render_widget(block, f.size());
let messages_rect = centered_rect(chunks[1], 90, 100);
render_messages(f, app, messages_rect);
render_logs(f, app, chunks[2]);
let chunks = Layout::default()
.direction(Direction::Horizontal)
.constraints([Constraint::Percentage(50), Constraint::Percentage(50)].as_ref())
.split(chunks[0]);
render_input(f, app, chunks[0]);
render_status(f, app, chunks[1]);
}
fn render_messages(f: &mut Frame, app: &App, rect: Rect) {
let messages: Vec<ListItem> = app
.messages
.iter()
.map(
|ChatMessage {
author,
message,
_nonce,
}| {
let content = if author == app.username.as_ref().unwrap() {
static MARGIN: usize = 2;
// pad to make it appear aligned on the right
let pad = " ".repeat(
(rect.width as usize)
.saturating_sub(message.len())
.saturating_sub(MARGIN),
);
Line::from(vec![Span::raw(pad), Span::raw(message)])
} else {
Line::from(vec![
Span::styled(format!("{author}: "), Style::new().fg(Color::Yellow).bold()),
Span::raw(message),
])
.alignment(Alignment::Left)
};
ListItem::new(content)
},
)
.collect();
let messages =
List::new(messages).block(Block::default().borders(Borders::ALL).title("Messages"));
f.render_widget(messages, rect);
}
fn render_input(f: &mut Frame, app: &App, rect: Rect) {
let style = if !app.message_in_flight {
Style::default().fg(Color::Yellow)
} else {
Style::default().fg(Color::DarkGray)
};
let input = Paragraph::new(app.input.value())
.style(Style::new().white().on_black())
.block(
Block::default()
.borders(Borders::ALL)
.title("Press <Enter> to send message")
.border_style(style),
);
f.render_widget(input, rect);
}
fn render_status(f: &mut Frame, app: &App, rect: Rect) {
let waiting_animation = std::iter::repeat(".")
.take(app.last_updated.elapsed().as_secs() as usize % 4)
.collect::<Vec<_>>()
.join("");
let status = Paragraph::new(
app.message_status
.as_ref()
.map(|s| format!("{}{}", s, waiting_animation))
.unwrap_or_default(),
)
.block(Block::default().borders(Borders::ALL).title("Status:"));
f.render_widget(status, rect);
}
fn render_logs(f: &mut Frame, app: &App, rect: Rect) {
let logs = String::from_utf8(app.logs.lock().unwrap().deref().clone()).unwrap();
f.render_widget(
Paragraph::new(logs)
.wrap(Wrap { trim: true })
.scroll((app.scroll_logs, 0))
.block(
Block::default()
.borders(Borders::ALL)
.title("Logs: (use ←/→ to scroll up/down"),
),
rect,
);
}

View File

@ -1,135 +0,0 @@
// std
use std::{path::PathBuf, sync::Arc, time::Duration};
// crates
use clap::Args;
use kzgrs_backend::dispersal::Metadata;
use nomos_da_network_service::NetworkService;
use nomos_log::{LoggerBackend, LoggerSettings};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
use reqwest::Url;
use tokio::sync::Mutex;
// internal
use crate::da::{
disseminate::{DisseminateApp, DisseminateAppServiceSettings, KzgrsSettings, Settings, Status},
NetworkBackend,
};
#[derive(Args, Debug, Default)]
pub struct Disseminate {
// TODO: accept bytes
#[clap(short, long, required_unless_present("file"))]
pub data: Option<String>,
/// Path to the network config file
#[clap(short, long)]
pub network_config: PathBuf,
/// Timeout in seconds. Defaults to 120 seconds.
#[clap(short, long, default_value = "120")]
pub timeout: u64,
/// Address of the node to send the certificate to
/// for block inclusion, if present.
#[clap(long)]
pub node_addr: Option<Url>,
// Application ID for dispersed data.
#[clap(long)]
pub app_id: String,
// Index for the Blob associated with Application ID.
#[clap(long)]
pub index: u64,
// Use Kzg RS cache.
#[clap(long)]
pub with_cache: bool,
// Number of columns to use for encoding.
#[clap(long, default_value = "4096")]
pub columns: usize,
// Duration in seconds to wait before publishing blob info.
#[clap(short, long, default_value = "5")]
pub wait_until_disseminated: u64,
/// File to write the certificate to, if present.
#[clap(long)]
pub output: Option<PathBuf>,
/// File to disseminate
#[clap(short, long)]
pub file: Option<PathBuf>,
// Path to the KzgRs global parameters.
#[clap(long)]
pub global_params_path: String,
}
impl Disseminate {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new())
.expect("setting tracing default failed");
let network = serde_yaml::from_reader::<
_,
<NetworkService<NetworkBackend> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let (status_updates, rx) = std::sync::mpsc::channel();
let mut bytes: Vec<u8> = if let Some(data) = &self.data {
data.clone().into_bytes()
} else {
let file_path = self.file.as_ref().unwrap();
std::fs::read(file_path)?
};
let remainder = bytes.len() % 31;
if remainder != 0 {
bytes.resize(bytes.len() + (31 - remainder), 0);
}
let app_id: [u8; 32] = hex::decode(&self.app_id)?
.try_into()
.map_err(|_| "Invalid app_id")?;
let metadata = Metadata::new(app_id, self.index.into());
let timeout = Duration::from_secs(self.timeout);
let node_addr = self.node_addr.clone();
let output = self.output.clone();
let num_columns = self.columns;
let with_cache = self.with_cache;
let wait_until_disseminated = Duration::from_secs(self.wait_until_disseminated);
let global_params_path = self.global_params_path.clone();
let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel();
payload_sender.send(bytes.into_boxed_slice()).unwrap();
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
payload: Arc::new(Mutex::new(payload_rx)),
timeout,
kzgrs_settings: KzgrsSettings {
num_columns,
with_cache,
},
metadata,
status_updates,
node_addr,
wait_until_disseminated,
output,
global_params_path,
},
logger: LoggerSettings {
backend: LoggerBackend::None,
..Default::default()
},
},
None,
)
.unwrap()
.wait_finished();
});
// drop to signal we're not going to send more blobs
drop(payload_sender);
tracing::info!("{}", rx.recv().unwrap());
while let Ok(update) = rx.recv() {
if let Status::Err(e) = update {
tracing::error!("{e}");
return Err(e);
}
tracing::info!("{}", update);
}
tracing::info!("done");
Ok(())
}
}

View File

@ -0,0 +1,82 @@
// std
use std::path::PathBuf;
// crates
use clap::Args;
use reqwest::Url;
// internal
use executor_http_client::ExecutorHttpClient;
use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams};
#[derive(Args, Debug)]
pub struct Disseminate {
/// Text to disseminate.
#[clap(short, long, required_unless_present("file"))]
pub data: Option<String>,
/// File to disseminate.
#[clap(short, long)]
pub file: Option<PathBuf>,
/// Application ID for dispersed data.
#[clap(long)]
pub app_id: String,
/// Index for the Blob associated with Application ID.
#[clap(long)]
pub index: u64,
/// Executor address which is responsible for dissemination.
#[clap(long)]
pub addr: Url,
}
impl Disseminate {
pub fn run(self) -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new())
.expect("setting tracing default failed");
let client = ExecutorHttpClient::new(reqwest::Client::new(), self.addr.clone());
let mut bytes: Vec<u8> = if let Some(data) = &self.data {
data.clone().into_bytes()
} else {
let file_path = self.file.as_ref().unwrap();
std::fs::read(file_path)?
};
let remainder = bytes.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE;
if remainder != 0 {
bytes.resize(
bytes.len() + (DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder),
0,
);
}
let app_id: [u8; 32] = hex::decode(&self.app_id)?
.try_into()
.map_err(|_| "Invalid app_id")?;
let metadata = Metadata::new(app_id, self.index.into());
let (res_sender, res_receiver) = std::sync::mpsc::channel();
tokio::spawn(async move {
let res = client
.publish_blob(bytes, metadata)
.await
.map_err(|err| format!("Failed to publish blob: {:?}", err));
res_sender.send(res).unwrap();
});
match res_receiver.recv() {
Ok(update) => match update {
Ok(_) => tracing::info!("Data successfully disseminated."),
Err(e) => {
tracing::error!("Error disseminating data: {e}");
return Err(e.into());
}
},
Err(e) => {
tracing::error!("Failed to receive from client thread: {e}");
return Err(e.into());
}
}
tracing::info!("Done");
Ok(())
}
}

View File

@ -1,21 +1,23 @@
use clap::Subcommand;
pub mod executor;
pub mod validator;
// pub mod chat;
pub mod disseminate;
// std
// crates
use clap::Subcommand;
// internal
#[derive(Debug, Subcommand)]
pub enum Command {
/// Send a blob to the network and collect attestations to create a DA proof
Disseminate(disseminate::Disseminate),
// /// (Almost) Instant messaging protocol on top of the Nomos network
// Chat(chat::NomosChat),
/// Send data to the executor for encoding and dispersal.
Disseminate(executor::Disseminate),
Retrieve(validator::Retrieve),
}
impl Command {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
pub fn run(self) -> Result<(), Box<dyn std::error::Error>> {
match self {
Command::Disseminate(cmd) => cmd.run(),
// Command::Chat(cmd) => cmd.run(),
Command::Retrieve(cmd) => cmd.run(),
}?;
Ok(())
}

View File

@ -0,0 +1,105 @@
// std
use std::{error::Error, ops::Range};
// crates
use clap::Args;
use reqwest::{Client, Url};
use serde::{de::DeserializeOwned, Serialize};
// internal
use kzgrs_backend::{common::blob::DaBlob, dispersal::Index};
use nomos_core::da::blob::{self, metadata};
use nomos_node::api::{handlers::GetRangeReq, paths};
#[derive(Args, Debug)]
pub struct Retrieve {
/// Application ID of data in Indexer.
#[clap(long)]
pub app_id: String,
/// Retrieve from this Index associated with Application ID.
#[clap(long)]
pub from: u64,
/// Retrieve to this Index associated with Application ID.
#[clap(long)]
pub to: u64,
/// Node address to retrieve appid blobs.
#[clap(long)]
pub addr: Url,
}
impl Retrieve {
pub fn run(self) -> Result<(), Box<dyn std::error::Error>> {
tracing::subscriber::set_global_default(tracing_subscriber::FmtSubscriber::new())
.expect("setting tracing default failed");
let app_id: [u8; 32] = hex::decode(&self.app_id)?
.try_into()
.map_err(|_| "Invalid app_id")?;
let from: Index = self.from.into();
let to: Index = self.to.into();
let (res_sender, res_receiver) = std::sync::mpsc::channel();
tokio::spawn(async move {
let res = get_app_data_range_from_node::<DaBlob, kzgrs_backend::dispersal::Metadata>(
reqwest::Client::new(),
self.addr.clone(),
app_id,
from..to,
)
.await
.map_err(|err| format!("Failed to retrieve data form appid {app_id:?}: {err:?}",));
res_sender.send(res).unwrap();
});
match res_receiver.recv() {
Ok(update) => match update {
Ok(app_blobs) => {
for (index, blobs) in app_blobs.iter() {
tracing::info!("Index {:?} has {:} blobs", (index), blobs.len());
for blob in blobs.iter() {
tracing::info!("Index {:?}; Blob: {blob:?}", index.to_u64());
}
}
}
Err(e) => {
tracing::error!("Error receiving data: {e}");
return Err(e.into());
}
},
Err(e) => {
tracing::error!("Failed to receive from client thread: {e}");
return Err(e.into());
}
}
tracing::info!("Done");
Ok(())
}
}
pub async fn get_app_data_range_from_node<Blob, Metadata>(
client: Client,
url: Url,
app_id: Metadata::AppId,
range: Range<Metadata::Index>,
) -> Result<Vec<(Metadata::Index, Vec<Blob>)>, Box<dyn Error>>
where
Blob: blob::Blob + DeserializeOwned,
Metadata: metadata::Metadata + Serialize,
<Metadata as metadata::Metadata>::Index: Serialize + DeserializeOwned,
<Metadata as metadata::Metadata>::AppId: Serialize + DeserializeOwned,
{
let url = url
.join(paths::DA_GET_RANGE)
.expect("Url should build properly");
let req = &GetRangeReq::<Metadata> { app_id, range };
Ok(client
.post(url)
.header("Content-Type", "application/json")
.json(&req)
.send()
.await
.unwrap()
.json::<Vec<(Metadata::Index, Vec<Blob>)>>()
.await
.unwrap())
}

View File

@ -1,238 +0,0 @@
// std
use std::{
path::PathBuf,
sync::{mpsc::Sender, Arc},
time::Duration,
};
// crates
use clap::Args;
use kzgrs_backend::{
common::build_blob_id,
dispersal::{BlobInfo, Metadata},
encoder::EncodedData as KzgEncodedData,
};
use nomos_core::{
da::{DaDispersal, DaEncoder},
wire,
};
use nomos_da_network_service::NetworkService;
use nomos_log::Logger;
use overwatch_derive::*;
use overwatch_rs::{
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::{NoMessage, OutboundRelay, Relay},
state::*,
ServiceCore, ServiceData, ServiceId,
},
DynError,
};
use reqwest::Url;
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
// internal
use super::{network::adapters::libp2p::Libp2pExecutorDispersalAdapter, NetworkBackend};
use crate::api::mempool::send_blob_info;
#[allow(clippy::too_many_arguments)]
pub async fn disseminate_and_wait<E, D>(
encoder: &E,
disperal: &D,
data: Box<[u8]>,
metadata: Metadata,
status_updates: Sender<Status>,
node_addr: Option<&Url>,
wait_until_disseminated: Duration,
output: Option<&PathBuf>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
E: DaEncoder<EncodedData = KzgEncodedData>,
D: DaDispersal<EncodedData = KzgEncodedData>,
<E as nomos_core::da::DaEncoder>::Error: std::error::Error + Send + Sync + 'static,
<D as nomos_core::da::DaDispersal>::Error: std::error::Error + Send + Sync + 'static,
{
// 1) Building blob
status_updates.send(Status::Encoding)?;
let encoded_data = encoder.encode(&data).map_err(Box::new)?;
let blob_hash = build_blob_id(
&encoded_data.aggregated_column_commitment,
&encoded_data.row_commitments,
);
// 2) Send blob to network
status_updates.send(Status::Disseminating)?;
disperal.disperse(encoded_data).await.map_err(Box::new)?;
// 3) Build blob info.
let blob_info = BlobInfo::new(blob_hash, metadata);
if let Some(output) = output {
status_updates.send(Status::SavingBlobInfo)?;
std::fs::write(output, wire::serialize(&blob_info)?)?;
}
// Wait for blobs be replicated in da network.
tokio::time::sleep(wait_until_disseminated).await;
// 4) Send blob info to the mempool.
if let Some(node) = node_addr {
status_updates.send(Status::SendingBlobInfo)?;
let res = send_blob_info(node, &blob_info).await?;
if !res.status().is_success() {
tracing::error!("ERROR: {:?}", res);
return Err(format!("Failed to send blob info to node: {}", res.status()).into());
}
}
status_updates.send(Status::Done)?;
Ok(())
}
pub enum Status {
Encoding,
Disseminating,
WaitingAttestations,
CreatingCert,
SavingBlobInfo,
SendingBlobInfo,
Done,
Err(Box<dyn std::error::Error + Send + Sync>),
}
impl std::fmt::Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Encoding => write!(f, "Encoding message into blob(s)"),
Self::Disseminating => write!(f, "Sending blob(s) to the network"),
Self::WaitingAttestations => write!(f, "Waiting for attestations"),
Self::CreatingCert => write!(f, "Creating certificate"),
Self::SavingBlobInfo => write!(f, "Saving blob info to file"),
Self::SendingBlobInfo => write!(f, "Sending blob info to node"),
Self::Done => write!(f, ""),
Self::Err(e) => write!(f, "Error: {e}"),
}
}
}
// To interact with the network service it's easier to just spawn
// an overwatch app
#[derive(Services)]
pub struct DisseminateApp {
network: ServiceHandle<NetworkService<NetworkBackend>>,
send_blob: ServiceHandle<DisseminateService>,
logger: ServiceHandle<Logger>,
}
#[derive(Clone, Debug)]
pub struct Settings {
// This is wrapped in an Arc just to make the struct Clone
pub payload: Arc<Mutex<UnboundedReceiver<Box<[u8]>>>>,
pub timeout: Duration,
pub kzgrs_settings: KzgrsSettings,
pub metadata: Metadata,
pub status_updates: Sender<Status>,
pub node_addr: Option<Url>,
pub wait_until_disseminated: Duration,
pub output: Option<std::path::PathBuf>,
pub global_params_path: String,
}
pub struct DisseminateService {
service_state: ServiceStateHandle<Self>,
}
impl ServiceData for DisseminateService {
const SERVICE_ID: ServiceId = "Disseminate";
type Settings = Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for DisseminateService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
let Self { service_state } = self;
let Settings {
payload,
timeout,
kzgrs_settings,
metadata,
status_updates,
node_addr,
wait_until_disseminated,
output,
global_params_path,
} = service_state.settings_reader.get_updated_settings();
let network_relay: Relay<NetworkService<NetworkBackend>> =
service_state.overwatch_handle.relay();
let network_relay: OutboundRelay<_> = network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let global_params = kzgrs_backend::global::global_parameters_from_file(&global_params_path)
.expect("Global parameters should be loaded from file");
let params = kzgrs_backend::encoder::DaEncoderParams::new(
kzgrs_settings.num_columns,
kzgrs_settings.with_cache,
global_params,
);
let da_encoder = kzgrs_backend::encoder::DaEncoder::new(params);
let da_dispersal = Libp2pExecutorDispersalAdapter::new(network_relay);
while let Some(data) = payload.lock().await.recv().await {
match tokio::time::timeout(
timeout,
disseminate_and_wait(
&da_encoder,
&da_dispersal,
data,
metadata,
status_updates.clone(),
node_addr.as_ref(),
wait_until_disseminated,
output.as_ref(),
),
)
.await
{
Err(_) => {
tracing::error!("Timeout reached, check the logs for additional details");
let _ = status_updates.send(Status::Err("Timeout reached".into()));
}
Ok(Err(e)) => {
tracing::error!(
"Could not disseminate blob, check logs for additional details"
);
let _ = status_updates.send(Status::Err(e));
}
_ => {}
}
}
service_state.overwatch_handle.shutdown().await;
Ok(())
}
}
#[derive(Debug, Clone, Args)]
pub struct KzgrsSettings {
pub num_columns: usize,
pub with_cache: bool,
}
impl Default for KzgrsSettings {
fn default() -> Self {
Self {
num_columns: 4096,
with_cache: true,
}
}
}

View File

@ -1,8 +0,0 @@
pub mod disseminate;
pub mod network;
// pub mod retrieve;
use network::backend::ExecutorBackend;
use subnetworks_assignations::versions::v1::FillFromNodeList;
pub type NetworkBackend = ExecutorBackend<FillFromNodeList>;

View File

@ -1,116 +0,0 @@
// std
use std::collections::HashSet;
// crates
use futures::{future::join_all, StreamExt};
use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData};
use nomos_core::da::DaDispersal;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use thiserror::Error;
use tokio::sync::oneshot;
// internal
use crate::da::{
network::{backend::Command, swarm::DispersalEvent},
NetworkBackend,
};
type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;
#[derive(Debug, Error)]
#[error("{0}")]
pub struct DispersalError(String);
impl From<String> for DispersalError {
fn from(s: String) -> Self {
DispersalError(s)
}
}
pub struct Libp2pExecutorDispersalAdapter {
network_relay: Relay<NetworkBackend>,
}
impl Libp2pExecutorDispersalAdapter {
pub fn new(network_relay: Relay<NetworkBackend>) -> Self {
Self { network_relay }
}
}
#[async_trait::async_trait]
impl DaDispersal for Libp2pExecutorDispersalAdapter {
type EncodedData = KzgEncodedData;
type Error = DispersalError;
async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> {
let mut tasks = Vec::new();
let (sender, receiver) = oneshot::channel();
self.network_relay
.send(DaNetworkMsg::Subscribe { kind: (), sender })
.await
.map_err(|(e, _)| e.to_string())?;
let mut event_stream = receiver.await.map_err(|e| e.to_string())?;
let mut expected_acknowledgments = HashSet::new();
for (i, column) in encoded_data.extended_data.columns().enumerate() {
let blob = DaBlob {
column: column.clone(),
column_idx: i
.try_into()
.expect("Column index shouldn't overflow the target type"),
column_commitment: encoded_data.column_commitments[i],
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
aggregated_column_proof: encoded_data.aggregated_column_proofs[i],
rows_commitments: encoded_data.row_commitments.clone(),
rows_proofs: encoded_data
.rows_proofs
.iter()
.map(|proofs| proofs.get(i).cloned().unwrap())
.collect(),
};
expected_acknowledgments.insert((blob.id().clone(), i as SubnetworkId));
let relay = self.network_relay.clone();
let command = DaNetworkMsg::Process(Command::Disperse {
blob,
subnetwork_id: i as u32,
});
let task = async move { relay.send(command).await.map_err(|(e, _)| e.to_string()) };
tasks.push(task);
}
let results: Vec<Result<(), String>> = join_all(tasks).await;
for result in results {
result?;
}
while !expected_acknowledgments.is_empty() {
let event = event_stream.next().await;
match event {
Some(event) => match event {
DispersalEvent::DispersalSuccess {
blob_id,
subnetwork_id,
} => {
expected_acknowledgments.remove(&(blob_id.to_vec(), subnetwork_id));
}
DispersalEvent::DispersalError { error } => {
return Err(DispersalError(format!("Received dispersal error: {error}")));
}
},
None => {
return Err(DispersalError(
"Event stream ended before receiving all acknowledgments".into(),
));
}
}
}
Ok(())
}
}

View File

@ -1,72 +0,0 @@
use std::fmt;
use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData};
use nomos_core::da::DaDispersal;
use nomos_da_network_service::{
backends::mock::executor::{Command, MockExecutorBackend},
DaNetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;
#[derive(Debug)]
pub struct DispersalError(String);
impl fmt::Display for DispersalError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for DispersalError {}
impl From<String> for DispersalError {
fn from(s: String) -> Self {
DispersalError(s)
}
}
pub struct MockExecutorDispersalAdapter {
network_relay: Relay<MockExecutorBackend>,
}
impl MockExecutorDispersalAdapter {
pub fn new(network_relay: Relay<MockExecutorBackend>) -> Self {
Self { network_relay }
}
}
#[async_trait::async_trait]
impl DaDispersal for MockExecutorDispersalAdapter {
type EncodedData = KzgEncodedData;
type Error = DispersalError;
async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> {
for (i, column) in encoded_data.extended_data.columns().enumerate() {
let blob = DaBlob {
column: column.clone(),
column_idx: i
.try_into()
.expect("Column index shouldn't overflow the target type"),
column_commitment: encoded_data.column_commitments[i],
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
aggregated_column_proof: encoded_data.aggregated_column_proofs[i],
rows_commitments: encoded_data.row_commitments.clone(),
rows_proofs: encoded_data
.rows_proofs
.iter()
.map(|proofs| proofs.get(i).cloned().unwrap())
.collect(),
};
self.network_relay
.send(DaNetworkMsg::Process(Command::Disperse {
blob,
subnetwork_id: i as u32,
}))
.await
.map_err(|(e, _)| e.to_string())?
}
Ok(())
}
}

View File

@ -1,2 +0,0 @@
pub mod libp2p;
pub mod mock;

View File

@ -1,189 +0,0 @@
// std
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
// crates
use futures::{Stream, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::{Multiaddr, PeerId};
use log::error;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::NetworkBackend;
use nomos_libp2p::{ed25519, secret_key_serde};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tokio::sync::broadcast;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream};
// internal
use super::swarm::{DispersalEvent, ExecutorSwarm};
const BROADCAST_CHANNEL_SIZE: usize = 128;
/// Message that the backend replies to
#[derive(Debug)]
pub enum Command {
/// Disperse a blob to a subnetwork.
Disperse {
subnetwork_id: SubnetworkId,
blob: DaBlob,
},
}
/// DA network backend for nomos cli as an executor.
/// Internally uses a libp2p swarm composed of the [`ExecutorBehaviour`]
/// It forwards network messages to the corresponding subscription channels/streams
pub struct ExecutorBackend<Membership> {
// TODO: this join handles should be cancelable tasks. We should add an stop method for
// the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open
// sub-tasks as well.
#[allow(dead_code)]
task: JoinHandle<()>,
#[allow(dead_code)]
replies_task: JoinHandle<()>,
dispersal_request_sender: UnboundedSender<(SubnetworkId, DaBlob)>,
dispersal_broadcast_receiver: broadcast::Receiver<DispersalEvent>,
_membership: PhantomData<Membership>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExecutorBackendSettings<Membership> {
// Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random.
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
/// Membership of DA network PoV set
pub membership: Membership,
pub node_addrs: HashMap<PeerId, Multiaddr>,
pub num_subnets: u16,
}
impl<Membership> ExecutorBackend<Membership> {
/// Send the dispersal request to the underlying dispersal behaviour
async fn handle_dispersal_request(&self, subnetwork_id: SubnetworkId, blob: DaBlob) {
if let Err(SendError((subnetwork_id, blob_id))) =
self.dispersal_request_sender.send((subnetwork_id, blob))
{
error!(
"Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}"
);
}
}
}
#[async_trait::async_trait]
impl<Membership> NetworkBackend for ExecutorBackend<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
type Settings = ExecutorBackendSettings<Membership>;
type State = NoState<Self::Settings>;
type Message = Command;
type EventKind = ();
type NetworkEvent = DispersalEvent;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel();
let keypair =
libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut executor_swarm =
ExecutorSwarm::new(keypair, config.membership.clone(), dispersal_events_sender);
let dispersal_request_sender = executor_swarm.blobs_sender();
let mut connected_peers = HashSet::new();
let local_peer_id = *executor_swarm.local_peer_id();
for subnetwork_id in 0..config.num_subnets {
// Connect to one peer in a subnet.
let mut members = config.membership.members_of(&(subnetwork_id as u32));
members.remove(&local_peer_id);
let peer_id = *members
.iter()
.next()
.expect("Subnet should have at least one node which is not the nomos_cli");
let addr = config
.node_addrs
.get(&peer_id)
.expect("Peer address should be in the list");
executor_swarm
.dial(addr.clone())
.expect("Should schedule the dials");
connected_peers.insert(peer_id);
}
let executor_open_stream_sender = executor_swarm.open_stream_sender();
let task = overwatch_handle
.runtime()
.spawn(async move { executor_swarm.run().await });
for peer_id in connected_peers.iter() {
executor_open_stream_sender.send(*peer_id).unwrap();
}
let (dispersal_broadcast_sender, dispersal_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver);
let replies_task = overwatch_handle
.runtime()
.spawn(handle_dispersal_events_stream(
dispersal_events_receiver,
dispersal_broadcast_sender,
));
Self {
task,
replies_task,
dispersal_request_sender,
dispersal_broadcast_receiver,
_membership: Default::default(),
}
}
async fn process(&self, msg: Self::Message) {
match msg {
Command::Disperse {
subnetwork_id,
blob,
} => {
self.handle_dispersal_request(subnetwork_id, blob).await;
}
}
}
async fn subscribe(
&mut self,
_event: Self::EventKind,
) -> Pin<Box<dyn Stream<Item = Self::NetworkEvent> + Send>> {
Box::pin(
BroadcastStream::new(self.dispersal_broadcast_receiver.resubscribe())
.filter_map(|event| async { event.ok() }),
)
}
}
/// Task that handles forwarding of events to the subscriptions channels/stream
async fn handle_dispersal_events_stream(
mut events_stream: UnboundedReceiverStream<DispersalEvent>,
dispersal_broadcast_sender: broadcast::Sender<DispersalEvent>,
) {
while let Some(dispersal_event) = events_stream.next().await {
if let Err(e) = dispersal_broadcast_sender.send(dispersal_event) {
error!("Error in internal broadcast of dispersal event: {e:?}");
}
}
}

View File

@ -1,3 +0,0 @@
pub mod adapters;
pub mod backend;
mod swarm;

View File

@ -1,275 +0,0 @@
// std
// crates
use kzgrs_backend::common::blob::DaBlob;
use libp2p::futures::StreamExt;
use libp2p::Multiaddr;
use libp2p::{identity::Keypair, swarm::SwarmEvent, PeerId, Swarm};
use nomos_core::da::BlobId;
use nomos_da_network_core::protocols::dispersal::executor::behaviour::{
DispersalError, DispersalExecutorEvent,
};
use nomos_da_network_core::{
protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour, SubnetworkId,
};
use nomos_libp2p::DialError;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error};
// internal
#[derive(Debug, Clone)]
pub enum DispersalEvent {
/// A blob successfully arrived its destination
DispersalSuccess {
blob_id: BlobId,
subnetwork_id: SubnetworkId,
},
/// Something went wrong delivering the blob
DispersalError { error: DispersalError },
}
pub struct ExecutorSwarm<Membership>
where
Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static,
{
swarm: Swarm<DispersalExecutorBehaviour<Membership>>,
dispersal_broadcast_sender: UnboundedSender<DispersalEvent>,
}
impl<Membership> ExecutorSwarm<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + Clone + Send,
{
pub fn new(
key: Keypair,
membership: Membership,
dispersal_broadcast_sender: UnboundedSender<DispersalEvent>,
) -> Self {
let swarm = Self::build_swarm(key, membership);
Self {
swarm,
dispersal_broadcast_sender,
}
}
pub fn blobs_sender(&self) -> UnboundedSender<(SubnetworkId, DaBlob)> {
self.swarm.behaviour().blobs_sender()
}
pub fn open_stream_sender(&self) -> UnboundedSender<PeerId> {
self.swarm.behaviour().open_stream_sender()
}
fn build_swarm(
_key: Keypair,
_membership: Membership,
) -> Swarm<DispersalExecutorBehaviour<Membership>> {
todo!("CLI will be removed");
// libp2p::SwarmBuilder::with_existing_identity(key)
// .with_tokio()
// .with_quic()
// .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
// .expect("Validator behaviour should build")
// .with_swarm_config(|cfg| {
// cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
// })
// .build()
}
pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
self.swarm.dial(addr)?;
Ok(())
}
pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id()
}
pub async fn run(&mut self) {
loop {
tokio::select! {
Some(event) = self.swarm.next() => {
debug!("Executor received an event: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_dispersal_event(behaviour_event).await;
},
SwarmEvent::ConnectionEstablished{ .. } => {}
SwarmEvent::ConnectionClosed{ .. } => {}
SwarmEvent::IncomingConnection{ .. } => {}
SwarmEvent::IncomingConnectionError{ .. } => {}
SwarmEvent::OutgoingConnectionError{ .. } => {}
SwarmEvent::NewListenAddr{ .. } => {}
SwarmEvent::ExpiredListenAddr{ .. } => {}
SwarmEvent::ListenerClosed{ .. } => {}
SwarmEvent::ListenerError{ .. } => {}
SwarmEvent::Dialing{ .. } => {}
SwarmEvent::NewExternalAddrCandidate{ .. } => {}
SwarmEvent::ExternalAddrConfirmed{ .. } => {}
SwarmEvent::ExternalAddrExpired{ .. } => {}
SwarmEvent::NewExternalAddrOfPeer{ .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
}
}
}
}
}
}
async fn handle_dispersal_event(&mut self, event: DispersalExecutorEvent) {
match event {
DispersalExecutorEvent::DispersalSuccess {
blob_id,
subnetwork_id,
} => {
if let Err(e) =
self.dispersal_broadcast_sender
.send(DispersalEvent::DispersalSuccess {
blob_id,
subnetwork_id,
})
{
error!("Error in internal broadcast of dispersal success: {e:?}");
}
}
DispersalExecutorEvent::DispersalError { error } => {
if let Err(e) = self
.dispersal_broadcast_sender
.send(DispersalEvent::DispersalError { error })
{
error! {"Error in internal broadcast of dispersal error: {e:?}"};
}
}
}
}
}
#[cfg(test)]
pub mod test {
use crate::da::network::swarm::ExecutorSwarm;
use crate::test_utils::AllNeighbours;
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
use libp2p::identity::Keypair;
use libp2p::PeerId;
use nomos_da_network_core::address_book::AddressBook;
use nomos_da_network_core::behaviour::validator::ValidatorBehaviourEvent;
use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent;
use nomos_da_network_core::swarm::validator::ValidatorSwarm;
use nomos_libp2p::{Multiaddr, SwarmEvent};
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel;
use tokio::time;
use tracing::{error, info};
use tracing_subscriber::fmt::TestWriter;
use tracing_subscriber::EnvFilter;
#[tokio::test]
#[ignore]
async fn test_dispersal_with_swarms() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_writer(TestWriter::default())
.try_init();
let k1 = Keypair::generate_ed25519();
let k2 = Keypair::generate_ed25519();
let executor_peer = PeerId::from_public_key(&k1.public());
let validator_peer = PeerId::from_public_key(&k2.public());
let neighbours = AllNeighbours {
neighbours: [
PeerId::from_public_key(&k1.public()),
PeerId::from_public_key(&k2.public()),
]
.into_iter()
.collect(),
};
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap();
let addr2 = addr.clone().with_p2p(validator_peer).unwrap();
let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]);
let (dispersal_events_sender, _) = unbounded_channel();
let mut executor = ExecutorSwarm::new(k1, neighbours.clone(), dispersal_events_sender);
let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book);
let msg_count = 10usize;
let validator_task = async move {
let validator_swarm = validator.protocol_swarm_mut();
validator_swarm.listen_on(addr).unwrap();
let mut res = vec![];
loop {
match validator_swarm.select_next_some().await {
SwarmEvent::Behaviour(ValidatorBehaviourEvent::Dispersal(event)) => {
res.push(event);
}
event => {
info!("Validator event: {event:?}");
}
}
if res.len() == msg_count {
tokio::time::sleep(Duration::from_secs(1)).await;
break;
}
}
res
};
let join_validator = tokio::spawn(validator_task);
executor.dial(addr2).unwrap();
let executor_open_stream_sender = executor.open_stream_sender();
let executor_disperse_blob_sender = executor.blobs_sender();
let executor_poll = async move {
let mut res = vec![];
loop {
tokio::select! {
Some(event) = executor.swarm.next() => {
info!("Executor event: {event:?}");
if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{blob_id, ..}) = event {
res.push(blob_id);
}
}
_ = time::sleep(Duration::from_secs(2)) => {
if res.len() < msg_count {error!("Executor timeout reached");}
break;
}
}
}
res
};
let executor_task = tokio::spawn(executor_poll);
executor_open_stream_sender.send(validator_peer).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
for i in 0..msg_count {
info!("Sending blob {i}...");
executor_disperse_blob_sender
.send((
0,
DaBlob {
column_idx: 0,
column: Column(vec![]),
column_commitment: Default::default(),
aggregated_column_commitment: Default::default(),
aggregated_column_proof: Default::default(),
rows_commitments: vec![],
rows_proofs: vec![],
},
))
.unwrap();
}
assert_eq!(
executor_task.await.unwrap().len(),
join_validator.await.unwrap().len()
);
}
}

View File

@ -1,34 +0,0 @@
use full_replication::Blob;
use nomos_core::header::HeaderId;
use reqwest::Url;
use thiserror::Error;
use crate::api::storage::get_block_contents;
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error("Block not found")]
NotFound,
}
/// Return the blobs whose certificate has been included in the provided block.
pub async fn get_block_blobs(node: &Url, block: &HeaderId) -> Result<Vec<Blob>, Error> {
let block = get_block_contents(node, block)
.await?
.ok_or(Error::NotFound)?;
let blobs = block.blobs().map(|cert| cert.blob()).collect::<Vec<_>>();
if blobs.is_empty() {
return Ok(vec![]);
}
let n_blobs = blobs.len();
let resp = get_blobs(node, blobs).await?;
if resp.len() != n_blobs {
tracing::warn!("Only {}/{} blobs returned", resp.len(), n_blobs);
}
Ok(resp)
}

View File

@ -1,9 +1,4 @@
pub mod api;
pub mod cmds;
pub mod da;
#[cfg(test)]
pub mod test_utils;
use clap::Parser;
use cmds::Command;

View File

@ -1,29 +0,0 @@
use libp2p::PeerId;
use std::collections::HashSet;
use subnetworks_assignations::MembershipHandler;
#[derive(Clone)]
pub struct AllNeighbours {
pub neighbours: HashSet<PeerId>,
}
impl MembershipHandler for AllNeighbours {
type NetworkId = u32;
type Id = PeerId;
fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
[0].into_iter().collect()
}
fn is_allowed(&self, _id: &Self::Id) -> bool {
true
}
fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
fn members(&self) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}

View File

@ -50,6 +50,12 @@ impl blob::metadata::Metadata for BlobInfo {
#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
pub struct Index([u8; 8]);
impl Index {
pub fn to_u64(self) -> u64 {
u64::from_be_bytes(self.0)
}
}
#[derive(Default, Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Metadata {
app_id: [u8; 32],