integrate carnot explorer

This commit is contained in:
Al Liu 2024-02-05 12:59:55 +08:00
parent 959cbfd646
commit fecd0c5f69
No known key found for this signature in database
GPG Key ID: C8AE9A6E0166923E
6 changed files with 36 additions and 57 deletions

View File

@ -26,7 +26,7 @@ nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" } nomos-core = { path = "../nomos-core" }
nomos-node = { path = "../nodes/nomos-node" } nomos-node = { path = "../nodes/nomos-node" }
full-replication = { path = "../nomos-da/full-replication" } full-replication = { path = "../nomos-da/full-replication" }
reqwest = "0.11" reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "1.0" thiserror = "1.0"

View File

@ -1,6 +1,5 @@
use super::CLIENT; use super::CLIENT;
use carnot_consensus::CarnotInfo; use carnot_consensus::CarnotInfo;
use carnot_engine::{Block, BlockId};
use reqwest::Url; use reqwest::Url;
pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> { pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
@ -12,20 +11,3 @@ pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
.json::<CarnotInfo>() .json::<CarnotInfo>()
.await .await
} }
pub async fn get_blocks_info(
node: &Url,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Result<Vec<Block>, reqwest::Error> {
const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks";
let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap());
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
req.send().await?.json().await
}

View File

