Metrics refactor (#20)
* metrics service Co-authored-by: al8n <scygliu1@gmail.com>
This commit is contained in:
parent
539c986f69
commit
52708b8253
@ -1,8 +1,8 @@
|
||||
[workspace]
|
||||
|
||||
members = [
|
||||
"nomos-core",
|
||||
"nomos-services/log",
|
||||
"nomos-services/metrics",
|
||||
"nomos-services/network",
|
||||
"nomos-services/storage",
|
||||
"nomos-services/consensus",
|
||||
|
34
nomos-services/metrics/Cargo.toml
Normal file
34
nomos-services/metrics/Cargo.toml
Normal file
@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "metrics"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[example]]
|
||||
name = "graphql"
|
||||
path = "examples/graphql.rs"
|
||||
features = ["gql"]
|
||||
|
||||
[dependencies]
|
||||
axum = { version = "0.6", optional = true }
|
||||
async-graphql = { version = "5", optional = true, features = ["tracing"] }
|
||||
async-graphql-axum = { version = "5", optional = true }
|
||||
async-trait = "0.1"
|
||||
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
once_cell = "1.16"
|
||||
parking_lot = "0.12"
|
||||
prometheus = "0.13"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
tokio = { version = "1", features = ["sync", "macros", "time"] }
|
||||
tracing = "0.1"
|
||||
tracing-appender = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
||||
tracing-gelf = "0.7"
|
||||
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
|
||||
futures = "0.3"
|
||||
|
||||
|
||||
[features]
|
||||
default = []
|
||||
gql = ["clap", "axum", "async-graphql", "async-graphql-axum", "tower-http"]
|
0
nomos-services/metrics/README.md
Normal file
0
nomos-services/metrics/README.md
Normal file
135
nomos-services/metrics/examples/graphql.rs
Normal file
135
nomos-services/metrics/examples/graphql.rs
Normal file
@ -0,0 +1,135 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use clap::Parser;
|
||||
use metrics::{
|
||||
frontend::graphql::{Graphql, GraphqlServerSettings},
|
||||
MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId,
|
||||
};
|
||||
use overwatch_rs::{
|
||||
overwatch::OverwatchRunner,
|
||||
services::{
|
||||
handle::{ServiceHandle, ServiceStateHandle},
|
||||
relay::{NoMessage, Relay},
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ConcurrentMapMetricsBackend(Arc<Mutex<HashMap<ServiceId, MetricsData>>>);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MetricsBackend for ConcurrentMapMetricsBackend {
|
||||
type MetricsData = MetricsData;
|
||||
type Error = ();
|
||||
type Settings = &'static [ServiceId];
|
||||
|
||||
fn init(config: Self::Settings) -> Self {
|
||||
let mut map = HashMap::with_capacity(config.len());
|
||||
for service_id in config {
|
||||
map.insert(*service_id, MetricsData::default());
|
||||
}
|
||||
Self(Arc::new(Mutex::new(map)))
|
||||
}
|
||||
|
||||
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) {
|
||||
self.0.lock().unwrap().insert(service_id, data);
|
||||
}
|
||||
|
||||
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData> {
|
||||
self.0.lock().unwrap().get(service_id.as_ref()).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MetricsUpdater<Backend: MetricsBackend + Send + Sync + 'static> {
|
||||
backend_channel: Relay<MetricsService<Backend>>,
|
||||
}
|
||||
|
||||
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceData for MetricsUpdater<Backend> {
|
||||
const SERVICE_ID: ServiceId = "MetricsUpdater";
|
||||
|
||||
type Settings = ();
|
||||
|
||||
type State = NoState<()>;
|
||||
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
|
||||
type Message = NoMessage;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Backend: MetricsBackend<MetricsData = MetricsData> + Clone + Send + Sync + 'static> ServiceCore
|
||||
for MetricsUpdater<Backend>
|
||||
where
|
||||
Backend::MetricsData: async_graphql::OutputType,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let backend_channel: Relay<MetricsService<Backend>> =
|
||||
service_state.overwatch_handle.relay();
|
||||
Ok(Self { backend_channel })
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||
let replay = self.backend_channel.connect().await.map_err(|e| {
|
||||
tracing::error!(err = ?e, "Metrics Updater: relay connect error");
|
||||
e
|
||||
})?;
|
||||
let tags = ["foo", "bar", "baz"].iter().cycle();
|
||||
for (duration, service_id) in tags.enumerate() {
|
||||
let message = MetricsMessage::Update {
|
||||
service_id,
|
||||
data: MetricsData {
|
||||
duration: duration as u64,
|
||||
},
|
||||
};
|
||||
replay.send(message).await.map_err(|(e, _)| {
|
||||
tracing::error!(err = ?e, "Metrics Updater: relay send error");
|
||||
e
|
||||
})?;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(overwatch_derive::Services)]
|
||||
struct Services {
|
||||
graphql: ServiceHandle<Graphql<ConcurrentMapMetricsBackend>>,
|
||||
metrics: ServiceHandle<MetricsService<ConcurrentMapMetricsBackend>>,
|
||||
updater: ServiceHandle<MetricsUpdater<ConcurrentMapMetricsBackend>>,
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub struct Args {
|
||||
#[clap(flatten)]
|
||||
graphql: GraphqlServerSettings,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, async_graphql::SimpleObject)]
|
||||
pub struct MetricsData {
|
||||
duration: u64,
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let settings = Args::parse();
|
||||
let graphql = OverwatchRunner::<Services>::run(
|
||||
ServicesServiceSettings {
|
||||
graphql: settings.graphql,
|
||||
metrics: &["Foo", "Bar"],
|
||||
updater: (),
|
||||
},
|
||||
None,
|
||||
)?;
|
||||
|
||||
tracing_subscriber::fmt::fmt()
|
||||
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()))
|
||||
.with_file(false)
|
||||
.init();
|
||||
|
||||
graphql.wait_finished();
|
||||
Ok(())
|
||||
}
|
31
nomos-services/metrics/src/backend/map.rs
Normal file
31
nomos-services/metrics/src/backend/map.rs
Normal file
@ -0,0 +1,31 @@
|
||||
// std
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
// crates
|
||||
// internal
|
||||
use crate::{MetricsBackend, OwnedServiceId};
|
||||
use overwatch_rs::services::ServiceId;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MapMetricsBackend<MetricsData>(HashMap<ServiceId, MetricsData>);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<MetricsData: Clone + Debug + Send + Sync + 'static> MetricsBackend
|
||||
for MapMetricsBackend<MetricsData>
|
||||
{
|
||||
type MetricsData = MetricsData;
|
||||
type Error = ();
|
||||
type Settings = ();
|
||||
|
||||
fn init(_config: Self::Settings) -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) {
|
||||
self.0.insert(service_id, data);
|
||||
}
|
||||
|
||||
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData> {
|
||||
self.0.get(service_id.as_ref()).cloned()
|
||||
}
|
||||
}
|
1
nomos-services/metrics/src/backend/mod.rs
Normal file
1
nomos-services/metrics/src/backend/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod map;
|
174
nomos-services/metrics/src/frontend/graphql/mod.rs
Normal file
174
nomos-services/metrics/src/frontend/graphql/mod.rs
Normal file
@ -0,0 +1,174 @@
|
||||
// std
|
||||
|
||||
// crates
|
||||
use async_graphql::{EmptyMutation, EmptySubscription, Schema};
|
||||
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
|
||||
use axum::{
|
||||
extract::State,
|
||||
http::{
|
||||
header::{CONTENT_TYPE, USER_AGENT},
|
||||
HeaderValue,
|
||||
},
|
||||
routing::post,
|
||||
Router, Server,
|
||||
};
|
||||
use tower_http::{
|
||||
cors::{Any, CorsLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
|
||||
// internal
|
||||
use crate::{MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId};
|
||||
use overwatch_rs::services::relay::Relay;
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
relay::NoMessage,
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
|
||||
/// Configuration for the GraphQl Server
|
||||
#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)]
|
||||
#[cfg(feature = "gql")]
|
||||
pub struct GraphqlServerSettings {
|
||||
/// Socket where the GraphQL will be listening on for incoming requests.
|
||||
#[arg(short, long = "graphql-addr", default_value_t = std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), 8080), env = "METRICS_GRAPHQL_BIND_ADDRESS")]
|
||||
pub address: std::net::SocketAddr,
|
||||
/// Max query depth allowed
|
||||
#[arg(
|
||||
long = "graphql-max-depth",
|
||||
default_value_t = 20,
|
||||
env = "METRICS_GRAPHQL_MAX_DEPTH"
|
||||
)]
|
||||
pub max_depth: usize,
|
||||
/// Max query complexity allowed
|
||||
#[arg(
|
||||
long = "graphql-max-complexity",
|
||||
default_value_t = 1000,
|
||||
env = "METRICS_GRAPHQL_MAX_COMPLEXITY"
|
||||
)]
|
||||
pub max_complexity: usize,
|
||||
/// Allowed origins for this server deployment requests.
|
||||
#[arg(long = "graphql-cors-origin")]
|
||||
pub cors_origins: Vec<String>,
|
||||
}
|
||||
|
||||
async fn graphql_handler<Backend: MetricsBackend + Send + 'static + Sync>(
|
||||
schema: State<Schema<Graphql<Backend>, EmptyMutation, EmptySubscription>>,
|
||||
req: GraphQLRequest,
|
||||
) -> GraphQLResponse
|
||||
where
|
||||
Backend::MetricsData: async_graphql::OutputType,
|
||||
{
|
||||
let request = req.into_inner();
|
||||
let resp = schema.execute(request).await;
|
||||
GraphQLResponse::from(resp)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Graphql<Backend: MetricsBackend + Send + Sync + 'static> {
|
||||
settings: GraphqlServerSettings,
|
||||
backend_channel: Relay<MetricsService<Backend>>,
|
||||
}
|
||||
|
||||
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceData for Graphql<Backend> {
|
||||
const SERVICE_ID: ServiceId = "GraphqlMetrics";
|
||||
|
||||
type Settings = GraphqlServerSettings;
|
||||
|
||||
type State = NoState<GraphqlServerSettings>;
|
||||
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
|
||||
type Message = NoMessage;
|
||||
}
|
||||
|
||||
#[async_graphql::Object]
|
||||
impl<Backend: MetricsBackend + Send + Sync + 'static> Graphql<Backend>
|
||||
where
|
||||
Backend::MetricsData: async_graphql::OutputType,
|
||||
{
|
||||
pub async fn load(
|
||||
&self,
|
||||
service_id: OwnedServiceId,
|
||||
) -> async_graphql::Result<Option<<Backend as MetricsBackend>::MetricsData>> {
|
||||
let replay = self
|
||||
.backend_channel
|
||||
.clone()
|
||||
.connect()
|
||||
.await
|
||||
.map_err(|e| async_graphql::Error::new(e.to_string()))?;
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
replay
|
||||
.send(MetricsMessage::Load {
|
||||
service_id,
|
||||
reply_channel: tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| async_graphql::Error::new(e.to_string()))?;
|
||||
rx.await.map_err(|e| {
|
||||
tracing::error!(err = ?e, "GraphqlMetrics: recv error");
|
||||
async_graphql::Error::new("GraphqlMetrics: recv error")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Backend: MetricsBackend + Clone + Send + Sync + 'static> ServiceCore for Graphql<Backend>
|
||||
where
|
||||
Backend::MetricsData: async_graphql::OutputType,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let settings = service_state.settings_reader.get_updated_settings();
|
||||
let backend_channel: Relay<MetricsService<Backend>> =
|
||||
service_state.overwatch_handle.relay();
|
||||
Ok(Self {
|
||||
settings,
|
||||
backend_channel,
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||
let mut builder = CorsLayer::new();
|
||||
if self.settings.cors_origins.is_empty() {
|
||||
builder = builder.allow_origin(Any);
|
||||
}
|
||||
for origin in &self.settings.cors_origins {
|
||||
builder = builder.allow_origin(
|
||||
origin
|
||||
.as_str()
|
||||
.parse::<HeaderValue>()
|
||||
.expect("fail to parse origin"),
|
||||
);
|
||||
}
|
||||
let cors = builder
|
||||
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
||||
.allow_methods(Any);
|
||||
|
||||
let addr = self.settings.address;
|
||||
let max_complexity = self.settings.max_complexity;
|
||||
let max_depth = self.settings.max_depth;
|
||||
let schema = async_graphql::Schema::build(
|
||||
self,
|
||||
async_graphql::EmptyMutation,
|
||||
async_graphql::EmptySubscription,
|
||||
)
|
||||
.limit_complexity(max_complexity)
|
||||
.limit_depth(max_depth)
|
||||
.extension(async_graphql::extensions::Tracing)
|
||||
.finish();
|
||||
|
||||
tracing::info!(schema = %schema.sdl(), "GraphQL schema definition");
|
||||
let router = Router::new()
|
||||
.route("/", post(graphql_handler::<Backend>))
|
||||
.with_state(schema)
|
||||
.layer(cors)
|
||||
.layer(TraceLayer::new_for_http());
|
||||
|
||||
tracing::info!("Metrics Service GraphQL server listening: {}", addr);
|
||||
Server::bind(&addr)
|
||||
.serve(router.into_make_service())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
2
nomos-services/metrics/src/frontend/mod.rs
Normal file
2
nomos-services/metrics/src/frontend/mod.rs
Normal file
@ -0,0 +1,2 @@
|
||||
#[cfg(feature = "gql")]
|
||||
pub mod graphql;
|
165
nomos-services/metrics/src/lib.rs
Normal file
165
nomos-services/metrics/src/lib.rs
Normal file
@ -0,0 +1,165 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
pub mod backend;
|
||||
pub mod frontend;
|
||||
pub mod types;
|
||||
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
relay::{InboundRelay, RelayMessage},
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait MetricsBackend {
|
||||
type MetricsData: Clone + Send + Sync + Debug + 'static;
|
||||
type Error: Send + Sync;
|
||||
type Settings: Clone + Send + Sync + 'static;
|
||||
fn init(config: Self::Settings) -> Self;
|
||||
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData);
|
||||
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData>;
|
||||
}
|
||||
|
||||
pub struct MetricsService<Backend: MetricsBackend> {
|
||||
inbound_relay: InboundRelay<MetricsMessage<Backend::MetricsData>>,
|
||||
backend: Backend,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Ord, PartialOrd)]
|
||||
pub struct OwnedServiceId {
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl From<ServiceId> for OwnedServiceId {
|
||||
fn from(id: ServiceId) -> Self {
|
||||
Self { id: id.into() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for OwnedServiceId {
|
||||
fn from(id: String) -> Self {
|
||||
Self { id }
|
||||
}
|
||||
}
|
||||
|
||||
impl core::fmt::Display for OwnedServiceId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.id)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<str> for OwnedServiceId {
|
||||
fn as_ref(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
impl async_graphql::InputType for OwnedServiceId {
|
||||
type RawValueType = Self;
|
||||
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
<String as async_graphql::InputType>::type_name()
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
<String as async_graphql::InputType>::create_type_info(registry)
|
||||
}
|
||||
|
||||
fn parse(
|
||||
value: Option<async_graphql::Value>,
|
||||
) -> async_graphql::InputValueResult<OwnedServiceId> {
|
||||
<String as async_graphql::InputType>::parse(value)
|
||||
.map(Self::from)
|
||||
.map_err(async_graphql::InputValueError::propagate)
|
||||
}
|
||||
|
||||
fn to_value(&self) -> async_graphql::Value {
|
||||
<String as async_graphql::InputType>::to_value(&self.id)
|
||||
}
|
||||
|
||||
fn as_raw_value(&self) -> Option<&Self::RawValueType> {
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MetricsMessage<Data> {
|
||||
Load {
|
||||
service_id: OwnedServiceId,
|
||||
reply_channel: tokio::sync::oneshot::Sender<Option<Data>>,
|
||||
},
|
||||
Update {
|
||||
service_id: ServiceId,
|
||||
data: Data,
|
||||
},
|
||||
}
|
||||
|
||||
impl<Data: Send + Sync + 'static> RelayMessage for MetricsMessage<Data> {}
|
||||
|
||||
impl<Backend: MetricsBackend> ServiceData for MetricsService<Backend> {
|
||||
const SERVICE_ID: ServiceId = "Metrics";
|
||||
type Settings = Backend::Settings;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = MetricsMessage<Backend::MetricsData>;
|
||||
}
|
||||
|
||||
impl<Backend: MetricsBackend> MetricsService<Backend> {
|
||||
async fn handle_load(
|
||||
backend: &mut Backend,
|
||||
service_id: &OwnedServiceId,
|
||||
reply_channel: tokio::sync::oneshot::Sender<Option<Backend::MetricsData>>,
|
||||
) {
|
||||
let metrics = backend.load(service_id).await;
|
||||
if let Err(e) = reply_channel.send(metrics) {
|
||||
tracing::error!(
|
||||
"Failed to send metric data for service: {service_id}. data: {:?}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_update(
|
||||
backend: &mut Backend,
|
||||
service_id: &ServiceId,
|
||||
metrics: Backend::MetricsData,
|
||||
) {
|
||||
backend.update(service_id, metrics).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for MetricsService<Backend> {
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let settings = service_state.settings_reader.get_updated_settings();
|
||||
let backend = Backend::init(settings);
|
||||
let inbound_relay = service_state.inbound_relay;
|
||||
Ok(Self {
|
||||
inbound_relay,
|
||||
backend,
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||
let Self {
|
||||
mut inbound_relay,
|
||||
mut backend,
|
||||
} = self;
|
||||
while let Some(message) = inbound_relay.recv().await {
|
||||
match message {
|
||||
MetricsMessage::Load {
|
||||
service_id,
|
||||
reply_channel,
|
||||
} => {
|
||||
MetricsService::handle_load(&mut backend, &service_id, reply_channel).await;
|
||||
}
|
||||
MetricsMessage::Update { service_id, data } => {
|
||||
MetricsService::handle_update(&mut backend, &service_id, data).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
521
nomos-services/metrics/src/types.rs
Normal file
521
nomos-services/metrics/src/types.rs
Normal file
@ -0,0 +1,521 @@
|
||||
use ::core::ops::{Deref, DerefMut};
|
||||
#[cfg(feature = "async-graphql")]
|
||||
use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value};
|
||||
use prometheus::HistogramOpts;
|
||||
pub use prometheus::{
|
||||
core::{self, Atomic},
|
||||
labels, opts, Opts,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MetricsData {
|
||||
ty: MetricDataType,
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl MetricsData {
|
||||
#[inline]
|
||||
pub fn new(ty: MetricDataType, id: String) -> Self {
|
||||
Self { ty, id }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn ty(&self) -> &MetricDataType {
|
||||
&self.ty
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum MetricDataType {
|
||||
IntCounter(IntCounter),
|
||||
Counter(Counter),
|
||||
IntGauge(IntGauge),
|
||||
Gauge(Gauge),
|
||||
Histogram(Histogram),
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for MetricDataType {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("MetricDataType")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(
|
||||
async_graphql::registry::MetaTypeId::Union,
|
||||
|registry| async_graphql::registry::MetaType::Union {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
possible_types: {
|
||||
let mut map = async_graphql::indexmap::IndexSet::new();
|
||||
map.insert(<IntCounter as async_graphql::OutputType>::create_type_info(
|
||||
registry,
|
||||
));
|
||||
map.insert(<Counter as async_graphql::OutputType>::create_type_info(
|
||||
registry,
|
||||
));
|
||||
map.insert(<IntGauge as async_graphql::OutputType>::create_type_info(
|
||||
registry,
|
||||
));
|
||||
map.insert(<Gauge as async_graphql::OutputType>::create_type_info(
|
||||
registry,
|
||||
));
|
||||
map.insert(<Histogram as async_graphql::OutputType>::create_type_info(
|
||||
registry,
|
||||
));
|
||||
map
|
||||
},
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
rust_typename: Some(std::any::type_name::<Self>()),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
match self {
|
||||
Self::IntCounter(v) => v.resolve(ctx, field).await,
|
||||
Self::Counter(v) => v.resolve(ctx, field).await,
|
||||
Self::IntGauge(v) => v.resolve(ctx, field).await,
|
||||
Self::Gauge(v) => v.resolve(ctx, field).await,
|
||||
Self::Histogram(v) => v.resolve(ctx, field).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GenericGauge<T: Atomic>(core::GenericGauge<T>);
|
||||
|
||||
impl<T: Atomic> Deref for GenericGauge<T> {
|
||||
type Target = core::GenericGauge<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Atomic> DerefMut for GenericGauge<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Atomic> GenericGauge<T> {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
let opts = Opts::new(name, help);
|
||||
core::GenericGauge::<T>::with_opts(opts).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
core::GenericGauge::<T>::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl<T> async_graphql::OutputType for GenericGauge<T>
|
||||
where
|
||||
T: Atomic,
|
||||
<T as Atomic>::T: async_graphql::OutputType,
|
||||
{
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("GenericGauge")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Gauge(prometheus::Gauge);
|
||||
|
||||
impl Deref for Gauge {
|
||||
type Target = prometheus::Gauge;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Gauge {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Gauge {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Gauge::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Gauge::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for Gauge {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("Gauge")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<f64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GenericCounter<T: Atomic>(core::GenericCounter<T>);
|
||||
|
||||
impl<T: Atomic> Deref for GenericCounter<T> {
|
||||
type Target = core::GenericCounter<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Atomic> DerefMut for GenericCounter<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Atomic> GenericCounter<T> {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
core::GenericCounter::<T>::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
core::GenericCounter::<T>::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl<T> async_graphql::OutputType for GenericCounter<T>
|
||||
where
|
||||
T: Atomic,
|
||||
<T as Atomic>::T: async_graphql::OutputType,
|
||||
{
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("GenericCounter")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Counter(prometheus::Counter);
|
||||
|
||||
impl Deref for Counter {
|
||||
type Target = prometheus::Counter;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Counter {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Counter::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Counter::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for Counter {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("Counter")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<f64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Histogram(prometheus::Histogram);
|
||||
|
||||
impl Histogram {
|
||||
pub fn with_opts(opts: HistogramOpts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Histogram::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Histogram {
|
||||
type Target = prometheus::Histogram;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Histogram {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[derive(async_graphql::SimpleObject)]
|
||||
#[graphql(name = "Histogram")]
|
||||
struct HistogramSample {
|
||||
count: u64,
|
||||
sum: f64,
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for Histogram {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("Histogram")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
<HistogramSample as async_graphql::OutputType>::create_type_info(registry)
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
let sample = HistogramSample {
|
||||
count: self.0.get_sample_count(),
|
||||
sum: self.0.get_sample_sum(),
|
||||
};
|
||||
|
||||
<HistogramSample as async_graphql::OutputType>::resolve(&sample, ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IntCounter(prometheus::IntCounter);
|
||||
|
||||
impl IntCounter {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntCounter::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntCounter::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for IntCounter {
|
||||
type Target = prometheus::IntCounter;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for IntCounter {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for IntCounter {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("IntCounter")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<u64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[repr(transparent)]
|
||||
pub struct IntGauge(prometheus::IntGauge);
|
||||
|
||||
impl Deref for IntGauge {
|
||||
type Target = prometheus::IntGauge;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for IntGauge {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl IntGauge {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntGauge::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntGauge::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for IntGauge {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("IntGauge")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<i64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user