use new APIs
This commit is contained in:
parent
61f06436c8
commit
b21747e66a
|
@ -24,7 +24,6 @@ nomos-consensus = { path = "../nomos-services/consensus" }
|
||||||
nomos-libp2p = { path = "../nomos-libp2p"}
|
nomos-libp2p = { path = "../nomos-libp2p"}
|
||||||
nomos-core = { path = "../nomos-core" }
|
nomos-core = { path = "../nomos-core" }
|
||||||
nomos-node = { path = "../nodes/nomos-node" }
|
nomos-node = { path = "../nodes/nomos-node" }
|
||||||
nomos-consensus = { path = "../nomos-services/consensus" }
|
|
||||||
full-replication = { path = "../nomos-da/full-replication" }
|
full-replication = { path = "../nomos-da/full-replication" }
|
||||||
reqwest = "0.11"
|
reqwest = "0.11"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use super::CLIENT;
|
use super::CLIENT;
|
||||||
|
use consensus_engine::{Block, BlockId};
|
||||||
use nomos_consensus::CarnotInfo;
|
use nomos_consensus::CarnotInfo;
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
|
|
||||||
|
@ -11,3 +12,20 @@ pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
|
||||||
.json::<CarnotInfo>()
|
.json::<CarnotInfo>()
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn get_blocks_info(
|
||||||
|
node: &Url,
|
||||||
|
from: Option<BlockId>,
|
||||||
|
to: Option<BlockId>,
|
||||||
|
) -> Result<Vec<Block>, reqwest::Error> {
|
||||||
|
const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks";
|
||||||
|
let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap());
|
||||||
|
if let Some(from) = from {
|
||||||
|
req = req.query(&[("from", from)]);
|
||||||
|
}
|
||||||
|
if let Some(to) = to {
|
||||||
|
req = req.query(&[("to", to)]);
|
||||||
|
}
|
||||||
|
|
||||||
|
req.send().await?.json().await
|
||||||
|
}
|
||||||
|
|
|
@ -4,24 +4,25 @@
|
||||||
///
|
///
|
||||||
mod ui;
|
mod ui;
|
||||||
|
|
||||||
use crate::da::{
|
use crate::{
|
||||||
|
api::consensus::get_blocks_info,
|
||||||
|
da::{
|
||||||
disseminate::{
|
disseminate::{
|
||||||
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
|
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
|
||||||
},
|
},
|
||||||
retrieve::get_block_blobs,
|
retrieve::get_block_blobs,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use clap::Args;
|
use clap::Args;
|
||||||
use full_replication::{
|
use full_replication::{
|
||||||
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
|
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
|
||||||
};
|
};
|
||||||
use nomos_consensus::CarnotInfo;
|
|
||||||
use nomos_core::{block::BlockId, da::DaProtocol, wire};
|
use nomos_core::{block::BlockId, da::DaProtocol, wire};
|
||||||
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
|
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
|
||||||
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
|
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashSet,
|
|
||||||
io,
|
io,
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::{
|
sync::{
|
||||||
|
@ -205,56 +206,58 @@ struct ChatMessage {
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
|
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
|
||||||
let mut old_blocks = HashSet::new();
|
// Should ask for the genesis block to be more robust
|
||||||
|
let mut last_tip = BlockId::zeros();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok(messages) = fetch_new_messages(&mut old_blocks, node.clone()).await {
|
if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await {
|
||||||
sender.send(messages).expect("channel closed");
|
sender.send(messages).expect("channel closed");
|
||||||
|
last_tip = new_tip;
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_new_messages(
|
async fn fetch_new_messages(
|
||||||
old_blocks: &mut HashSet<BlockId>,
|
last_tip: &BlockId,
|
||||||
node: Url,
|
node: &Url,
|
||||||
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
|
) -> Result<(BlockId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
|
||||||
const NODE_CARNOT_INFO_PATH: &str = "carnot/info";
|
|
||||||
|
|
||||||
let mut new_messages = Vec::new();
|
let mut new_messages = Vec::new();
|
||||||
|
// By only specifying the 'to' parameter we get all the blocks since the last tip
|
||||||
let info = reqwest::get(node.join(NODE_CARNOT_INFO_PATH).unwrap())
|
let mut new_blocks = get_blocks_info(node, None, Some(*last_tip))
|
||||||
.await?
|
.await?
|
||||||
.json::<CarnotInfo>()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let new_blocks = info
|
|
||||||
.committed_blocks
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|id| !old_blocks.contains(id) && id != &BlockId::zeros())
|
.map(|block| block.id)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// note that number of attestations is ignored here since we only use the da protocol to
|
// The first block is the most recent one.
|
||||||
|
// Note that the 'to' is inclusive so the above request will always return at least one block
|
||||||
|
// as long as the block exists (which is the case since it was returned by a previous call)
|
||||||
|
let new_tip = new_blocks[0];
|
||||||
|
// We already processed the last block so let's remove it
|
||||||
|
new_blocks.pop();
|
||||||
|
|
||||||
|
// Note that number of attestations is ignored here since we only use the da protocol to
|
||||||
// decode the blob data, not to validate the certificate
|
// decode the blob data, not to validate the certificate
|
||||||
let mut da_protocol =
|
let mut da_protocol =
|
||||||
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(
|
<FullReplication<AbsoluteNumber<Attestation, Certificate>> as DaProtocol>::new(
|
||||||
DaSettings {
|
DaSettings {
|
||||||
num_attestations: 1,
|
num_attestations: 1,
|
||||||
|
voter: [0; 32], // voter is ignored as well
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
for block in new_blocks {
|
for block in new_blocks {
|
||||||
let blobs = get_block_blobs(node.clone(), block).await?;
|
let blobs = get_block_blobs(node, &block).await?;
|
||||||
for blob in blobs {
|
for blob in blobs {
|
||||||
da_protocol.recv_blob(blob);
|
da_protocol.recv_blob(blob);
|
||||||
// full replication only needs one blob to decode the data, so the unwrap is safe
|
// Full replication only needs one blob to decode the data, so the unwrap is safe
|
||||||
let bytes = da_protocol.extract().unwrap();
|
let bytes = da_protocol.extract().unwrap();
|
||||||
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
|
if let Ok(message) = wire::deserialize::<ChatMessage>(&bytes) {
|
||||||
new_messages.push(message);
|
new_messages.push(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
old_blocks.insert(block);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(new_messages)
|
Ok((new_tip, new_messages))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue