Metrics to nomos services (#623)

* Remove unused metrics crate from services

* Move prometheus metrics to nomos-services
This commit is contained in:
gusto 2024-03-22 14:03:35 +02:00 committed by GitHub
parent de6138fc27
commit b341a11eb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 84 additions and 1062 deletions

View File

@ -3,14 +3,13 @@ members = [
"nomos-core",
"nomos-libp2p",
"nomos-services/api",
"nomos-metrics",
"nomos-services/log",
"nomos-services/metrics",
"nomos-services/network",
"nomos-services/storage",
"nomos-services/carnot-consensus",
"nomos-services/cryptarchia-consensus",
"nomos-services/mempool",
"nomos-services/metrics",
"nomos-services/data-availability",
"nomos-services/system-sig",
"nomos-da/reed-solomon",

View File

@ -26,9 +26,8 @@ nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = [
"mock",
"libp2p",
"metrics",
] }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = [
@ -40,7 +39,6 @@ nomos-da = { path = "../../nomos-services/data-availability", features = [
"libp2p",
] }
nomos-system-sig = { path = "../../nomos-services/system-sig" }
metrics = { path = "../../nomos-services/metrics", optional = true }
tracing-subscriber = "0.3"
carnot-engine = { path = "../../consensus/carnot-engine" }
tokio = { version = "1.24", features = ["sync"] }
@ -60,3 +58,4 @@ tower-http = { version = "0.4", features = ["cors", "trace"] }
[features]
default = []
mixnet = ["nomos-network/mixnet"]
metrics = []

View File

@ -1,16 +0,0 @@
[package]
name = "nomos-metrics"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
futures = "0.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
prometheus-client = "0.22.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] }
serde = { version = "1", features = ["derive"] }

View File

@ -1,119 +0,0 @@
pub use prometheus_client::{self, *};
// std
use std::fmt::{Debug, Error, Formatter};
use std::sync::{Arc, Mutex};
// crates
use futures::StreamExt;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::RelayMessage,
state::{NoOperator, NoState},
ServiceCore, ServiceData,
};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use tokio::sync::oneshot::Sender;
use tracing::error;
// internal
// A wrapper for prometheus_client Registry.
// Lock is only used during services initialization and prometheus pull query.
pub type NomosRegistry = Arc<Mutex<Registry>>;
pub struct Metrics {
service_state: ServiceStateHandle<Self>,
registry: NomosRegistry,
}
#[derive(Clone, Debug)]
pub struct MetricsSettings {
pub registry: Option<NomosRegistry>,
}
pub enum MetricsMsg {
Gather { reply_channel: Sender<String> },
}
impl RelayMessage for MetricsMsg {}
impl Debug for MetricsMsg {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::Gather { .. } => {
write!(f, "MetricsMsg::Gather")
}
}
}
}
impl ServiceData for Metrics {
const SERVICE_ID: &'static str = "Metrics";
type Settings = MetricsSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MetricsMsg;
}
#[async_trait::async_trait]
impl ServiceCore for Metrics {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let config = service_state.settings_reader.get_updated_settings();
Ok(Self {
service_state,
registry: config.registry.ok_or("No registry provided")?,
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
registry,
} = self;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
let MetricsMsg::Gather{reply_channel} = msg;
let mut buf = String::new();
{
let reg = registry.lock().unwrap();
// If encoding fails, we need to stop trying process subsequent metrics gather
// requests. If it succeds, encode method returns empty unit type.
_ = encode(&mut buf, &reg).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
}
reply_channel
.send(buf)
.unwrap_or_else(|_| tracing::debug!("could not send back metrics"));
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl Metrics {
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
}

View File

@ -23,7 +23,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
"libp2p",
"openapi",
] }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
full-replication = { path = "../../nomos-da/full-replication" }

View File

@ -10,7 +10,7 @@ async-trait = "0.1"
bincode = { version = "2.0.0-rc.2", features = ["serde"] }
futures = "0.3"
linked-hash-map = { version = "0.5.6", optional = true }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-network = { path = "../network" }
nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }

