1
0
mirror of synced 2025-01-23 14:09:53 +00:00

Metrics api (#466)

* Add metrics API
This commit is contained in:
Al Liu 2023-10-31 17:20:04 +08:00 committed by GitHub
parent 5e520ae194
commit 9b3c675b3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 372 additions and 54 deletions

View File

@ -14,10 +14,10 @@ members = [
"nomos-da/reed-solomon",
"nomos-da/kzg",
"nomos-da/full-replication",
"nomos-http-api",
"nomos-cli",
"nomos-utils",
"nodes/nomos-node",
"nodes/nomos-node-api",
"nodes/mixnode",
"simulations",
"consensus-engine",

View File

@ -17,11 +17,16 @@ thiserror = "1"
fraction = { version = "0.13" }
nomos-utils = { path = "../nomos-utils", optional = true }
utoipa = { version = "4.0", optional = true }
serde_json = { version = "1.0", optional = true }
[features]
default = []
serde = ["dep:serde", "nomos-utils/serde"]
simulation = []
openapi = ["dep:utoipa", "serde_json"]
[dev-dependencies]
proptest = "1.2.0"
proptest-state-machine = "0.1.0"

View File

@ -5,6 +5,12 @@ mod types;
pub use overlay::Overlay;
pub use types::*;
/// Re-export of the OpenAPI types
#[cfg(feature = "openapi")]
pub mod openapi {
pub use crate::types::BlockId;
}
#[derive(Clone, Debug, PartialEq)]
pub struct Carnot<O: Overlay> {
id: NodeId,

View File

@ -1,4 +1,6 @@
/// The block id
#[derive(Clone, Copy, Debug, Default, Eq, Hash, PartialEq, Ord, PartialOrd)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct BlockId(pub(crate) [u8; 32]);
#[cfg(feature = "serde")]

View File

@ -1,9 +1,10 @@
[package]
name = "nomos-http-api"
name = "nomos-node-api"
version = "0.1.0"
edition = "2021"
[features]
default = ["axum"]
axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"]
[dependencies]
@ -11,13 +12,20 @@ async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tracing = "0.1"
utoipa = "4.0"
utoipa-swagger-ui = { version = "4.0" }
# axum related dependencies
axum = { version = "0.6", optional = true }
hyper = { version = "0.14", features = ["full"], optional = true }
nomos-core = { path = "../../nomos-core" }
nomos-da = { path = "../../nomos-services/data-availability" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] }
full-replication = { path = "../../nomos-da/full-replication", features = ["openapi"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.33", default-features = false, features = ["sync"] }
utoipa = "4.0"
utoipa-swagger-ui = { version = "4.0" }
[dev-dependencies]
axum = "0.6"
hyper = { version = "0.14", features = ["full"] }

View File

@ -0,0 +1,179 @@
use std::{fmt::Debug, hash::Hash, net::SocketAddr, sync::Arc};
use axum::{extract::State, response::IntoResponse, routing, Json, Router, Server};
use full_replication::Blob;
use hyper::StatusCode;
use nomos_core::{da::blob, tx::Transaction};
use nomos_mempool::{openapi::Status, MempoolMetrics};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{Deserialize, Serialize};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::{
http::{cl::*, da::*},
Backend,
};
#[derive(Clone)]
pub struct AxumBackendSettings {
pub addr: SocketAddr,
pub handle: OverwatchHandle,
}
pub struct AxumBackend<ClTransaction> {
settings: Arc<AxumBackendSettings>,
_cl: core::marker::PhantomData<ClTransaction>,
}
#[derive(OpenApi)]
#[openapi(
paths(
da_metrics,
da_status,
),
components(
schemas(Status, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")
)
)]
struct ApiDoc;
type Store = Arc<AxumBackendSettings>;
#[async_trait::async_trait]
impl<ClTransaction> Backend for AxumBackend<ClTransaction>
where
ClTransaction: Transaction
+ Clone
+ Debug
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<ClTransaction as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self {
settings: Arc::new(settings),
_cl: core::marker::PhantomData,
})
}
async fn serve(self) -> Result<(), Self::Error> {
let store = self.settings.clone();
let app = Router::new()
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/da/metrics", routing::get(da_metrics))
.route("/da/status", routing::post(da_status))
.route("/cl/metrics", routing::get(cl_metrics::<ClTransaction>))
.route("/cl/status", routing::post(cl_status::<ClTransaction>))
.with_state(store);
Server::bind(&self.settings.addr)
.serve(app.into_make_service())
.await
}
}
#[utoipa::path(
get,
path = "/da/metrics",
responses(
(status = 200, description = "Get the mempool metrics of the da service", body = MempoolMetrics),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_metrics(State(store): State<Store>) -> impl IntoResponse {
match da_mempool_metrics(&store.handle).await {
Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
#[utoipa::path(
post,
path = "/da/status",
responses(
(status = 200, description = "Query the mempool status of the da service", body = Vec<<Blob as blob::Blob>::Hash>),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_status(
State(store): State<Store>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> impl IntoResponse {
match da_mempool_status(&store.handle, items).await {
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
#[utoipa::path(
get,
path = "/cl/metrics",
responses(
(status = 200, description = "Get the mempool metrics of the cl service", body = MempoolMetrics),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_metrics<T>(State(store): State<Store>) -> impl IntoResponse
where
T: Transaction
+ Clone
+ Debug
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
match cl_mempool_metrics::<T>(&store.handle).await {
Ok(metrics) => (StatusCode::OK, Json(metrics)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
#[utoipa::path(
post,
path = "/cl/status",
responses(
(status = 200, description = "Query the mempool status of the cl service", body = Vec<<T as Transaction>::Hash>),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_status<T>(
State(store): State<Store>,
Json(items): Json<Vec<<T as Transaction>::Hash>>,
) -> impl IntoResponse
where
T: Transaction
+ Clone
+ Debug
+ Hash
+ Serialize
+ serde::de::DeserializeOwned
+ Send
+ Sync
+ 'static,
<T as nomos_core::tx::Transaction>::Hash:
Serialize + serde::de::DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static,
{
match cl_mempool_status::<T>(&store.handle, items).await {
Ok(status) => (StatusCode::OK, Json(status)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}

View File

@ -0,0 +1,73 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::tx::Transaction;
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::libp2p::Libp2pAdapter,
openapi::{MempoolMetrics, Status},
MempoolMsg, MempoolService, Transaction as TxDiscriminant,
};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
type ClMempoolService<T> = MempoolService<
Libp2pAdapter<T, <T as Transaction>::Hash>,
MockPool<T, <T as Transaction>::Hash>,
TxDiscriminant,
>;
pub async fn cl_mempool_metrics<T>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<MempoolMetrics, super::DynError>
where
T: Transaction
+ Clone
+ Debug
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
let relay = handle.relay::<ClMempoolService<T>>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}
pub async fn cl_mempool_status<T>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<T as Transaction>::Hash>,
) -> Result<Vec<Status>, super::DynError>
where
T: Transaction
+ Clone
+ Debug
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
let relay = handle.relay::<ClMempoolService<T>>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Status {
items,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}

View File

@ -0,0 +1,47 @@
use full_replication::{Blob, Certificate};
use nomos_core::da::blob;
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::libp2p::Libp2pAdapter,
openapi::{MempoolMetrics, Status},
Certificate as CertDiscriminant, MempoolMsg, MempoolService,
};
use tokio::sync::oneshot;
type DaMempoolService = MempoolService<
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<Certificate, <Blob as blob::Blob>::Hash>,
CertDiscriminant,
>;
pub async fn da_mempool_metrics(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<MempoolMetrics, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await.unwrap())
}
pub async fn da_mempool_status(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Status>, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Status {
items,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await.unwrap())
}

View File

@ -0,0 +1,5 @@
pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub mod backend;
pub mod cl;
pub mod da;

View File

@ -6,7 +6,7 @@ use std::{
use axum::{routing, Router, Server};
use hyper::Error;
use nomos_http_api::{ApiService, ApiServiceSettings, Backend};
use nomos_node_api::{ApiService, ApiServiceSettings, Backend};
use overwatch_derive::Services;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use utoipa::{

View File

@ -3,10 +3,15 @@ name = "full-replication"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
openapi = ["dep:utoipa", "serde_json"]
[dependencies]
blake2 = { version = "0.10" }
bytes = { version = "1.3", features = ["serde"] }
nomos-core = { path = "../../nomos-core" }
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
utoipa = { version = "4.0", optional = true }
serde_json = { version = "1.0", optional = true }

View File

@ -16,6 +16,12 @@ use bytes::Bytes;
use nomos_core::wire;
use serde::{Deserialize, Serialize};
/// Re-export the types for OpenAPI
#[cfg(feature = "openapi")]
pub mod openapi {
pub use super::{Attestation, Certificate};
}
#[derive(Debug, Clone)]
pub struct FullReplication<CertificateStrategy> {
certificate_strategy: CertificateStrategy,
@ -110,6 +116,7 @@ impl blob::Blob for Blob {
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Attestation {
blob: [u8; 32],
voter: [u8; 32],
@ -135,6 +142,7 @@ impl attestation::Attestation for Attestation {
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Certificate {
attestations: Vec<Attestation>,
}

View File

@ -1,46 +0,0 @@
use std::{net::SocketAddr, sync::Arc};
use axum::{Router, Server};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use crate::Backend;
#[derive(Clone)]
pub struct AxumBackendSettings {
pub addr: SocketAddr,
}
pub struct AxumBackend {
settings: Arc<AxumBackendSettings>,
}
#[derive(OpenApi)]
#[openapi(paths(), components(), tags())]
struct ApiDoc;
#[async_trait::async_trait]
impl Backend for AxumBackend {
type Error = hyper::Error;
type Settings = AxumBackendSettings;
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self {
settings: Arc::new(settings),
})
}
async fn serve(self) -> Result<(), Self::Error> {
let store = self.settings.clone();
let app = Router::new()
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.with_state(store);
Server::bind(&self.settings.addr)
.serve(app.into_make_service())
.await
}
}

View File

@ -1 +0,0 @@
pub mod backend;

View File

@ -21,6 +21,9 @@ tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1"
chrono = "0.4"
utoipa = { version = "4.0", optional = true }
serde_json = { version = "1", optional = true }
[dev-dependencies]
nomos-log = { path = "../log" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
@ -31,3 +34,6 @@ blake2 = "0.10"
default = []
mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"]
libp2p = ["nomos-network/libp2p"]
# enable to help generate OpenAPI
openapi = ["dep:utoipa", "serde_json"]

View File

@ -50,9 +50,22 @@ pub trait MemPool {
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub enum Status {
/// Unknown status
Unknown,
/// Pending status
Pending,
/// Rejected status
Rejected,
/// Accepted status
///
/// The block id of the block that contains the item
#[cfg_attr(
feature = "openapi",
schema(
example = "e.g. 0x000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f"
)
)]
InBlock { block: BlockId },
}

View File

@ -1,6 +1,12 @@
pub mod backend;
pub mod network;
/// Re-export for OpenAPI
#[cfg(feature = "openapi")]
pub mod openapi {
pub use super::{backend::Status, MempoolMetrics};
}
// std
use std::{
fmt::{Debug, Error, Formatter},
@ -43,6 +49,8 @@ where
_d: PhantomData<D>,
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(serde::Serialize, serde::Deserialize)]
pub struct MempoolMetrics {
pub pending_items: usize,
pub last_item_timestamp: u64,