diff --git a/Cargo.toml b/Cargo.toml index dd1ae02d..580def01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,14 +3,13 @@ members = [ "nomos-core", "nomos-libp2p", "nomos-services/api", - "nomos-metrics", "nomos-services/log", - "nomos-services/metrics", "nomos-services/network", "nomos-services/storage", "nomos-services/carnot-consensus", "nomos-services/cryptarchia-consensus", "nomos-services/mempool", + "nomos-services/metrics", "nomos-services/data-availability", "nomos-services/system-sig", "nomos-da/reed-solomon", diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index a51cd092..433e453a 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -26,9 +26,8 @@ nomos-log = { path = "../../nomos-services/log" } nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", "libp2p", - "metrics", ] } -nomos-metrics = { path = "../../nomos-metrics" } +nomos-metrics = { path = "../../nomos-services/metrics" } nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = [ @@ -40,7 +39,6 @@ nomos-da = { path = "../../nomos-services/data-availability", features = [ "libp2p", ] } nomos-system-sig = { path = "../../nomos-services/system-sig" } -metrics = { path = "../../nomos-services/metrics", optional = true } tracing-subscriber = "0.3" carnot-engine = { path = "../../consensus/carnot-engine" } tokio = { version = "1.24", features = ["sync"] } @@ -60,3 +58,4 @@ tower-http = { version = "0.4", features = ["cors", "trace"] } [features] default = [] mixnet = ["nomos-network/mixnet"] +metrics = [] diff --git a/nomos-metrics/Cargo.toml b/nomos-metrics/Cargo.toml deleted file mode 100644 index 1867b0f1..00000000 --- a/nomos-metrics/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "nomos-metrics" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -async-trait = "0.1" -futures = "0.3" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } -prometheus-client = "0.22.0" -tracing = "0.1" -tokio = { version = "1", features = ["sync", "macros"] } -serde = { version = "1", features = ["derive"] } diff --git a/nomos-metrics/src/lib.rs b/nomos-metrics/src/lib.rs deleted file mode 100644 index d9461b38..00000000 --- a/nomos-metrics/src/lib.rs +++ /dev/null @@ -1,119 +0,0 @@ -pub use prometheus_client::{self, *}; - -// std -use std::fmt::{Debug, Error, Formatter}; -use std::sync::{Arc, Mutex}; -// crates -use futures::StreamExt; -use overwatch_rs::services::life_cycle::LifecycleMessage; -use overwatch_rs::services::{ - handle::ServiceStateHandle, - relay::RelayMessage, - state::{NoOperator, NoState}, - ServiceCore, ServiceData, -}; -use prometheus_client::encoding::text::encode; -use prometheus_client::registry::Registry; -use tokio::sync::oneshot::Sender; -use tracing::error; -// internal - -// A wrapper for prometheus_client Registry. -// Lock is only used during services initialization and prometheus pull query. -pub type NomosRegistry = Arc>; - -pub struct Metrics { - service_state: ServiceStateHandle, - registry: NomosRegistry, -} - -#[derive(Clone, Debug)] -pub struct MetricsSettings { - pub registry: Option, -} - -pub enum MetricsMsg { - Gather { reply_channel: Sender }, -} - -impl RelayMessage for MetricsMsg {} - -impl Debug for MetricsMsg { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { - match self { - Self::Gather { .. } => { - write!(f, "MetricsMsg::Gather") - } - } - } -} - -impl ServiceData for Metrics { - const SERVICE_ID: &'static str = "Metrics"; - type Settings = MetricsSettings; - type State = NoState; - type StateOperator = NoOperator; - type Message = MetricsMsg; -} - -#[async_trait::async_trait] -impl ServiceCore for Metrics { - fn init(service_state: ServiceStateHandle) -> Result { - let config = service_state.settings_reader.get_updated_settings(); - - Ok(Self { - service_state, - registry: config.registry.ok_or("No registry provided")?, - }) - } - - async fn run(self) -> Result<(), overwatch_rs::DynError> { - let Self { - mut service_state, - registry, - } = self; - let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); - loop { - tokio::select! { - Some(msg) = service_state.inbound_relay.recv() => { - let MetricsMsg::Gather{reply_channel} = msg; - - let mut buf = String::new(); - { - let reg = registry.lock().unwrap(); - // If encoding fails, we need to stop trying process subsequent metrics gather - // requests. If it succeds, encode method returns empty unit type. - _ = encode(&mut buf, ®).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - } - - reply_channel - .send(buf) - .unwrap_or_else(|_| tracing::debug!("could not send back metrics")); - } - Some(msg) = lifecycle_stream.next() => { - if Self::should_stop_service(msg).await { - break; - } - } - } - } - Ok(()) - } -} - -impl Metrics { - async fn should_stop_service(message: LifecycleMessage) -> bool { - match message { - LifecycleMessage::Shutdown(sender) => { - if sender.send(()).is_err() { - error!( - "Error sending successful shutdown signal from service {}", - Self::SERVICE_ID - ); - } - true - } - LifecycleMessage::Kill => true, - } - } -} diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 9eafe3fc..64be5db0 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -23,7 +23,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [ "libp2p", "openapi", ] } -nomos-metrics = { path = "../../nomos-metrics" } +nomos-metrics = { path = "../../nomos-services/metrics" } nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } full-replication = { path = "../../nomos-da/full-replication" } diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index d1a4e895..a31a531d 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -10,7 +10,7 @@ async-trait = "0.1" bincode = { version = "2.0.0-rc.2", features = ["serde"] } futures = "0.3" linked-hash-map = { version = "0.5.6", optional = true } -nomos-metrics = { path = "../../nomos-metrics" } +nomos-metrics = { path = "../../nomos-services/metrics" } nomos-network = { path = "../network" } nomos-core = { path = "../../nomos-core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } diff --git a/nomos-services/metrics/Cargo.toml b/nomos-services/metrics/Cargo.toml index 868f44e4..1867b0f1 100644 --- a/nomos-services/metrics/Cargo.toml +++ b/nomos-services/metrics/Cargo.toml @@ -1,33 +1,16 @@ [package] -name = "metrics" +name = "nomos-metrics" version = "0.1.0" edition = "2021" -[[example]] -name = "graphql" -path = "examples/graphql.rs" -required-features = ["gql"] +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -axum = { version = "0.6", optional = true } -async-graphql = { version = "5", optional = true, features = ["tracing"] } async-trait = "0.1" -bytes = "1.3" -clap = { version = "4", features = ["derive", "env"], optional = true } futures = "0.3" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } -once_cell = "1.16" -parking_lot = "0.12" -prometheus = "0.13" -serde = { version = "1", features = ["derive"] } -serde_json = "1" -tokio = { version = "1", features = ["sync", "macros", "time"] } +prometheus-client = "0.22.0" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } -tower-http = { version = "0.3", features = ["cors", "trace"], optional = true } -thiserror = "1" - -[features] -default = [] -gql = ["clap", "axum", "async-graphql", "tower-http"] +tokio = { version = "1", features = ["sync", "macros"] } +serde = { version = "1", features = ["derive"] } diff --git a/nomos-services/metrics/README.md b/nomos-services/metrics/README.md deleted file mode 100644 index e69de29b..00000000 diff --git a/nomos-services/metrics/examples/graphql.rs b/nomos-services/metrics/examples/graphql.rs deleted file mode 100644 index c5fe0c06..00000000 --- a/nomos-services/metrics/examples/graphql.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -use clap::Parser; -use metrics::{ - frontend::graphql::{Graphql, GraphqlServerSettings}, - MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId, -}; -use overwatch_rs::{ - overwatch::OverwatchRunner, - services::{ - handle::{ServiceHandle, ServiceStateHandle}, - relay::{NoMessage, Relay}, - state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, - }, -}; - -#[derive(Debug, Clone)] -pub struct ConcurrentMapMetricsBackend(Arc>>); - -#[async_trait::async_trait] -impl MetricsBackend for ConcurrentMapMetricsBackend { - type MetricsData = MetricsData; - type Error = (); - type Settings = &'static [ServiceId]; - - fn init(config: Self::Settings) -> Self { - let mut map = HashMap::with_capacity(config.len()); - for service_id in config { - map.insert(*service_id, MetricsData::default()); - } - Self(Arc::new(Mutex::new(map))) - } - - async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) { - self.0.lock().unwrap().insert(service_id, data); - } - - async fn load(&self, service_id: &OwnedServiceId) -> Option { - self.0.lock().unwrap().get(service_id.as_ref()).cloned() - } -} - -#[derive(Clone)] -pub struct MetricsUpdater { - backend_channel: Relay>, -} - -impl ServiceData for MetricsUpdater { - const SERVICE_ID: ServiceId = "MetricsUpdater"; - - type Settings = (); - - type State = NoState<()>; - - type StateOperator = NoOperator; - - type Message = NoMessage; -} - -#[async_trait::async_trait] -impl + Clone + Send + Sync + 'static> ServiceCore - for MetricsUpdater -where - Backend::MetricsData: async_graphql::OutputType, -{ - fn init(service_state: ServiceStateHandle) -> Result { - let backend_channel: Relay> = - service_state.overwatch_handle.relay(); - Ok(Self { backend_channel }) - } - - async fn run(self) -> Result<(), overwatch_rs::DynError> { - let replay = self.backend_channel.connect().await.map_err(|e| { - tracing::error!(err = ?e, "Metrics Updater: relay connect error"); - e - })?; - let tags = ["foo", "bar", "baz"].iter().cycle(); - for (duration, service_id) in tags.enumerate() { - let message = MetricsMessage::Update { - service_id, - data: MetricsData { - duration: duration as u64, - }, - }; - replay.send(message).await.map_err(|(e, _)| { - tracing::error!(err = ?e, "Metrics Updater: relay send error"); - e - })?; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - Ok(()) - } -} - -#[derive(overwatch_derive::Services)] -struct Services { - graphql: ServiceHandle>, - metrics: ServiceHandle>, - updater: ServiceHandle>, -} - -#[derive(clap::Parser)] -pub struct Args { - #[clap(flatten)] - graphql: GraphqlServerSettings, -} - -#[derive(Debug, Default, Clone, async_graphql::SimpleObject)] -pub struct MetricsData { - duration: u64, -} - -#[derive(Debug, Clone)] -pub enum ParseMetricsDataError { - TryFromSliceError(core::array::TryFromSliceError), -} - -impl std::fmt::Display for ParseMetricsDataError { - fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Ok(()) - } -} - -impl std::error::Error for ParseMetricsDataError {} - -fn main() -> Result<(), Box> { - let settings = Args::parse(); - let graphql = OverwatchRunner::::run( - ServicesServiceSettings { - graphql: settings.graphql, - metrics: &["Foo", "Bar"], - updater: (), - }, - None, - )?; - - tracing_subscriber::fmt::fmt() - .with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned())) - .with_file(false) - .init(); - - graphql.wait_finished(); - Ok(()) -} diff --git a/nomos-services/metrics/src/backend/map.rs b/nomos-services/metrics/src/backend/map.rs deleted file mode 100644 index 7246b11c..00000000 --- a/nomos-services/metrics/src/backend/map.rs +++ /dev/null @@ -1,31 +0,0 @@ -// std -use std::collections::HashMap; -use std::fmt::Debug; -// crates -// internal -use crate::{MetricsBackend, OwnedServiceId}; -use overwatch_rs::services::ServiceId; - -#[derive(Debug, Clone)] -pub struct MapMetricsBackend(HashMap); - -#[async_trait::async_trait] -impl MetricsBackend - for MapMetricsBackend -{ - type MetricsData = MetricsData; - type Error = (); - type Settings = (); - - fn init(_config: Self::Settings) -> Self { - Self(HashMap::new()) - } - - async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) { - self.0.insert(service_id, data); - } - - async fn load(&self, service_id: &OwnedServiceId) -> Option { - self.0.get(service_id.as_ref()).cloned() - } -} diff --git a/nomos-services/metrics/src/backend/mod.rs b/nomos-services/metrics/src/backend/mod.rs deleted file mode 100644 index 1d7f53b0..00000000 --- a/nomos-services/metrics/src/backend/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod map; diff --git a/nomos-services/metrics/src/frontend/graphql/mod.rs b/nomos-services/metrics/src/frontend/graphql/mod.rs deleted file mode 100644 index 39ffd60f..00000000 --- a/nomos-services/metrics/src/frontend/graphql/mod.rs +++ /dev/null @@ -1,185 +0,0 @@ -// std - -// crates -use nomos_http::backends::HttpBackend; -use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; -use nomos_http::http::{handle_graphql_req, HttpMethod, HttpRequest}; -// internal -use crate::{MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId}; -use overwatch_rs::services::relay::{Relay, RelayMessage}; -use overwatch_rs::services::{ - handle::ServiceStateHandle, - state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, -}; - -/// Configuration for the GraphQl Server -#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)] -#[cfg(feature = "gql")] -pub struct GraphqlServerSettings { - /// Max query depth allowed - #[arg( - long = "graphql-max-depth", - default_value_t = 20, - env = "METRICS_GRAPHQL_MAX_DEPTH" - )] - pub max_depth: usize, - /// Max query complexity allowed - #[arg( - long = "graphql-max-complexity", - default_value_t = 1000, - env = "METRICS_GRAPHQL_MAX_COMPLEXITY" - )] - pub max_complexity: usize, - /// Allowed origins for this server deployment requests. - #[arg(long = "graphql-cors-origin")] - pub cors_origins: Vec, -} - -pub fn metrics_graphql_router( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner -where - MB: MetricsBackend + Clone + Send + 'static + Sync, - MB::MetricsData: async_graphql::OutputType, - B: HttpBackend + Send + Sync + 'static, - B::Error: std::error::Error + Send + Sync + 'static, -{ - Box::new(Box::pin(async move { - // TODO: Graphql supports http GET requests, should nomos support that? - let (relay, mut res_rx) = - build_http_bridge::, B, _>(handle, HttpMethod::POST, "") - .await - .unwrap(); - - while let Some(HttpRequest { - query: _, - payload, - res_tx, - }) = res_rx.recv().await - { - let res = match handle_graphql_req(payload, relay.clone(), |req, tx| { - Ok(GraphqlMetricsMessage { - req, - reply_channel: tx, - }) - }) - .await - { - Ok(r) => r, - Err(err) => { - tracing::error!(err); - err.to_string() - } - }; - - res_tx.send(Ok(res.into())).await.unwrap(); - } - Ok(()) - })) -} - -#[derive(Debug)] -pub struct GraphqlMetricsMessage { - req: async_graphql::Request, - reply_channel: tokio::sync::oneshot::Sender, -} - -impl RelayMessage for GraphqlMetricsMessage {} - -pub struct Graphql -where - Backend: MetricsBackend + Send + Sync + 'static, - Backend::MetricsData: async_graphql::OutputType, -{ - service_state: Option>, - settings: GraphqlServerSettings, - backend_channel: Relay>, -} - -impl ServiceData for Graphql -where - Backend: MetricsBackend + Send + Sync + 'static, - Backend::MetricsData: async_graphql::OutputType, -{ - const SERVICE_ID: ServiceId = "GraphqlMetrics"; - - type Settings = GraphqlServerSettings; - - type State = NoState; - - type StateOperator = NoOperator; - - type Message = GraphqlMetricsMessage; -} - -#[async_graphql::Object] -impl Graphql -where - Backend::MetricsData: async_graphql::OutputType, -{ - pub async fn load( - &self, - service_id: OwnedServiceId, - ) -> async_graphql::Result::MetricsData>> { - let replay = self - .backend_channel - .clone() - .connect() - .await - .map_err(|e| async_graphql::Error::new(e.to_string()))?; - let (tx, rx) = tokio::sync::oneshot::channel(); - replay - .send(MetricsMessage::Load { - service_id, - reply_channel: tx, - }) - .await - .map_err(|(e, _)| async_graphql::Error::new(e.to_string()))?; - rx.await.map_err(|e| { - tracing::error!(err = ?e, "GraphqlMetrics: recv error"); - async_graphql::Error::new("GraphqlMetrics: recv error") - }) - } -} - -#[async_trait::async_trait] -impl ServiceCore for Graphql -where - Backend::MetricsData: async_graphql::OutputType, -{ - fn init(service_state: ServiceStateHandle) -> Result { - let backend_channel: Relay> = - service_state.overwatch_handle.relay(); - let settings = service_state.settings_reader.get_updated_settings(); - Ok(Self { - settings, - service_state: Some(service_state), - backend_channel, - }) - } - - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { - let max_complexity = self.settings.max_complexity; - let max_depth = self.settings.max_depth; - let mut inbound_relay = self.service_state.take().unwrap().inbound_relay; - let schema = async_graphql::Schema::build( - self, - async_graphql::EmptyMutation, - async_graphql::EmptySubscription, - ) - .limit_complexity(max_complexity) - .limit_depth(max_depth) - .extension(async_graphql::extensions::Tracing) - .finish(); - - tracing::info!(schema = %schema.sdl(), "GraphQL schema definition"); - - while let Some(msg) = inbound_relay.recv().await { - let res = schema.execute(msg.req).await; - msg.reply_channel.send(res).unwrap(); - } - - Ok(()) - } -} diff --git a/nomos-services/metrics/src/frontend/mod.rs b/nomos-services/metrics/src/frontend/mod.rs deleted file mode 100644 index a36a1b55..00000000 --- a/nomos-services/metrics/src/frontend/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(feature = "gql")] -pub mod graphql; diff --git a/nomos-services/metrics/src/lib.rs b/nomos-services/metrics/src/lib.rs index bfa99564..d9461b38 100644 --- a/nomos-services/metrics/src/lib.rs +++ b/nomos-services/metrics/src/lib.rs @@ -1,155 +1,107 @@ -// std -use std::fmt::Debug; +pub use prometheus_client::{self, *}; +// std +use std::fmt::{Debug, Error, Formatter}; +use std::sync::{Arc, Mutex}; // crates use futures::StreamExt; -use tracing::error; - -// internal use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::RelayMessage, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, }; +use prometheus_client::encoding::text::encode; +use prometheus_client::registry::Registry; +use tokio::sync::oneshot::Sender; +use tracing::error; +// internal -pub mod backend; -pub mod frontend; -pub mod types; +// A wrapper for prometheus_client Registry. +// Lock is only used during services initialization and prometheus pull query. +pub type NomosRegistry = Arc>; -#[async_trait::async_trait] -pub trait MetricsBackend { - type MetricsData: Clone + Send + Sync + Debug + 'static; - type Error: Send + Sync; - type Settings: Clone + Send + Sync + 'static; - fn init(config: Self::Settings) -> Self; - async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData); - async fn load(&self, service_id: &OwnedServiceId) -> Option; -} - -pub struct MetricsService { +pub struct Metrics { service_state: ServiceStateHandle, - backend: Backend, + registry: NomosRegistry, } -#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)] -pub struct OwnedServiceId { - id: String, +#[derive(Clone, Debug)] +pub struct MetricsSettings { + pub registry: Option, } -impl From for OwnedServiceId { - fn from(id: ServiceId) -> Self { - Self { id: id.into() } +pub enum MetricsMsg { + Gather { reply_channel: Sender }, +} + +impl RelayMessage for MetricsMsg {} + +impl Debug for MetricsMsg { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + match self { + Self::Gather { .. } => { + write!(f, "MetricsMsg::Gather") + } + } } } -impl From for OwnedServiceId { - fn from(id: String) -> Self { - Self { id } - } -} - -impl core::fmt::Display for OwnedServiceId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.id) - } -} - -impl AsRef for OwnedServiceId { - fn as_ref(&self) -> &str { - &self.id - } -} - -#[cfg(feature = "async-graphql")] -impl async_graphql::InputType for OwnedServiceId { - type RawValueType = Self; - - fn type_name() -> std::borrow::Cow<'static, str> { - ::type_name() - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - ::create_type_info(registry) - } - - fn parse( - value: Option, - ) -> async_graphql::InputValueResult { - ::parse(value) - .map(Self::from) - .map_err(async_graphql::InputValueError::propagate) - } - - fn to_value(&self) -> async_graphql::Value { - ::to_value(&self.id) - } - - fn as_raw_value(&self) -> Option<&Self::RawValueType> { - Some(self) - } -} - -#[derive(Debug)] -pub enum MetricsMessage { - Load { - service_id: OwnedServiceId, - reply_channel: tokio::sync::oneshot::Sender>, - }, - Update { - service_id: ServiceId, - data: Data, - }, -} - -impl RelayMessage for MetricsMessage {} - -impl ServiceData for MetricsService { - const SERVICE_ID: ServiceId = "Metrics"; - type Settings = Backend::Settings; +impl ServiceData for Metrics { + const SERVICE_ID: &'static str = "Metrics"; + type Settings = MetricsSettings; type State = NoState; type StateOperator = NoOperator; - type Message = MetricsMessage; + type Message = MetricsMsg; } -impl MetricsService { - async fn handle_load( - backend: &Backend, - service_id: &OwnedServiceId, - reply_channel: tokio::sync::oneshot::Sender>, - ) { - let metrics = backend.load(service_id).await; - if let Err(e) = reply_channel.send(metrics) { - tracing::error!( - "Failed to send metric data for service: {service_id}. data: {:?}", - e - ); - } +#[async_trait::async_trait] +impl ServiceCore for Metrics { + fn init(service_state: ServiceStateHandle) -> Result { + let config = service_state.settings_reader.get_updated_settings(); + + Ok(Self { + service_state, + registry: config.registry.ok_or("No registry provided")?, + }) } - async fn handle_update( - backend: &mut Backend, - service_id: &ServiceId, - metrics: Backend::MetricsData, - ) { - backend.update(service_id, metrics).await; - } + async fn run(self) -> Result<(), overwatch_rs::DynError> { + let Self { + mut service_state, + registry, + } = self; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + loop { + tokio::select! { + Some(msg) = service_state.inbound_relay.recv() => { + let MetricsMsg::Gather{reply_channel} = msg; - async fn handle_message(message: MetricsMessage, backend: &mut Backend) { - match message { - MetricsMessage::Load { - service_id, - reply_channel, - } => { - MetricsService::handle_load(backend, &service_id, reply_channel).await; - } - MetricsMessage::Update { service_id, data } => { - MetricsService::handle_update(backend, &service_id, data).await; + let mut buf = String::new(); + { + let reg = registry.lock().unwrap(); + // If encoding fails, we need to stop trying process subsequent metrics gather + // requests. If it succeds, encode method returns empty unit type. + _ = encode(&mut buf, ®).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + } + + reply_channel + .send(buf) + .unwrap_or_else(|_| tracing::debug!("could not send back metrics")); + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } } } + Ok(()) } +} +impl Metrics { async fn should_stop_service(message: LifecycleMessage) -> bool { match message { LifecycleMessage::Shutdown(sender) => { @@ -165,41 +117,3 @@ impl MetricsService { } } } - -#[async_trait::async_trait] -impl ServiceCore for MetricsService { - fn init(service_state: ServiceStateHandle) -> Result { - let settings = service_state.settings_reader.get_updated_settings(); - let backend = Backend::init(settings); - Ok(Self { - service_state, - backend, - }) - } - - async fn run(self) -> Result<(), overwatch_rs::DynError> { - let Self { - service_state: - ServiceStateHandle { - mut inbound_relay, - lifecycle_handle, - .. - }, - mut backend, - } = self; - let mut lifecycle_stream = lifecycle_handle.message_stream(); - loop { - tokio::select! { - Some(message) = inbound_relay.recv() => { - Self::handle_message(message, &mut backend).await; - } - Some(message) = lifecycle_stream.next() => { - if Self::should_stop_service(message).await { - break; - } - } - } - } - Ok(()) - } -} diff --git a/nomos-services/metrics/src/types.rs b/nomos-services/metrics/src/types.rs deleted file mode 100644 index bc3d0ab0..00000000 --- a/nomos-services/metrics/src/types.rs +++ /dev/null @@ -1,370 +0,0 @@ -use ::core::ops::{Deref, DerefMut}; -#[cfg(feature = "async-graphql")] -use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value}; -use prometheus::HistogramOpts; -pub use prometheus::{ - core::{self, Atomic, GenericCounter as PrometheusGenericCounter}, - labels, opts, Opts, -}; - -#[derive(Debug, Clone)] -pub struct MetricsData { - ty: MetricDataType, - id: String, -} - -impl MetricsData { - #[inline] - pub fn new(ty: MetricDataType, id: String) -> Self { - Self { ty, id } - } - - #[inline] - pub fn ty(&self) -> &MetricDataType { - &self.ty - } - - #[inline] - pub fn id(&self) -> &str { - &self.id - } -} - -#[derive(Debug, Clone)] -pub enum MetricDataType { - IntCounter(IntCounter), - Counter(Counter), - IntGauge(IntGauge), - Gauge(Gauge), - Histogram(Histogram), -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for MetricDataType { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("MetricDataType") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::( - async_graphql::registry::MetaTypeId::Union, - |registry| async_graphql::registry::MetaType::Union { - name: Self::type_name().to_string(), - description: None, - possible_types: { - let mut map = async_graphql::indexmap::IndexSet::new(); - map.insert(::create_type_info( - registry, - )); - map.insert(::create_type_info( - registry, - )); - map.insert(::create_type_info( - registry, - )); - map.insert(::create_type_info( - registry, - )); - map.insert(::create_type_info( - registry, - )); - map - }, - visible: None, - inaccessible: false, - tags: Vec::new(), - rust_typename: Some(std::any::type_name::()), - }, - ) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - match self { - Self::IntCounter(v) => v.resolve(ctx, field).await, - Self::Counter(v) => v.resolve(ctx, field).await, - Self::IntGauge(v) => v.resolve(ctx, field).await, - Self::Gauge(v) => v.resolve(ctx, field).await, - Self::Histogram(v) => v.resolve(ctx, field).await, - } - } -} - -#[derive(Debug, Clone)] -pub struct GenericGauge { - val: core::GenericGauge, -} - -impl Deref for GenericGauge { - type Target = core::GenericGauge; - - fn deref(&self) -> &Self::Target { - &self.val - } -} - -impl DerefMut for GenericGauge { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.val - } -} - -impl GenericGauge { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - core::GenericGauge::::new(name, help).map(|v| Self { val: v }) - } - - pub fn with_opts(opts: Opts) -> Result { - core::GenericGauge::::with_opts(opts).map(|v| Self { val: v }) - } -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for GenericGauge -where - T: Atomic, - ::T: async_graphql::OutputType, -{ - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("GenericGauge") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, - } - }) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - <::T as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await - } -} - -#[derive(Debug, Clone)] -pub struct GenericCounter { - ctr: core::GenericCounter, -} - -impl Deref for GenericCounter { - type Target = core::GenericCounter; - - fn deref(&self) -> &Self::Target { - &self.ctr - } -} - -impl DerefMut for GenericCounter { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.ctr - } -} - -impl GenericCounter { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - core::GenericCounter::::new(name, help).map(|ctr| Self { ctr }) - } - - pub fn with_opts(opts: Opts) -> Result { - core::GenericCounter::::with_opts(opts).map(|ctr| Self { ctr }) - } -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for GenericCounter -where - T: Atomic, - ::T: async_graphql::OutputType, -{ - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("GenericCounter") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, - } - }) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - <::T as async_graphql::OutputType>::resolve(&self.ctr.get(), ctx, field).await - } -} - -#[derive(Debug, Clone)] -pub struct Histogram { - val: prometheus::Histogram, -} - -impl Histogram { - pub fn with_opts(opts: HistogramOpts) -> Result { - prometheus::Histogram::with_opts(opts).map(|val| Self { val }) - } -} - -impl Deref for Histogram { - type Target = prometheus::Histogram; - - fn deref(&self) -> &Self::Target { - &self.val - } -} - -impl DerefMut for Histogram { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.val - } -} - -#[cfg(feature = "async-graphql")] -#[derive(async_graphql::SimpleObject, Debug, Clone, Copy)] -#[graphql(name = "Histogram")] -struct HistogramSample { - count: u64, - sum: f64, -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for Histogram { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("Histogram") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - ::create_type_info(registry) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - let sample = HistogramSample { - count: self.val.get_sample_count(), - sum: self.val.get_sample_sum(), - }; - - ::resolve(&sample, ctx, field).await - } -} - -macro_rules! metric_typ { - ($($ty: ident::$setter:ident($primitive:ident)::$name: literal), +$(,)?) => { - $( - #[derive(Clone)] - pub struct $ty { - val: prometheus::$ty, - } - - impl std::fmt::Debug for $ty { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - ::fmt(&self.val, f) - } - } - - impl $ty { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - prometheus::$ty::new(name, help).map(|val| Self { - val, - }) - } - - pub fn with_opts(opts: Opts) -> Result { - prometheus::$ty::with_opts(opts.clone()).map(|val| Self { - val, - }) - } - } - - impl Deref for $ty { - type Target = prometheus::$ty; - - fn deref(&self) -> &Self::Target { - &self.val - } - } - - impl DerefMut for $ty { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.val - } - } - - #[cfg(feature = "async-graphql")] - #[async_trait::async_trait] - impl async_graphql::OutputType for $ty { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed($name) - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, - } - }) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - <$primitive as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await - } - } - )* - }; -} - -metric_typ! { - IntCounter::inc_by(u64)::"IntCounter", - Counter::inc_by(f64)::"Counter", - IntGauge::set(i64)::"IntGauge", - Gauge::set(f64)::"Gauge", -} diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 27440b49..dbd95198 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -52,4 +52,3 @@ path = "src/tests/cli.rs" [features] mixnet = ["nomos-network/mixnet"] -metrics = ["nomos-node/metrics"]