Add raw api module to nomos-cli (#511)
* Add raw api module to nomos-cli Add a separate module which contains all raw api calls and make them reuse the same reqwest client for increased efficiency * Use json instead of wire format in mempool add api Uniform all http api to use json as the encoding
This commit is contained in:
parent
196f395992
commit
0730e05a6f
|
@ -14,7 +14,6 @@ use tokio::sync::oneshot;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
// internal
|
// internal
|
||||||
use full_replication::{Blob, Certificate};
|
use full_replication::{Blob, Certificate};
|
||||||
use nomos_core::wire;
|
|
||||||
use nomos_core::{
|
use nomos_core::{
|
||||||
block::Block,
|
block::Block,
|
||||||
da::{blob, certificate::Certificate as _},
|
da::{blob, certificate::Certificate as _},
|
||||||
|
@ -400,7 +399,7 @@ pub(super) async fn handle_mempool_add_req<K, V>(
|
||||||
where
|
where
|
||||||
K: DeserializeOwned,
|
K: DeserializeOwned,
|
||||||
{
|
{
|
||||||
let item = wire::deserialize::<K>(&wire_item)?;
|
let item: K = serde_json::from_slice(&wire_item)?;
|
||||||
let (sender, receiver) = oneshot::channel();
|
let (sender, receiver) = oneshot::channel();
|
||||||
let key = key(&item);
|
let key = key(&item);
|
||||||
mempool_channel
|
mempool_channel
|
||||||
|
|
|
@ -20,6 +20,7 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806"
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||||
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
|
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
|
||||||
|
nomos-consensus = { path = "../nomos-services/consensus" }
|
||||||
nomos-libp2p = { path = "../nomos-libp2p"}
|
nomos-libp2p = { path = "../nomos-libp2p"}
|
||||||
nomos-core = { path = "../nomos-core" }
|
nomos-core = { path = "../nomos-core" }
|
||||||
nomos-node = { path = "../nodes/nomos-node" }
|
nomos-node = { path = "../nodes/nomos-node" }
|
||||||
|
@ -29,3 +30,4 @@ serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
|
once_cell = "1"
|
|
@ -0,0 +1,13 @@
|
||||||
|
use super::CLIENT;
|
||||||
|
use nomos_consensus::CarnotInfo;
|
||||||
|
use reqwest::Url;
|
||||||
|
|
||||||
|
pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
|
||||||
|
const NODE_CARNOT_INFO_PATH: &str = "carnot/info";
|
||||||
|
CLIENT
|
||||||
|
.get(node.join(NODE_CARNOT_INFO_PATH).unwrap())
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json::<CarnotInfo>()
|
||||||
|
.await
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
use super::CLIENT;
|
||||||
|
use full_replication::Blob;
|
||||||
|
use nomos_core::da::blob;
|
||||||
|
use reqwest::Url;
|
||||||
|
|
||||||
|
pub async fn get_blobs(
|
||||||
|
node: &Url,
|
||||||
|
ids: Vec<<Blob as blob::Blob>::Hash>,
|
||||||
|
) -> Result<Vec<Blob>, reqwest::Error> {
|
||||||
|
const BLOBS_PATH: &str = "da/blobs";
|
||||||
|
CLIENT
|
||||||
|
.post(node.join(BLOBS_PATH).unwrap())
|
||||||
|
.body(serde_json::to_string(&ids).unwrap())
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
use super::CLIENT;
|
||||||
|
use reqwest::{Error, Response, Url};
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
pub async fn send_certificate<C>(node: &Url, cert: &C) -> Result<Response, Error>
|
||||||
|
where
|
||||||
|
C: Serialize,
|
||||||
|
{
|
||||||
|
const NODE_CERT_PATH: &str = "mempool-da/add";
|
||||||
|
CLIENT
|
||||||
|
.post(node.join(NODE_CERT_PATH).unwrap())
|
||||||
|
.body(serde_json::to_string(cert).unwrap())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
pub mod consensus;
|
||||||
|
pub mod da;
|
||||||
|
pub mod mempool;
|
||||||
|
pub mod storage;
|
||||||
|
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use reqwest::Client;
|
||||||
|
|
||||||
|
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
|
|
@ -0,0 +1,21 @@
|
||||||
|
use super::CLIENT;
|
||||||
|
use consensus_engine::BlockId;
|
||||||
|
use full_replication::Certificate;
|
||||||
|
use nomos_core::block::Block;
|
||||||
|
use nomos_node::Tx;
|
||||||
|
use reqwest::Url;
|
||||||
|
|
||||||
|
pub async fn get_block_contents(
|
||||||
|
node: &Url,
|
||||||
|
block: &BlockId,
|
||||||
|
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
|
||||||
|
const BLOCK_PATH: &str = "storage/block";
|
||||||
|
let block = CLIENT
|
||||||
|
.post(node.join(BLOCK_PATH).unwrap())
|
||||||
|
.body(serde_json::to_string(block).unwrap())
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await?;
|
||||||
|
Ok(block)
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::api::mempool::send_certificate;
|
||||||
use clap::{Args, ValueEnum};
|
use clap::{Args, ValueEnum};
|
||||||
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication, Voter};
|
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication, Voter};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
@ -15,7 +16,7 @@ use overwatch_rs::{
|
||||||
},
|
},
|
||||||
DynError,
|
DynError,
|
||||||
};
|
};
|
||||||
use reqwest::{Client, Url};
|
use reqwest::Url;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::{
|
use std::{
|
||||||
error::Error,
|
error::Error,
|
||||||
|
@ -25,8 +26,6 @@ use std::{
|
||||||
};
|
};
|
||||||
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
|
use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
|
||||||
|
|
||||||
const NODE_CERT_PATH: &str = "mempool-da/add";
|
|
||||||
|
|
||||||
pub async fn disseminate_and_wait<D, B, N, A, C>(
|
pub async fn disseminate_and_wait<D, B, N, A, C>(
|
||||||
mut da: D,
|
mut da: D,
|
||||||
data: Box<[u8]>,
|
data: Box<[u8]>,
|
||||||
|
@ -69,12 +68,7 @@ where
|
||||||
|
|
||||||
if let Some(node) = node_addr {
|
if let Some(node) = node_addr {
|
||||||
status_updates.send(Status::SendingCert)?;
|
status_updates.send(Status::SendingCert)?;
|
||||||
let client = Client::new();
|
let res = send_certificate(node, &cert).await?;
|
||||||
let res = client
|
|
||||||
.post(node.join(NODE_CERT_PATH).unwrap())
|
|
||||||
.body(wire::serialize(&cert).unwrap())
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::info!("Response: {:?}", res);
|
tracing::info!("Response: {:?}", res);
|
||||||
if !res.status().is_success() {
|
if !res.status().is_success() {
|
||||||
|
|
|
@ -1,15 +1,10 @@
|
||||||
use consensus_engine::BlockId;
|
use consensus_engine::BlockId;
|
||||||
use full_replication::{Blob, Certificate};
|
use full_replication::Blob;
|
||||||
use nomos_core::{
|
use nomos_core::da::certificate::Certificate;
|
||||||
block::Block,
|
|
||||||
da::{blob, certificate::Certificate as _},
|
|
||||||
};
|
|
||||||
use nomos_node::Tx;
|
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
const BLOCK_PATH: &str = "storage/block";
|
use crate::api::{da::get_blobs, storage::get_block_contents};
|
||||||
const BLOBS_PATH: &str = "da/blobs";
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
@ -20,37 +15,10 @@ pub enum Error {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the blobs whose certificate has been included in the provided block.
|
/// Return the blobs whose certificate has been included in the provided block.
|
||||||
pub async fn get_block_blobs(node: Url, block: BlockId) -> Result<Vec<Blob>, Error> {
|
pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result<Vec<Blob>, Error> {
|
||||||
let block = get_block_contents(&node, block)
|
let block = get_block_contents(node, block)
|
||||||
.await?
|
.await?
|
||||||
.ok_or(Error::NotFound)?;
|
.ok_or(Error::NotFound)?;
|
||||||
|
|
||||||
Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?)
|
Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_block_contents(
|
|
||||||
node: &Url,
|
|
||||||
block: BlockId,
|
|
||||||
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
|
|
||||||
let block = reqwest::Client::new()
|
|
||||||
.post(node.join(BLOCK_PATH).unwrap())
|
|
||||||
.body(serde_json::to_string(&block).unwrap())
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json()
|
|
||||||
.await?;
|
|
||||||
Ok(block)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_blobs(
|
|
||||||
node: Url,
|
|
||||||
ids: Vec<<Blob as blob::Blob>::Hash>,
|
|
||||||
) -> Result<Vec<Blob>, reqwest::Error> {
|
|
||||||
reqwest::Client::new()
|
|
||||||
.post(node.join(BLOBS_PATH).unwrap())
|
|
||||||
.body(serde_json::to_string(&ids).unwrap())
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json()
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
pub mod api;
|
||||||
pub mod cmds;
|
pub mod cmds;
|
||||||
pub mod da;
|
pub mod da;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue