impl serialization for metrics data
This commit is contained in:
parent
3bf84340f4
commit
a87d685641
@ -13,19 +13,22 @@ axum = { version = "0.6", optional = true }
|
||||
async-graphql = { version = "5", optional = true, features = ["tracing"] }
|
||||
async-graphql-axum = { version = "5", optional = true }
|
||||
async-trait = "0.1"
|
||||
bytes = "1.3"
|
||||
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
once_cell = "1.16"
|
||||
parking_lot = "0.12"
|
||||
prometheus = "0.13"
|
||||
prometheus = "0.13"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tokio = { version = "1", features = ["sync", "macros", "time"] }
|
||||
tracing = "0.1"
|
||||
tracing-appender = "0.2"
|
||||
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
||||
tracing-gelf = "0.7"
|
||||
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
|
||||
thiserror = "1"
|
||||
futures = "0.3"
|
||||
|
||||
[features]
|
||||
|
@ -6,7 +6,7 @@ use std::{
|
||||
use clap::Parser;
|
||||
use metrics::{
|
||||
frontend::graphql::{Graphql, GraphqlServerSettings},
|
||||
MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId,
|
||||
GraphqlData, MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId,
|
||||
};
|
||||
use overwatch_rs::{
|
||||
overwatch::OverwatchRunner,
|
||||
@ -114,6 +114,35 @@ pub struct MetricsData {
|
||||
duration: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ParseMetricsDataError {
|
||||
TryFromSliceError(core::array::TryFromSliceError),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ParseMetricsDataError {
|
||||
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ParseMetricsDataError {}
|
||||
|
||||
impl GraphqlData for MetricsData {
|
||||
type Error = ParseMetricsDataError;
|
||||
|
||||
fn try_from_bytes(src: &[u8]) -> Result<Self, Self::Error> {
|
||||
src.try_into()
|
||||
.map(|v| Self {
|
||||
duration: u64::from_be_bytes(v),
|
||||
})
|
||||
.map_err(ParseMetricsDataError::TryFromSliceError)
|
||||
}
|
||||
|
||||
fn try_into_bytes(self) -> Result<bytes::Bytes, Self::Error> {
|
||||
Ok(self.duration.to_be_bytes().to_vec().into())
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let settings = Args::parse();
|
||||
let graphql = OverwatchRunner::<Services>::run(
|
||||
|
@ -3,13 +3,14 @@ use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
// crates
|
||||
// internal
|
||||
use crate::{MetricsBackend, OwnedServiceId};
|
||||
use crate::{GraphqlData, MetricsBackend, OwnedServiceId};
|
||||
use overwatch_rs::services::ServiceId;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MapMetricsBackend<MetricsData>(HashMap<ServiceId, MetricsData>);
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[cfg(not(feature = "gql"))]
|
||||
impl<MetricsData: Clone + Debug + Send + Sync + 'static> MetricsBackend
|
||||
for MapMetricsBackend<MetricsData>
|
||||
{
|
||||
@ -29,3 +30,25 @@ impl<MetricsData: Clone + Debug + Send + Sync + 'static> MetricsBackend
|
||||
self.0.get(service_id.as_ref()).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[cfg(feature = "gql")]
|
||||
impl<MetricsData: GraphqlData + Clone + Debug + Send + Sync + 'static> MetricsBackend
|
||||
for MapMetricsBackend<MetricsData>
|
||||
{
|
||||
type MetricsData = MetricsData;
|
||||
type Error = ();
|
||||
type Settings = ();
|
||||
|
||||
fn init(_config: Self::Settings) -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) {
|
||||
self.0.insert(service_id, data);
|
||||
}
|
||||
|
||||
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData> {
|
||||
self.0.get(service_id.as_ref()).cloned()
|
||||
}
|
||||
}
|
||||
|
@ -11,9 +11,20 @@ use overwatch_rs::services::{
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
|
||||
pub trait GraphqlData: Sized {
|
||||
type Error: std::error::Error + Send + Sync + 'static;
|
||||
|
||||
fn try_from_bytes(src: &[u8]) -> Result<Self, Self::Error>;
|
||||
|
||||
fn try_into_bytes(self) -> Result<bytes::Bytes, Self::Error>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait MetricsBackend {
|
||||
#[cfg(not(feature = "gql"))]
|
||||
type MetricsData: Clone + Send + Sync + Debug + 'static;
|
||||
#[cfg(feature = "gql")]
|
||||
type MetricsData: GraphqlData + Clone + Send + Sync + Debug + 'static;
|
||||
type Error: Send + Sync;
|
||||
type Settings: Clone + Send + Sync + 'static;
|
||||
fn init(config: Self::Settings) -> Self;
|
||||
|
@ -3,16 +3,46 @@ use ::core::ops::{Deref, DerefMut};
|
||||
use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value};
|
||||
use prometheus::HistogramOpts;
|
||||
pub use prometheus::{
|
||||
core::{self, Atomic},
|
||||
core::{self, Atomic, GenericCounter as PrometheusGenericCounter},
|
||||
labels, opts, Opts,
|
||||
};
|
||||
use serde::{
|
||||
de::{DeserializeOwned, MapAccess, Visitor},
|
||||
ser::{SerializeStruct, SerializeTupleStruct},
|
||||
Deserialize, Serialize,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
use crate::GraphqlData;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MetricsData {
|
||||
ty: MetricDataType,
|
||||
id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ParseMetricsDataError {
|
||||
#[error("fail to encode metrics data: {0}")]
|
||||
EncodeError(serde_json::Error),
|
||||
#[error("fail to decode metrics data: {0}")]
|
||||
DecodeError(serde_json::Error),
|
||||
}
|
||||
|
||||
#[cfg(feature = "gql")]
|
||||
impl GraphqlData for MetricsData {
|
||||
type Error = ParseMetricsDataError;
|
||||
|
||||
fn try_from_bytes(src: &[u8]) -> Result<Self, Self::Error> {
|
||||
serde_json::from_slice(src).map_err(Self::Error::DecodeError)
|
||||
}
|
||||
|
||||
fn try_into_bytes(self) -> Result<bytes::Bytes, Self::Error> {
|
||||
serde_json::to_vec(&self)
|
||||
.map(bytes::Bytes::from)
|
||||
.map_err(Self::Error::EncodeError)
|
||||
}
|
||||
}
|
||||
|
||||
impl MetricsData {
|
||||
#[inline]
|
||||
pub fn new(ty: MetricDataType, id: String) -> Self {
|
||||
@ -30,7 +60,7 @@ impl MetricsData {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum MetricDataType {
|
||||
IntCounter(IntCounter),
|
||||
Counter(Counter),
|
||||
@ -95,19 +125,22 @@ impl async_graphql::OutputType for MetricDataType {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GenericGauge<T: Atomic>(core::GenericGauge<T>);
|
||||
pub struct GenericGauge<T: Atomic> {
|
||||
val: core::GenericGauge<T>,
|
||||
opts: Opts,
|
||||
}
|
||||
|
||||
impl<T: Atomic> Deref for GenericGauge<T> {
|
||||
type Target = core::GenericGauge<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
&self.val
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Atomic> DerefMut for GenericGauge<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
&mut self.val
|
||||
}
|
||||
}
|
||||
|
||||
@ -117,11 +150,11 @@ impl<T: Atomic> GenericGauge<T> {
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
let opts = Opts::new(name, help);
|
||||
core::GenericGauge::<T>::with_opts(opts).map(Self)
|
||||
core::GenericGauge::<T>::with_opts(opts.clone()).map(|val| Self { val, opts })
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
core::GenericGauge::<T>::with_opts(opts).map(Self)
|
||||
core::GenericGauge::<T>::with_opts(opts.clone()).map(|val| Self { val, opts })
|
||||
}
|
||||
}
|
||||
|
||||
@ -155,84 +188,65 @@ where
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Gauge(prometheus::Gauge);
|
||||
|
||||
impl Deref for Gauge {
|
||||
type Target = prometheus::Gauge;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
impl<T: Atomic> Serialize for GenericGauge<T>
|
||||
where
|
||||
T::T: Serialize + DeserializeOwned,
|
||||
{
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut ser = serializer.serialize_tuple_struct("GenericCounter", 1)?;
|
||||
let opts = SerializableOpts {
|
||||
val: self.val.get(),
|
||||
opts: self.opts.clone(),
|
||||
};
|
||||
ser.serialize_field(&opts)?;
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Gauge {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Gauge {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Gauge::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Gauge::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for Gauge {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("Gauge")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
impl<'de, T: Atomic> Deserialize<'de> for GenericGauge<T>
|
||||
where
|
||||
T::T: Serialize + DeserializeOwned,
|
||||
{
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
<SerializableOpts<T::T> as Deserialize<'de>>::deserialize(deserializer).map(|opt| {
|
||||
// unwrap safe here, because ctr is always valid
|
||||
let val = prometheus::core::GenericGauge::with_opts(opt.opts.clone()).unwrap();
|
||||
val.set(opt.val);
|
||||
Self {
|
||||
val,
|
||||
opts: opt.opts,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<f64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GenericCounter<T: Atomic>(core::GenericCounter<T>);
|
||||
pub struct GenericCounter<T: Atomic> {
|
||||
ctr: core::GenericCounter<T>,
|
||||
opts: Opts,
|
||||
}
|
||||
|
||||
impl<T: Atomic> Deref for GenericCounter<T> {
|
||||
type Target = core::GenericCounter<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
&self.ctr
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Atomic> DerefMut for GenericCounter<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
&mut self.ctr
|
||||
}
|
||||
}
|
||||
|
||||
@ -241,11 +255,12 @@ impl<T: Atomic> GenericCounter<T> {
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
core::GenericCounter::<T>::new(name, help).map(Self)
|
||||
let opts = Opts::new(name, help);
|
||||
core::GenericCounter::<T>::with_opts(opts.clone()).map(|ctr| Self { ctr, opts })
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
core::GenericCounter::<T>::with_opts(opts).map(Self)
|
||||
core::GenericCounter::<T>::with_opts(opts.clone()).map(|ctr| Self { ctr, opts })
|
||||
}
|
||||
}
|
||||
|
||||
@ -279,76 +294,210 @@ where
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.ctr.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Counter(prometheus::Counter);
|
||||
|
||||
impl Deref for Counter {
|
||||
type Target = prometheus::Counter;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
impl<T: Atomic> Serialize for GenericCounter<T>
|
||||
where
|
||||
T::T: Serialize + DeserializeOwned,
|
||||
{
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut ser = serializer.serialize_tuple_struct("GenericCounter", 1)?;
|
||||
let opts = SerializableOpts {
|
||||
val: self.ctr.get(),
|
||||
opts: self.opts.clone(),
|
||||
};
|
||||
ser.serialize_field(&opts)?;
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Counter {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Counter::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Counter::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for Counter {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("Counter")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
impl<'de, T: Atomic> Deserialize<'de> for GenericCounter<T>
|
||||
where
|
||||
T::T: Serialize + DeserializeOwned,
|
||||
{
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
<SerializableOpts<T::T> as Deserialize<'de>>::deserialize(deserializer).map(|opt| {
|
||||
// unwrap safe here, because ctr is always valid
|
||||
let ctr = prometheus::core::GenericCounter::with_opts(opt.opts.clone()).unwrap();
|
||||
ctr.inc_by(opt.val);
|
||||
Self {
|
||||
ctr,
|
||||
opts: opt.opts,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<f64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
#[derive(Debug, Clone)]
|
||||
struct SerializableHistogramOpts {
|
||||
val: HistogramSample,
|
||||
opts: HistogramOpts,
|
||||
}
|
||||
|
||||
impl Serialize for SerializableHistogramOpts {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut ser = serializer.serialize_struct("Opts", 8)?;
|
||||
ser.serialize_field("val", &self.val)?;
|
||||
ser.serialize_field("namespace", &self.opts.common_opts.namespace)?;
|
||||
ser.serialize_field("subsystem", &self.opts.common_opts.subsystem)?;
|
||||
ser.serialize_field("name", &self.opts.common_opts.name)?;
|
||||
ser.serialize_field("help", &self.opts.common_opts.help)?;
|
||||
ser.serialize_field("const_labels", &self.opts.common_opts.const_labels)?;
|
||||
ser.serialize_field("variable_labels", &self.opts.common_opts.variable_labels)?;
|
||||
ser.serialize_field("buckets", &self.opts.buckets)?;
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for SerializableHistogramOpts {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
#[serde(field_identifier, rename_all = "lowercase")]
|
||||
enum Field {
|
||||
Val,
|
||||
Namespace,
|
||||
Subsystem,
|
||||
Name,
|
||||
Help,
|
||||
ConstLabels,
|
||||
VariableLabels,
|
||||
Buckets,
|
||||
}
|
||||
|
||||
struct SerializableHistogramOptsVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for SerializableHistogramOptsVisitor {
|
||||
type Value = SerializableHistogramOpts;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("struct SerializableHistogramOpts")
|
||||
}
|
||||
|
||||
fn visit_map<V>(self, mut map: V) -> Result<SerializableHistogramOpts, V::Error>
|
||||
where
|
||||
V: MapAccess<'de>,
|
||||
{
|
||||
let mut val = None;
|
||||
let mut namespace = None;
|
||||
let mut subsystem = None;
|
||||
let mut name = None;
|
||||
let mut help = None;
|
||||
let mut const_labels = None;
|
||||
let mut variable_labels = None;
|
||||
let mut buckets = None;
|
||||
|
||||
while let Some(key) = map.next_key()? {
|
||||
match key {
|
||||
Field::Val => {
|
||||
if val.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("ctr"));
|
||||
}
|
||||
val = Some(map.next_value()?);
|
||||
}
|
||||
Field::Namespace => {
|
||||
if namespace.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("namespace"));
|
||||
}
|
||||
namespace = Some(map.next_value()?);
|
||||
}
|
||||
Field::Subsystem => {
|
||||
if subsystem.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("subsystem"));
|
||||
}
|
||||
subsystem = Some(map.next_value()?);
|
||||
}
|
||||
Field::Name => {
|
||||
if name.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("name"));
|
||||
}
|
||||
name = Some(map.next_value()?);
|
||||
}
|
||||
Field::Help => {
|
||||
if help.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("help"));
|
||||
}
|
||||
help = Some(map.next_value()?);
|
||||
}
|
||||
Field::ConstLabels => {
|
||||
if const_labels.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("const_labels"));
|
||||
}
|
||||
const_labels = Some(map.next_value()?);
|
||||
}
|
||||
Field::VariableLabels => {
|
||||
if variable_labels.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("variable_labels"));
|
||||
}
|
||||
variable_labels = Some(map.next_value()?);
|
||||
}
|
||||
Field::Buckets => {
|
||||
if buckets.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("buckets"));
|
||||
}
|
||||
buckets = Some(map.next_value()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let val = val.ok_or_else(|| serde::de::Error::missing_field("val"))?;
|
||||
Ok(SerializableHistogramOpts {
|
||||
val,
|
||||
opts: HistogramOpts {
|
||||
buckets: buckets.unwrap_or_default(),
|
||||
common_opts: Opts {
|
||||
namespace: namespace.unwrap_or_default(),
|
||||
subsystem: subsystem.unwrap_or_default(),
|
||||
name: name.unwrap_or_default(),
|
||||
help: help.unwrap_or_default(),
|
||||
const_labels: const_labels.unwrap_or_default(),
|
||||
variable_labels: variable_labels.unwrap_or_default(),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const FIELDS: &[&str] = &[
|
||||
"ctr",
|
||||
"namespace",
|
||||
"subsystem",
|
||||
"name",
|
||||
"help",
|
||||
"const_labels",
|
||||
"variable_label",
|
||||
"buckets",
|
||||
];
|
||||
deserializer.deserialize_struct(
|
||||
"SerializableOpts",
|
||||
FIELDS,
|
||||
SerializableHistogramOptsVisitor,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Histogram(prometheus::Histogram);
|
||||
pub struct Histogram {
|
||||
val: prometheus::Histogram,
|
||||
opts: HistogramOpts,
|
||||
}
|
||||
|
||||
impl Histogram {
|
||||
pub fn with_opts(opts: HistogramOpts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::Histogram::with_opts(opts).map(Self)
|
||||
prometheus::Histogram::with_opts(opts.clone()).map(|val| Self { val, opts })
|
||||
}
|
||||
}
|
||||
|
||||
@ -356,18 +505,53 @@ impl Deref for Histogram {
|
||||
type Target = prometheus::Histogram;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
&self.val
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Histogram {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
&mut self.val
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Histogram {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut ser = serializer.serialize_tuple_struct("Histogram", 1)?;
|
||||
let sample = HistogramSample {
|
||||
count: self.val.get_sample_count(),
|
||||
sum: self.val.get_sample_sum(),
|
||||
};
|
||||
let opts = SerializableHistogramOpts {
|
||||
val: sample,
|
||||
opts: self.opts.clone(),
|
||||
};
|
||||
ser.serialize_field(&opts)?;
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Histogram {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::de::Deserializer<'de>,
|
||||
{
|
||||
// TODO: this implementation is not correct because we cannot set samples for histogram,
|
||||
// need to wait prometheus support serde.
|
||||
SerializableHistogramOpts::deserialize(deserializer).map(
|
||||
|SerializableHistogramOpts { val, opts }| {
|
||||
let x = prometheus::Histogram::with_opts(opts.clone()).unwrap();
|
||||
Self { val: x, opts }
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[derive(async_graphql::SimpleObject)]
|
||||
#[derive(async_graphql::SimpleObject, Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[graphql(name = "Histogram")]
|
||||
struct HistogramSample {
|
||||
count: u64,
|
||||
@ -391,8 +575,8 @@ impl async_graphql::OutputType for Histogram {
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
let sample = HistogramSample {
|
||||
count: self.0.get_sample_count(),
|
||||
sum: self.0.get_sample_sum(),
|
||||
count: self.val.get_sample_count(),
|
||||
sum: self.val.get_sample_sum(),
|
||||
};
|
||||
|
||||
<HistogramSample as async_graphql::OutputType>::resolve(&sample, ctx, field).await
|
||||
@ -400,122 +584,260 @@ impl async_graphql::OutputType for Histogram {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IntCounter(prometheus::IntCounter);
|
||||
struct SerializableOpts<T> {
|
||||
val: T,
|
||||
opts: Opts,
|
||||
}
|
||||
|
||||
impl IntCounter {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntCounter::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntCounter::with_opts(opts).map(Self)
|
||||
impl<T: Serialize + DeserializeOwned> Serialize for SerializableOpts<T> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut ser = serializer.serialize_struct("Opts", 7)?;
|
||||
ser.serialize_field("val", &self.val)?;
|
||||
ser.serialize_field("namespace", &self.opts.namespace)?;
|
||||
ser.serialize_field("subsystem", &self.opts.subsystem)?;
|
||||
ser.serialize_field("name", &self.opts.name)?;
|
||||
ser.serialize_field("help", &self.opts.help)?;
|
||||
ser.serialize_field("const_labels", &self.opts.const_labels)?;
|
||||
ser.serialize_field("variable_labels", &self.opts.variable_labels)?;
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for IntCounter {
|
||||
type Target = prometheus::IntCounter;
|
||||
impl<'de, T: Serialize + DeserializeOwned> Deserialize<'de> for SerializableOpts<T> {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
#[serde(field_identifier, rename_all = "lowercase")]
|
||||
enum Field {
|
||||
Val,
|
||||
Namespace,
|
||||
Subsystem,
|
||||
Name,
|
||||
Help,
|
||||
ConstLabels,
|
||||
VariableLabels,
|
||||
}
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
struct SerializableOptsVisitor<T: Serialize + DeserializeOwned>(
|
||||
std::marker::PhantomData<T>,
|
||||
);
|
||||
|
||||
impl DerefMut for IntCounter {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
impl<'de, T: Serialize + DeserializeOwned> Visitor<'de> for SerializableOptsVisitor<T> {
|
||||
type Value = SerializableOpts<T>;
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for IntCounter {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("IntCounter")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("struct SerializableOpts")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<u64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
fn visit_map<V>(self, mut map: V) -> Result<SerializableOpts<T>, V::Error>
|
||||
where
|
||||
V: MapAccess<'de>,
|
||||
{
|
||||
let mut val = None;
|
||||
let mut namespace = None;
|
||||
let mut subsystem = None;
|
||||
let mut name = None;
|
||||
let mut help = None;
|
||||
let mut const_labels = None;
|
||||
let mut variable_labels = None;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[repr(transparent)]
|
||||
pub struct IntGauge(prometheus::IntGauge);
|
||||
while let Some(key) = map.next_key()? {
|
||||
match key {
|
||||
Field::Val => {
|
||||
if val.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("ctr"));
|
||||
}
|
||||
val = Some(map.next_value()?);
|
||||
}
|
||||
Field::Namespace => {
|
||||
if namespace.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("namespace"));
|
||||
}
|
||||
namespace = Some(map.next_value()?);
|
||||
}
|
||||
Field::Subsystem => {
|
||||
if subsystem.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("subsystem"));
|
||||
}
|
||||
subsystem = Some(map.next_value()?);
|
||||
}
|
||||
Field::Name => {
|
||||
if name.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("name"));
|
||||
}
|
||||
name = Some(map.next_value()?);
|
||||
}
|
||||
Field::Help => {
|
||||
if help.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("help"));
|
||||
}
|
||||
help = Some(map.next_value()?);
|
||||
}
|
||||
Field::ConstLabels => {
|
||||
if const_labels.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("const_labels"));
|
||||
}
|
||||
const_labels = Some(map.next_value()?);
|
||||
}
|
||||
Field::VariableLabels => {
|
||||
if variable_labels.is_some() {
|
||||
return Err(serde::de::Error::duplicate_field("variable_labels"));
|
||||
}
|
||||
variable_labels = Some(map.next_value()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for IntGauge {
|
||||
type Target = prometheus::IntGauge;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for IntGauge {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl IntGauge {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntGauge::new(name, help).map(Self)
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::IntGauge::with_opts(opts).map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for IntGauge {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed("IntGauge")
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
let val = val.ok_or_else(|| serde::de::Error::missing_field("val"))?;
|
||||
Ok(SerializableOpts {
|
||||
val,
|
||||
opts: Opts {
|
||||
namespace: namespace.unwrap_or_default(),
|
||||
subsystem: subsystem.unwrap_or_default(),
|
||||
name: name.unwrap_or_default(),
|
||||
help: help.unwrap_or_default(),
|
||||
const_labels: const_labels.unwrap_or_default(),
|
||||
variable_labels: variable_labels.unwrap_or_default(),
|
||||
},
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<i64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
||||
const FIELDS: &[&str] = &[
|
||||
"ctr",
|
||||
"namespace",
|
||||
"subsystem",
|
||||
"name",
|
||||
"help",
|
||||
"const_labels",
|
||||
"variable_label",
|
||||
];
|
||||
deserializer.deserialize_struct(
|
||||
"SerializableOpts",
|
||||
FIELDS,
|
||||
SerializableOptsVisitor::<T>(std::marker::PhantomData),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! metric_typ {
|
||||
($($ty: ident::$setter:ident($primitive:ident)::$name: literal), +$(,)?) => {
|
||||
$(
|
||||
#[derive(Clone)]
|
||||
pub struct $ty {
|
||||
val: prometheus::$ty,
|
||||
opts: Opts,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for $ty {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
<prometheus::$ty as std::fmt::Debug>::fmt(&self.val, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for $ty {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut ser = serializer.serialize_tuple_struct($name, 1)?;
|
||||
let opts = SerializableOpts {
|
||||
val: self.val.get(),
|
||||
opts: self.opts.clone(),
|
||||
};
|
||||
ser.serialize_field(&opts)?;
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for $ty {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de> {
|
||||
<SerializableOpts<$primitive> as Deserialize<'de>>::deserialize(deserializer).map(|opt| {
|
||||
// unwrap safe here, because ctr is always valid
|
||||
let val = prometheus::$ty::with_opts(opt.opts.clone()).unwrap();
|
||||
val.$setter(opt.val);
|
||||
Self { val, opts: opt.opts }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl $ty {
|
||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||
name: S1,
|
||||
help: S2,
|
||||
) -> Result<Self, prometheus::Error> {
|
||||
let opts = Opts::new(name, help);
|
||||
prometheus::$ty::with_opts(opts.clone()).map(|val| Self {
|
||||
val,
|
||||
opts,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||
prometheus::$ty::with_opts(opts.clone()).map(|val| Self {
|
||||
val,
|
||||
opts,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for $ty {
|
||||
type Target = prometheus::$ty;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.val
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for $ty {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.val
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-graphql")]
|
||||
#[async_trait::async_trait]
|
||||
impl async_graphql::OutputType for $ty {
|
||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||
std::borrow::Cow::Borrowed($name)
|
||||
}
|
||||
|
||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||
async_graphql::registry::MetaType::Scalar {
|
||||
name: Self::type_name().to_string(),
|
||||
description: None,
|
||||
is_valid: None,
|
||||
visible: None,
|
||||
inaccessible: false,
|
||||
tags: Vec::new(),
|
||||
specified_by_url: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve(
|
||||
&self,
|
||||
ctx: &ContextSelectionSet<'_>,
|
||||
field: &Positioned<Field>,
|
||||
) -> ServerResult<Value> {
|
||||
<$primitive as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await
|
||||
}
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
metric_typ! {
|
||||
IntCounter::inc_by(u64)::"IntCounter",
|
||||
Counter::inc_by(f64)::"Counter",
|
||||
IntGauge::set(i64)::"IntGauge",
|
||||
Gauge::set(f64)::"Gauge",
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user