Nomos http API (#476)
* Info api * Da blob api (#487) * Add storage api for new http backend (#488) * Mempool add APIs (#489)
This commit is contained in:
parent
ff8fc577a0
commit
ba90ed1b55
|
@ -25,7 +25,7 @@ default = []
|
|||
serde = ["dep:serde", "nomos-utils/serde"]
|
||||
simulation = []
|
||||
|
||||
openapi = ["dep:utoipa", "serde_json"]
|
||||
openapi = ["dep:utoipa", "serde_json", "serde"]
|
||||
|
||||
[dev-dependencies]
|
||||
proptest = "1.2.0"
|
||||
|
|
|
@ -20,6 +20,7 @@ pub use view::View;
|
|||
/// can't be directly used in the network as they lack things like cryptographic signatures.
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub enum Payload {
|
||||
/// Vote for a block in a view
|
||||
Vote(Vote),
|
||||
|
@ -32,6 +33,7 @@ pub enum Payload {
|
|||
/// Returned
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct Vote {
|
||||
pub view: View,
|
||||
pub block: BlockId,
|
||||
|
@ -39,6 +41,7 @@ pub struct Vote {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct Timeout {
|
||||
pub view: View,
|
||||
pub sender: NodeId,
|
||||
|
@ -50,6 +53,7 @@ pub struct Timeout {
|
|||
// We should consider to remove the TimoutQc from the NewView message and use a hash or id instead.
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct NewView {
|
||||
pub view: View,
|
||||
pub sender: NodeId,
|
||||
|
@ -59,6 +63,7 @@ pub struct NewView {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct TimeoutQc {
|
||||
view: View,
|
||||
high_qc: StandardQc,
|
||||
|
@ -96,6 +101,7 @@ impl TimeoutQc {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct Block {
|
||||
#[cfg_attr(feature = "serde", serde(skip))]
|
||||
pub id: BlockId,
|
||||
|
@ -106,6 +112,7 @@ pub struct Block {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub enum LeaderProof {
|
||||
LeaderId { leader_id: NodeId },
|
||||
}
|
||||
|
@ -136,6 +143,7 @@ pub struct Send {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct StandardQc {
|
||||
pub view: View,
|
||||
pub id: BlockId,
|
||||
|
@ -152,6 +160,7 @@ impl StandardQc {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct AggregateQc {
|
||||
pub high_qc: StandardQc,
|
||||
pub view: View,
|
||||
|
@ -159,6 +168,7 @@ pub struct AggregateQc {
|
|||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub enum Qc {
|
||||
Standard(StandardQc),
|
||||
Aggregated(AggregateQc),
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::collections::BTreeSet;
|
|||
use crate::NodeId;
|
||||
|
||||
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct CommitteeId(pub(crate) [u8; 32]);
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct NodeId(pub(crate) [u8; 32]);
|
||||
|
||||
#[cfg(feature = "serde")]
|
||||
|
|
|
@ -17,6 +17,7 @@ use derive_more::{Add, AddAssign, Sub, SubAssign};
|
|||
)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[cfg_attr(feature = "serde", serde(transparent))]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct View(pub(crate) i64);
|
||||
|
||||
impl View {
|
||||
|
|
|
@ -13,16 +13,26 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806"
|
|||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
tracing = "0.1"
|
||||
|
||||
|
||||
consensus-engine = { path = "../../consensus-engine" }
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
nomos-consensus = { path = "../../nomos-services/consensus" }
|
||||
nomos-network = { path = "../../nomos-services/network" }
|
||||
nomos-da = { path = "../../nomos-services/data-availability" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] }
|
||||
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
|
||||
nomos-libp2p = { path = "../../nomos-libp2p" }
|
||||
full-replication = { path = "../../nomos-da/full-replication" }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
tokio = { version = "1.33", default-features = false, features = ["sync"] }
|
||||
|
||||
|
||||
# axum related dependencies
|
||||
axum = { version = "0.6", optional = true }
|
||||
hyper = { version = "0.14", features = ["full"], optional = true }
|
||||
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
nomos-da = { path = "../../nomos-services/data-availability" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] }
|
||||
full-replication = { path = "../../nomos-da/full-replication", features = ["openapi"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
tokio = { version = "1.33", default-features = false, features = ["sync"] }
|
||||
|
||||
# openapi related dependencies
|
||||
utoipa = "4.0"
|
||||
utoipa-swagger-ui = { version = "4.0" }
|
||||
|
||||
|
|
|
@ -1,17 +1,20 @@
|
|||
use std::{fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};
|
||||
use std::{fmt::Debug, hash::Hash, net::SocketAddr};
|
||||
|
||||
use axum::{extract::State, response::IntoResponse, routing, Json, Router, Server};
|
||||
use full_replication::Blob;
|
||||
use hyper::StatusCode;
|
||||
use nomos_core::{da::blob, tx::Transaction};
|
||||
use nomos_mempool::{openapi::Status, MempoolMetrics};
|
||||
use axum::{extract::State, response::Response, routing, Json, Router, Server};
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
|
||||
use consensus_engine::BlockId;
|
||||
use full_replication::{Blob, Certificate};
|
||||
use nomos_core::{da::blob, tx::Transaction};
|
||||
use nomos_mempool::{network::adapters::libp2p::Libp2pAdapter, openapi::Status, MempoolMetrics};
|
||||
use nomos_network::backends::libp2p::Libp2p;
|
||||
use nomos_storage::backends::StorageSerde;
|
||||
|
||||
use crate::{
|
||||
http::{cl::*, da::*},
|
||||
http::{cl, consensus, da, libp2p, mempool, storage},
|
||||
Backend,
|
||||
};
|
||||
|
||||
|
@ -21,9 +24,10 @@ pub struct AxumBackendSettings {
|
|||
pub handle: OverwatchHandle,
|
||||
}
|
||||
|
||||
pub struct AxumBackend<ClTransaction> {
|
||||
settings: Arc<AxumBackendSettings>,
|
||||
_cl: core::marker::PhantomData<ClTransaction>,
|
||||
pub struct AxumBackend<T, S, const SIZE: usize> {
|
||||
settings: AxumBackendSettings,
|
||||
_tx: core::marker::PhantomData<T>,
|
||||
_storage_serde: core::marker::PhantomData<S>,
|
||||
}
|
||||
|
||||
#[derive(OpenApi)]
|
||||
|
@ -41,22 +45,22 @@ pub struct AxumBackend<ClTransaction> {
|
|||
)]
|
||||
struct ApiDoc;
|
||||
|
||||
type Store = Arc<AxumBackendSettings>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<ClTransaction> Backend for AxumBackend<ClTransaction>
|
||||
impl<T, S, const SIZE: usize> Backend for AxumBackend<T, S, SIZE>
|
||||
where
|
||||
ClTransaction: Transaction
|
||||
T: Transaction
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ for<'de> Deserialize<'de>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<ClTransaction as nomos_core::tx::Transaction>::Hash:
|
||||
<T as nomos_core::tx::Transaction>::Hash:
|
||||
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
type Error = hyper::Error;
|
||||
type Settings = AxumBackendSettings;
|
||||
|
@ -66,20 +70,26 @@ where
|
|||
Self: Sized,
|
||||
{
|
||||
Ok(Self {
|
||||
settings: Arc::new(settings),
|
||||
_cl: core::marker::PhantomData,
|
||||
settings,
|
||||
_tx: core::marker::PhantomData,
|
||||
_storage_serde: core::marker::PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
async fn serve(self) -> Result<(), Self::Error> {
|
||||
let store = self.settings.clone();
|
||||
let app = Router::new()
|
||||
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
|
||||
.route("/da/metrics", routing::get(da_metrics))
|
||||
.route("/da/status", routing::post(da_status))
|
||||
.route("/cl/metrics", routing::get(cl_metrics::<ClTransaction>))
|
||||
.route("/cl/status", routing::post(cl_status::<ClTransaction>))
|
||||
.with_state(store);
|
||||
.route("/da/blobs", routing::post(da_blobs))
|
||||
.route("/cl/metrics", routing::get(cl_metrics::<T>))
|
||||
.route("/cl/status", routing::post(cl_status::<T>))
|
||||
.route("/carnot/info", routing::get(carnot_info::<T, S, SIZE>))
|
||||
.route("/network/info", routing::get(libp2p_info))
|
||||
.route("/storage/block", routing::post(block::<S, T>))
|
||||
.route("/mempool/add/tx", routing::post(add_tx::<T>))
|
||||
.route("/mempool/add/cert", routing::post(add_cert))
|
||||
.with_state(self.settings.handle);
|
||||
|
||||
Server::bind(&self.settings.addr)
|
||||
.serve(app.into_make_service())
|
||||
|
@ -87,6 +97,21 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
macro_rules! make_request_and_return_response {
|
||||
($cond:expr) => {{
|
||||
match $cond.await {
|
||||
::std::result::Result::Ok(val) => ::axum::response::IntoResponse::into_response((
|
||||
::hyper::StatusCode::OK,
|
||||
::axum::Json(val),
|
||||
)),
|
||||
::std::result::Result::Err(e) => ::axum::response::IntoResponse::into_response((
|
||||
::hyper::StatusCode::INTERNAL_SERVER_ERROR,
|
||||
e.to_string(),
|
||||
)),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/da/metrics",
|
||||
|
@ -95,11 +120,8 @@ where
|
|||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn da_metrics(State(store): State<Store>) -> impl IntoResponse {
|
||||
match da_mempool_metrics(&store.handle).await {
|
||||
Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(),
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
|
||||
}
|
||||
async fn da_metrics(State(store): State<OverwatchHandle>) -> Response {
|
||||
make_request_and_return_response!(da::da_mempool_metrics(&store))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
|
@ -111,13 +133,25 @@ async fn da_metrics(State(store): State<Store>) -> impl IntoResponse {
|
|||
)
|
||||
)]
|
||||
async fn da_status(
|
||||
State(store): State<Store>,
|
||||
State(store): State<OverwatchHandle>,
|
||||
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
|
||||
) -> impl IntoResponse {
|
||||
match da_mempool_status(&store.handle, items).await {
|
||||
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
|
||||
}
|
||||
) -> Response {
|
||||
make_request_and_return_response!(da::da_mempool_status(&store, items))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/da/blobs",
|
||||
responses(
|
||||
(status = 200, description = "Get pending blobs", body = Vec<Blob>),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn da_blobs(
|
||||
State(store): State<OverwatchHandle>,
|
||||
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
|
||||
) -> Response {
|
||||
make_request_and_return_response!(da::da_blobs(&store, items))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
|
@ -128,7 +162,7 @@ async fn da_status(
|
|||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn cl_metrics<T>(State(store): State<Store>) -> impl IntoResponse
|
||||
async fn cl_metrics<T>(State(store): State<OverwatchHandle>) -> Response
|
||||
where
|
||||
T: Transaction
|
||||
+ Clone
|
||||
|
@ -141,10 +175,7 @@ where
|
|||
+ 'static,
|
||||
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
{
|
||||
match cl_mempool_metrics::<T>(&store.handle).await {
|
||||
Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(),
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
|
||||
}
|
||||
make_request_and_return_response!(cl::cl_mempool_metrics::<T>(&store))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
|
@ -156,24 +187,102 @@ where
|
|||
)
|
||||
)]
|
||||
async fn cl_status<T>(
|
||||
State(store): State<Store>,
|
||||
State(store): State<OverwatchHandle>,
|
||||
Json(items): Json<Vec<<T as Transaction>::Hash>>,
|
||||
) -> impl IntoResponse
|
||||
) -> Response
|
||||
where
|
||||
T: Transaction
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ serde::de::DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
T: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
<T as nomos_core::tx::Transaction>::Hash:
|
||||
Serialize + serde::de::DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
Serialize + DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
{
|
||||
match cl_mempool_status::<T>(&store.handle, items).await {
|
||||
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
|
||||
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
|
||||
}
|
||||
make_request_and_return_response!(cl::cl_mempool_status::<T>(&store, items))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/carnot/info",
|
||||
responses(
|
||||
(status = 200, description = "Query the carnot information", body = nomos_consensus::CarnotInfo),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn carnot_info<Tx, SS, const SIZE: usize>(State(store): State<OverwatchHandle>) -> Response
|
||||
where
|
||||
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
SS: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
make_request_and_return_response!(consensus::carnot_info::<Tx, SS, SIZE>(&store))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/network/info",
|
||||
responses(
|
||||
(status = 200, description = "Query the network information", body = nomos_network::backends::libp2p::Libp2pInfo),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn libp2p_info(State(store): State<OverwatchHandle>) -> Response {
|
||||
make_request_and_return_response!(libp2p::libp2p_info(&store))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
get,
|
||||
path = "/storage/block",
|
||||
responses(
|
||||
(status = 200, description = "Get the block by block id", body = Block<Tx, full_replication::Certificate>),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn block<S, Tx>(State(store): State<OverwatchHandle>, Json(id): Json<BlockId>) -> Response
|
||||
where
|
||||
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
make_request_and_return_response!(storage::block_req::<S, Tx>(&store, id))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/mempool/add/tx",
|
||||
responses(
|
||||
(status = 200, description = "Add transaction to the mempool"),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_tx<Tx>(State(store): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
|
||||
where
|
||||
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
{
|
||||
make_request_and_return_response!(mempool::add::<
|
||||
Libp2p,
|
||||
Libp2pAdapter<Tx, <Tx as Transaction>::Hash>,
|
||||
nomos_mempool::Transaction,
|
||||
Tx,
|
||||
<Tx as Transaction>::Hash,
|
||||
>(&store, tx, Transaction::hash))
|
||||
}
|
||||
|
||||
#[utoipa::path(
|
||||
post,
|
||||
path = "/mempool/add/tx",
|
||||
responses(
|
||||
(status = 200, description = "Add certificate to the mempool"),
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_cert(State(store): State<OverwatchHandle>, Json(cert): Json<Certificate>) -> Response {
|
||||
make_request_and_return_response!(mempool::add::<
|
||||
Libp2p,
|
||||
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
|
||||
nomos_mempool::Certificate,
|
||||
Certificate,
|
||||
<Blob as blob::Blob>::Hash,
|
||||
>(
|
||||
&store,
|
||||
cert,
|
||||
nomos_core::da::certificate::Certificate::hash
|
||||
))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
use std::{fmt::Debug, hash::Hash};
|
||||
|
||||
use consensus_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay};
|
||||
use full_replication::Certificate;
|
||||
use nomos_consensus::{
|
||||
network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter, CarnotConsensus,
|
||||
CarnotInfo, ConsensusMsg,
|
||||
};
|
||||
use nomos_core::{
|
||||
da::{
|
||||
blob,
|
||||
certificate::{self, select::FillSize as FillSizeWithBlobsCertificate},
|
||||
},
|
||||
tx::{select::FillSize as FillSizeWithTx, Transaction},
|
||||
};
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter,
|
||||
};
|
||||
use nomos_storage::backends::{sled::SledBackend, StorageSerde};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
|
||||
ConsensusLibp2pAdapter,
|
||||
MockPool<Tx, <Tx as Transaction>::Hash>,
|
||||
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
|
||||
MockPool<Certificate, <<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash>,
|
||||
MempoolLibp2pAdapter<
|
||||
Certificate,
|
||||
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
|
||||
>,
|
||||
TreeOverlay<RoundRobin, RandomBeaconState>,
|
||||
FillSizeWithTx<SIZE, Tx>,
|
||||
FillSizeWithBlobsCertificate<SIZE, Certificate>,
|
||||
SledBackend<SS>,
|
||||
>;
|
||||
|
||||
pub async fn carnot_info<Tx, SS, const SIZE: usize>(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> Result<CarnotInfo, overwatch_rs::DynError>
|
||||
where
|
||||
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
SS: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
let relay = handle.relay::<Carnot<Tx, SS, SIZE>>().connect().await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
relay
|
||||
.send(ConsensusMsg::Info { tx: sender })
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
Ok(receiver.await?)
|
||||
}
|
|
@ -1,5 +1,9 @@
|
|||
use full_replication::{Blob, Certificate};
|
||||
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication};
|
||||
use nomos_core::da::blob;
|
||||
use nomos_da::{
|
||||
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
|
||||
DaMsg, DataAvailabilityService,
|
||||
};
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool,
|
||||
network::adapters::libp2p::Libp2pAdapter,
|
||||
|
@ -8,12 +12,18 @@ use nomos_mempool::{
|
|||
};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
type DaMempoolService = MempoolService<
|
||||
pub type DaMempoolService = MempoolService<
|
||||
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
|
||||
MockPool<Certificate, <Blob as blob::Blob>::Hash>,
|
||||
CertDiscriminant,
|
||||
>;
|
||||
|
||||
pub type DataAvailability = DataAvailabilityService<
|
||||
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
|
||||
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
|
||||
DaLibp2pAdapter<Blob, Attestation>,
|
||||
>;
|
||||
|
||||
pub async fn da_mempool_metrics(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> Result<MempoolMetrics, super::DynError> {
|
||||
|
@ -45,3 +55,20 @@ pub async fn da_mempool_status(
|
|||
|
||||
Ok(receiver.await.unwrap())
|
||||
}
|
||||
|
||||
pub async fn da_blobs(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
ids: Vec<<Blob as blob::Blob>::Hash>,
|
||||
) -> Result<Vec<Blob>, super::DynError> {
|
||||
let relay = handle.relay::<DataAvailability>().connect().await?;
|
||||
let (reply_channel, receiver) = oneshot::channel();
|
||||
relay
|
||||
.send(DaMsg::Get {
|
||||
ids: Box::new(ids.into_iter()),
|
||||
reply_channel,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
Ok(receiver.await?)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
use nomos_network::{
|
||||
backends::libp2p::{Command, Libp2p, Libp2pInfo},
|
||||
NetworkMsg, NetworkService,
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub async fn libp2p_info(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> Result<Libp2pInfo, overwatch_rs::DynError> {
|
||||
let relay = handle.relay::<NetworkService<Libp2p>>().connect().await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
relay
|
||||
.send(NetworkMsg::Process(Command::Info { reply: sender }))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
Ok(receiver.await?)
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
use core::{fmt::Debug, hash::Hash};
|
||||
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService,
|
||||
};
|
||||
use nomos_network::backends::NetworkBackend;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub async fn add<N, A, D, Item, Key>(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
item: Item,
|
||||
converter: impl Fn(&Item) -> Key,
|
||||
) -> Result<(), super::DynError>
|
||||
where
|
||||
N: NetworkBackend,
|
||||
A: NetworkAdapter<Backend = N, Item = Item, Key = Key> + Send + Sync + 'static,
|
||||
A::Settings: Send + Sync,
|
||||
D: Discriminant,
|
||||
Item: Clone + Debug + Send + Sync + 'static + Hash,
|
||||
Key: Clone + Debug + Ord + Hash,
|
||||
{
|
||||
let relay = handle
|
||||
.relay::<MempoolService<A, MockPool<Item, Key>, D>>()
|
||||
.connect()
|
||||
.await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
relay
|
||||
.send(MempoolMsg::Add {
|
||||
key: converter(&item),
|
||||
item,
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
match receiver.await {
|
||||
Ok(Ok(())) => Ok(()),
|
||||
Ok(Err(())) => Err("mempool error".into()),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
|
@ -1,5 +1,10 @@
|
|||
pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
pub mod backend;
|
||||
|
||||
pub mod cl;
|
||||
pub mod consensus;
|
||||
pub mod da;
|
||||
pub mod libp2p;
|
||||
pub mod mempool;
|
||||
pub mod storage;
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
use consensus_engine::BlockId;
|
||||
use nomos_core::block::Block;
|
||||
use nomos_storage::{
|
||||
backends::{sled::SledBackend, StorageSerde},
|
||||
StorageMsg, StorageService,
|
||||
};
|
||||
|
||||
pub async fn block_req<S, Tx>(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
id: BlockId,
|
||||
) -> Result<Option<Block<Tx, full_replication::Certificate>>, super::DynError>
|
||||
where
|
||||
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
let relay = handle
|
||||
.relay::<StorageService<SledBackend<S>>>()
|
||||
.connect()
|
||||
.await?;
|
||||
let (msg, receiver) = StorageMsg::new_load_message(id);
|
||||
relay.send(msg).await.map_err(|(e, _)| e)?;
|
||||
|
||||
Ok(receiver.recv().await?)
|
||||
}
|
|
@ -93,6 +93,7 @@ impl CertificateStrategy for AbsoluteNumber<Attestation, Certificate> {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
|
||||
pub struct Blob {
|
||||
data: Bytes,
|
||||
|
|
|
@ -29,10 +29,14 @@ serde_with = "3.0.0"
|
|||
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
||||
blake2 = "0.10"
|
||||
|
||||
utoipa = { version = "4.0", optional = true }
|
||||
serde_json = { version = "1", optional = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
mock = ["nomos-network/mock"]
|
||||
libp2p = ["nomos-network/libp2p", "nomos-libp2p"]
|
||||
openapi = ["dep:utoipa", "serde_json"]
|
||||
|
||||
[dev-dependencies]
|
||||
serde_json = "1.0.96"
|
||||
|
|
|
@ -1080,6 +1080,7 @@ impl RelayMessage for ConsensusMsg {}
|
|||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct CarnotInfo {
|
||||
pub id: NodeId,
|
||||
pub current_view: View,
|
||||
|
|
|
@ -3,8 +3,6 @@ name = "nomos-network"
|
|||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
bytes = "1.2"
|
||||
|
@ -26,6 +24,9 @@ nomos-core = { path = "../../nomos-core" }
|
|||
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
||||
mixnet-client = { path = "../../mixnet/client" }
|
||||
|
||||
utoipa = { version = "4.0", optional = true }
|
||||
serde_json = { version = "1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
||||
|
@ -33,3 +34,4 @@ tokio = { version = "1", features = ["full"] }
|
|||
default = []
|
||||
libp2p = ["nomos-libp2p", "rand", "humantime-serde"]
|
||||
mock = ["rand", "chrono"]
|
||||
openapi = ["dep:utoipa", "serde_json",]
|
||||
|
|
|
@ -33,6 +33,7 @@ pub struct Dial {
|
|||
pub type Topic = String;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct Libp2pInfo {
|
||||
pub listen_addresses: Vec<Multiaddr>,
|
||||
pub n_peers: usize,
|
||||
|
|
Loading…
Reference in New Issue