View File

@ -1,33 +1,16 @@
[package]
name = "metrics"
name = "nomos-metrics"
version = "0.1.0"
edition = "2021"
[[example]]
name = "graphql"
path = "examples/graphql.rs"
required-features = ["gql"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = { version = "0.6", optional = true }
async-graphql = { version = "5", optional = true, features = ["tracing"] }
async-trait = "0.1"
bytes = "1.3"
clap = { version = "4", features = ["derive", "env"], optional = true }
futures = "0.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
once_cell = "1.16"
parking_lot = "0.12"
prometheus = "0.13"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["sync", "macros", "time"] }
prometheus-client = "0.22.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
thiserror = "1"
[features]
default = []
gql = ["clap", "axum", "async-graphql", "tower-http"]
tokio = { version = "1", features = ["sync", "macros"] }
serde = { version = "1", features = ["derive"] }

View File

@ -1,148 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use clap::Parser;
use metrics::{
frontend::graphql::{Graphql, GraphqlServerSettings},
MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId,
};
use overwatch_rs::{
overwatch::OverwatchRunner,
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::{NoMessage, Relay},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
},
};
#[derive(Debug, Clone)]
pub struct ConcurrentMapMetricsBackend(Arc<Mutex<HashMap<ServiceId, MetricsData>>>);
#[async_trait::async_trait]
impl MetricsBackend for ConcurrentMapMetricsBackend {
type MetricsData = MetricsData;
type Error = ();
type Settings = &'static [ServiceId];
fn init(config: Self::Settings) -> Self {
let mut map = HashMap::with_capacity(config.len());
for service_id in config {
map.insert(*service_id, MetricsData::default());
}
Self(Arc::new(Mutex::new(map)))
}
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) {
self.0.lock().unwrap().insert(service_id, data);
}
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData> {
self.0.lock().unwrap().get(service_id.as_ref()).cloned()
}
}
#[derive(Clone)]
pub struct MetricsUpdater<Backend: MetricsBackend + Send + Sync + 'static> {
backend_channel: Relay<MetricsService<Backend>>,
}
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceData for MetricsUpdater<Backend> {
const SERVICE_ID: ServiceId = "MetricsUpdater";
type Settings = ();
type State = NoState<()>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl<Backend: MetricsBackend<MetricsData = MetricsData> + Clone + Send + Sync + 'static> ServiceCore
for MetricsUpdater<Backend>
where
Backend::MetricsData: async_graphql::OutputType,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let backend_channel: Relay<MetricsService<Backend>> =
service_state.overwatch_handle.relay();
Ok(Self { backend_channel })
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let replay = self.backend_channel.connect().await.map_err(|e| {
tracing::error!(err = ?e, "Metrics Updater: relay connect error");
e
})?;
let tags = ["foo", "bar", "baz"].iter().cycle();
for (duration, service_id) in tags.enumerate() {
let message = MetricsMessage::Update {
service_id,
data: MetricsData {
duration: duration as u64,
},
};
replay.send(message).await.map_err(|(e, _)| {
tracing::error!(err = ?e, "Metrics Updater: relay send error");
e
})?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
}
}
#[derive(overwatch_derive::Services)]
struct Services {
graphql: ServiceHandle<Graphql<ConcurrentMapMetricsBackend>>,
metrics: ServiceHandle<MetricsService<ConcurrentMapMetricsBackend>>,
updater: ServiceHandle<MetricsUpdater<ConcurrentMapMetricsBackend>>,
}
#[derive(clap::Parser)]
pub struct Args {
#[clap(flatten)]
graphql: GraphqlServerSettings,
}
#[derive(Debug, Default, Clone, async_graphql::SimpleObject)]
pub struct MetricsData {
duration: u64,
}
#[derive(Debug, Clone)]
pub enum ParseMetricsDataError {
TryFromSliceError(core::array::TryFromSliceError),
}
impl std::fmt::Display for ParseMetricsDataError {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl std::error::Error for ParseMetricsDataError {}
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let settings = Args::parse();
let graphql = OverwatchRunner::<Services>::run(
ServicesServiceSettings {
graphql: settings.graphql,
metrics: &["Foo", "Bar"],
updater: (),
},
None,
)?;
tracing_subscriber::fmt::fmt()
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()))
.with_file(false)
.init();
graphql.wait_finished();
Ok(())
}

View File

@ -1,31 +0,0 @@
// std
use std::collections::HashMap;
use std::fmt::Debug;
// crates
// internal
use crate::{MetricsBackend, OwnedServiceId};
use overwatch_rs::services::ServiceId;
#[derive(Debug, Clone)]
pub struct MapMetricsBackend<MetricsData>(HashMap<ServiceId, MetricsData>);
#[async_trait::async_trait]
impl<MetricsData: Clone + Debug + Send + Sync + 'static> MetricsBackend
for MapMetricsBackend<MetricsData>
{
type MetricsData = MetricsData;
type Error = ();
type Settings = ();
fn init(_config: Self::Settings) -> Self {
Self(HashMap::new())
}
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) {
self.0.insert(service_id, data);
}
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData> {
self.0.get(service_id.as_ref()).cloned()
}
}

View File

@ -1 +0,0 @@
pub mod map;

View File

@ -1,185 +0,0 @@
// std
// crates
use nomos_http::backends::HttpBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{handle_graphql_req, HttpMethod, HttpRequest};
// internal
use crate::{MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId};
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::{
handle::ServiceStateHandle,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
/// Configuration for the GraphQl Server
#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)]
#[cfg(feature = "gql")]
pub struct GraphqlServerSettings {
/// Max query depth allowed
#[arg(
long = "graphql-max-depth",
default_value_t = 20,
env = "METRICS_GRAPHQL_MAX_DEPTH"
)]
pub max_depth: usize,
/// Max query complexity allowed
#[arg(
long = "graphql-max-complexity",
default_value_t = 1000,
env = "METRICS_GRAPHQL_MAX_COMPLEXITY"
)]
pub max_complexity: usize,
/// Allowed origins for this server deployment requests.
#[arg(long = "graphql-cors-origin")]
pub cors_origins: Vec<String>,
}
pub fn metrics_graphql_router<MB, B>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner
where
MB: MetricsBackend + Clone + Send + 'static + Sync,
MB::MetricsData: async_graphql::OutputType,
B: HttpBackend + Send + Sync + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
{
Box::new(Box::pin(async move {
// TODO: Graphql supports http GET requests, should nomos support that?
let (relay, mut res_rx) =
build_http_bridge::<Graphql<MB>, B, _>(handle, HttpMethod::POST, "")
.await
.unwrap();
while let Some(HttpRequest {
query: _,
payload,
res_tx,
}) = res_rx.recv().await
{
let res = match handle_graphql_req(payload, relay.clone(), |req, tx| {
Ok(GraphqlMetricsMessage {
req,
reply_channel: tx,
})
})
.await
{
Ok(r) => r,
Err(err) => {
tracing::error!(err);
err.to_string()
}
};
res_tx.send(Ok(res.into())).await.unwrap();
}
Ok(())
}))
}
#[derive(Debug)]
pub struct GraphqlMetricsMessage {
req: async_graphql::Request,
reply_channel: tokio::sync::oneshot::Sender<async_graphql::Response>,
}
impl RelayMessage for GraphqlMetricsMessage {}
pub struct Graphql<Backend>
where
Backend: MetricsBackend + Send + Sync + 'static,
Backend::MetricsData: async_graphql::OutputType,
{
service_state: Option<ServiceStateHandle<Self>>,
settings: GraphqlServerSettings,
backend_channel: Relay<MetricsService<Backend>>,
}
impl<Backend> ServiceData for Graphql<Backend>
where
Backend: MetricsBackend + Send + Sync + 'static,
Backend::MetricsData: async_graphql::OutputType,
{
const SERVICE_ID: ServiceId = "GraphqlMetrics";
type Settings = GraphqlServerSettings;
type State = NoState<GraphqlServerSettings>;
type StateOperator = NoOperator<Self::State>;
type Message = GraphqlMetricsMessage;
}
#[async_graphql::Object]
impl<Backend: MetricsBackend + Send + Sync + 'static> Graphql<Backend>
where
Backend::MetricsData: async_graphql::OutputType,
{
pub async fn load(
&self,
service_id: OwnedServiceId,
) -> async_graphql::Result<Option<<Backend as MetricsBackend>::MetricsData>> {
let replay = self
.backend_channel
.clone()
.connect()
.await
.map_err(|e| async_graphql::Error::new(e.to_string()))?;
let (tx, rx) = tokio::sync::oneshot::channel();
replay
.send(MetricsMessage::Load {
service_id,
reply_channel: tx,
})
.await
.map_err(|(e, _)| async_graphql::Error::new(e.to_string()))?;
rx.await.map_err(|e| {
tracing::error!(err = ?e, "GraphqlMetrics: recv error");
async_graphql::Error::new("GraphqlMetrics: recv error")
})
}
}
#[async_trait::async_trait]
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for Graphql<Backend>
where
Backend::MetricsData: async_graphql::OutputType,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let backend_channel: Relay<MetricsService<Backend>> =
service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
Ok(Self {
settings,
service_state: Some(service_state),
backend_channel,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let max_complexity = self.settings.max_complexity;
let max_depth = self.settings.max_depth;
let mut inbound_relay = self.service_state.take().unwrap().inbound_relay;
let schema = async_graphql::Schema::build(
self,
async_graphql::EmptyMutation,
async_graphql::EmptySubscription,
)
.limit_complexity(max_complexity)
.limit_depth(max_depth)
.extension(async_graphql::extensions::Tracing)
.finish();
tracing::info!(schema = %schema.sdl(), "GraphQL schema definition");
while let Some(msg) = inbound_relay.recv().await {
let res = schema.execute(msg.req).await;
msg.reply_channel.send(res).unwrap();
}
Ok(())
}
}

