diff --git a/Cargo.toml b/Cargo.toml index f96f4514..44bcdbd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "nomos-core", "nomos-libp2p", "nomos-services/api", + "nomos-metrics", "nomos-services/log", "nomos-services/metrics", "nomos-services/network", @@ -27,4 +28,4 @@ members = [ "mixnet/protocol", "mixnet/topology", ] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 27a0c1d5..71168c8c 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -23,7 +23,8 @@ nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } nomos-api = { path = "../../nomos-services/api" } nomos-log = { path = "../../nomos-services/log" } -nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] } +nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "metrics"] } +nomos-metrics = { path = "../../nomos-metrics" } nomos-http = { path = "../../nomos-services/http", features = ["http"] } carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] } nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } @@ -45,4 +46,4 @@ utoipa-swagger-ui = { version = "4.0" } # axum related dependencies axum = { version = "0.6" } hyper = { version = "0.14", features = ["full"] } -tower-http = { version = "0.4", features = ["cors", "trace"] } \ No newline at end of file +tower-http = { version = "0.4", features = ["cors", "trace"] } diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index 5bcb3a6b..20e65903 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -3,10 +3,13 @@ use std::{fmt::Debug, hash::Hash}; use axum::{ extract::{Query, State}, http::HeaderValue, - response::Response, + response::{IntoResponse, Response}, routing, Json, Router, Server, }; -use hyper::header::{CONTENT_TYPE, USER_AGENT}; +use hyper::{ + header::{CONTENT_TYPE, USER_AGENT}, + Body, StatusCode, +}; use overwatch_rs::overwatch::handle::OverwatchHandle; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tower_http::{ @@ -24,7 +27,7 @@ use nomos_network::backends::libp2p::Libp2p; use nomos_storage::backends::StorageSerde; use nomos_api::{ - http::{cl, consensus, da, libp2p, mempool, storage}, + http::{cl, consensus, da, libp2p, mempool, metrics, storage}, Backend, }; @@ -123,6 +126,7 @@ where .route("/storage/block", routing::post(block::)) .route("/mempool/add/tx", routing::post(add_tx::)) .route("/mempool/add/cert", routing::post(add_cert)) + .route("/metrics", routing::get(get_metrics)) .with_state(handle); Server::bind(&self.settings.address) @@ -350,3 +354,29 @@ async fn add_cert( nomos_core::da::certificate::Certificate::hash )) } + +#[utoipa::path( + get, + path = "/metrics", + responses( + (status = 200, description = "Get all metrics"), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn get_metrics(State(handle): State) -> Response { + match metrics::gather(&handle).await { + Ok(encoded_metrics) => Response::builder() + .status(StatusCode::OK) + .header( + CONTENT_TYPE, + HeaderValue::from_static("text/plain; version=0.0.4"), + ) + .body(Body::from(encoded_metrics)) + .unwrap() + .into_response(), + Err(e) => axum::response::IntoResponse::into_response(( + hyper::StatusCode::INTERNAL_SERVER_ERROR, + e.to_string(), + )), + } +} diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 3388a71f..04db7eba 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -10,8 +10,6 @@ use crate::{Carnot, Tx, Wire, MB16}; use clap::{Parser, ValueEnum}; use color_eyre::eyre::{self, eyre, Result}; use hex::FromHex; -#[cfg(feature = "metrics")] -use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; use nomos_api::ApiService; use nomos_libp2p::{secp256k1::SecretKey, Multiaddr}; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; @@ -115,14 +113,18 @@ pub struct DaArgs { da_voter: Option, } +#[derive(Parser, Debug, Clone)] +pub struct MetricsArgs { + #[clap(long = "with-metrics", env = "WITH_METRICS")] + pub with_metrics: bool, +} + #[derive(Deserialize, Debug, Clone, Serialize)] pub struct Config { pub log: ::Settings, pub network: as ServiceData>::Settings, pub http: > as ServiceData>::Settings, pub consensus: ::Settings, - #[cfg(feature = "metrics")] - pub metrics: > as ServiceData>::Settings, pub da: ::Settings, } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 4db22e5d..cebd8886 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -29,13 +29,17 @@ use nomos_mempool::{ backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService, Transaction as TxDiscriminant, }; +#[cfg(feature = "metrics")] +use nomos_metrics::Metrics; use nomos_network::backends::libp2p::Libp2p; use nomos_storage::{ backends::{sled::SledBackend, StorageSerde}, StorageService, }; -pub use config::{Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; +pub use config::{ + Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, OverlayArgs, +}; use nomos_core::{ da::certificate::select::FillSize as FillSizeWithBlobsCertificate, tx::select::FillSize as FillSizeWithTx, @@ -89,10 +93,10 @@ pub struct Nomos { >, consensus: ServiceHandle, http: ServiceHandle>>, - #[cfg(feature = "metrics")] - metrics: ServiceHandle>>, da: ServiceHandle, storage: ServiceHandle>>, + #[cfg(feature = "metrics")] + metrics: ServiceHandle, system_sig: ServiceHandle, } diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index a7441b52..25bc5787 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,7 +1,9 @@ use full_replication::{Blob, Certificate}; +#[cfg(feature = "metrics")] +use nomos_metrics::MetricsSettings; use nomos_node::{ - Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings, - OverlayArgs, Tx, + Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos, + NomosServiceSettings, OverlayArgs, Tx, }; use clap::Parser; @@ -40,6 +42,9 @@ struct Args { /// Overrides da config. #[clap(flatten)] da_args: DaArgs, + /// Overrides metrics config. + #[clap(flatten)] + metrics_args: MetricsArgs, } fn main() -> Result<()> { @@ -51,6 +56,7 @@ fn main() -> Result<()> { network_args, consensus_args, overlay_args, + metrics_args, } = Args::parse(); let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)? .update_da(da_args)? @@ -60,6 +66,14 @@ fn main() -> Result<()> { .update_overlay(overlay_args)? .update_network(network_args)?; + let registry = cfg!(feature = "metrics") + .then(|| { + metrics_args + .with_metrics + .then(nomos_metrics::NomosRegistry::default) + }) + .flatten(); + let app = OverwatchRunner::::run( NomosServiceSettings { network: config.network, @@ -71,6 +85,7 @@ fn main() -> Result<()> { topic: String::from(nomos_node::CL_TOPIC), id: ::hash, }, + registry: registry.clone(), }, da_mempool: nomos_mempool::Settings { backend: (), @@ -78,10 +93,11 @@ fn main() -> Result<()> { topic: String::from(nomos_node::DA_TOPIC), id: cert_id, }, + registry: registry.clone(), }, consensus: config.consensus, #[cfg(feature = "metrics")] - metrics: config.metrics, + metrics: MetricsSettings { registry }, da: config.da, storage: nomos_storage::backends::sled::SledBackendSettings { db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), diff --git a/nomos-metrics/Cargo.toml b/nomos-metrics/Cargo.toml new file mode 100644 index 00000000..1867b0f1 --- /dev/null +++ b/nomos-metrics/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "nomos-metrics" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +futures = "0.3" +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } +prometheus-client = "0.22.0" +tracing = "0.1" +tokio = { version = "1", features = ["sync", "macros"] } +serde = { version = "1", features = ["derive"] } diff --git a/nomos-metrics/src/lib.rs b/nomos-metrics/src/lib.rs new file mode 100644 index 00000000..d9461b38 --- /dev/null +++ b/nomos-metrics/src/lib.rs @@ -0,0 +1,119 @@ +pub use prometheus_client::{self, *}; + +// std +use std::fmt::{Debug, Error, Formatter}; +use std::sync::{Arc, Mutex}; +// crates +use futures::StreamExt; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::RelayMessage, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, +}; +use prometheus_client::encoding::text::encode; +use prometheus_client::registry::Registry; +use tokio::sync::oneshot::Sender; +use tracing::error; +// internal + +// A wrapper for prometheus_client Registry. +// Lock is only used during services initialization and prometheus pull query. +pub type NomosRegistry = Arc>; + +pub struct Metrics { + service_state: ServiceStateHandle, + registry: NomosRegistry, +} + +#[derive(Clone, Debug)] +pub struct MetricsSettings { + pub registry: Option, +} + +pub enum MetricsMsg { + Gather { reply_channel: Sender }, +} + +impl RelayMessage for MetricsMsg {} + +impl Debug for MetricsMsg { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + Self::Gather { .. } => { + write!(f, "MetricsMsg::Gather") + } + } + } +} + +impl ServiceData for Metrics { + const SERVICE_ID: &'static str = "Metrics"; + type Settings = MetricsSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = MetricsMsg; +} + +#[async_trait::async_trait] +impl ServiceCore for Metrics { + fn init(service_state: ServiceStateHandle) -> Result { + let config = service_state.settings_reader.get_updated_settings(); + + Ok(Self { + service_state, + registry: config.registry.ok_or("No registry provided")?, + }) + } + + async fn run(self) -> Result<(), overwatch_rs::DynError> { + let Self { + mut service_state, + registry, + } = self; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + loop { + tokio::select! { + Some(msg) = service_state.inbound_relay.recv() => { + let MetricsMsg::Gather{reply_channel} = msg; + + let mut buf = String::new(); + { + let reg = registry.lock().unwrap(); + // If encoding fails, we need to stop trying process subsequent metrics gather + // requests. If it succeds, encode method returns empty unit type. + _ = encode(&mut buf, ®).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + } + + reply_channel + .send(buf) + .unwrap_or_else(|_| tracing::debug!("could not send back metrics")); + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +} + +impl Metrics { + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } +} diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 5c557026..0e7aa4e1 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -19,6 +19,7 @@ carnot-consensus = { path = "../carnot-consensus" } nomos-network = { path = "../../nomos-services/network" } nomos-da = { path = "../../nomos-services/data-availability" } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] } +nomos-metrics = { path = "../../nomos-metrics" } nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } nomos-libp2p = { path = "../../nomos-libp2p" } full-replication = { path = "../../nomos-da/full-replication" } diff --git a/nomos-services/api/src/http/metrics.rs b/nomos-services/api/src/http/metrics.rs new file mode 100644 index 00000000..c25deb08 --- /dev/null +++ b/nomos-services/api/src/http/metrics.rs @@ -0,0 +1,18 @@ +use nomos_metrics::{Metrics, MetricsMsg}; +use tokio::sync::oneshot; + +pub async fn gather( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, +) -> Result { + let relay = handle.relay::().connect().await?; + let (sender, receiver) = oneshot::channel(); + + relay + .send(MetricsMsg::Gather { + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + Ok(receiver.await?) +} diff --git a/nomos-services/api/src/http/mod.rs b/nomos-services/api/src/http/mod.rs index 789aeee9..6ab13b2a 100644 --- a/nomos-services/api/src/http/mod.rs +++ b/nomos-services/api/src/http/mod.rs @@ -4,4 +4,5 @@ pub mod consensus; pub mod da; pub mod libp2p; pub mod mempool; +pub mod metrics; pub mod storage; diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 30834d8a..d1a4e895 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -10,6 +10,7 @@ async-trait = "0.1" bincode = { version = "2.0.0-rc.2", features = ["serde"] } futures = "0.3" linked-hash-map = { version = "0.5.6", optional = true } +nomos-metrics = { path = "../../nomos-metrics" } nomos-network = { path = "../network" } nomos-core = { path = "../../nomos-core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } @@ -34,6 +35,7 @@ blake2 = "0.10" default = [] mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"] libp2p = ["nomos-network/libp2p"] +metrics = [] # enable to help generate OpenAPI openapi = ["dep:utoipa", "serde_json"] diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index ca887d8b..41eadb27 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -1,4 +1,6 @@ pub mod backend; +#[cfg(feature = "metrics")] +pub mod metrics; pub mod network; /// Re-export for OpenAPI @@ -15,6 +17,9 @@ use std::{ // crates use futures::StreamExt; +#[cfg(feature = "metrics")] +use metrics::Metrics; +use nomos_metrics::NomosRegistry; use tokio::sync::oneshot::Sender; // internal use crate::network::NetworkAdapter; @@ -42,6 +47,8 @@ where service_state: ServiceStateHandle, network_relay: Relay>, pool: P, + #[cfg(feature = "metrics")] + metrics: Option, // This is an hack because SERVICE_ID has to be univoque and associated const // values can't depend on generic parameters. // Unfortunately, this means that the mempools for certificates and transactions @@ -162,10 +169,18 @@ where fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); let settings = service_state.settings_reader.get_updated_settings(); + + #[cfg(feature = "metrics")] + let metrics = settings + .registry + .map(|reg| Metrics::new(reg, service_state.id())); + Ok(Self { service_state, network_relay, pool: P::new(settings.backend), + #[cfg(feature = "metrics")] + metrics, _d: PhantomData, }) } @@ -195,6 +210,8 @@ where loop { tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { + #[cfg(feature = "metrics")] + if let Some(metrics) = &self.metrics { metrics.record(&msg) } Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; } Some((key, item )) = network_items.next() => { @@ -317,4 +334,5 @@ where pub struct Settings { pub backend: B, pub network: N, + pub registry: Option, } diff --git a/nomos-services/mempool/src/metrics.rs b/nomos-services/mempool/src/metrics.rs new file mode 100644 index 00000000..6073d818 --- /dev/null +++ b/nomos-services/mempool/src/metrics.rs @@ -0,0 +1,80 @@ +// std +use std::fmt::Debug; +// crates +use nomos_metrics::{ + metrics::{counter::Counter, family::Family}, + prometheus_client::{self, encoding::EncodeLabelSet, encoding::EncodeLabelValue}, + NomosRegistry, +}; +use overwatch_rs::services::ServiceId; +// internal +use crate::MempoolMsg; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)] +enum MempoolMsgType { + Add, + View, + Prune, + MarkInBlock, +} + +impl From<&MempoolMsg> for MempoolMsgType +where + I: 'static + Debug, + K: 'static + Debug, +{ + fn from(event: &MempoolMsg) -> Self { + match event { + MempoolMsg::Add { .. } => MempoolMsgType::Add, + MempoolMsg::View { .. } => MempoolMsgType::View, + MempoolMsg::Prune { .. } => MempoolMsgType::Prune, + MempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock, + _ => unimplemented!(), + } + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)] +struct MessageLabels { + label: MempoolMsgType, +} + +pub(crate) struct Metrics { + messages: Family, +} + +impl Metrics { + pub(crate) fn new(registry: NomosRegistry, discriminant: ServiceId) -> Self { + let mut registry = registry + .lock() + .expect("should've acquired the lock for registry"); + let sub_registry = registry.sub_registry_with_prefix(discriminant); + + let messages = Family::default(); + sub_registry.register( + "messages", + "Messages emitted by the Mempool", + messages.clone(), + ); + + Self { messages } + } + + pub(crate) fn record(&self, msg: &MempoolMsg) + where + I: 'static + Debug, + K: 'static + Debug, + { + match msg { + MempoolMsg::Add { .. } + | MempoolMsg::View { .. } + | MempoolMsg::Prune { .. } + | MempoolMsg::MarkInBlock { .. } => { + self.messages + .get_or_create(&MessageLabels { label: msg.into() }) + .inc(); + } + _ => {} + } + } +} diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index f1c94cfc..c86bed99 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -64,6 +64,7 @@ fn test_mockmempool() { mockpool: Settings { backend: (), network: (), + registry: None, }, logging: LoggerSettings::default(), },