Prometheus metrics service (#522)

* A wrapper crate for prometheus client

* Initial integration of metrics for mempool

* Merge mempool metrics imports

* Add cli flag to enable metrics

* Add nomos metrics service for serving metrics

* Use nomos prometheus metrics in the node

* Rename metrics to registry where applicable

* Expose metrics via http

* Featuregate the metrics service

* Style and fail on encode error

* Add metrics cargo feature for mempool
This commit is contained in:
gusto 2024-01-12 16:15:12 +02:00 committed by GitHub
parent c3b5dc98e4
commit 46d53479a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 326 additions and 16 deletions

View File

@ -3,6 +3,7 @@ members = [
"nomos-core", "nomos-core",
"nomos-libp2p", "nomos-libp2p",
"nomos-services/api", "nomos-services/api",
"nomos-metrics",
"nomos-services/log", "nomos-services/log",
"nomos-services/metrics", "nomos-services/metrics",
"nomos-services/network", "nomos-services/network",

View File

@ -23,7 +23,8 @@ nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-api = { path = "../../nomos-services/api" } nomos-api = { path = "../../nomos-services/api" }
nomos-log = { path = "../../nomos-services/log" } nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "metrics"] }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-http = { path = "../../nomos-services/http", features = ["http"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] }
carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] } carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }

View File

@ -3,10 +3,13 @@ use std::{fmt::Debug, hash::Hash};
use axum::{ use axum::{
extract::{Query, State}, extract::{Query, State},
http::HeaderValue, http::HeaderValue,
response::Response, response::{IntoResponse, Response},
routing, Json, Router, Server, routing, Json, Router, Server,
}; };
use hyper::header::{CONTENT_TYPE, USER_AGENT}; use hyper::{
header::{CONTENT_TYPE, USER_AGENT},
Body, StatusCode,
};
use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tower_http::{ use tower_http::{
@ -24,7 +27,7 @@ use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::backends::StorageSerde; use nomos_storage::backends::StorageSerde;
use nomos_api::{ use nomos_api::{
http::{cl, consensus, da, libp2p, mempool, storage}, http::{cl, consensus, da, libp2p, mempool, metrics, storage},
Backend, Backend,
}; };
@ -123,6 +126,7 @@ where
.route("/storage/block", routing::post(block::<S, T>)) .route("/storage/block", routing::post(block::<S, T>))
.route("/mempool/add/tx", routing::post(add_tx::<T>)) .route("/mempool/add/tx", routing::post(add_tx::<T>))
.route("/mempool/add/cert", routing::post(add_cert)) .route("/mempool/add/cert", routing::post(add_cert))
.route("/metrics", routing::get(get_metrics))
.with_state(handle); .with_state(handle);
Server::bind(&self.settings.address) Server::bind(&self.settings.address)
@ -350,3 +354,29 @@ async fn add_cert(
nomos_core::da::certificate::Certificate::hash nomos_core::da::certificate::Certificate::hash
)) ))
} }
#[utoipa::path(
get,
path = "/metrics",
responses(
(status = 200, description = "Get all metrics"),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn get_metrics(State(handle): State<OverwatchHandle>) -> Response {
match metrics::gather(&handle).await {
Ok(encoded_metrics) => Response::builder()
.status(StatusCode::OK)
.header(
CONTENT_TYPE,
HeaderValue::from_static("text/plain; version=0.0.4"),
)
.body(Body::from(encoded_metrics))
.unwrap()
.into_response(),
Err(e) => axum::response::IntoResponse::into_response((
hyper::StatusCode::INTERNAL_SERVER_ERROR,
e.to_string(),
)),
}
}

View File

@ -10,8 +10,6 @@ use crate::{Carnot, Tx, Wire, MB16};
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use color_eyre::eyre::{self, eyre, Result}; use color_eyre::eyre::{self, eyre, Result};
use hex::FromHex; use hex::FromHex;
#[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
use nomos_api::ApiService; use nomos_api::ApiService;
use nomos_libp2p::{secp256k1::SecretKey, Multiaddr}; use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_log::{Logger, LoggerBackend, LoggerFormat};
@ -115,14 +113,18 @@ pub struct DaArgs {
da_voter: Option<String>, da_voter: Option<String>,
} }
#[derive(Parser, Debug, Clone)]
pub struct MetricsArgs {
#[clap(long = "with-metrics", env = "WITH_METRICS")]
pub with_metrics: bool,
}
#[derive(Deserialize, Debug, Clone, Serialize)] #[derive(Deserialize, Debug, Clone, Serialize)]
pub struct Config { pub struct Config {
pub log: <Logger as ServiceData>::Settings, pub log: <Logger as ServiceData>::Settings,
pub network: <NetworkService<Libp2p> as ServiceData>::Settings, pub network: <NetworkService<Libp2p> as ServiceData>::Settings,
pub http: <ApiService<AxumBackend<Tx, Wire, MB16>> as ServiceData>::Settings, pub http: <ApiService<AxumBackend<Tx, Wire, MB16>> as ServiceData>::Settings,
pub consensus: <Carnot as ServiceData>::Settings, pub consensus: <Carnot as ServiceData>::Settings,
#[cfg(feature = "metrics")]
pub metrics: <MetricsService<MapMetricsBackend<MetricsData>> as ServiceData>::Settings,
pub da: <DataAvailability as ServiceData>::Settings, pub da: <DataAvailability as ServiceData>::Settings,
} }

