From ba90ed1b5525059588fa923ec2cde80399671ff0 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 1 Nov 2023 17:14:58 +0800 Subject: [PATCH] Nomos http API (#476) * Info api * Da blob api (#487) * Add storage api for new http backend (#488) * Mempool add APIs (#489) --- consensus-engine/Cargo.toml | 2 +- consensus-engine/src/types.rs | 10 + consensus-engine/src/types/committee.rs | 1 + consensus-engine/src/types/node_id.rs | 1 + consensus-engine/src/types/view.rs | 1 + nodes/nomos-node-api/Cargo.toml | 22 +- nodes/nomos-node-api/src/http/backend/axum.rs | 217 +++++++++++++----- nodes/nomos-node-api/src/http/consensus.rs | 53 +++++ nodes/nomos-node-api/src/http/da.rs | 31 ++- nodes/nomos-node-api/src/http/libp2p.rs | 19 ++ nodes/nomos-node-api/src/http/mempool.rs | 42 ++++ nodes/nomos-node-api/src/http/mod.rs | 5 + nodes/nomos-node-api/src/http/storage.rs | 24 ++ nomos-da/full-replication/src/lib.rs | 1 + nomos-services/consensus/Cargo.toml | 4 + nomos-services/consensus/src/lib.rs | 1 + nomos-services/network/Cargo.toml | 6 +- .../network/src/backends/libp2p/command.rs | 1 + 18 files changed, 376 insertions(+), 65 deletions(-) create mode 100644 nodes/nomos-node-api/src/http/consensus.rs create mode 100644 nodes/nomos-node-api/src/http/libp2p.rs create mode 100644 nodes/nomos-node-api/src/http/mempool.rs create mode 100644 nodes/nomos-node-api/src/http/storage.rs diff --git a/consensus-engine/Cargo.toml b/consensus-engine/Cargo.toml index 3723c8cb..c7147112 100644 --- a/consensus-engine/Cargo.toml +++ b/consensus-engine/Cargo.toml @@ -25,7 +25,7 @@ default = [] serde = ["dep:serde", "nomos-utils/serde"] simulation = [] -openapi = ["dep:utoipa", "serde_json"] +openapi = ["dep:utoipa", "serde_json", "serde"] [dev-dependencies] proptest = "1.2.0" diff --git a/consensus-engine/src/types.rs b/consensus-engine/src/types.rs index ab970115..d165ef32 100644 --- a/consensus-engine/src/types.rs +++ b/consensus-engine/src/types.rs @@ -20,6 +20,7 @@ pub use view::View; /// can't be directly used in the network as they lack things like cryptographic signatures. #[derive(Debug, Clone, Eq, PartialEq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub enum Payload { /// Vote for a block in a view Vote(Vote), @@ -32,6 +33,7 @@ pub enum Payload { /// Returned #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct Vote { pub view: View, pub block: BlockId, @@ -39,6 +41,7 @@ pub struct Vote { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct Timeout { pub view: View, pub sender: NodeId, @@ -50,6 +53,7 @@ pub struct Timeout { // We should consider to remove the TimoutQc from the NewView message and use a hash or id instead. #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct NewView { pub view: View, pub sender: NodeId, @@ -59,6 +63,7 @@ pub struct NewView { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct TimeoutQc { view: View, high_qc: StandardQc, @@ -96,6 +101,7 @@ impl TimeoutQc { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct Block { #[cfg_attr(feature = "serde", serde(skip))] pub id: BlockId, @@ -106,6 +112,7 @@ pub struct Block { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub enum LeaderProof { LeaderId { leader_id: NodeId }, } @@ -136,6 +143,7 @@ pub struct Send { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct StandardQc { pub view: View, pub id: BlockId, @@ -152,6 +160,7 @@ impl StandardQc { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct AggregateQc { pub high_qc: StandardQc, pub view: View, @@ -159,6 +168,7 @@ pub struct AggregateQc { #[derive(Debug, Clone, Eq, PartialEq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub enum Qc { Standard(StandardQc), Aggregated(AggregateQc), diff --git a/consensus-engine/src/types/committee.rs b/consensus-engine/src/types/committee.rs index 4ee8c270..939213fd 100644 --- a/consensus-engine/src/types/committee.rs +++ b/consensus-engine/src/types/committee.rs @@ -3,6 +3,7 @@ use std::collections::BTreeSet; use crate::NodeId; #[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct CommitteeId(pub(crate) [u8; 32]); #[cfg(feature = "serde")] diff --git a/consensus-engine/src/types/node_id.rs b/consensus-engine/src/types/node_id.rs index 48748049..23a59e01 100644 --- a/consensus-engine/src/types/node_id.rs +++ b/consensus-engine/src/types/node_id.rs @@ -1,4 +1,5 @@ #[derive(Clone, Copy, Default, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct NodeId(pub(crate) [u8; 32]); #[cfg(feature = "serde")] diff --git a/consensus-engine/src/types/view.rs b/consensus-engine/src/types/view.rs index 497f5668..e26380d3 100644 --- a/consensus-engine/src/types/view.rs +++ b/consensus-engine/src/types/view.rs @@ -17,6 +17,7 @@ use derive_more::{Add, AddAssign, Sub, SubAssign}; )] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serde", serde(transparent))] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct View(pub(crate) i64); impl View { diff --git a/nodes/nomos-node-api/Cargo.toml b/nodes/nomos-node-api/Cargo.toml index c8252875..b04db982 100644 --- a/nodes/nomos-node-api/Cargo.toml +++ b/nodes/nomos-node-api/Cargo.toml @@ -13,16 +13,26 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" + +consensus-engine = { path = "../../consensus-engine" } +nomos-core = { path = "../../nomos-core" } +nomos-consensus = { path = "../../nomos-services/consensus" } +nomos-network = { path = "../../nomos-services/network" } +nomos-da = { path = "../../nomos-services/data-availability" } +nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] } +nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } +nomos-libp2p = { path = "../../nomos-libp2p" } +full-replication = { path = "../../nomos-da/full-replication" } +serde = { version = "1", features = ["derive"] } +tokio = { version = "1.33", default-features = false, features = ["sync"] } + + # axum related dependencies axum = { version = "0.6", optional = true } hyper = { version = "0.14", features = ["full"], optional = true } -nomos-core = { path = "../../nomos-core" } -nomos-da = { path = "../../nomos-services/data-availability" } -nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] } -full-replication = { path = "../../nomos-da/full-replication", features = ["openapi"] } -serde = { version = "1", features = ["derive"] } -tokio = { version = "1.33", default-features = false, features = ["sync"] } + +# openapi related dependencies utoipa = "4.0" utoipa-swagger-ui = { version = "4.0" } diff --git a/nodes/nomos-node-api/src/http/backend/axum.rs b/nodes/nomos-node-api/src/http/backend/axum.rs index 1d572600..55e1d03b 100644 --- a/nodes/nomos-node-api/src/http/backend/axum.rs +++ b/nodes/nomos-node-api/src/http/backend/axum.rs @@ -1,17 +1,20 @@ -use std::{fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc}; +use std::{fmt::Debug, hash::Hash, net::SocketAddr}; -use axum::{extract::State, response::IntoResponse, routing, Json, Router, Server}; -use full_replication::Blob; -use hyper::StatusCode; -use nomos_core::{da::blob, tx::Transaction}; -use nomos_mempool::{openapi::Status, MempoolMetrics}; +use axum::{extract::State, response::Response, routing, Json, Router, Server}; use overwatch_rs::overwatch::handle::OverwatchHandle; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; +use consensus_engine::BlockId; +use full_replication::{Blob, Certificate}; +use nomos_core::{da::blob, tx::Transaction}; +use nomos_mempool::{network::adapters::libp2p::Libp2pAdapter, openapi::Status, MempoolMetrics}; +use nomos_network::backends::libp2p::Libp2p; +use nomos_storage::backends::StorageSerde; + use crate::{ - http::{cl::*, da::*}, + http::{cl, consensus, da, libp2p, mempool, storage}, Backend, }; @@ -21,9 +24,10 @@ pub struct AxumBackendSettings { pub handle: OverwatchHandle, } -pub struct AxumBackend { - settings: Arc, - _cl: core::marker::PhantomData, +pub struct AxumBackend { + settings: AxumBackendSettings, + _tx: core::marker::PhantomData, + _storage_serde: core::marker::PhantomData, } #[derive(OpenApi)] @@ -41,22 +45,22 @@ pub struct AxumBackend { )] struct ApiDoc; -type Store = Arc; - #[async_trait::async_trait] -impl Backend for AxumBackend +impl Backend for AxumBackend where - ClTransaction: Transaction + T: Transaction + Clone + Debug + + Eq + Hash + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, - ::Hash: + ::Hash: Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static, + S: StorageSerde + Send + Sync + 'static, { type Error = hyper::Error; type Settings = AxumBackendSettings; @@ -66,20 +70,26 @@ where Self: Sized, { Ok(Self { - settings: Arc::new(settings), - _cl: core::marker::PhantomData, + settings, + _tx: core::marker::PhantomData, + _storage_serde: core::marker::PhantomData, }) } async fn serve(self) -> Result<(), Self::Error> { - let store = self.settings.clone(); let app = Router::new() .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())) .route("/da/metrics", routing::get(da_metrics)) .route("/da/status", routing::post(da_status)) - .route("/cl/metrics", routing::get(cl_metrics::)) - .route("/cl/status", routing::post(cl_status::)) - .with_state(store); + .route("/da/blobs", routing::post(da_blobs)) + .route("/cl/metrics", routing::get(cl_metrics::)) + .route("/cl/status", routing::post(cl_status::)) + .route("/carnot/info", routing::get(carnot_info::)) + .route("/network/info", routing::get(libp2p_info)) + .route("/storage/block", routing::post(block::)) + .route("/mempool/add/tx", routing::post(add_tx::)) + .route("/mempool/add/cert", routing::post(add_cert)) + .with_state(self.settings.handle); Server::bind(&self.settings.addr) .serve(app.into_make_service()) @@ -87,6 +97,21 @@ where } } +macro_rules! make_request_and_return_response { + ($cond:expr) => {{ + match $cond.await { + ::std::result::Result::Ok(val) => ::axum::response::IntoResponse::into_response(( + ::hyper::StatusCode::OK, + ::axum::Json(val), + )), + ::std::result::Result::Err(e) => ::axum::response::IntoResponse::into_response(( + ::hyper::StatusCode::INTERNAL_SERVER_ERROR, + e.to_string(), + )), + } + }}; +} + #[utoipa::path( get, path = "/da/metrics", @@ -95,11 +120,8 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn da_metrics(State(store): State) -> impl IntoResponse { - match da_mempool_metrics(&store.handle).await { - Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), - } +async fn da_metrics(State(store): State) -> Response { + make_request_and_return_response!(da::da_mempool_metrics(&store)) } #[utoipa::path( @@ -111,13 +133,25 @@ async fn da_metrics(State(store): State) -> impl IntoResponse { ) )] async fn da_status( - State(store): State, + State(store): State, Json(items): Json::Hash>>, -) -> impl IntoResponse { - match da_mempool_status(&store.handle, items).await { - Ok(status) => (StatusCode::OK, Json(status)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), - } +) -> Response { + make_request_and_return_response!(da::da_mempool_status(&store, items)) +} + +#[utoipa::path( + post, + path = "/da/blobs", + responses( + (status = 200, description = "Get pending blobs", body = Vec), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn da_blobs( + State(store): State, + Json(items): Json::Hash>>, +) -> Response { + make_request_and_return_response!(da::da_blobs(&store, items)) } #[utoipa::path( @@ -128,7 +162,7 @@ async fn da_status( (status = 500, description = "Internal server error", body = String), ) )] -async fn cl_metrics(State(store): State) -> impl IntoResponse +async fn cl_metrics(State(store): State) -> Response where T: Transaction + Clone @@ -141,10 +175,7 @@ where + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, { - match cl_mempool_metrics::(&store.handle).await { - Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), - } + make_request_and_return_response!(cl::cl_mempool_metrics::(&store)) } #[utoipa::path( @@ -156,24 +187,102 @@ where ) )] async fn cl_status( - State(store): State, + State(store): State, Json(items): Json::Hash>>, -) -> impl IntoResponse +) -> Response where - T: Transaction - + Clone - + Debug - + Hash - + Serialize - + serde::de::DeserializeOwned - + Send - + Sync - + 'static, + T: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, ::Hash: - Serialize + serde::de::DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static, + Serialize + DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static, { - match cl_mempool_status::(&store.handle, items).await { - Ok(status) => (StatusCode::OK, Json(status)).into_response(), - Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), - } + make_request_and_return_response!(cl::cl_mempool_status::(&store, items)) +} + +#[utoipa::path( + get, + path = "/carnot/info", + responses( + (status = 200, description = "Query the carnot information", body = nomos_consensus::CarnotInfo), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn carnot_info(State(store): State) -> Response +where + Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, + SS: StorageSerde + Send + Sync + 'static, +{ + make_request_and_return_response!(consensus::carnot_info::(&store)) +} + +#[utoipa::path( + get, + path = "/network/info", + responses( + (status = 200, description = "Query the network information", body = nomos_network::backends::libp2p::Libp2pInfo), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn libp2p_info(State(store): State) -> Response { + make_request_and_return_response!(libp2p::libp2p_info(&store)) +} + +#[utoipa::path( + get, + path = "/storage/block", + responses( + (status = 200, description = "Get the block by block id", body = Block), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn block(State(store): State, Json(id): Json) -> Response +where + Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash, + S: StorageSerde + Send + Sync + 'static, +{ + make_request_and_return_response!(storage::block_req::(&store, id)) +} + +#[utoipa::path( + post, + path = "/mempool/add/tx", + responses( + (status = 200, description = "Add transaction to the mempool"), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn add_tx(State(store): State, Json(tx): Json) -> Response +where + Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, +{ + make_request_and_return_response!(mempool::add::< + Libp2p, + Libp2pAdapter::Hash>, + nomos_mempool::Transaction, + Tx, + ::Hash, + >(&store, tx, Transaction::hash)) +} + +#[utoipa::path( + post, + path = "/mempool/add/tx", + responses( + (status = 200, description = "Add certificate to the mempool"), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn add_cert(State(store): State, Json(cert): Json) -> Response { + make_request_and_return_response!(mempool::add::< + Libp2p, + Libp2pAdapter::Hash>, + nomos_mempool::Certificate, + Certificate, + ::Hash, + >( + &store, + cert, + nomos_core::da::certificate::Certificate::hash + )) } diff --git a/nodes/nomos-node-api/src/http/consensus.rs b/nodes/nomos-node-api/src/http/consensus.rs new file mode 100644 index 00000000..dc61ccd2 --- /dev/null +++ b/nodes/nomos-node-api/src/http/consensus.rs @@ -0,0 +1,53 @@ +use std::{fmt::Debug, hash::Hash}; + +use consensus_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay}; +use full_replication::Certificate; +use nomos_consensus::{ + network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter, CarnotConsensus, + CarnotInfo, ConsensusMsg, +}; +use nomos_core::{ + da::{ + blob, + certificate::{self, select::FillSize as FillSizeWithBlobsCertificate}, + }, + tx::{select::FillSize as FillSizeWithTx, Transaction}, +}; +use nomos_mempool::{ + backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter, +}; +use nomos_storage::backends::{sled::SledBackend, StorageSerde}; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::sync::oneshot; + +pub type Carnot = CarnotConsensus< + ConsensusLibp2pAdapter, + MockPool::Hash>, + MempoolLibp2pAdapter::Hash>, + MockPool::Blob as blob::Blob>::Hash>, + MempoolLibp2pAdapter< + Certificate, + <::Blob as blob::Blob>::Hash, + >, + TreeOverlay, + FillSizeWithTx, + FillSizeWithBlobsCertificate, + SledBackend, +>; + +pub async fn carnot_info( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, +) -> Result +where + Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static, + ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, + SS: StorageSerde + Send + Sync + 'static, +{ + let relay = handle.relay::>().connect().await?; + let (sender, receiver) = oneshot::channel(); + relay + .send(ConsensusMsg::Info { tx: sender }) + .await + .map_err(|(e, _)| e)?; + Ok(receiver.await?) +} diff --git a/nodes/nomos-node-api/src/http/da.rs b/nodes/nomos-node-api/src/http/da.rs index 22d45c0e..918bb854 100644 --- a/nodes/nomos-node-api/src/http/da.rs +++ b/nodes/nomos-node-api/src/http/da.rs @@ -1,5 +1,9 @@ -use full_replication::{Blob, Certificate}; +use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication}; use nomos_core::da::blob; +use nomos_da::{ + backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, + DaMsg, DataAvailabilityService, +}; use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter, @@ -8,12 +12,18 @@ use nomos_mempool::{ }; use tokio::sync::oneshot; -type DaMempoolService = MempoolService< +pub type DaMempoolService = MempoolService< Libp2pAdapter::Hash>, MockPool::Hash>, CertDiscriminant, >; +pub type DataAvailability = DataAvailabilityService< + FullReplication>, + BlobCache<::Hash, Blob>, + DaLibp2pAdapter, +>; + pub async fn da_mempool_metrics( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, ) -> Result { @@ -45,3 +55,20 @@ pub async fn da_mempool_status( Ok(receiver.await.unwrap()) } + +pub async fn da_blobs( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + ids: Vec<::Hash>, +) -> Result, super::DynError> { + let relay = handle.relay::().connect().await?; + let (reply_channel, receiver) = oneshot::channel(); + relay + .send(DaMsg::Get { + ids: Box::new(ids.into_iter()), + reply_channel, + }) + .await + .map_err(|(e, _)| e)?; + + Ok(receiver.await?) +} diff --git a/nodes/nomos-node-api/src/http/libp2p.rs b/nodes/nomos-node-api/src/http/libp2p.rs new file mode 100644 index 00000000..c43706b8 --- /dev/null +++ b/nodes/nomos-node-api/src/http/libp2p.rs @@ -0,0 +1,19 @@ +use nomos_network::{ + backends::libp2p::{Command, Libp2p, Libp2pInfo}, + NetworkMsg, NetworkService, +}; +use tokio::sync::oneshot; + +pub async fn libp2p_info( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, +) -> Result { + let relay = handle.relay::>().connect().await?; + let (sender, receiver) = oneshot::channel(); + + relay + .send(NetworkMsg::Process(Command::Info { reply: sender })) + .await + .map_err(|(e, _)| e)?; + + Ok(receiver.await?) +} diff --git a/nodes/nomos-node-api/src/http/mempool.rs b/nodes/nomos-node-api/src/http/mempool.rs new file mode 100644 index 00000000..a5b7009f --- /dev/null +++ b/nodes/nomos-node-api/src/http/mempool.rs @@ -0,0 +1,42 @@ +use core::{fmt::Debug, hash::Hash}; + +use nomos_mempool::{ + backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService, +}; +use nomos_network::backends::NetworkBackend; +use tokio::sync::oneshot; + +pub async fn add( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + item: Item, + converter: impl Fn(&Item) -> Key, +) -> Result<(), super::DynError> +where + N: NetworkBackend, + A: NetworkAdapter + Send + Sync + 'static, + A::Settings: Send + Sync, + D: Discriminant, + Item: Clone + Debug + Send + Sync + 'static + Hash, + Key: Clone + Debug + Ord + Hash, +{ + let relay = handle + .relay::, D>>() + .connect() + .await?; + let (sender, receiver) = oneshot::channel(); + + relay + .send(MempoolMsg::Add { + key: converter(&item), + item, + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + match receiver.await { + Ok(Ok(())) => Ok(()), + Ok(Err(())) => Err("mempool error".into()), + Err(e) => Err(e.into()), + } +} diff --git a/nodes/nomos-node-api/src/http/mod.rs b/nodes/nomos-node-api/src/http/mod.rs index 22f67af6..369b16b3 100644 --- a/nodes/nomos-node-api/src/http/mod.rs +++ b/nodes/nomos-node-api/src/http/mod.rs @@ -1,5 +1,10 @@ pub type DynError = Box; pub mod backend; + pub mod cl; +pub mod consensus; pub mod da; +pub mod libp2p; +pub mod mempool; +pub mod storage; diff --git a/nodes/nomos-node-api/src/http/storage.rs b/nodes/nomos-node-api/src/http/storage.rs new file mode 100644 index 00000000..b35c7162 --- /dev/null +++ b/nodes/nomos-node-api/src/http/storage.rs @@ -0,0 +1,24 @@ +use consensus_engine::BlockId; +use nomos_core::block::Block; +use nomos_storage::{ + backends::{sled::SledBackend, StorageSerde}, + StorageMsg, StorageService, +}; + +pub async fn block_req( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + id: BlockId, +) -> Result>, super::DynError> +where + Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash, + S: StorageSerde + Send + Sync + 'static, +{ + let relay = handle + .relay::>>() + .connect() + .await?; + let (msg, receiver) = StorageMsg::new_load_message(id); + relay.send(msg).await.map_err(|(e, _)| e)?; + + Ok(receiver.recv().await?) +} diff --git a/nomos-da/full-replication/src/lib.rs b/nomos-da/full-replication/src/lib.rs index 762c5357..c8c122a6 100644 --- a/nomos-da/full-replication/src/lib.rs +++ b/nomos-da/full-replication/src/lib.rs @@ -93,6 +93,7 @@ impl CertificateStrategy for AbsoluteNumber { } #[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct Blob { data: Bytes, diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 0c564def..7424dffc 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -29,10 +29,14 @@ serde_with = "3.0.0" nomos-libp2p = { path = "../../nomos-libp2p", optional = true } blake2 = "0.10" +utoipa = { version = "4.0", optional = true } +serde_json = { version = "1", optional = true } + [features] default = [] mock = ["nomos-network/mock"] libp2p = ["nomos-network/libp2p", "nomos-libp2p"] +openapi = ["dep:utoipa", "serde_json"] [dev-dependencies] serde_json = "1.0.96" diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 60c0c13d..c2d08be6 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -1080,6 +1080,7 @@ impl RelayMessage for ConsensusMsg {} #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct CarnotInfo { pub id: NodeId, pub current_view: View, diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index cd809d5b..1d14aabc 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -3,8 +3,6 @@ name = "nomos-network" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1" bytes = "1.2" @@ -26,6 +24,9 @@ nomos-core = { path = "../../nomos-core" } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } mixnet-client = { path = "../../mixnet/client" } +utoipa = { version = "4.0", optional = true } +serde_json = { version = "1", optional = true } + [dev-dependencies] tokio = { version = "1", features = ["full"] } @@ -33,3 +34,4 @@ tokio = { version = "1", features = ["full"] } default = [] libp2p = ["nomos-libp2p", "rand", "humantime-serde"] mock = ["rand", "chrono"] +openapi = ["dep:utoipa", "serde_json",] diff --git a/nomos-services/network/src/backends/libp2p/command.rs b/nomos-services/network/src/backends/libp2p/command.rs index c33709c1..51108880 100644 --- a/nomos-services/network/src/backends/libp2p/command.rs +++ b/nomos-services/network/src/backends/libp2p/command.rs @@ -33,6 +33,7 @@ pub struct Dial { pub type Topic = String; #[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] pub struct Libp2pInfo { pub listen_addresses: Vec, pub n_peers: usize,