From a5243c6af260ce3e8685ca19d72d964c85a51b20 Mon Sep 17 00:00:00 2001 From: gusto Date: Thu, 7 Nov 2024 04:25:20 +0200 Subject: [PATCH] Tracing: Otlp metrics (#909) * Metrics layer configuration in nomos-tracing * Add metrics layer to tracing service * Use http/proto for prometheus * Add metrics layer to integration tests * Use metrics layer in cfgsync * Plug metric in node and tests * Remove old metrics * Use otlp metrics in mempool --- Cargo.toml | 1 - README.md | 1 - compose.debug.yml | 1 + compose.static.yml | 1 + nodes/nomos-executor/Cargo.toml | 2 - nodes/nomos-executor/src/api/backend.rs | 3 +- nodes/nomos-executor/src/lib.rs | 4 - nodes/nomos-executor/src/main.rs | 22 +--- nodes/nomos-node/Cargo.toml | 2 - nodes/nomos-node/src/api/backend.rs | 3 +- nodes/nomos-node/src/api/handlers.rs | 32 +---- nodes/nomos-node/src/api/paths.rs | 1 - nodes/nomos-node/src/config.rs | 6 - nodes/nomos-node/src/lib.rs | 7 +- nodes/nomos-node/src/main.rs | 19 +-- nomos-services/api/Cargo.toml | 1 - nomos-services/api/src/http/metrics.rs | 18 --- nomos-services/api/src/http/mod.rs | 1 - .../data-availability/tests/src/common.rs | 3 - nomos-services/mempool/Cargo.toml | 2 - nomos-services/mempool/src/da/service.rs | 20 +-- nomos-services/mempool/src/tx/metrics.rs | 80 ------------ nomos-services/mempool/src/tx/mod.rs | 2 - nomos-services/mempool/src/tx/service.rs | 16 +-- nomos-services/mempool/tests/mock.rs | 1 - nomos-services/metrics/Cargo.toml | 16 --- nomos-services/metrics/src/lib.rs | 119 ------------------ nomos-services/tracing/src/lib.rs | 16 +++ nomos-tracing/Cargo.toml | 7 +- nomos-tracing/src/lib.rs | 1 + nomos-tracing/src/metrics/mod.rs | 1 + nomos-tracing/src/metrics/otlp.rs | 52 ++++++++ testnet/cfgsync.yaml | 1 + testnet/cfgsync/src/bin/cfgsync-server.rs | 2 + testnet/cfgsync/src/config.rs | 15 ++- testnet/cfgsync/src/lib.rs | 1 + testnet/monitoring/prometheus.yml | 9 -- tests/Cargo.toml | 1 - tests/src/topology/configs/tracing.rs | 17 ++- 39 files changed, 117 insertions(+), 390 deletions(-) delete mode 100644 nomos-services/api/src/http/metrics.rs delete mode 100644 nomos-services/mempool/src/tx/metrics.rs delete mode 100644 nomos-services/metrics/Cargo.toml delete mode 100644 nomos-services/metrics/src/lib.rs create mode 100644 nomos-tracing/src/metrics/mod.rs create mode 100644 nomos-tracing/src/metrics/otlp.rs diff --git a/Cargo.toml b/Cargo.toml index a5bc5b56..91fc6861 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ members = [ "nomos-services/storage", "nomos-services/cryptarchia-consensus", "nomos-services/mempool", - "nomos-services/metrics", "nomos-services/system-sig", "nomos-services/data-availability/indexer", "nomos-services/data-availability/network", diff --git a/README.md b/README.md index dcab9586..92e358f2 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,6 @@ Nomos blockchain node mvp - http - mempool - network - - metrics - `nodes`: Nomos nodes is the collection of nodes that are used to run the Nomos mvp and experimental nodes. - `nomos-node`: main implementation of the Nomos mvp node. - `mockpool-node`: node with single mempool service, used to measure transaction dissemination. diff --git a/compose.debug.yml b/compose.debug.yml index 383f163c..aea73219 100644 --- a/compose.debug.yml +++ b/compose.debug.yml @@ -10,6 +10,7 @@ services: command: - --config.file=/etc/prometheus/prometheus.yml - --storage.tsdb.retention.time=7d + - --enable-feature=otlp-write-receiver ports: - 127.0.0.1:9090:9090 restart: on-failure diff --git a/compose.static.yml b/compose.static.yml index 7b385aff..e9dfb168 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -84,6 +84,7 @@ services: command: - --config.file=/etc/prometheus/prometheus.yml - --storage.tsdb.retention.time=7d + - --enable-feature=otlp-write-receiver ports: - 127.0.0.1:9090:9090 restart: on-failure diff --git a/nodes/nomos-executor/Cargo.toml b/nodes/nomos-executor/Cargo.toml index 6a6a5319..808a6486 100644 --- a/nodes/nomos-executor/Cargo.toml +++ b/nodes/nomos-executor/Cargo.toml @@ -22,7 +22,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", "libp2p", ] } -nomos-metrics = { path = "../../nomos-services/metrics" } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] } nomos-node = { path = "../nomos-node" } @@ -42,5 +41,4 @@ uuid = { version = "1.10.0", features = ["v4"] } [features] default = ["tracing"] -metrics = ["nomos-node/metrics"] tracing = ["nomos-node/tracing"] diff --git a/nodes/nomos-executor/src/api/backend.rs b/nodes/nomos-executor/src/api/backend.rs index 8282ad0d..ce6ad5cf 100644 --- a/nodes/nomos-executor/src/api/backend.rs +++ b/nodes/nomos-executor/src/api/backend.rs @@ -28,7 +28,7 @@ use nomos_libp2p::PeerId; use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics}; use nomos_node::api::handlers::{ add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers, - cryptarchia_info, get_metrics, get_range, libp2p_info, + cryptarchia_info, get_range, libp2p_info, }; use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; @@ -327,7 +327,6 @@ where >, ), ) - .route(paths::METRICS, routing::get(get_metrics)) .route( paths::DISPERSE_DATA, routing::post( diff --git a/nodes/nomos-executor/src/lib.rs b/nodes/nomos-executor/src/lib.rs index db8f34dc..17ed141c 100644 --- a/nodes/nomos-executor/src/lib.rs +++ b/nodes/nomos-executor/src/lib.rs @@ -24,8 +24,6 @@ use nomos_mix_service::MixService; use nomos_node::DispersedBlobInfo; use nomos_node::HeaderId; use nomos_node::MempoolNetworkAdapter; -#[cfg(feature = "metrics")] -use nomos_node::Metrics; use nomos_node::NetworkBackend; use nomos_node::{ BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, @@ -101,7 +99,5 @@ pub struct NomosExecutor { cryptarchia: ServiceHandle, http: ServiceHandle, storage: ServiceHandle>>, - #[cfg(feature = "metrics")] - metrics: ServiceHandle, system_sig: ServiceHandle, } diff --git a/nodes/nomos-executor/src/main.rs b/nodes/nomos-executor/src/main.rs index 64b49a3c..310a88b9 100644 --- a/nodes/nomos-executor/src/main.rs +++ b/nodes/nomos-executor/src/main.rs @@ -4,12 +4,10 @@ use clap::Parser; use color_eyre::eyre::{eyre, Result}; use nomos_executor::config::Config as ExecutorConfig; use nomos_executor::{NomosExecutor, NomosExecutorServiceSettings}; -#[cfg(feature = "metrics")] -use nomos_node::MetricsSettings; use nomos_node::{ config::MixArgs, BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs, - LogArgs, MempoolAdapterSettings, MetricsArgs, NetworkArgs, Transaction, Tx, TxMempoolSettings, - CL_TOPIC, DA_TOPIC, + LogArgs, MempoolAdapterSettings, NetworkArgs, Transaction, Tx, TxMempoolSettings, CL_TOPIC, + DA_TOPIC, }; use overwatch_rs::overwatch::*; use tracing::{span, Level}; @@ -35,9 +33,6 @@ struct Args { http_args: HttpArgs, #[clap(flatten)] cryptarchia_args: CryptarchiaArgs, - /// Overrides metrics config. - #[clap(flatten)] - metrics_args: MetricsArgs, } fn main() -> Result<()> { @@ -48,7 +43,6 @@ fn main() -> Result<()> { network_args, mix_args, cryptarchia_args, - metrics_args, } = Args::parse(); let config = serde_yaml::from_reader::<_, ExecutorConfig>(std::fs::File::open(config)?)? .update_from_args( @@ -59,14 +53,6 @@ fn main() -> Result<()> { cryptarchia_args, )?; - let registry = cfg!(feature = "metrics") - .then(|| { - metrics_args - .with_metrics - .then(nomos_metrics::NomosRegistry::default) - }) - .flatten(); - #[cfg(debug_assertions)] let debug_span = { let debug_id = Uuid::new_v4(); @@ -87,7 +73,6 @@ fn main() -> Result<()> { topic: String::from(CL_TOPIC), id: ::hash, }, - registry: registry.clone(), }, da_mempool: DaMempoolSettings { backend: (), @@ -95,7 +80,6 @@ fn main() -> Result<()> { topic: String::from(DA_TOPIC), id: ::blob_id, }, - registry: registry.clone(), }, da_dispersal: config.da_dispersal, da_network: config.da_network, @@ -103,8 +87,6 @@ fn main() -> Result<()> { da_sampling: config.da_sampling, da_verifier: config.da_verifier, cryptarchia: config.cryptarchia, - #[cfg(feature = "metrics")] - metrics: MetricsSettings { registry }, storage: config.storage, system_sig: (), }, diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 3688e2f0..3f54f53e 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -36,7 +36,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", "libp2p", ] } -nomos-metrics = { path = "../../nomos-services/metrics" } nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } cryptarchia-consensus = { path = "../../nomos-services/cryptarchia-consensus", features = ["libp2p"] } nomos-libp2p = { path = "../../nomos-libp2p" } @@ -65,5 +64,4 @@ rand = "0.8" [features] default = ["tracing"] -metrics = [] tracing = [] diff --git a/nodes/nomos-node/src/api/backend.rs b/nodes/nomos-node/src/api/backend.rs index 9d61a649..e94ae6ad 100644 --- a/nodes/nomos-node/src/api/backend.rs +++ b/nodes/nomos-node/src/api/backend.rs @@ -29,7 +29,7 @@ use utoipa_swagger_ui::SwaggerUi; // internal use super::handlers::{ add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers, - cryptarchia_info, get_metrics, get_range, libp2p_info, + cryptarchia_info, get_range, libp2p_info, }; /// Configuration for the Http Server @@ -303,7 +303,6 @@ where >, ), ) - .route(paths::METRICS, routing::get(get_metrics)) .with_state(handle); Server::bind(&self.settings.address) diff --git a/nodes/nomos-node/src/api/handlers.rs b/nodes/nomos-node/src/api/handlers.rs index 6689f781..c9e3dbfc 100644 --- a/nodes/nomos-node/src/api/handlers.rs +++ b/nodes/nomos-node/src/api/handlers.rs @@ -5,16 +5,14 @@ use std::{fmt::Debug, hash::Hash}; // crates use axum::{ extract::{Query, State}, - http::HeaderValue, - response::{IntoResponse, Response}, + response::Response, Json, }; -use hyper::{header::CONTENT_TYPE, Body, StatusCode}; use rand::{RngCore, SeedableRng}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; // internal use super::paths; -use nomos_api::http::{cl, consensus, da, libp2p, mempool, metrics, storage}; +use nomos_api::http::{cl, consensus, da, libp2p, mempool, storage}; use nomos_core::da::blob::info::DispersedBlobInfo; use nomos_core::da::blob::metadata::Metadata; use nomos_core::da::{BlobId, DaVerifier as CoreDaVerifier}; @@ -418,29 +416,3 @@ where SamplingStorage, >(&handle, blob_info, DispersedBlobInfo::blob_id)) } - -#[utoipa::path( - get, - path = paths::METRICS, - responses( - (status = 200, description = "Get all metrics"), - (status = 500, description = "Internal server error", body = String), - ) -)] -pub async fn get_metrics(State(handle): State) -> Response { - match metrics::gather(&handle).await { - Ok(encoded_metrics) => Response::builder() - .status(StatusCode::OK) - .header( - CONTENT_TYPE, - HeaderValue::from_static("text/plain; version=0.0.4"), - ) - .body(Body::from(encoded_metrics)) - .unwrap() - .into_response(), - Err(e) => axum::response::IntoResponse::into_response(( - hyper::StatusCode::INTERNAL_SERVER_ERROR, - e.to_string(), - )), - } -} diff --git a/nodes/nomos-node/src/api/paths.rs b/nodes/nomos-node/src/api/paths.rs index 6f43824b..6357739c 100644 --- a/nodes/nomos-node/src/api/paths.rs +++ b/nodes/nomos-node/src/api/paths.rs @@ -8,4 +8,3 @@ pub const NETWORK_INFO: &str = "/network/info"; pub const STORAGE_BLOCK: &str = "/storage/block"; pub const MEMPOOL_ADD_TX: &str = "/mempool/add/tx"; pub const MEMPOOL_ADD_BLOB_INFO: &str = "/mempool/add/blobinfo"; -pub const METRICS: &str = "/metrics"; diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index a41927ed..78bf97e1 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -129,12 +129,6 @@ pub struct CryptarchiaArgs { note_nonce: Option, } -#[derive(Parser, Debug, Clone)] -pub struct MetricsArgs { - #[clap(long = "with-metrics", env = "WITH_METRICS")] - pub with_metrics: bool, -} - #[derive(Deserialize, Debug, Clone, Serialize)] pub struct Config { pub tracing: ::Settings, diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 0856688e..ea3c8031 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -7,7 +7,7 @@ mod tx; use api::backend::AxumBackend; use bytes::Bytes; use color_eyre::eyre::Result; -pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs}; +pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs}; use kzgrs_backend::common::blob::DaBlob; pub use kzgrs_backend::dispersal::BlobInfo; use nomos_api::ApiService; @@ -35,9 +35,6 @@ pub use nomos_mempool::network::adapters::libp2p::{ }; pub use nomos_mempool::TxMempoolSettings; use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; -pub use nomos_metrics::NomosRegistry; -#[cfg(feature = "metrics")] -pub use nomos_metrics::{Metrics, MetricsSettings}; pub use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; pub use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; pub use nomos_mix_service::MixService; @@ -173,8 +170,6 @@ pub struct Nomos { cryptarchia: ServiceHandle, http: ServiceHandle, storage: ServiceHandle>>, - #[cfg(feature = "metrics")] - metrics: ServiceHandle, system_sig: ServiceHandle, } diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 1f54d228..6b44d9d7 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,8 +1,6 @@ use kzgrs_backend::dispersal::BlobInfo; -#[cfg(feature = "metrics")] -use nomos_metrics::MetricsSettings; use nomos_node::{ - config::MixArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos, + config::MixArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings, Tx, }; @@ -35,9 +33,6 @@ struct Args { http_args: HttpArgs, #[clap(flatten)] cryptarchia_args: CryptarchiaArgs, - /// Overrides metrics config. - #[clap(flatten)] - metrics_args: MetricsArgs, } fn main() -> Result<()> { @@ -48,7 +43,6 @@ fn main() -> Result<()> { network_args, mix_args, cryptarchia_args, - metrics_args, } = Args::parse(); let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)? .update_from_args( @@ -59,13 +53,6 @@ fn main() -> Result<()> { cryptarchia_args, )?; - let registry = cfg!(feature = "metrics") - .then(|| { - metrics_args - .with_metrics - .then(nomos_metrics::NomosRegistry::default) - }) - .flatten(); #[cfg(debug_assertions)] let debug_span = { let debug_id = Uuid::new_v4(); @@ -86,7 +73,6 @@ fn main() -> Result<()> { topic: String::from(nomos_node::CL_TOPIC), id: ::hash, }, - registry: registry.clone(), }, da_mempool: nomos_mempool::DaMempoolSettings { backend: (), @@ -94,15 +80,12 @@ fn main() -> Result<()> { topic: String::from(nomos_node::DA_TOPIC), id: ::blob_id, }, - registry: registry.clone(), }, da_network: config.da_network, da_indexer: config.da_indexer, da_sampling: config.da_sampling, da_verifier: config.da_verifier, cryptarchia: config.cryptarchia, - #[cfg(feature = "metrics")] - metrics: MetricsSettings { registry }, storage: config.storage, system_sig: (), }, diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index bac4ecc0..2b812678 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -22,7 +22,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [ "libp2p", "openapi", ] } -nomos-metrics = { path = "../../nomos-services/metrics" } nomos-da-dispersal = { path = "../data-availability/dispersal" } nomos-da-indexer = { path = "../data-availability/indexer", features = ["rocksdb-backend"] } nomos-da-sampling = { path = "../data-availability/sampling" } diff --git a/nomos-services/api/src/http/metrics.rs b/nomos-services/api/src/http/metrics.rs deleted file mode 100644 index c25deb08..00000000 --- a/nomos-services/api/src/http/metrics.rs +++ /dev/null @@ -1,18 +0,0 @@ -use nomos_metrics::{Metrics, MetricsMsg}; -use tokio::sync::oneshot; - -pub async fn gather( - handle: &overwatch_rs::overwatch::handle::OverwatchHandle, -) -> Result { - let relay = handle.relay::().connect().await?; - let (sender, receiver) = oneshot::channel(); - - relay - .send(MetricsMsg::Gather { - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - Ok(receiver.await?) -} diff --git a/nomos-services/api/src/http/mod.rs b/nomos-services/api/src/http/mod.rs index 6ab13b2a..789aeee9 100644 --- a/nomos-services/api/src/http/mod.rs +++ b/nomos-services/api/src/http/mod.rs @@ -4,5 +4,4 @@ pub mod consensus; pub mod da; pub mod libp2p; pub mod mempool; -pub mod metrics; pub mod storage; diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 90bd74c8..deeb1bd1 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -11,7 +11,6 @@ use std::path::PathBuf; use std::time::Duration; // crates use bytes::Bytes; -use cl::InputWitness; use cryptarchia_consensus::TimeConfig; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::dispersal::BlobInfo; @@ -254,7 +253,6 @@ pub fn new_node( topic: String::from(nomos_node::CL_TOPIC), id: ::hash, }, - registry: None, }, da_mempool: DaMempoolSettings { backend: (), @@ -262,7 +260,6 @@ pub fn new_node( topic: String::from(nomos_node::DA_TOPIC), id: ::blob_id, }, - registry: None, }, storage: nomos_storage::backends::rocksdb::RocksBackendSettings { db_path, diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 8a5ec8e9..b0b4051e 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -10,7 +10,6 @@ async-trait = "0.1" bincode = { version = "2.0.0-rc.2", features = ["serde"] } futures = "0.3" linked-hash-map = { version = "0.5.6", optional = true } -nomos-metrics = { path = "../../nomos-services/metrics" } nomos-network = { path = "../network" } nomos-da-network-core = { path = "../../nomos-da/network/core" } nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling/" } @@ -38,7 +37,6 @@ blake2 = "0.10" default = [] mock = ["linked-hash-map", "nomos-network/mock", "nomos-core/mock"] libp2p = ["nomos-network/libp2p"] -metrics = [] # enable to help generate OpenAPI openapi = ["dep:utoipa", "serde_json"] diff --git a/nomos-services/mempool/src/da/service.rs b/nomos-services/mempool/src/da/service.rs index 0a1384a4..a9f72738 100644 --- a/nomos-services/mempool/src/da/service.rs +++ b/nomos-services/mempool/src/da/service.rs @@ -8,9 +8,6 @@ pub mod openapi { use std::fmt::Debug; // crates -// TODO: Add again after metrics refactor -// #[cfg(feature = "metrics")] -// use super::metrics::Metrics; use futures::StreamExt; use nomos_da_sampling::storage::DaStorageAdapter; use rand::{RngCore, SeedableRng}; @@ -23,7 +20,6 @@ use nomos_da_sampling::{ backend::DaSamplingServiceBackend, network::NetworkAdapter as DaSamplingNetworkAdapter, DaSamplingService, DaSamplingServiceMsg, }; -use nomos_metrics::NomosRegistry; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ @@ -55,9 +51,6 @@ where network_relay: Relay>, sampling_relay: Relay>, pool: P, - // TODO: Add again after metrics refactor - // #[cfg(feature = "metrics")] - // metrics: Option, } impl ServiceData for DaMempoolService @@ -113,19 +106,11 @@ where let sampling_relay = service_state.overwatch_handle.relay(); let settings = service_state.settings_reader.get_updated_settings(); - // TODO: Refactor metrics to be reusable then replug it again - // #[cfg(feature = "metrics")] - // let metrics = settings - // .registry - // .map(|reg| Metrics::new(reg, service_state.id())); - Ok(Self { service_state, network_relay, sampling_relay, pool: P::new(settings.backend), - // #[cfg(feature = "metrics")] - // metrics, }) } @@ -160,9 +145,6 @@ where loop { tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { - // TODO: replug metrics once refactor is done - // #[cfg(feature = "metrics")] - // if let Some(metrics) = &self.metrics { metrics.record(&msg) } Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; } Some((key, item)) = network_items.next() => { @@ -170,6 +152,7 @@ where pool.add_item(key, item).unwrap_or_else(|e| { tracing::debug!("could not add item to the pool due to: {}", e) }); + tracing::info!(counter.da_mempool_pending_items = pool.pending_item_count()); } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { @@ -294,5 +277,4 @@ where pub struct DaMempoolSettings { pub backend: B, pub network: N, - pub registry: Option, } diff --git a/nomos-services/mempool/src/tx/metrics.rs b/nomos-services/mempool/src/tx/metrics.rs deleted file mode 100644 index 0fbcd1b5..00000000 --- a/nomos-services/mempool/src/tx/metrics.rs +++ /dev/null @@ -1,80 +0,0 @@ -// std -use std::fmt::Debug; -// crates -use nomos_metrics::{ - metrics::{counter::Counter, family::Family}, - prometheus_client::{self, encoding::EncodeLabelSet, encoding::EncodeLabelValue}, - NomosRegistry, -}; -use overwatch_rs::services::ServiceId; -// internal -use crate::MempoolMsg; - -#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)] -enum MempoolMsgType { - Add, - View, - Prune, - MarkInBlock, -} - -impl From<&MempoolMsg> for MempoolMsgType -where - I: 'static + Debug, - K: 'static + Debug, -{ - fn from(event: &MempoolMsg) -> Self { - match event { - MempoolMsg::Add { .. } => MempoolMsgType::Add, - MempoolMsg::View { .. } => MempoolMsgType::View, - MempoolMsg::Prune { .. } => MempoolMsgType::Prune, - MempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock, - _ => unimplemented!(), - } - } -} - -#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)] -struct MessageLabels { - label: MempoolMsgType, -} - -pub(crate) struct Metrics { - messages: Family, -} - -impl Metrics { - pub(crate) fn new(registry: NomosRegistry, discriminant: ServiceId) -> Self { - let mut registry = registry - .lock() - .expect("should've acquired the lock for registry"); - let sub_registry = registry.sub_registry_with_prefix(discriminant); - - let messages = Family::default(); - sub_registry.register( - "messages", - "Messages emitted by the Mempool", - messages.clone(), - ); - - Self { messages } - } - - pub(crate) fn record(&self, msg: &MempoolMsg) - where - I: 'static + Debug, - K: 'static + Debug, - { - match msg { - MempoolMsg::Add { .. } - | MempoolMsg::View { .. } - | MempoolMsg::Prune { .. } - | MempoolMsg::MarkInBlock { .. } => { - self.messages - .get_or_create(&MessageLabels { label: msg.into() }) - .inc(); - } - _ => {} - } - } -} diff --git a/nomos-services/mempool/src/tx/mod.rs b/nomos-services/mempool/src/tx/mod.rs index 320bcdf1..1f278a4d 100644 --- a/nomos-services/mempool/src/tx/mod.rs +++ b/nomos-services/mempool/src/tx/mod.rs @@ -1,3 +1 @@ -#[cfg(feature = "metrics")] -pub mod metrics; pub mod service; diff --git a/nomos-services/mempool/src/tx/service.rs b/nomos-services/mempool/src/tx/service.rs index a03e3328..fe8fc798 100644 --- a/nomos-services/mempool/src/tx/service.rs +++ b/nomos-services/mempool/src/tx/service.rs @@ -8,10 +8,7 @@ pub mod openapi { use std::fmt::Debug; // crates -#[cfg(feature = "metrics")] -use super::metrics::Metrics; use futures::StreamExt; -use nomos_metrics::NomosRegistry; // internal use crate::backend::MemPool; use crate::network::NetworkAdapter; @@ -38,8 +35,6 @@ where service_state: ServiceStateHandle, network_relay: Relay>, pool: P, - #[cfg(feature = "metrics")] - metrics: Option, } impl ServiceData for TxMempoolService @@ -78,17 +73,10 @@ where let network_relay = service_state.overwatch_handle.relay(); let settings = service_state.settings_reader.get_updated_settings(); - #[cfg(feature = "metrics")] - let metrics = settings - .registry - .map(|reg| Metrics::new(reg, service_state.id())); - Ok(Self { service_state, network_relay, pool: P::new(settings.backend), - #[cfg(feature = "metrics")] - metrics, }) } @@ -117,14 +105,13 @@ where loop { tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { - #[cfg(feature = "metrics")] - if let Some(metrics) = &self.metrics { metrics.record(&msg) } Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; } Some((key, item )) = network_items.next() => { pool.add_item(key, item).unwrap_or_else(|e| { tracing::debug!("could not add item to the pool due to: {}", e) }); + tracing::info!(counter.tx_mempool_pending_items = pool.pending_item_count()); } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { @@ -241,5 +228,4 @@ where pub struct TxMempoolSettings { pub backend: B, pub network: N, - pub registry: Option, } diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index b9436f82..42676741 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -64,7 +64,6 @@ fn test_mockmempool() { mockpool: TxMempoolSettings { backend: (), network: (), - registry: None, }, logging: TracingSettings::default(), }, diff --git a/nomos-services/metrics/Cargo.toml b/nomos-services/metrics/Cargo.toml deleted file mode 100644 index 1867b0f1..00000000 --- a/nomos-services/metrics/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "nomos-metrics" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "0.1" -futures = "0.3" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } -prometheus-client = "0.22.0" -tracing = "0.1" -tokio = { version = "1", features = ["sync", "macros"] } -serde = { version = "1", features = ["derive"] } diff --git a/nomos-services/metrics/src/lib.rs b/nomos-services/metrics/src/lib.rs deleted file mode 100644 index d9461b38..00000000 --- a/nomos-services/metrics/src/lib.rs +++ /dev/null @@ -1,119 +0,0 @@ -pub use prometheus_client::{self, *}; - -// std -use std::fmt::{Debug, Error, Formatter}; -use std::sync::{Arc, Mutex}; -// crates -use futures::StreamExt; -use overwatch_rs::services::life_cycle::LifecycleMessage; -use overwatch_rs::services::{ - handle::ServiceStateHandle, - relay::RelayMessage, - state::{NoOperator, NoState}, - ServiceCore, ServiceData, -}; -use prometheus_client::encoding::text::encode; -use prometheus_client::registry::Registry; -use tokio::sync::oneshot::Sender; -use tracing::error; -// internal - -// A wrapper for prometheus_client Registry. -// Lock is only used during services initialization and prometheus pull query. -pub type NomosRegistry = Arc>; - -pub struct Metrics { - service_state: ServiceStateHandle, - registry: NomosRegistry, -} - -#[derive(Clone, Debug)] -pub struct MetricsSettings { - pub registry: Option, -} - -pub enum MetricsMsg { - Gather { reply_channel: Sender }, -} - -impl RelayMessage for MetricsMsg {} - -impl Debug for MetricsMsg { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { - match self { - Self::Gather { .. } => { - write!(f, "MetricsMsg::Gather") - } - } - } -} - -impl ServiceData for Metrics { - const SERVICE_ID: &'static str = "Metrics"; - type Settings = MetricsSettings; - type State = NoState; - type StateOperator = NoOperator; - type Message = MetricsMsg; -} - -#[async_trait::async_trait] -impl ServiceCore for Metrics { - fn init(service_state: ServiceStateHandle) -> Result { - let config = service_state.settings_reader.get_updated_settings(); - - Ok(Self { - service_state, - registry: config.registry.ok_or("No registry provided")?, - }) - } - - async fn run(self) -> Result<(), overwatch_rs::DynError> { - let Self { - mut service_state, - registry, - } = self; - let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); - loop { - tokio::select! { - Some(msg) = service_state.inbound_relay.recv() => { - let MetricsMsg::Gather{reply_channel} = msg; - - let mut buf = String::new(); - { - let reg = registry.lock().unwrap(); - // If encoding fails, we need to stop trying process subsequent metrics gather - // requests. If it succeds, encode method returns empty unit type. - _ = encode(&mut buf, ®).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - } - - reply_channel - .send(buf) - .unwrap_or_else(|_| tracing::debug!("could not send back metrics")); - } - Some(msg) = lifecycle_stream.next() => { - if Self::should_stop_service(msg).await { - break; - } - } - } - } - Ok(()) - } -} - -impl Metrics { - async fn should_stop_service(message: LifecycleMessage) -> bool { - match message { - LifecycleMessage::Shutdown(sender) => { - if sender.send(()).is_err() { - error!( - "Error sending successful shutdown signal from service {}", - Self::SERVICE_ID - ); - } - true - } - LifecycleMessage::Kill => true, - } - } -} diff --git a/nomos-services/tracing/src/lib.rs b/nomos-services/tracing/src/lib.rs index 15d928fa..8b4b475d 100644 --- a/nomos-services/tracing/src/lib.rs +++ b/nomos-services/tracing/src/lib.rs @@ -8,6 +8,7 @@ use nomos_tracing::filter::envfilter::{create_envfilter_layer, EnvFilterConfig}; use nomos_tracing::logging::gelf::{create_gelf_layer, GelfConfig}; use nomos_tracing::logging::local::{create_file_layer, create_writer_layer, FileConfig}; use nomos_tracing::logging::loki::{create_loki_layer, LokiConfig}; +use nomos_tracing::metrics::otlp::{create_otlp_metrics_layer, OtlpMetricsConfig}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -93,11 +94,18 @@ pub enum FilterLayer { None, } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum MetricsLayer { + Otlp(OtlpMetricsConfig), + None, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TracingSettings { pub logger: LoggerLayer, pub tracing: TracingLayer, pub filter: FilterLayer, + pub metrics: MetricsLayer, #[serde(with = "serde_level")] pub level: Level, } @@ -108,6 +116,7 @@ impl Default for TracingSettings { logger: LoggerLayer::Stdout, tracing: TracingLayer::None, filter: FilterLayer::None, + metrics: MetricsLayer::None, level: Level::DEBUG, } } @@ -119,12 +128,14 @@ impl TracingSettings { logger: LoggerLayer, tracing: TracingLayer, filter: FilterLayer, + metrics: MetricsLayer, level: Level, ) -> Self { Self { logger, tracing, filter, + metrics, level, } } @@ -194,6 +205,11 @@ impl ServiceCore for Tracing { layers.push(Box::new(filter_layer)); } + if let MetricsLayer::Otlp(config) = config.metrics { + let metrics_layer = create_otlp_metrics_layer(config)?; + layers.push(Box::new(metrics_layer)); + } + // If no layers are created, tracing subscriber is not required. if layers.is_empty() { return Ok(Self { diff --git a/nomos-tracing/Cargo.toml b/nomos-tracing/Cargo.toml index 0c876cdf..9b584c2f 100644 --- a/nomos-tracing/Cargo.toml +++ b/nomos-tracing/Cargo.toml @@ -5,14 +5,17 @@ edition = "2021" [dependencies] opentelemetry = { version = "0.26" } -opentelemetry-otlp = "0.26" +opentelemetry-otlp = { version = "0.26", features = ["grpc-tonic", "http-proto", "opentelemetry-http"] } opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] } +opentelemetry-http = { version = "0.26", features = ["reqwest"] } +opentelemetry-semantic-conventions = "0.26" +reqwest = "0.12" serde = { version = "1.0", features = ["derive"] } tokio = "1" tracing = "0.1" tracing-appender = "0.2" +tracing-gelf = "0.7" tracing-loki = "0.2.5" tracing-opentelemetry = "0.27" tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry"] } -tracing-gelf = "0.7" url = { version = "2", features = ["serde"] } diff --git a/nomos-tracing/src/lib.rs b/nomos-tracing/src/lib.rs index 68b36ec4..ac434e96 100644 --- a/nomos-tracing/src/lib.rs +++ b/nomos-tracing/src/lib.rs @@ -1,3 +1,4 @@ pub mod filter; pub mod logging; +pub mod metrics; pub mod tracing; diff --git a/nomos-tracing/src/metrics/mod.rs b/nomos-tracing/src/metrics/mod.rs new file mode 100644 index 00000000..95eb89aa --- /dev/null +++ b/nomos-tracing/src/metrics/mod.rs @@ -0,0 +1 @@ +pub mod otlp; diff --git a/nomos-tracing/src/metrics/otlp.rs b/nomos-tracing/src/metrics/otlp.rs new file mode 100644 index 00000000..5774b2c2 --- /dev/null +++ b/nomos-tracing/src/metrics/otlp.rs @@ -0,0 +1,52 @@ +// std +use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig}; +use opentelemetry_sdk::{runtime, Resource}; +use reqwest::Client; +use std::error::Error; +// crates +use opentelemetry::{global, KeyValue}; +use serde::{Deserialize, Serialize}; +use tracing::Subscriber; +use tracing_opentelemetry::MetricsLayer; +use tracing_subscriber::registry::LookupSpan; +use url::Url; +// internal + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OtlpMetricsConfig { + pub endpoint: Url, + pub host_identifier: String, +} + +pub fn create_otlp_metrics_layer( + config: OtlpMetricsConfig, +) -> Result, Box> +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + let resource = Resource::new(vec![KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + config.host_identifier, + )]); + + let export_config = ExportConfig { + endpoint: config.endpoint.into(), + protocol: Protocol::HttpBinary, + ..ExportConfig::default() + }; + + let client = Client::new(); + let meter_provider = opentelemetry_otlp::new_pipeline() + .metrics(runtime::Tokio) + .with_exporter( + opentelemetry_otlp::new_exporter() + .http() + .with_http_client(client) + .with_export_config(export_config), + ) + .with_resource(resource) + .build()?; + + global::set_meter_provider(meter_provider.clone()); + Ok(MetricsLayer::new(meter_provider)) +} diff --git a/testnet/cfgsync.yaml b/testnet/cfgsync.yaml index 750658aa..72ee17d8 100644 --- a/testnet/cfgsync.yaml +++ b/testnet/cfgsync.yaml @@ -18,3 +18,4 @@ global_params_path: "/kzgrs_test_params" # Tracing params tempo_endpoint: "http://tempo:4317" loki_endpoint: "http://loki:3100" +metrics_endpoint: "http://prometheus:9090/api/v1/otlp/v1/metrics" diff --git a/testnet/cfgsync/src/bin/cfgsync-server.rs b/testnet/cfgsync/src/bin/cfgsync-server.rs index 276ef96b..b775f444 100644 --- a/testnet/cfgsync/src/bin/cfgsync-server.rs +++ b/testnet/cfgsync/src/bin/cfgsync-server.rs @@ -49,6 +49,7 @@ struct CfgSyncConfig { // Tracing params tempo_endpoint: Url, loki_endpoint: Url, + metrics_endpoint: Url, } impl CfgSyncConfig { @@ -83,6 +84,7 @@ impl CfgSyncConfig { TracingParams { tempo_endpoint: self.tempo_endpoint.clone(), loki_endpoint: self.loki_endpoint.clone(), + metrics_endpoint: self.metrics_endpoint.clone(), } } } diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index 31a610af..5f40a4d7 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -4,8 +4,10 @@ use std::{collections::HashMap, net::Ipv4Addr, str::FromStr}; use nomos_libp2p::{Multiaddr, PeerId}; use nomos_mix::membership::Node; use nomos_mix_message::{mock::MockMixMessage, MixMessage}; -use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig}; -use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings}; +use nomos_tracing::{ + logging::loki::LokiConfig, metrics::otlp::OtlpMetricsConfig, tracing::otlp::OtlpTracingConfig, +}; +use nomos_tracing_service::{FilterLayer, LoggerLayer, MetricsLayer, TracingSettings}; use rand::{thread_rng, Rng}; use tests::topology::configs::{ api::GeneralApiConfig, @@ -193,13 +195,19 @@ fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> Gene tracing_settings: TracingSettings { logger: LoggerLayer::Loki(LokiConfig { endpoint: params.loki_endpoint, - host_identifier: identifier, + host_identifier: identifier.clone(), }), tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig { endpoint: params.tempo_endpoint, sample_ratio: 1.0, }), filter: FilterLayer::None, + metrics: MetricsLayer::Otlp(OtlpMetricsConfig { + endpoint: "http://127.0.0.1:9090/api/v1/otlp/v1/metrics" + .try_into() + .unwrap(), + host_identifier: identifier, + }), level: Level::INFO, }, } @@ -249,6 +257,7 @@ mod cfgsync_tests { TracingParams { tempo_endpoint: "http://test.com".try_into().unwrap(), loki_endpoint: "http://test.com".try_into().unwrap(), + metrics_endpoint: "http://test.com".try_into().unwrap(), }, hosts, ); diff --git a/testnet/cfgsync/src/lib.rs b/testnet/cfgsync/src/lib.rs index a432324e..c03e0708 100644 --- a/testnet/cfgsync/src/lib.rs +++ b/testnet/cfgsync/src/lib.rs @@ -7,4 +7,5 @@ pub mod repo; pub struct TracingParams { pub tempo_endpoint: Url, pub loki_endpoint: Url, + pub metrics_endpoint: Url, } diff --git a/testnet/monitoring/prometheus.yml b/testnet/monitoring/prometheus.yml index 6772dd7e..ee039b07 100644 --- a/testnet/monitoring/prometheus.yml +++ b/testnet/monitoring/prometheus.yml @@ -3,12 +3,3 @@ global: evaluation_interval: 15s external_labels: monitor: "Monitoring" - -scrape_configs: - - job_name: "nomos" - static_configs: - - targets: - - nomos-node-0:18080 - - nomos-node-1:18080 - - nomos-node-2:18080 - - nomos-node-3:18080 diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 8624f54e..c4ae7b46 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -62,5 +62,4 @@ name = "test_da" path = "src/tests/da.rs" [features] -metrics = ["nomos-node/metrics"] debug = [] diff --git a/tests/src/topology/configs/tracing.rs b/tests/src/topology/configs/tracing.rs index 8e445d21..7c67f3f7 100644 --- a/tests/src/topology/configs/tracing.rs +++ b/tests/src/topology/configs/tracing.rs @@ -1,5 +1,9 @@ -use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig}; -use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingLayer, TracingSettings}; +use nomos_tracing::{ + logging::loki::LokiConfig, metrics::otlp::OtlpMetricsConfig, tracing::otlp::OtlpTracingConfig, +}; +use nomos_tracing_service::{ + FilterLayer, LoggerLayer, MetricsLayer, TracingLayer, TracingSettings, +}; use tracing::Level; #[derive(Clone, Default)] @@ -10,11 +14,12 @@ pub struct GeneralTracingConfig { impl GeneralTracingConfig { #[allow(dead_code)] fn local_debug_tracing(id: usize) -> Self { + let host_identifier = format!("node-{id}"); Self { tracing_settings: TracingSettings { logger: LoggerLayer::Loki(LokiConfig { endpoint: "http://localhost:3100".try_into().unwrap(), - host_identifier: format!("node-{id}"), + host_identifier: host_identifier.clone(), }), tracing: TracingLayer::Otlp(OtlpTracingConfig { endpoint: "http://localhost:4317".try_into().unwrap(), @@ -29,6 +34,12 @@ impl GeneralTracingConfig { .map(|(k, v)| (k.to_string(), v.to_string())) .collect(), }), + metrics: MetricsLayer::Otlp(OtlpMetricsConfig { + endpoint: "http://127.0.0.1:9090/api/v1/otlp/v1/metrics" + .try_into() + .unwrap(), + host_identifier, + }), level: Level::DEBUG, }, }