@ -4,12 +4,12 @@ use nomos_core::da::blob;
use reqwest::Url; use reqwest::Url;
pub async fn get_blobs( pub async fn get_blobs(
node: &Url, explorer: &Url,
ids: Vec<<Blob as blob::Blob>::Hash>, ids: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Blob>, reqwest::Error> { ) -> Result<Vec<Blob>, reqwest::Error> {
const BLOBS_PATH: &str = "da/blobs"; const BLOBS_PATH: &str = "da/blobs";
CLIENT CLIENT
.post(node.join(BLOBS_PATH).unwrap()) .post(explorer.join(BLOBS_PATH).unwrap())
.json(&ids) .json(&ids)
.send() .send()
.await? .await?

View File

@ -6,13 +6,13 @@ use nomos_node::Tx;
use reqwest::Url; use reqwest::Url;
pub async fn get_block_contents( pub async fn get_block_contents(
node: &Url, explorer: &Url,
block: &BlockId, block: &BlockId,
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> { ) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
const BLOCK_PATH: &str = "storage/block"; const BLOCK_PATH: &str = "explorer/blocks/depth";
CLIENT CLIENT
.post(node.join(BLOCK_PATH).unwrap()) .get(explorer.join(BLOCK_PATH).unwrap())
.json(block) .query(&[("from", block)])
.send() .send()
.await? .await?
.json() .json()

View File

@ -5,7 +5,7 @@
mod ui; mod ui;
use crate::{ use crate::{
api::consensus::get_blocks_info, api::{consensus::carnot_info, storage::get_block_contents},
da::{ da::{
disseminate::{ disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
@ -18,7 +18,7 @@ use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
}; };
use futures::{stream, StreamExt}; use futures::{stream, StreamExt};
use nomos_core::{block::BlockId, da::DaProtocol, wire}; use nomos_core::{da::DaProtocol, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
@ -58,9 +58,12 @@ pub struct NomosChat {
/// The data availability protocol to use. Defaults to full replication. /// The data availability protocol to use. Defaults to full replication.
#[clap(flatten)] #[clap(flatten)]
pub da_protocol: DaProtocolChoice, pub da_protocol: DaProtocolChoice,
/// The node to connect to to fetch blocks and blobs /// The node to connect to to fetch carnot info
#[clap(long)] #[clap(long)]
pub node: Url, pub node: Url,
/// The explorer to connect to to fetch blocks and blobs
#[clap(long)]
pub explorer: Url,
} }
pub struct App { pub struct App {
@ -73,6 +76,7 @@ pub struct App {
payload_sender: UnboundedSender<Box<[u8]>>, payload_sender: UnboundedSender<Box<[u8]>>,
status_updates: Receiver<Status>, status_updates: Receiver<Status>,
node: Url, node: Url,
explorer: Url,
logs: Arc<sync::Mutex<Vec<u8>>>, logs: Arc<sync::Mutex<Vec<u8>>>,
scroll_logs: u16, scroll_logs: u16,
} }
@ -133,6 +137,7 @@ impl NomosChat {
payload_sender, payload_sender,
status_updates, status_updates,
node: self.node.clone(), node: self.node.clone(),
explorer: self.explorer.clone(),
logs: shared_writer, logs: shared_writer,
scroll_logs: 0, scroll_logs: 0,
}; };
@ -155,7 +160,8 @@ impl NomosChat {
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) { fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
let (message_tx, message_rx) = std::sync::mpsc::channel(); let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone(); let node = app.node.clone();
std::thread::spawn(move || check_for_messages(message_tx, node)); let explorer = app.explorer.clone();
std::thread::spawn(move || check_for_messages(message_tx, node, explorer));
loop { loop {
terminal.draw(|f| ui::ui(f, &app)).unwrap(); terminal.draw(|f| ui::ui(f, &app)).unwrap();
@ -229,14 +235,10 @@ struct ChatMessage {
} }
#[tokio::main] #[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) { async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url, explorer: Url) {
// Should ask for the genesis block to be more robust
let mut last_tip = BlockId::zeros();
loop { loop {
if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { if let Ok(messages) = fetch_new_messages(&explorer, &node).await {
sender.send(messages).expect("channel closed"); sender.send(messages).expect("channel closed");
last_tip = new_tip;
} }
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
@ -245,10 +247,10 @@ async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Process a single block's blobs and return chat messages // Process a single block's blobs and return chat messages
async fn process_block_blobs( async fn process_block_blobs(
node: Url, node: Url,
block_id: &BlockId, block: &nomos_core::block::Block<nomos_node::Tx, full_replication::Certificate>,
da_settings: DaSettings, da_settings: DaSettings,
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> { ) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let blobs = get_block_blobs(&node, block_id).await?; let blobs = get_block_blobs(&node, block).await?;
// Note that number of attestations is ignored here since we only use the da protocol to // Note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate // decode the blob data, not to validate the certificate
@ -269,29 +271,25 @@ async fn process_block_blobs(
// Fetch new messages since the last tip // Fetch new messages since the last tip
async fn fetch_new_messages( async fn fetch_new_messages(
last_tip: &BlockId, explorer: &Url,
node: &Url, node: &Url,
) -> Result<(BlockId, Vec<ChatMessage>), Box<dyn std::error::Error>> { ) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let info = carnot_info(node).await?;
// By only specifying the 'to' parameter we get all the blocks since the last tip // By only specifying the 'to' parameter we get all the blocks since the last tip
let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) let mut blocks = get_block_contents(explorer, &info.tip.id)
.await? .await?
.into_iter() .into_iter()
.map(|block| block.id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// The first block is the most recent one.
// Note that the 'to' is inclusive so the above request will always return at least one block
// as long as the block exists (which is the case since it was returned by a previous call)
let new_tip = new_blocks[0];
// We already processed the last block so let's remove it // We already processed the last block so let's remove it
new_blocks.pop(); blocks.pop();
let da_settings = DaSettings { let da_settings = DaSettings {
num_attestations: 1, num_attestations: 1,
voter: [0; 32], voter: [0; 32],
}; };
let block_stream = stream::iter(new_blocks.iter().rev()); let block_stream = stream::iter(blocks.iter());
let results: Vec<_> = block_stream let results: Vec<_> = block_stream
.map(|block| { .map(|block| {
let node = node.clone(); let node = node.clone();
@ -299,7 +297,7 @@ async fn fetch_new_messages(
process_block_blobs(node, block, da_settings) process_block_blobs(node, block, da_settings)
}) })
.buffer_unordered(new_blocks.len()) .buffer_unordered(blocks.len())
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await; .await;
@ -308,5 +306,5 @@ async fn fetch_new_messages(
new_messages.extend(result?); new_messages.extend(result?);
} }
Ok((new_tip, new_messages)) Ok(new_messages)
} }

View File

@ -1,10 +1,10 @@
use carnot_engine::BlockId;
use full_replication::Blob; use full_replication::Blob;
use nomos_core::da::certificate::Certificate; use nomos_core::{block::Block, da::certificate::Certificate};
use nomos_node::Tx;
use reqwest::Url; use reqwest::Url;
use thiserror::Error; use thiserror::Error;
use crate::api::{da::get_blobs, storage::get_block_contents}; use crate::api::da::get_blobs;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
@ -15,10 +15,9 @@ pub enum Error {
} }
/// Return the blobs whose certificate has been included in the provided block. /// Return the blobs whose certificate has been included in the provided block.
pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result<Vec<Blob>, Error> { pub async fn get_block_blobs(
let block = get_block_contents(node, block) node: &Url,
.await? block: &Block<Tx, full_replication::Certificate>,
.ok_or(Error::NotFound)?; ) -> Result<Vec<Blob>, Error> {
Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?) Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?)
} }