Tracing: Otlp metrics (#909)

* Metrics layer configuration in nomos-tracing

* Add metrics layer to tracing service

* Use http/proto for prometheus

* Add metrics layer to integration tests

* Use metrics layer in cfgsync

* Plug metric in node and tests

* Remove old metrics

* Use otlp metrics in mempool
This commit is contained in:
gusto 2024-11-07 04:25:20 +02:00 committed by GitHub
parent e0959644a9
commit a5243c6af2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 117 additions and 390 deletions

View File

@ -16,7 +16,6 @@ members = [
"nomos-services/storage",
"nomos-services/cryptarchia-consensus",
"nomos-services/mempool",
"nomos-services/metrics",
"nomos-services/system-sig",
"nomos-services/data-availability/indexer",
"nomos-services/data-availability/network",

View File

@ -11,7 +11,6 @@ Nomos blockchain node mvp
- http
- mempool
- network
- metrics
- `nodes`: Nomos nodes is the collection of nodes that are used to run the Nomos mvp and experimental nodes.
- `nomos-node`: main implementation of the Nomos mvp node.
- `mockpool-node`: node with single mempool service, used to measure transaction dissemination.

View File

@ -10,6 +10,7 @@ services:
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.retention.time=7d
- --enable-feature=otlp-write-receiver
ports:
- 127.0.0.1:9090:9090
restart: on-failure

View File

@ -84,6 +84,7 @@ services:
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.retention.time=7d
- --enable-feature=otlp-write-receiver
ports:
- 127.0.0.1:9090:9090
restart: on-failure

View File

@ -22,7 +22,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
"mock",
"libp2p",
] }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] }
nomos-node = { path = "../nomos-node" }
@ -42,5 +41,4 @@ uuid = { version = "1.10.0", features = ["v4"] }
[features]
default = ["tracing"]
metrics = ["nomos-node/metrics"]
tracing = ["nomos-node/tracing"]

View File

@ -28,7 +28,7 @@ use nomos_libp2p::PeerId;
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
use nomos_node::api::handlers::{
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
cryptarchia_info, get_metrics, get_range, libp2p_info,
cryptarchia_info, get_range, libp2p_info,
};
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
@ -327,7 +327,6 @@ where
>,
),
)
.route(paths::METRICS, routing::get(get_metrics))
.route(
paths::DISPERSE_DATA,
routing::post(

View File

@ -24,8 +24,6 @@ use nomos_mix_service::MixService;
use nomos_node::DispersedBlobInfo;
use nomos_node::HeaderId;
use nomos_node::MempoolNetworkAdapter;
#[cfg(feature = "metrics")]
use nomos_node::Metrics;
use nomos_node::NetworkBackend;
use nomos_node::{
BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier,
@ -101,7 +99,5 @@ pub struct NomosExecutor {
cryptarchia: ServiceHandle<ExecutorCryptarchia>,
http: ServiceHandle<ExecutorApiService>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,
system_sig: ServiceHandle<SystemSig>,
}

View File

@ -4,12 +4,10 @@ use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_executor::config::Config as ExecutorConfig;
use nomos_executor::{NomosExecutor, NomosExecutorServiceSettings};
#[cfg(feature = "metrics")]
use nomos_node::MetricsSettings;
use nomos_node::{
config::MixArgs, BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs,
LogArgs, MempoolAdapterSettings, MetricsArgs, NetworkArgs, Transaction, Tx, TxMempoolSettings,
CL_TOPIC, DA_TOPIC,
LogArgs, MempoolAdapterSettings, NetworkArgs, Transaction, Tx, TxMempoolSettings, CL_TOPIC,
DA_TOPIC,
};
use overwatch_rs::overwatch::*;
use tracing::{span, Level};
@ -35,9 +33,6 @@ struct Args {
http_args: HttpArgs,
#[clap(flatten)]
cryptarchia_args: CryptarchiaArgs,
/// Overrides metrics config.
#[clap(flatten)]
metrics_args: MetricsArgs,
}
fn main() -> Result<()> {
@ -48,7 +43,6 @@ fn main() -> Result<()> {
network_args,
mix_args,
cryptarchia_args,
metrics_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, ExecutorConfig>(std::fs::File::open(config)?)?
.update_from_args(
@ -59,14 +53,6 @@ fn main() -> Result<()> {
cryptarchia_args,
)?;
let registry = cfg!(feature = "metrics")
.then(|| {
metrics_args
.with_metrics
.then(nomos_metrics::NomosRegistry::default)
})
.flatten();
#[cfg(debug_assertions)]
let debug_span = {
let debug_id = Uuid::new_v4();
@ -87,7 +73,6 @@ fn main() -> Result<()> {
topic: String::from(CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: registry.clone(),
},
da_mempool: DaMempoolSettings {
backend: (),
@ -95,7 +80,6 @@ fn main() -> Result<()> {
topic: String::from(DA_TOPIC),
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: registry.clone(),
},
da_dispersal: config.da_dispersal,
da_network: config.da_network,
@ -103,8 +87,6 @@ fn main() -> Result<()> {
da_sampling: config.da_sampling,
da_verifier: config.da_verifier,
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
storage: config.storage,
system_sig: (),
},

View File

@ -36,7 +36,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
"mock",
"libp2p",
] }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
cryptarchia-consensus = { path = "../../nomos-services/cryptarchia-consensus", features = ["libp2p"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
@ -65,5 +64,4 @@ rand = "0.8"
[features]
default = ["tracing"]
metrics = []
tracing = []

View File

@ -29,7 +29,7 @@ use utoipa_swagger_ui::SwaggerUi;
// internal
use super::handlers::{
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
cryptarchia_info, get_metrics, get_range, libp2p_info,
cryptarchia_info, get_range, libp2p_info,
};
/// Configuration for the Http Server
@ -303,7 +303,6 @@ where
>,
),
)
.route(paths::METRICS, routing::get(get_metrics))
.with_state(handle);
Server::bind(&self.settings.address)

View File

@ -5,16 +5,14 @@ use std::{fmt::Debug, hash::Hash};
// crates
use axum::{
extract::{Query, State},
http::HeaderValue,
response::{IntoResponse, Response},
response::Response,
Json,
};
use hyper::{header::CONTENT_TYPE, Body, StatusCode};
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
// internal
use super::paths;
use nomos_api::http::{cl, consensus, da, libp2p, mempool, metrics, storage};
use nomos_api::http::{cl, consensus, da, libp2p, mempool, storage};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::{BlobId, DaVerifier as CoreDaVerifier};
@ -418,29 +416,3 @@ where
SamplingStorage,
>(&handle, blob_info, DispersedBlobInfo::blob_id))
}
#[utoipa::path(
get,
path = paths::METRICS,
responses(
(status = 200, description = "Get all metrics"),
(status = 500, description = "Internal server error", body = String),
)
)]
pub async fn get_metrics(State(handle): State<OverwatchHandle>) -> Response {
match metrics::gather(&handle).await {
Ok(encoded_metrics) => Response::builder()
.status(StatusCode::OK)
.header(
CONTENT_TYPE,
HeaderValue::from_static("text/plain; version=0.0.4"),
)
.body(Body::from(encoded_metrics))
.unwrap()
.into_response(),
Err(e) => axum::response::IntoResponse::into_response((
hyper::StatusCode::INTERNAL_SERVER_ERROR,
e.to_string(),
)),
}
}

View File

@ -8,4 +8,3 @@ pub const NETWORK_INFO: &str = "/network/info";
pub const STORAGE_BLOCK: &str = "/storage/block";
pub const MEMPOOL_ADD_TX: &str = "/mempool/add/tx";
pub const MEMPOOL_ADD_BLOB_INFO: &str = "/mempool/add/blobinfo";
pub const METRICS: &str = "/metrics";

View File

@ -129,12 +129,6 @@ pub struct CryptarchiaArgs {
note_nonce: Option<String>,
}
#[derive(Parser, Debug, Clone)]
pub struct MetricsArgs {
#[clap(long = "with-metrics", env = "WITH_METRICS")]
pub with_metrics: bool,
}
#[derive(Deserialize, Debug, Clone, Serialize)]
pub struct Config {
pub tracing: <Tracing as ServiceData>::Settings,

View File

@ -7,7 +7,7 @@ mod tx;
use api::backend::AxumBackend;
use bytes::Bytes;
use color_eyre::eyre::Result;
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs};
use kzgrs_backend::common::blob::DaBlob;
pub use kzgrs_backend::dispersal::BlobInfo;
use nomos_api::ApiService;
@ -35,9 +35,6 @@ pub use nomos_mempool::network::adapters::libp2p::{
};
pub use nomos_mempool::TxMempoolSettings;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
pub use nomos_metrics::NomosRegistry;
#[cfg(feature = "metrics")]
pub use nomos_metrics::{Metrics, MetricsSettings};
pub use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
pub use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
pub use nomos_mix_service::MixService;
@ -173,8 +170,6 @@ pub struct Nomos {
cryptarchia: ServiceHandle<NodeCryptarchia>,
http: ServiceHandle<NomosApiService>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,
system_sig: ServiceHandle<SystemSig>,
}

View File

@ -1,8 +1,6 @@
use kzgrs_backend::dispersal::BlobInfo;
#[cfg(feature = "metrics")]
use nomos_metrics::MetricsSettings;
use nomos_node::{
config::MixArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos,
config::MixArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos,
NomosServiceSettings, Tx,
};
@ -35,9 +33,6 @@ struct Args {
http_args: HttpArgs,
#[clap(flatten)]
cryptarchia_args: CryptarchiaArgs,
/// Overrides metrics config.
#[clap(flatten)]
metrics_args: MetricsArgs,
}
fn main() -> Result<()> {
@ -48,7 +43,6 @@ fn main() -> Result<()> {
network_args,
mix_args,
cryptarchia_args,
metrics_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_from_args(
@ -59,13 +53,6 @@ fn main() -> Result<()> {
cryptarchia_args,
)?;
let registry = cfg!(feature = "metrics")
.then(|| {
metrics_args
.with_metrics
.then(nomos_metrics::NomosRegistry::default)
})
.flatten();
#[cfg(debug_assertions)]
let debug_span = {
let debug_id = Uuid::new_v4();
@ -86,7 +73,6 @@ fn main() -> Result<()> {
topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: registry.clone(),
},
da_mempool: nomos_mempool::DaMempoolSettings {
backend: (),
@ -94,15 +80,12 @@ fn main() -> Result<()> {
topic: String::from(nomos_node::DA_TOPIC),
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: registry.clone(),
},
da_network: config.da_network,
da_indexer: config.da_indexer,
da_sampling: config.da_sampling,
da_verifier: config.da_verifier,
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
storage: config.storage,
system_sig: (),
},

View File

@ -22,7 +22,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
"libp2p",
"openapi",
] }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-da-dispersal = { path = "../data-availability/dispersal" }
nomos-da-indexer = { path = "../data-availability/indexer", features = ["rocksdb-backend"] }
nomos-da-sampling = { path = "../data-availability/sampling" }

View File

@ -1,18 +0,0 @@
use nomos_metrics::{Metrics, MetricsMsg};
use tokio::sync::oneshot;
pub async fn gather(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<String, super::DynError> {
let relay = handle.relay::<Metrics>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MetricsMsg::Gather {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}

View File

@ -4,5 +4,4 @@ pub mod consensus;
pub mod da;
pub mod libp2p;
pub mod mempool;
pub mod metrics;
pub mod storage;

View File

@ -11,7 +11,6 @@ use std::path::PathBuf;
use std::time::Duration;
// crates
use bytes::Bytes;
use cl::InputWitness;
use cryptarchia_consensus::TimeConfig;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::dispersal::BlobInfo;
@ -254,7 +253,6 @@ pub fn new_node(
topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: None,
},
da_mempool: DaMempoolSettings {
backend: (),
@ -262,7 +260,6 @@ pub fn new_node(
topic: String::from(nomos_node::DA_TOPIC),
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: None,
},
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,

View File

@ -10,7 +10,6 @@ async-trait = "0.1"
bincode = { version = "2.0.0-rc.2", features = ["serde"] }
futures = "0.3"
linked-hash-map = { version = "0.5.6", optional = true }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-network = { path = "../network" }
nomos-da-network-core = { path = "../../nomos-da/network/core" }
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling/" }
@ -38,7 +37,6 @@ blake2 = "0.10"
default = []
mock = ["linked-hash-map", "nomos-network/mock", "nomos-core/mock"]
libp2p = ["nomos-network/libp2p"]
metrics = []
# enable to help generate OpenAPI
openapi = ["dep:utoipa", "serde_json"]

View File

@ -8,9 +8,6 @@ pub mod openapi {
use std::fmt::Debug;
// crates
// TODO: Add again after metrics refactor
// #[cfg(feature = "metrics")]
// use super::metrics::Metrics;
use futures::StreamExt;
use nomos_da_sampling::storage::DaStorageAdapter;
use rand::{RngCore, SeedableRng};
@ -23,7 +20,6 @@ use nomos_da_sampling::{
backend::DaSamplingServiceBackend, network::NetworkAdapter as DaSamplingNetworkAdapter,
DaSamplingService, DaSamplingServiceMsg,
};
use nomos_metrics::NomosRegistry;
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
@ -55,9 +51,6 @@ where
network_relay: Relay<NetworkService<N::Backend>>,
sampling_relay: Relay<DaSamplingService<DB, DN, R, SamplingStorage>>,
pool: P,
// TODO: Add again after metrics refactor
// #[cfg(feature = "metrics")]
// metrics: Option<Metrics>,
}
impl<N, P, DB, DN, R, DaStorage> ServiceData for DaMempoolService<N, P, DB, DN, R, DaStorage>
@ -113,19 +106,11 @@ where
let sampling_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
// TODO: Refactor metrics to be reusable then replug it again
// #[cfg(feature = "metrics")]
// let metrics = settings
// .registry
// .map(|reg| Metrics::new(reg, service_state.id()));
Ok(Self {
service_state,
network_relay,
sampling_relay,
pool: P::new(settings.backend),
// #[cfg(feature = "metrics")]
// metrics,
})
}
@ -160,9 +145,6 @@ where
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
// TODO: replug metrics once refactor is done
// #[cfg(feature = "metrics")]
// if let Some(metrics) = &self.metrics { metrics.record(&msg) }
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
}
Some((key, item)) = network_items.next() => {
@ -170,6 +152,7 @@ where
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
tracing::info!(counter.da_mempool_pending_items = pool.pending_item_count());
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
@ -294,5 +277,4 @@ where
pub struct DaMempoolSettings<B, N> {
pub backend: B,
pub network: N,
pub registry: Option<NomosRegistry>,
}

View File

@ -1,80 +0,0 @@
// std
use std::fmt::Debug;
// crates
use nomos_metrics::{
metrics::{counter::Counter, family::Family},
prometheus_client::{self, encoding::EncodeLabelSet, encoding::EncodeLabelValue},
NomosRegistry,
};
use overwatch_rs::services::ServiceId;
// internal
use crate::MempoolMsg;
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelValue)]
enum MempoolMsgType {
Add,
View,
Prune,
MarkInBlock,
}
impl<BlockId, P, I, K> From<&MempoolMsg<BlockId, P, I, K>> for MempoolMsgType
where
I: 'static + Debug,
K: 'static + Debug,
{
fn from(event: &MempoolMsg<BlockId, P, I, K>) -> Self {
match event {
MempoolMsg::Add { .. } => MempoolMsgType::Add,
MempoolMsg::View { .. } => MempoolMsgType::View,
MempoolMsg::Prune { .. } => MempoolMsgType::Prune,
MempoolMsg::MarkInBlock { .. } => MempoolMsgType::MarkInBlock,
_ => unimplemented!(),
}
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, EncodeLabelSet)]
struct MessageLabels {
label: MempoolMsgType,
}
pub(crate) struct Metrics {
messages: Family<MessageLabels, Counter>,
}
impl Metrics {
pub(crate) fn new(registry: NomosRegistry, discriminant: ServiceId) -> Self {
let mut registry = registry
.lock()
.expect("should've acquired the lock for registry");
let sub_registry = registry.sub_registry_with_prefix(discriminant);
let messages = Family::default();
sub_registry.register(
"messages",
"Messages emitted by the Mempool",
messages.clone(),
);
Self { messages }
}
pub(crate) fn record<BlockId, P, I, K>(&self, msg: &MempoolMsg<BlockId, P, I, K>)
where
I: 'static + Debug,
K: 'static + Debug,
{
match msg {
MempoolMsg::Add { .. }
| MempoolMsg::View { .. }
| MempoolMsg::Prune { .. }
| MempoolMsg::MarkInBlock { .. } => {
self.messages
.get_or_create(&MessageLabels { label: msg.into() })
.inc();
}
_ => {}
}
}
}

View File

@ -1,3 +1 @@
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod service;

View File

@ -8,10 +8,7 @@ pub mod openapi {
use std::fmt::Debug;
// crates
#[cfg(feature = "metrics")]
use super::metrics::Metrics;
use futures::StreamExt;
use nomos_metrics::NomosRegistry;
// internal
use crate::backend::MemPool;
use crate::network::NetworkAdapter;
@ -38,8 +35,6 @@ where
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
pool: P,
#[cfg(feature = "metrics")]
metrics: Option<Metrics>,
}
impl<N, P> ServiceData for TxMempoolService<N, P>
@ -78,17 +73,10 @@ where
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
#[cfg(feature = "metrics")]
let metrics = settings
.registry
.map(|reg| Metrics::new(reg, service_state.id()));
Ok(Self {
service_state,
network_relay,
pool: P::new(settings.backend),
#[cfg(feature = "metrics")]
metrics,
})
}
@ -117,14 +105,13 @@ where
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
#[cfg(feature = "metrics")]
if let Some(metrics) = &self.metrics { metrics.record(&msg) }
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
}
Some((key, item )) = network_items.next() => {
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
tracing::info!(counter.tx_mempool_pending_items = pool.pending_item_count());
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
@ -241,5 +228,4 @@ where
pub struct TxMempoolSettings<B, N> {
pub backend: B,
pub network: N,
pub registry: Option<NomosRegistry>,
}

View File

@ -64,7 +64,6 @@ fn test_mockmempool() {
mockpool: TxMempoolSettings {
backend: (),
network: (),
registry: None,
},
logging: TracingSettings::default(),
},

View File

@ -1,16 +0,0 @@
[package]
name = "nomos-metrics"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
futures = "0.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
prometheus-client = "0.22.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] }
serde = { version = "1", features = ["derive"] }

View File

@ -1,119 +0,0 @@
pub use prometheus_client::{self, *};
// std
use std::fmt::{Debug, Error, Formatter};
use std::sync::{Arc, Mutex};
// crates
use futures::StreamExt;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::RelayMessage,
state::{NoOperator, NoState},
ServiceCore, ServiceData,
};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use tokio::sync::oneshot::Sender;
use tracing::error;
// internal
// A wrapper for prometheus_client Registry.
// Lock is only used during services initialization and prometheus pull query.
pub type NomosRegistry = Arc<Mutex<Registry>>;
pub struct Metrics {
service_state: ServiceStateHandle<Self>,
registry: NomosRegistry,
}
#[derive(Clone, Debug)]
pub struct MetricsSettings {
pub registry: Option<NomosRegistry>,
}
pub enum MetricsMsg {
Gather { reply_channel: Sender<String> },
}
impl RelayMessage for MetricsMsg {}
impl Debug for MetricsMsg {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::Gather { .. } => {
write!(f, "MetricsMsg::Gather")
}
}
}
}
impl ServiceData for Metrics {
const SERVICE_ID: &'static str = "Metrics";
type Settings = MetricsSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MetricsMsg;
}
#[async_trait::async_trait]
impl ServiceCore for Metrics {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let config = service_state.settings_reader.get_updated_settings();
Ok(Self {
service_state,
registry: config.registry.ok_or("No registry provided")?,
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
registry,
} = self;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
let MetricsMsg::Gather{reply_channel} = msg;
let mut buf = String::new();
{
let reg = registry.lock().unwrap();
// If encoding fails, we need to stop trying process subsequent metrics gather
// requests. If it succeds, encode method returns empty unit type.
_ = encode(&mut buf, &reg).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
}
reply_channel
.send(buf)
.unwrap_or_else(|_| tracing::debug!("could not send back metrics"));
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl Metrics {
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
}

View File

@ -8,6 +8,7 @@ use nomos_tracing::filter::envfilter::{create_envfilter_layer, EnvFilterConfig};
use nomos_tracing::logging::gelf::{create_gelf_layer, GelfConfig};
use nomos_tracing::logging::local::{create_file_layer, create_writer_layer, FileConfig};
use nomos_tracing::logging::loki::{create_loki_layer, LokiConfig};
use nomos_tracing::metrics::otlp::{create_otlp_metrics_layer, OtlpMetricsConfig};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -93,11 +94,18 @@ pub enum FilterLayer {
None,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum MetricsLayer {
Otlp(OtlpMetricsConfig),
None,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TracingSettings {
pub logger: LoggerLayer,
pub tracing: TracingLayer,
pub filter: FilterLayer,
pub metrics: MetricsLayer,
#[serde(with = "serde_level")]
pub level: Level,
}
@ -108,6 +116,7 @@ impl Default for TracingSettings {
logger: LoggerLayer::Stdout,
tracing: TracingLayer::None,
filter: FilterLayer::None,
metrics: MetricsLayer::None,
level: Level::DEBUG,
}
}
@ -119,12 +128,14 @@ impl TracingSettings {
logger: LoggerLayer,
tracing: TracingLayer,
filter: FilterLayer,
metrics: MetricsLayer,
level: Level,
) -> Self {
Self {
logger,
tracing,
filter,
metrics,
level,
}
}
@ -194,6 +205,11 @@ impl ServiceCore for Tracing {
layers.push(Box::new(filter_layer));
}
if let MetricsLayer::Otlp(config) = config.metrics {
let metrics_layer = create_otlp_metrics_layer(config)?;
layers.push(Box::new(metrics_layer));
}
// If no layers are created, tracing subscriber is not required.
if layers.is_empty() {
return Ok(Self {

View File

@ -5,14 +5,17 @@ edition = "2021"
[dependencies]
opentelemetry = { version = "0.26" }
opentelemetry-otlp = "0.26"
opentelemetry-otlp = { version = "0.26", features = ["grpc-tonic", "http-proto", "opentelemetry-http"] }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
opentelemetry-http = { version = "0.26", features = ["reqwest"] }
opentelemetry-semantic-conventions = "0.26"
reqwest = "0.12"
serde = { version = "1.0", features = ["derive"] }
tokio = "1"
tracing = "0.1"
tracing-appender = "0.2"
tracing-gelf = "0.7"
tracing-loki = "0.2.5"
tracing-opentelemetry = "0.27"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "registry"] }
tracing-gelf = "0.7"
url = { version = "2", features = ["serde"] }

View File

@ -1,3 +1,4 @@
pub mod filter;
pub mod logging;
pub mod metrics;
pub mod tracing;

View File

@ -0,0 +1 @@
pub mod otlp;

View File

@ -0,0 +1,52 @@
// std
use opentelemetry_otlp::{ExportConfig, Protocol, WithExportConfig};
use opentelemetry_sdk::{runtime, Resource};
use reqwest::Client;
use std::error::Error;
// crates
use opentelemetry::{global, KeyValue};
use serde::{Deserialize, Serialize};
use tracing::Subscriber;
use tracing_opentelemetry::MetricsLayer;
use tracing_subscriber::registry::LookupSpan;
use url::Url;
// internal
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OtlpMetricsConfig {
pub endpoint: Url,
pub host_identifier: String,
}
pub fn create_otlp_metrics_layer<S>(
config: OtlpMetricsConfig,
) -> Result<MetricsLayer<S>, Box<dyn Error + Send + Sync>>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let resource = Resource::new(vec![KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
config.host_identifier,
)]);
let export_config = ExportConfig {
endpoint: config.endpoint.into(),
protocol: Protocol::HttpBinary,
..ExportConfig::default()
};
let client = Client::new();
let meter_provider = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.http()
.with_http_client(client)
.with_export_config(export_config),
)
.with_resource(resource)
.build()?;
global::set_meter_provider(meter_provider.clone());
Ok(MetricsLayer::new(meter_provider))
}

View File

@ -18,3 +18,4 @@ global_params_path: "/kzgrs_test_params"
# Tracing params
tempo_endpoint: "http://tempo:4317"
loki_endpoint: "http://loki:3100"
metrics_endpoint: "http://prometheus:9090/api/v1/otlp/v1/metrics"

View File

@ -49,6 +49,7 @@ struct CfgSyncConfig {
// Tracing params
tempo_endpoint: Url,
loki_endpoint: Url,
metrics_endpoint: Url,
}
impl CfgSyncConfig {
@ -83,6 +84,7 @@ impl CfgSyncConfig {
TracingParams {
tempo_endpoint: self.tempo_endpoint.clone(),
loki_endpoint: self.loki_endpoint.clone(),
metrics_endpoint: self.metrics_endpoint.clone(),
}
}
}

View File

@ -4,8 +4,10 @@ use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
use nomos_libp2p::{Multiaddr, PeerId};
use nomos_mix::membership::Node;
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig};
use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings};
use nomos_tracing::{
logging::loki::LokiConfig, metrics::otlp::OtlpMetricsConfig, tracing::otlp::OtlpTracingConfig,
};
use nomos_tracing_service::{FilterLayer, LoggerLayer, MetricsLayer, TracingSettings};
use rand::{thread_rng, Rng};
use tests::topology::configs::{
api::GeneralApiConfig,
@ -193,13 +195,19 @@ fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> Gene
tracing_settings: TracingSettings {
logger: LoggerLayer::Loki(LokiConfig {
endpoint: params.loki_endpoint,
host_identifier: identifier,
host_identifier: identifier.clone(),
}),
tracing: nomos_tracing_service::TracingLayer::Otlp(OtlpTracingConfig {
endpoint: params.tempo_endpoint,
sample_ratio: 1.0,
}),
filter: FilterLayer::None,
metrics: MetricsLayer::Otlp(OtlpMetricsConfig {
endpoint: "http://127.0.0.1:9090/api/v1/otlp/v1/metrics"
.try_into()
.unwrap(),
host_identifier: identifier,
}),
level: Level::INFO,
},
}
@ -249,6 +257,7 @@ mod cfgsync_tests {
TracingParams {
tempo_endpoint: "http://test.com".try_into().unwrap(),
loki_endpoint: "http://test.com".try_into().unwrap(),
metrics_endpoint: "http://test.com".try_into().unwrap(),
},
hosts,
);

View File

@ -7,4 +7,5 @@ pub mod repo;
pub struct TracingParams {
pub tempo_endpoint: Url,
pub loki_endpoint: Url,
pub metrics_endpoint: Url,
}

View File

@ -3,12 +3,3 @@ global:
evaluation_interval: 15s
external_labels:
monitor: "Monitoring"
scrape_configs:
- job_name: "nomos"
static_configs:
- targets:
- nomos-node-0:18080
- nomos-node-1:18080
- nomos-node-2:18080
- nomos-node-3:18080

View File

@ -62,5 +62,4 @@ name = "test_da"
path = "src/tests/da.rs"
[features]
metrics = ["nomos-node/metrics"]
debug = []

View File

@ -1,5 +1,9 @@
use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig};
use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingLayer, TracingSettings};
use nomos_tracing::{
logging::loki::LokiConfig, metrics::otlp::OtlpMetricsConfig, tracing::otlp::OtlpTracingConfig,
};
use nomos_tracing_service::{
FilterLayer, LoggerLayer, MetricsLayer, TracingLayer, TracingSettings,
};
use tracing::Level;
#[derive(Clone, Default)]
@ -10,11 +14,12 @@ pub struct GeneralTracingConfig {
impl GeneralTracingConfig {
#[allow(dead_code)]
fn local_debug_tracing(id: usize) -> Self {
let host_identifier = format!("node-{id}");
Self {
tracing_settings: TracingSettings {
logger: LoggerLayer::Loki(LokiConfig {
endpoint: "http://localhost:3100".try_into().unwrap(),
host_identifier: format!("node-{id}"),
host_identifier: host_identifier.clone(),
}),
tracing: TracingLayer::Otlp(OtlpTracingConfig {
endpoint: "http://localhost:4317".try_into().unwrap(),
@ -29,6 +34,12 @@ impl GeneralTracingConfig {
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
}),
metrics: MetricsLayer::Otlp(OtlpMetricsConfig {
endpoint: "http://127.0.0.1:9090/api/v1/otlp/v1/metrics"
.try_into()
.unwrap(),
host_identifier,
}),
level: Level::DEBUG,
},
}