View File

@ -29,13 +29,17 @@ use nomos_mempool::{
backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService, backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService,
Transaction as TxDiscriminant, Transaction as TxDiscriminant,
}; };
#[cfg(feature = "metrics")]
use nomos_metrics::Metrics;
use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::{ use nomos_storage::{
backends::{sled::SledBackend, StorageSerde}, backends::{sled::SledBackend, StorageSerde},
StorageService, StorageService,
}; };
pub use config::{Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; pub use config::{
Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, OverlayArgs,
};
use nomos_core::{ use nomos_core::{
da::certificate::select::FillSize as FillSizeWithBlobsCertificate, da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
tx::select::FillSize as FillSizeWithTx, tx::select::FillSize as FillSizeWithTx,
@ -89,10 +93,10 @@ pub struct Nomos {
>, >,
consensus: ServiceHandle<Carnot>, consensus: ServiceHandle<Carnot>,
http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>, http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
da: ServiceHandle<DataAvailability>, da: ServiceHandle<DataAvailability>,
storage: ServiceHandle<StorageService<SledBackend<Wire>>>, storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,
system_sig: ServiceHandle<SystemSig>, system_sig: ServiceHandle<SystemSig>,
} }

View File

@ -1,7 +1,9 @@
use full_replication::{Blob, Certificate}; use full_replication::{Blob, Certificate};
#[cfg(feature = "metrics")]
use nomos_metrics::MetricsSettings;
use nomos_node::{ use nomos_node::{
Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings, Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos,
OverlayArgs, Tx, NomosServiceSettings, OverlayArgs, Tx,
}; };
use clap::Parser; use clap::Parser;
@ -40,6 +42,9 @@ struct Args {
/// Overrides da config. /// Overrides da config.
#[clap(flatten)] #[clap(flatten)]
da_args: DaArgs, da_args: DaArgs,
/// Overrides metrics config.
#[clap(flatten)]
metrics_args: MetricsArgs,
} }
fn main() -> Result<()> { fn main() -> Result<()> {
@ -51,6 +56,7 @@ fn main() -> Result<()> {
network_args, network_args,
consensus_args, consensus_args,
overlay_args, overlay_args,
metrics_args,
} = Args::parse(); } = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)? let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_da(da_args)? .update_da(da_args)?
@ -60,6 +66,14 @@ fn main() -> Result<()> {
.update_overlay(overlay_args)? .update_overlay(overlay_args)?
.update_network(network_args)?; .update_network(network_args)?;
let registry = cfg!(feature = "metrics")
.then(|| {
metrics_args
.with_metrics
.then(nomos_metrics::NomosRegistry::default)
})
.flatten();
let app = OverwatchRunner::<Nomos>::run( let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings { NomosServiceSettings {
network: config.network, network: config.network,
@ -71,6 +85,7 @@ fn main() -> Result<()> {
topic: String::from(nomos_node::CL_TOPIC), topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash, id: <Tx as Transaction>::hash,
}, },
registry: registry.clone(),
}, },
da_mempool: nomos_mempool::Settings { da_mempool: nomos_mempool::Settings {
backend: (), backend: (),
@ -78,10 +93,11 @@ fn main() -> Result<()> {
topic: String::from(nomos_node::DA_TOPIC), topic: String::from(nomos_node::DA_TOPIC),
id: cert_id, id: cert_id,
}, },
registry: registry.clone(),
}, },
consensus: config.consensus, consensus: config.consensus,
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics: config.metrics, metrics: MetricsSettings { registry },
da: config.da, da: config.da,
storage: nomos_storage::backends::sled::SledBackendSettings { storage: nomos_storage::backends::sled::SledBackendSettings {
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),

16
nomos-metrics/Cargo.toml Normal file
View File

@ -0,0 +1,16 @@
[package]
name = "nomos-metrics"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
futures = "0.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
prometheus-client = "0.22.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] }
serde = { version = "1", features = ["derive"] }

119
nomos-metrics/src/lib.rs Normal file
View File

@ -0,0 +1,119 @@
pub use prometheus_client::{self, *};
// std
use std::fmt::{Debug, Error, Formatter};
use std::sync::{Arc, Mutex};
// crates
use futures::StreamExt;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::RelayMessage,
state::{NoOperator, NoState},
ServiceCore, ServiceData,
};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use tokio::sync::oneshot::Sender;
use tracing::error;
// internal
// A wrapper for prometheus_client Registry.
// Lock is only used during services initialization and prometheus pull query.
pub type NomosRegistry = Arc<Mutex<Registry>>;
pub struct Metrics {
service_state: ServiceStateHandle<Self>,
registry: NomosRegistry,
}
#[derive(Clone, Debug)]
pub struct MetricsSettings {
pub registry: Option<NomosRegistry>,
}
pub enum MetricsMsg {
Gather { reply_channel: Sender<String> },
}
impl RelayMessage for MetricsMsg {}
impl Debug for MetricsMsg {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::Gather { .. } => {
write!(f, "MetricsMsg::Gather")
}
}
}
}
impl ServiceData for Metrics {
const SERVICE_ID: &'static str = "Metrics";
type Settings = MetricsSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MetricsMsg;
}
#[async_trait::async_trait]
impl ServiceCore for Metrics {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let config = service_state.settings_reader.get_updated_settings();
Ok(Self {
service_state,
registry: config.registry.ok_or("No registry provided")?,
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
registry,
} = self;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
let MetricsMsg::Gather{reply_channel} = msg;
let mut buf = String::new();
{
let reg = registry.lock().unwrap();
// If encoding fails, we need to stop trying process subsequent metrics gather
// requests. If it succeds, encode method returns empty unit type.
_ = encode(&mut buf, &reg).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
}
reply_channel
.send(buf)
.unwrap_or_else(|_| tracing::debug!("could not send back metrics"));
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl Metrics {
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
}

View File

@ -19,6 +19,7 @@ carnot-consensus = { path = "../carnot-consensus" }
nomos-network = { path = "../../nomos-services/network" } nomos-network = { path = "../../nomos-services/network" }
nomos-da = { path = "../../nomos-services/data-availability" } nomos-da = { path = "../../nomos-services/data-availability" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
nomos-libp2p = { path = "../../nomos-libp2p" } nomos-libp2p = { path = "../../nomos-libp2p" }
full-replication = { path = "../../nomos-da/full-replication" } full-replication = { path = "../../nomos-da/full-replication" }

View File

@ -0,0 +1,18 @@
use nomos_metrics::{Metrics, MetricsMsg};
use tokio::sync::oneshot;
pub async fn gather(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<String, super::DynError> {
let relay = handle.relay::<Metrics>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MetricsMsg::Gather {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}

View File

@ -4,4 +4,5 @@ pub mod consensus;
pub mod da; pub mod da;
pub mod libp2p; pub mod libp2p;
pub mod mempool; pub mod mempool;
pub mod metrics;
pub mod storage; pub mod storage;

View File

@ -10,6 +10,7 @@ async-trait = "0.1"
bincode = { version = "2.0.0-rc.2", features = ["serde"] } bincode = { version = "2.0.0-rc.2", features = ["serde"] }
futures = "0.3" futures = "0.3"
linked-hash-map = { version = "0.5.6", optional = true } linked-hash-map = { version = "0.5.6", optional = true }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-network = { path = "../network" } nomos-network = { path = "../network" }
nomos-core = { path = "../../nomos-core" } nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
@ -34,6 +35,7 @@ blake2 = "0.10"
default = [] default = []
mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"] mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"]
libp2p = ["nomos-network/libp2p"] libp2p = ["nomos-network/libp2p"]
metrics = []
# enable to help generate OpenAPI # enable to help generate OpenAPI
openapi = ["dep:utoipa", "serde_json"] openapi = ["dep:utoipa", "serde_json"]

View File

@ -1,4 +1,6 @@
pub mod backend; pub mod backend;
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod network; pub mod network;
/// Re-export for OpenAPI /// Re-export for OpenAPI
@ -15,6 +17,9 @@ use std::{
// crates // crates
use futures::StreamExt; use futures::StreamExt;
#[cfg(feature = "metrics")]
use metrics::Metrics;
use nomos_metrics::NomosRegistry;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
// internal // internal
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
@ -42,6 +47,8 @@ where
service_state: ServiceStateHandle<Self>, service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>, network_relay: Relay<NetworkService<N::Backend>>,
pool: P, pool: P,
#[cfg(feature = "metrics")]
metrics: Option<Metrics>,
// This is an hack because SERVICE_ID has to be univoque and associated const // This is an hack because SERVICE_ID has to be univoque and associated const
// values can't depend on generic parameters. // values can't depend on generic parameters.
// Unfortunately, this means that the mempools for certificates and transactions // Unfortunately, this means that the mempools for certificates and transactions
@ -162,10 +169,18 @@ where
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings(); let settings = service_state.settings_reader.get_updated_settings();
#[cfg(feature = "metrics")]
let metrics = settings
.registry
.map(|reg| Metrics::new(reg, service_state.id()));
Ok(Self { Ok(Self {
service_state, service_state,
network_relay, network_relay,
pool: P::new(settings.backend), pool: P::new(settings.backend),
#[cfg(feature = "metrics")]
metrics,
_d: PhantomData, _d: PhantomData,
}) })
} }
@ -195,6 +210,8 @@ where
loop { loop {
tokio::select! { tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => { Some(msg) = service_state.inbound_relay.recv() => {
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.metrics { metrics.record(&msg) }
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
} }
Some((key, item )) = network_items.next() => { Some((key, item )) = network_items.next() => {
@ -317,4 +334,5 @@ where
pub struct Settings<B, N> { pub struct Settings<B, N> {
pub backend: B, pub backend: B,
pub network: N, pub network: N,
pub registry: Option<NomosRegistry>,
} }

View File

@ -0,0 +1,80 @@
// std
use std::fmt::Debug;
// crates
use nomos_metrics::{
metrics::{counter::Counter, family::Family},
prometheus_client::{self, encoding::EncodeLabelSet, encoding::EncodeLabelValue},
NomosRegistry,
};
use overwatch_rs::services::ServiceId;
// internal
use crate::MempoolMsg;
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)]
enum MempoolMsgType {
Add,
View,
Prune,
MarkInBlock,
}
impl<I, K> From<&MempoolMsg<I, K>> for MempoolMsgType
where
I: 'static + Debug,
K: 'static + Debug,
{
fn from(event: &MempoolMsg<I, K>) -> Self {
match event {
MempoolMsg::Add { .. } => MempoolMsgType::Add,
MempoolMsg::View { .. } => MempoolMsgType::View,
MempoolMsg::Prune { .. } => MempoolMsgType::Prune,
MempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock,
_ => unimplemented!(),
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)]
struct MessageLabels {
label: MempoolMsgType,
}
pub(crate) struct Metrics {
messages: Family<MessageLabels, Counter>,
}
impl Metrics {
pub(crate) fn new(registry: NomosRegistry, discriminant: ServiceId) -> Self {
let mut registry = registry
.lock()
.expect("should've acquired the lock for registry");
let sub_registry = registry.sub_registry_with_prefix(discriminant);
let messages = Family::default();
sub_registry.register(
"messages",
"Messages emitted by the Mempool",
messages.clone(),
);
Self { messages }
}
pub(crate) fn record<I, K>(&self, msg: &MempoolMsg<I, K>)
where
I: 'static + Debug,
K: 'static + Debug,
{
match msg {
MempoolMsg::Add { .. }
| MempoolMsg::View { .. }
| MempoolMsg::Prune { .. }
| MempoolMsg::MarkInBlock { .. } => {
self.messages
.get_or_create(&MessageLabels { label: msg.into() })
.inc();
}
_ => {}
}
}
}

View File

@ -64,6 +64,7 @@ fn test_mockmempool() {
mockpool: Settings { mockpool: Settings {
backend: (), backend: (),
network: (), network: (),
registry: None,
}, },
logging: LoggerSettings::default(), logging: LoggerSettings::default(),
}, },