View File

@ -1,2 +0,0 @@
#[cfg(feature = "gql")]
pub mod graphql;

View File

@ -1,155 +1,107 @@
// std
use std::fmt::Debug;
pub use prometheus_client::{self, *};
// std
use std::fmt::{Debug, Error, Formatter};
use std::sync::{Arc, Mutex};
// crates
use futures::StreamExt;
use tracing::error;
// internal
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::RelayMessage,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData,
};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use tokio::sync::oneshot::Sender;
use tracing::error;
// internal
pub mod backend;
pub mod frontend;
pub mod types;
// A wrapper for prometheus_client Registry.
// Lock is only used during services initialization and prometheus pull query.
pub type NomosRegistry = Arc<Mutex<Registry>>;
#[async_trait::async_trait]
pub trait MetricsBackend {
type MetricsData: Clone + Send + Sync + Debug + 'static;
type Error: Send + Sync;
type Settings: Clone + Send + Sync + 'static;
fn init(config: Self::Settings) -> Self;
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData);
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData>;
}
pub struct MetricsService<Backend: MetricsBackend> {
pub struct Metrics {
service_state: ServiceStateHandle<Self>,
backend: Backend,
registry: NomosRegistry,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct OwnedServiceId {
id: String,
#[derive(Clone, Debug)]
pub struct MetricsSettings {
pub registry: Option<NomosRegistry>,
}
impl From<ServiceId> for OwnedServiceId {
fn from(id: ServiceId) -> Self {
Self { id: id.into() }
pub enum MetricsMsg {
Gather { reply_channel: Sender<String> },
}
impl RelayMessage for MetricsMsg {}
impl Debug for MetricsMsg {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::Gather { .. } => {
write!(f, "MetricsMsg::Gather")
}
}
}
}
impl From<String> for OwnedServiceId {
fn from(id: String) -> Self {
Self { id }
}
}
impl core::fmt::Display for OwnedServiceId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)
}
}
impl AsRef<str> for OwnedServiceId {
fn as_ref(&self) -> &str {
&self.id
}
}
#[cfg(feature = "async-graphql")]
impl async_graphql::InputType for OwnedServiceId {
type RawValueType = Self;
fn type_name() -> std::borrow::Cow<'static, str> {
<String as async_graphql::InputType>::type_name()
}
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
<String as async_graphql::InputType>::create_type_info(registry)
}
fn parse(
value: Option<async_graphql::Value>,
) -> async_graphql::InputValueResult<OwnedServiceId> {
<String as async_graphql::InputType>::parse(value)
.map(Self::from)
.map_err(async_graphql::InputValueError::propagate)
}
fn to_value(&self) -> async_graphql::Value {
<String as async_graphql::InputType>::to_value(&self.id)
}
fn as_raw_value(&self) -> Option<&Self::RawValueType> {
Some(self)
}
}
#[derive(Debug)]
pub enum MetricsMessage<Data> {
Load {
service_id: OwnedServiceId,
reply_channel: tokio::sync::oneshot::Sender<Option<Data>>,
},
Update {
service_id: ServiceId,
data: Data,
},
}
impl<Data: Send + Sync + 'static> RelayMessage for MetricsMessage<Data> {}
impl<Backend: MetricsBackend> ServiceData for MetricsService<Backend> {
const SERVICE_ID: ServiceId = "Metrics";
type Settings = Backend::Settings;
impl ServiceData for Metrics {
const SERVICE_ID: &'static str = "Metrics";
type Settings = MetricsSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MetricsMessage<Backend::MetricsData>;
type Message = MetricsMsg;
}
impl<Backend: MetricsBackend> MetricsService<Backend> {
async fn handle_load(
backend: &Backend,
service_id: &OwnedServiceId,
reply_channel: tokio::sync::oneshot::Sender<Option<Backend::MetricsData>>,
) {
let metrics = backend.load(service_id).await;
if let Err(e) = reply_channel.send(metrics) {
tracing::error!(
"Failed to send metric data for service: {service_id}. data: {:?}",
e
);
}
#[async_trait::async_trait]
impl ServiceCore for Metrics {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let config = service_state.settings_reader.get_updated_settings();
Ok(Self {
service_state,
registry: config.registry.ok_or("No registry provided")?,
})
}
async fn handle_update(
backend: &mut Backend,
service_id: &ServiceId,
metrics: Backend::MetricsData,
) {
backend.update(service_id, metrics).await;
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut service_state,
registry,
} = self;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
let MetricsMsg::Gather{reply_channel} = msg;
async fn handle_message(message: MetricsMessage<Backend::MetricsData>, backend: &mut Backend) {
match message {
MetricsMessage::Load {
service_id,
reply_channel,
} => {
MetricsService::handle_load(backend, &service_id, reply_channel).await;
}
MetricsMessage::Update { service_id, data } => {
MetricsService::handle_update(backend, &service_id, data).await;
let mut buf = String::new();
{
let reg = registry.lock().unwrap();
// If encoding fails, we need to stop trying process subsequent metrics gather
// requests. If it succeds, encode method returns empty unit type.
_ = encode(&mut buf, &reg).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
}
reply_channel
.send(buf)
.unwrap_or_else(|_| tracing::debug!("could not send back metrics"));
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl Metrics {
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
@ -165,41 +117,3 @@ impl<Backend: MetricsBackend> MetricsService<Backend> {
}
}
}
#[async_trait::async_trait]
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for MetricsService<Backend> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let settings = service_state.settings_reader.get_updated_settings();
let backend = Backend::init(settings);
Ok(Self {
service_state,
backend,
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state:
ServiceStateHandle {
mut inbound_relay,
lifecycle_handle,
..
},
mut backend,
} = self;
let mut lifecycle_stream = lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(message) = inbound_relay.recv() => {
Self::handle_message(message, &mut backend).await;
}
Some(message) = lifecycle_stream.next() => {
if Self::should_stop_service(message).await {
break;
}
}
}
}
Ok(())
}
}

View File

@ -1,370 +0,0 @@
use ::core::ops::{Deref, DerefMut};
#[cfg(feature = "async-graphql")]
use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value};
use prometheus::HistogramOpts;
pub use prometheus::{
core::{self, Atomic, GenericCounter as PrometheusGenericCounter},
labels, opts, Opts,
};
#[derive(Debug, Clone)]
pub struct MetricsData {
ty: MetricDataType,
id: String,
}
impl MetricsData {
#[inline]
pub fn new(ty: MetricDataType, id: String) -> Self {
Self { ty, id }
}
#[inline]
pub fn ty(&self) -> &MetricDataType {
&self.ty
}
#[inline]
pub fn id(&self) -> &str {
&self.id
}
}
#[derive(Debug, Clone)]
pub enum MetricDataType {
IntCounter(IntCounter),
Counter(Counter),
IntGauge(IntGauge),
Gauge(Gauge),
Histogram(Histogram),
}
#[cfg(feature = "async-graphql")]
#[async_trait::async_trait]
impl async_graphql::OutputType for MetricDataType {
fn type_name() -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("MetricDataType")
}
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
registry.create_output_type::<Self, _>(
async_graphql::registry::MetaTypeId::Union,
|registry| async_graphql::registry::MetaType::Union {
name: Self::type_name().to_string(),
description: None,
possible_types: {
let mut map = async_graphql::indexmap::IndexSet::new();
map.insert(<IntCounter as async_graphql::OutputType>::create_type_info(
registry,
));
map.insert(<Counter as async_graphql::OutputType>::create_type_info(
registry,
));
map.insert(<IntGauge as async_graphql::OutputType>::create_type_info(
registry,
));
map.insert(<Gauge as async_graphql::OutputType>::create_type_info(
registry,
));
map.insert(<Histogram as async_graphql::OutputType>::create_type_info(
registry,
));
map
},
visible: None,
inaccessible: false,
tags: Vec::new(),
rust_typename: Some(std::any::type_name::<Self>()),
},
)
}
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
match self {
Self::IntCounter(v) => v.resolve(ctx, field).await,
Self::Counter(v) => v.resolve(ctx, field).await,
Self::IntGauge(v) => v.resolve(ctx, field).await,
Self::Gauge(v) => v.resolve(ctx, field).await,
Self::Histogram(v) => v.resolve(ctx, field).await,
}
}
}
#[derive(Debug, Clone)]
pub struct GenericGauge<T: Atomic> {
val: core::GenericGauge<T>,
}
impl<T: Atomic> Deref for GenericGauge<T> {
type Target = core::GenericGauge<T>;
fn deref(&self) -> &Self::Target {
&self.val
}
}
impl<T: Atomic> DerefMut for GenericGauge<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.val
}
}
impl<T: Atomic> GenericGauge<T> {
pub fn new<S1: Into<String>, S2: Into<String>>(
name: S1,
help: S2,
) -> Result<Self, prometheus::Error> {
core::GenericGauge::<T>::new(name, help).map(|v| Self { val: v })
}
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
core::GenericGauge::<T>::with_opts(opts).map(|v| Self { val: v })
}
}
#[cfg(feature = "async-graphql")]
#[async_trait::async_trait]
impl<T> async_graphql::OutputType for GenericGauge<T>
where
T: Atomic,
<T as Atomic>::T: async_graphql::OutputType,
{
fn type_name() -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("GenericGauge")
}
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
async_graphql::registry::MetaType::Scalar {
name: Self::type_name().to_string(),
description: None,
is_valid: None,
visible: None,
inaccessible: false,
tags: Vec::new(),
specified_by_url: None,
}
})
}
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await
}
}
#[derive(Debug, Clone)]
pub struct GenericCounter<T: Atomic> {
ctr: core::GenericCounter<T>,
}
impl<T: Atomic> Deref for GenericCounter<T> {
type Target = core::GenericCounter<T>;
fn deref(&self) -> &Self::Target {
&self.ctr
}
}
impl<T: Atomic> DerefMut for GenericCounter<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.ctr
}
}
impl<T: Atomic> GenericCounter<T> {
pub fn new<S1: Into<String>, S2: Into<String>>(
name: S1,
help: S2,
) -> Result<Self, prometheus::Error> {
core::GenericCounter::<T>::new(name, help).map(|ctr| Self { ctr })
}
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
core::GenericCounter::<T>::with_opts(opts).map(|ctr| Self { ctr })
}
}
#[cfg(feature = "async-graphql")]
#[async_trait::async_trait]
impl<T> async_graphql::OutputType for GenericCounter<T>
where
T: Atomic,
<T as Atomic>::T: async_graphql::OutputType,
{
fn type_name() -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("GenericCounter")
}
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
async_graphql::registry::MetaType::Scalar {
name: Self::type_name().to_string(),
description: None,
is_valid: None,
visible: None,
inaccessible: false,
tags: Vec::new(),
specified_by_url: None,
}
})
}
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.ctr.get(), ctx, field).await
}
}
#[derive(Debug, Clone)]
pub struct Histogram {
val: prometheus::Histogram,
}
impl Histogram {
pub fn with_opts(opts: HistogramOpts) -> Result<Self, prometheus::Error> {
prometheus::Histogram::with_opts(opts).map(|val| Self { val })
}
}
impl Deref for Histogram {
type Target = prometheus::Histogram;
fn deref(&self) -> &Self::Target {
&self.val
}
}
impl DerefMut for Histogram {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.val
}
}
#[cfg(feature = "async-graphql")]
#[derive(async_graphql::SimpleObject, Debug, Clone, Copy)]
#[graphql(name = "Histogram")]
struct HistogramSample {
count: u64,
sum: f64,
}
#[cfg(feature = "async-graphql")]
#[async_trait::async_trait]
impl async_graphql::OutputType for Histogram {
fn type_name() -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed("Histogram")
}
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
<HistogramSample as async_graphql::OutputType>::create_type_info(registry)
}
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
let sample = HistogramSample {
count: self.val.get_sample_count(),
sum: self.val.get_sample_sum(),
};
<HistogramSample as async_graphql::OutputType>::resolve(&sample, ctx, field).await
}
}
macro_rules! metric_typ {
($($ty: ident::$setter:ident($primitive:ident)::$name: literal), +$(,)?) => {
$(
#[derive(Clone)]
pub struct $ty {
val: prometheus::$ty,
}
impl std::fmt::Debug for $ty {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
<prometheus::$ty as std::fmt::Debug>::fmt(&self.val, f)
}
}
impl $ty {
pub fn new<S1: Into<String>, S2: Into<String>>(
name: S1,
help: S2,
) -> Result<Self, prometheus::Error> {
prometheus::$ty::new(name, help).map(|val| Self {
val,
})
}
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
prometheus::$ty::with_opts(opts.clone()).map(|val| Self {
val,
})
}
}
impl Deref for $ty {
type Target = prometheus::$ty;
fn deref(&self) -> &Self::Target {
&self.val
}
}
impl DerefMut for $ty {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.val
}
}
#[cfg(feature = "async-graphql")]
#[async_trait::async_trait]
impl async_graphql::OutputType for $ty {
fn type_name() -> std::borrow::Cow<'static, str> {
std::borrow::Cow::Borrowed($name)
}
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
async_graphql::registry::MetaType::Scalar {
name: Self::type_name().to_string(),
description: None,
is_valid: None,
visible: None,
inaccessible: false,
tags: Vec::new(),
specified_by_url: None,
}
})
}
async fn resolve(
&self,
ctx: &ContextSelectionSet<'_>,
field: &Positioned<Field>,
) -> ServerResult<Value> {
<$primitive as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await
}
}
)*
};
}
metric_typ! {
IntCounter::inc_by(u64)::"IntCounter",
Counter::inc_by(f64)::"Counter",
IntGauge::set(i64)::"IntGauge",
Gauge::set(f64)::"Gauge",
}

View File

@ -52,4 +52,3 @@ path = "src/tests/cli.rs"
[features]
mixnet = ["nomos-network/mixnet"]
metrics = ["nomos-node/metrics"]