From 915ec00b341db14da57e40388104367a0011fd2b Mon Sep 17 00:00:00 2001 From: gusto Date: Thu, 19 Jan 2023 16:51:30 +0200 Subject: [PATCH] Http service api (#44) * Cargo http-service folder * WIP: http server * Revert comments in network service * Router service and axum implementation structure * Move bin contents to examples dir * Add http service and server traits * HttpMsg definition * WIP: axum backend * fix example in Cargo.toml * Shared axum router and router modification methods * Http example with axum and metrics service * make project compile and add Error associated type * Axum backend shared router fixes * Dummy service implementation for http example * remove unused clone on mutex * Cargo http-service folder * WIP: http server * Revert comments in network service * Router service and axum implementation structure * Move bin contents to examples dir * Add http service and server traits * HttpMsg definition * WIP: axum backend * fix example in Cargo.toml * Shared axum router and router modification methods * Http example with axum and metrics service * make project compile and add Error associated type * Axum backend shared router fixes * Dummy service implementation for http example * remove unused clone on mutex * Fix typos and remove unused code * Fix failing tests when feature flags are not set * Use bytes as a type for payload and response in http service * Refactored http crate layout into differential services files * First stab at router service * Fully piped http bridge system * Start building bridge helper function * Refactor bridge builder helper and update example * impl serialization for metrics data * Get updated copy of router when processing request * remove unused code * fix typo * [POC]: Http service: support add graphql handler (#47) * WIP: add graphql endpoint * support add graphql handler * remove generic * fix clippy warnings * Add post put and patch handlers that expect bytes as body * Graphql example file * WIP: Use http post method for graphql related queries * Parse graphql requests in handler * Simplify handlers for post and other data methods * Revert "Simplify handlers for post and other data methods" This reverts commit 96f2b1821e5cfe90be81baac8a6b4a2f9780d477. * add tracing and remove comments * Pass response bytes without any modifications * Use receive_batch_json for gql request parsing * Readme for running examples * fix conflicts * add a general helper function for graphql * remove unused function * cleanup code * move schema initialization to handle function * adapt metrics to http service * fix clippy warnings * remove unused fn * fix clippy * optimize example Co-authored-by: gusto Co-authored-by: Gusto Bacvinka * Fix cargo build without features * Simplify handlers for routes with data Co-authored-by: al8n Co-authored-by: Daniel Sanchez Quiros --- Cargo.toml | 3 +- nomos-services/http/Cargo.toml | 49 +++ nomos-services/http/examples/README.md | 33 ++ nomos-services/http/examples/axum.rs | 138 +++++++ nomos-services/http/examples/graphql.rs | 202 ++++++++++ nomos-services/http/src/backends/axum.rs | 161 ++++++++ nomos-services/http/src/backends/mod.rs | 24 ++ nomos-services/http/src/bridge.rs | 105 ++++++ nomos-services/http/src/http.rs | 209 ++++++++++ nomos-services/http/src/lib.rs | 3 + nomos-services/metrics/Cargo.toml | 11 +- nomos-services/metrics/examples/graphql.rs | 13 + .../metrics/src/frontend/graphql/mod.rs | 135 ++++--- nomos-services/metrics/src/types.rs | 357 +++++------------- 14 files changed, 1122 insertions(+), 321 deletions(-) create mode 100644 nomos-services/http/Cargo.toml create mode 100644 nomos-services/http/examples/README.md create mode 100644 nomos-services/http/examples/axum.rs create mode 100644 nomos-services/http/examples/graphql.rs create mode 100644 nomos-services/http/src/backends/axum.rs create mode 100644 nomos-services/http/src/backends/mod.rs create mode 100644 nomos-services/http/src/bridge.rs create mode 100644 nomos-services/http/src/http.rs create mode 100644 nomos-services/http/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 89b92406..1e9e6ae1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,5 +7,6 @@ members = [ "nomos-services/storage", "nomos-services/consensus", "nomos-services/mempool", + "nomos-services/http", "nomos-node" -] \ No newline at end of file +] diff --git a/nomos-services/http/Cargo.toml b/nomos-services/http/Cargo.toml new file mode 100644 index 00000000..14aea1d3 --- /dev/null +++ b/nomos-services/http/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "nomos-http" +version = "0.1.0" +edition = "2021" + +[[example]] +name = "axum" +path = "examples/axum.rs" +required-features = ["http"] + +[[example]] +name = "graphql" +path = "examples/graphql.rs" +required-features = ["http", "gql"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +axum = { version = "0.6", optional = true } +async-trait = "0.1" +# async-graphql does not follow semver, so we pin the version +async-graphql = { version = "=5.0.5", optional = true } +bytes = "1.3" +clap = { version = "4", features = ["derive", "env"], optional = true } +futures = "0.3" +hyper = { version = "0.14", optional = true } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +parking_lot = { version = "0.12", optional = true } +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", optional = true } +thiserror = "1" +tracing = "0.1" +tracing-appender = "0.2" +tracing-subscriber = { version = "0.3", features = ["json"] } +tracing-gelf = "0.7" +tower = { version = "0.4", optional = true } +tower-http = { version = "0.3", features = ["cors", "trace"] } +tokio = { version = "1", features = ["sync", "macros"] } +tower-service = "0.3.2" + +[dev-dependencies] +metrics = { path = "../metrics", features = ["gql"] } +once_cell = "1.17" + +[features] +default = [] +http = ["clap", "axum", "serde_json", "parking_lot", "hyper", "tower"] +gql = ["async-graphql", "serde_json"] diff --git a/nomos-services/http/examples/README.md b/nomos-services/http/examples/README.md new file mode 100644 index 00000000..982f0bb4 --- /dev/null +++ b/nomos-services/http/examples/README.md @@ -0,0 +1,33 @@ +# Http service examples + +## Axum.rs +A simple service to demonstrate how to register http handler for overwatch service. + +To run this example use: +```bash +cargo run --example axum --features http +``` + +A GET enpoint will be registered at `http://localhost:8080/dummy/`. An endpoint corresponds with the Service name. + +## Graphql.rs +A demonstration of usage from within an overwatch service over the http. + +To run this example use: +```bash +cargo run --example graphql --features http,gql +``` + +An enpoint will be registered at `http://localhost:8080/dummygraphqlservice/`. An endpoint corresponds with the Service name. + +To query this endpoint use: +```bash +curl --location --request POST 'localhost:8080/dummygraphqlservice/' \ +--data-raw '{"query":"query {val}","variables":{}}' + +``` + +Every response should increment the `val` variable. +```json +{"data":{"val":1}} +``` diff --git a/nomos-services/http/examples/axum.rs b/nomos-services/http/examples/axum.rs new file mode 100644 index 00000000..16767660 --- /dev/null +++ b/nomos-services/http/examples/axum.rs @@ -0,0 +1,138 @@ +use std::error::Error; +use std::sync::Arc; + +use clap::Parser; +use nomos_http::backends::HttpBackend; +use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService}; +use nomos_http::{ + backends::axum::{AxumBackend, AxumBackendSettings}, + http::*, +}; +use overwatch_rs::services::relay::RelayMessage; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; +use parking_lot::Mutex; +use tokio::sync::oneshot; + +pub struct DummyService { + counter: Arc>, + service_state: ServiceStateHandle, +} + +#[derive(Debug)] +pub struct DummyMsg { + reply_channel: oneshot::Sender, +} + +impl RelayMessage for DummyMsg {} + +impl ServiceData for DummyService { + const SERVICE_ID: ServiceId = "Dummy"; + type Settings = (); + type State = NoState<()>; + type StateOperator = NoOperator; + type Message = DummyMsg; +} + +#[async_trait::async_trait] +impl ServiceCore for DummyService { + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { + counter: Default::default(), + service_state, + }) + } + + async fn run(self) -> Result<(), overwatch_rs::DynError> { + let Self { + counter, + service_state: ServiceStateHandle { + mut inbound_relay, .. + }, + } = self; + + // Handle the http request to dummy service. + while let Some(msg) = inbound_relay.recv().await { + handle_hello(counter.clone(), msg.reply_channel).await; + } + + Ok(()) + } +} + +async fn handle_hello(counter: Arc>, reply_channel: oneshot::Sender) { + *counter.lock() += 1; + let count = *counter.lock(); + + if let Err(e) = reply_channel.send(count) { + tracing::error!("dummy service send error: {e}"); + } +} + +fn dummy_router(handle: overwatch_rs::overwatch::handle::OverwatchHandle) -> HttpBridgeRunner +where + B: HttpBackend + Send + Sync + 'static, + B::Error: Error + Send + Sync + 'static, +{ + Box::new(Box::pin(async move { + let (dummy, mut hello_res_rx) = + build_http_bridge::(handle, HttpMethod::GET, "") + .await + .unwrap(); + + while let Some(HttpRequest { res_tx, .. }) = hello_res_rx.recv().await { + let (sender, receiver) = oneshot::channel(); + dummy + .send(DummyMsg { + reply_channel: sender, + }) + .await + .unwrap(); + let value = receiver.await.unwrap(); + res_tx + .send(format!("Hello, world! {}", value).into()) + .await + .unwrap(); + } + Ok(()) + })) +} + +#[derive(overwatch_derive::Services)] +struct Services { + http: ServiceHandle>, + router: ServiceHandle, + dummy: ServiceHandle, +} + +#[derive(clap::Parser)] +pub struct Args { + #[clap(flatten)] + http: AxumBackendSettings, +} + +fn main() -> Result<(), Box> { + tracing_subscriber::fmt::fmt().with_file(false).init(); + + let settings = Args::parse(); + let app = OverwatchRunner::::run( + ServicesServiceSettings { + http: nomos_http::http::Config { + backend: settings.http, + }, + router: nomos_http::bridge::HttpBridgeSettings { + runners: vec![Arc::new(Box::new(dummy_router::))], + }, + dummy: (), + }, + None, + )?; + + tracing::info!("overwatch ready"); + app.wait_finished(); + Ok(()) +} diff --git a/nomos-services/http/examples/graphql.rs b/nomos-services/http/examples/graphql.rs new file mode 100644 index 00000000..28fb3a27 --- /dev/null +++ b/nomos-services/http/examples/graphql.rs @@ -0,0 +1,202 @@ +use std::error::Error; +use std::sync::Arc; + +use bytes::Bytes; +use clap::Parser; +use nomos_http::backends::HttpBackend; +use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService}; +use nomos_http::{ + backends::axum::{AxumBackend, AxumBackendSettings}, + http::*, +}; +use overwatch_rs::services::relay::{OutboundRelay, RelayMessage}; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; +use parking_lot::Mutex; +use tokio::sync::oneshot; + +async fn handle_count(counter: Arc>, reply_channel: oneshot::Sender) { + *counter.lock() += 1; + let count = *counter.lock(); + + if let Err(e) = reply_channel.send(count) { + tracing::error!("dummy service send error: {e}"); + } +} + +fn dummy_graphql_router( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner +where + B: HttpBackend + Send + Sync + 'static, + B::Error: Error + Send + Sync + 'static, +{ + static SCHEMA: once_cell::sync::Lazy< + async_graphql::Schema< + DummyGraphql, + async_graphql::EmptyMutation, + async_graphql::EmptySubscription, + >, + > = once_cell::sync::Lazy::new(|| { + async_graphql::Schema::build( + DummyGraphql::default(), + async_graphql::EmptyMutation, + async_graphql::EmptySubscription, + ) + .finish() + }); + + Box::new(Box::pin(async move { + // TODO: Graphql supports http GET requests, should nomos support that? + let (dummy, mut hello_res_rx) = + build_http_bridge::(handle, HttpMethod::POST, "") + .await + .unwrap(); + + while let Some(HttpRequest { + query: _, + payload, + res_tx, + }) = hello_res_rx.recv().await + { + let res = match handle_graphql_req(&SCHEMA, payload, dummy.clone()).await { + Ok(r) => r, + Err(err) => { + tracing::error!(err); + err.to_string() + } + }; + + res_tx.send(res.into()).await.unwrap(); + } + Ok(()) + })) +} + +async fn handle_graphql_req( + schema: &async_graphql::Schema< + DummyGraphql, + async_graphql::EmptyMutation, + async_graphql::EmptySubscription, + >, + payload: Option, + dummy: OutboundRelay, +) -> Result { + // TODO: Move to the graphql frontend as a helper function? + let payload = payload.ok_or("empty payload")?; + let req = async_graphql::http::receive_batch_json(&payload[..]) + .await? + .into_single()?; + + let (sender, receiver) = oneshot::channel(); + dummy + .send(DummyGraphqlMsg { + reply_channel: sender, + }) + .await + .unwrap(); + + // wait for the dummy service to respond + receiver.await.unwrap(); + let res = serde_json::to_string(&schema.execute(req).await)?; + Ok(res) +} + +#[derive(Debug, Clone, Default)] +pub struct DummyGraphql { + val: Arc>, +} + +#[async_graphql::Object] +impl DummyGraphql { + async fn val(&self) -> i32 { + let mut val = self.val.lock(); + *val += 1; + *val + } +} + +pub struct DummyGraphqlService { + val: Arc>, + service_state: ServiceStateHandle, +} + +#[derive(Debug)] +pub struct DummyGraphqlMsg { + reply_channel: oneshot::Sender, +} + +impl RelayMessage for DummyGraphqlMsg {} + +impl ServiceData for DummyGraphqlService { + const SERVICE_ID: ServiceId = "DummyGraphqlService"; + type Settings = (); + type State = NoState<()>; + type StateOperator = NoOperator; + type Message = DummyGraphqlMsg; +} + +#[async_trait::async_trait] +impl ServiceCore for DummyGraphqlService { + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { + service_state, + val: Arc::new(Mutex::new(0)), + }) + } + + async fn run(self) -> Result<(), overwatch_rs::DynError> { + let Self { + service_state: ServiceStateHandle { + mut inbound_relay, .. + }, + val, + } = self; + + // Handle the http request to dummy service. + while let Some(msg) = inbound_relay.recv().await { + handle_count(val.clone(), msg.reply_channel).await; + } + + Ok(()) + } +} + +#[derive(overwatch_derive::Services)] +struct Services { + http: ServiceHandle>, + router: ServiceHandle, + dummy_graphql: ServiceHandle, +} + +#[derive(clap::Parser)] +pub struct Args { + #[clap(flatten)] + http: AxumBackendSettings, +} + +fn main() -> Result<(), overwatch_rs::DynError> { + tracing_subscriber::fmt::fmt().with_file(false).init(); + + let settings = Args::parse(); + let app = OverwatchRunner::::run( + ServicesServiceSettings { + http: nomos_http::http::Config { + backend: settings.http, + }, + router: nomos_http::bridge::HttpBridgeSettings { + runners: vec![Arc::new(Box::new(dummy_graphql_router::))], + }, + dummy_graphql: (), + }, + None, + )?; + + tracing::info!("overwatch ready"); + app.wait_finished(); + Ok(()) +} diff --git a/nomos-services/http/src/backends/axum.rs b/nomos-services/http/src/backends/axum.rs new file mode 100644 index 00000000..9fe4aa7d --- /dev/null +++ b/nomos-services/http/src/backends/axum.rs @@ -0,0 +1,161 @@ +// std +use std::{collections::HashMap, sync::Arc}; + +// crates +use axum::{ + body::Bytes, + extract::Query, + http::HeaderValue, + routing::{get, patch, post, put}, + Router, +}; +use hyper::{ + header::{CONTENT_TYPE, USER_AGENT}, + Body, Request, +}; +use overwatch_rs::{services::state::NoState, DynError}; +use parking_lot::Mutex; +use tokio::sync::mpsc::Sender; +use tower::make::Shared; +use tower_http::{ + cors::{Any, CorsLayer}, + trace::TraceLayer, +}; +use tower_service::Service; + +// internal +use super::HttpBackend; +use crate::http::{HttpMethod, HttpRequest, Route}; + +/// Configuration for the Http Server +#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)] +pub struct AxumBackendSettings { + /// Socket where the server will be listening on for incoming requests. + #[arg( + short, long = "http-addr", + default_value_t = std::net::SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + 8080, + ), + env = "HTTP_BIND_ADDRESS" + )] + pub address: std::net::SocketAddr, + /// Allowed origins for this server deployment requests. + #[arg(long = "http-cors-origin")] + pub cors_origins: Vec, +} + +#[derive(Debug, thiserror::Error)] +pub enum AxumBackendError { + #[error("axum backend: send error: {0}")] + SendError(#[from] tokio::sync::mpsc::error::SendError), + + #[error("axum backend: {0}")] + Any(DynError), +} + +#[derive(Debug, Clone)] +pub struct AxumBackend { + config: AxumBackendSettings, + router: Arc>, +} + +#[async_trait::async_trait] +impl HttpBackend for AxumBackend { + type Config = AxumBackendSettings; + type State = NoState; + type Error = AxumBackendError; + + fn new(config: Self::Config) -> Result + where + Self: Sized, + { + let mut builder = CorsLayer::new(); + if config.cors_origins.is_empty() { + builder = builder.allow_origin(Any); + } + + for origin in &config.cors_origins { + builder = builder.allow_origin( + origin + .as_str() + .parse::() + .expect("fail to parse origin"), + ); + } + + let router = Arc::new(Mutex::new( + Router::new() + .layer( + builder + .allow_headers([CONTENT_TYPE, USER_AGENT]) + .allow_methods(Any), + ) + .layer(TraceLayer::new_for_http()), + )); + + Ok(Self { config, router }) + } + + fn add_route( + &self, + service_id: overwatch_rs::services::ServiceId, + route: Route, + req_stream: Sender, + ) { + let path = format!("/{}/{}", service_id.to_lowercase(), route.path); + tracing::info!("Axum backend: adding route {}", path); + self.add_data_route(route.method, &path, req_stream); + } + + async fn run(&self) -> Result<(), overwatch_rs::DynError> { + let router = self.router.clone(); + let service = tower::service_fn(move |request: Request| { + let mut router = router.lock().clone(); + async move { router.call(request).await } + }); + + axum::Server::bind(&self.config.address) + .serve(Shared::new(service)) + .await?; + Ok(()) + } +} + +impl AxumBackend { + fn add_data_route(&self, method: HttpMethod, path: &str, req_stream: Sender) { + let handle_data = |Query(query): Query>, payload: Option| async move { + handle_req(req_stream, query, payload).await + }; + + let handler = match method { + HttpMethod::GET => get(handle_data), + HttpMethod::POST => post(handle_data), + HttpMethod::PUT => put(handle_data), + HttpMethod::PATCH => patch(handle_data), + _ => unimplemented!(), + }; + + let mut router = self.router.lock(); + *router = router.clone().route(path, handler) + } +} + +async fn handle_req( + req_stream: Sender, + query: HashMap, + payload: Option, +) -> Result { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + match req_stream + .send(HttpRequest { + query, + payload, + res_tx: tx, + }) + .await + { + Ok(_) => rx.recv().await.ok_or_else(|| "".into()), + Err(e) => Err(AxumBackendError::SendError(e).to_string()), + } +} diff --git a/nomos-services/http/src/backends/mod.rs b/nomos-services/http/src/backends/mod.rs new file mode 100644 index 00000000..163aa965 --- /dev/null +++ b/nomos-services/http/src/backends/mod.rs @@ -0,0 +1,24 @@ +#[cfg(feature = "http")] +pub mod axum; + +use std::fmt::Debug; + +use overwatch_rs::services::{state::ServiceState, ServiceId}; +use tokio::sync::mpsc::Sender; + +use crate::http::{HttpRequest, Route}; + +#[async_trait::async_trait] +pub trait HttpBackend { + type Config: Clone + Debug + Send + Sync + 'static; + type State: ServiceState + Clone; + type Error: std::fmt::Display; + + fn new(config: Self::Config) -> Result + where + Self: Sized; + + fn add_route(&self, service_id: ServiceId, route: Route, req_stream: Sender); + + async fn run(&self) -> Result<(), overwatch_rs::DynError>; +} diff --git a/nomos-services/http/src/bridge.rs b/nomos-services/http/src/bridge.rs new file mode 100644 index 00000000..d987205a --- /dev/null +++ b/nomos-services/http/src/bridge.rs @@ -0,0 +1,105 @@ +use std::error::Error; +use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::sync::Arc; + +use async_trait::async_trait; +use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::relay::{NoMessage, OutboundRelay}; +use overwatch_rs::services::{ + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use overwatch_rs::DynError; +use tokio::sync::mpsc::{channel, Receiver}; + +use crate::backends::HttpBackend; +use crate::http::{HttpMethod, HttpMsg, HttpRequest, HttpService}; + +pub type HttpBridgeRunner = + Box> + Send + Unpin + 'static>; + +// TODO: If we can get rid of the clone bound on here remove Arc. +// For now as we bind this through the settings we need to keep it. +pub type HttpBridge = Arc< + Box< + dyn Fn(overwatch_rs::overwatch::handle::OverwatchHandle) -> HttpBridgeRunner + + Send + + Sync + + 'static, + >, +>; + +// TODO: Add error handling +pub async fn build_http_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, + method: HttpMethod, + path: P, +) -> Result<(OutboundRelay, Receiver), overwatch_rs::DynError> +where + S: ServiceCore + Send + Sync + 'static, + B: HttpBackend + Send + Sync + 'static, + B::Error: Error + Send + Sync + 'static, + P: Into + Send + Sync + 'static, +{ + let http_relay = handle.clone().relay::>().connect().await?; + + let service_relay = handle.clone().relay::().connect().await?; + + let (http_sender, http_receiver) = channel(1); + + // Register on http service to receive GET requests. + http_relay + .send(HttpMsg::add_http_handler( + method, + S::SERVICE_ID, + path, + http_sender, + )) + .await + .map_err(|(e, _)| e)?; + + Ok((service_relay, http_receiver)) +} + +pub struct HttpBridgeService { + pub(crate) runners: Vec, +} + +#[derive(Clone)] +pub struct HttpBridgeSettings { + pub runners: Vec, +} + +impl Debug for HttpBridgeSettings { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RouterSettings") + .field("runners len", &self.runners.len()) + .finish() + } +} + +impl ServiceData for HttpBridgeService { + const SERVICE_ID: ServiceId = "Router"; + type Settings = HttpBridgeSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = NoMessage; +} + +#[async_trait] +impl ServiceCore for HttpBridgeService { + fn init(service_state: ServiceStateHandle) -> Result { + let runners = service_state.settings_reader.get_updated_settings().runners; + let runners: Vec<_> = runners + .into_iter() + .map(|r| (r)(service_state.overwatch_handle.clone())) + .collect(); + Ok(Self { runners }) + } + + async fn run(self) -> Result<(), DynError> { + futures::future::join_all(self.runners).await; + Ok(()) + } +} diff --git a/nomos-services/http/src/http.rs b/nomos-services/http/src/http.rs new file mode 100644 index 00000000..9ed3aa8d --- /dev/null +++ b/nomos-services/http/src/http.rs @@ -0,0 +1,209 @@ +// std +use std::{ + collections::HashMap, + error::Error, + fmt::{self, Debug}, + sync::Arc, +}; + +use bytes::Bytes; +// crates +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::{InboundRelay, OutboundRelay, RelayMessage}, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc::Sender, oneshot}; + +// internal +use crate::backends::HttpBackend; + +#[derive(Serialize, Deserialize, Debug)] +pub struct Config { + pub backend: B::Config, +} + +pub struct HttpService { + backend: B, + inbound_relay: InboundRelay, +} + +impl ServiceData for HttpService { + const SERVICE_ID: ServiceId = "Http"; + type Settings = Config; + type State = NoState; + type StateOperator = NoOperator; + type Message = HttpMsg; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum HttpMethod { + GET, + POST, + PUT, + PATCH, + DELETE, +} + +#[derive(PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Route { + pub method: HttpMethod, + pub path: String, +} + +impl core::fmt::Debug for Route { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Route") + .field("method", &self.method) + .field("path", &self.path) + .finish() + } +} + +#[derive(Debug, Clone)] +pub struct HttpRequest { + pub query: HashMap, + pub payload: Option, + pub res_tx: Sender, +} + +// HttpMsg is a message that is sent via the relay to communicate with +// the HttpService. +pub enum HttpMsg { + AddHandler { + service_id: ServiceId, + route: Route, + req_stream: Sender, + }, +} + +impl HttpMsg { + pub fn add_http_handler>( + method: HttpMethod, + service_id: ServiceId, + path: P, + req_stream: Sender, + ) -> Self { + Self::AddHandler { + service_id, + route: Route { + method, + path: path.into(), + }, + req_stream, + } + } +} + +impl RelayMessage for HttpMsg {} + +impl Debug for HttpMsg { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::AddHandler { + service_id, + route, + req_stream: _, + } => write!( + fmt, + "HttpMsg::AddHandler {{ sender: {:?}, route: {:?} }}", + service_id, route + ), + } + } +} + +#[async_trait::async_trait] +impl ServiceCore for HttpService +where + B: HttpBackend + Send + Sync + 'static, + ::Error: Error + Send + Sync + 'static, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let inbound_relay = service_state.inbound_relay; + ::new(service_state.settings_reader.get_updated_settings().backend) + .map(|backend| Self { + backend, + inbound_relay, + }) + .map_err(|e| Box::new(e) as Box) + } + + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let Self { + backend, + mut inbound_relay, + } = self; + + let backend = Arc::new(backend); + let (stop_tx, mut stop_rx) = oneshot::channel(); + tokio::spawn({ + let backend = backend.clone(); + async move { + loop { + tokio::select! { + Some(msg) = inbound_relay.recv() => { + match msg { + HttpMsg::AddHandler { + service_id, + route, + req_stream, + } => { + backend.add_route(service_id, route, req_stream); + }, + } + } + _server_exit = &mut stop_rx => { + break; + } + } + } + } + }); + backend.run().await.map_err(|e| { + if stop_tx.send(()).is_err() { + tracing::error!("HTTP service: failed to send stop signal to HTTP backend."); + } + e + }) + } +} + +impl Clone for Config { + fn clone(&self) -> Self { + Self { + backend: self.backend.clone(), + } + } +} + +// TODO: optimize error construct? +#[cfg(feature = "gql")] +pub async fn handle_graphql_req( + payload: Option, + relay: OutboundRelay, + f: F, +) -> Result +where + F: FnOnce( + async_graphql::Request, + oneshot::Sender, + ) -> Result, +{ + let payload = payload.ok_or("empty payload")?; + let req = async_graphql::http::receive_batch_json(&payload[..]) + .await? + .into_single()?; + + let (sender, receiver) = oneshot::channel(); + relay.send(f(req, sender)?).await.map_err(|_| { + tracing::error!(err = "failed to send graphql request to the http service"); + "failed to send graphql request to the frontend" + })?; + + let res = receiver.await.unwrap(); + let res = serde_json::to_string(&res)?; + Ok(res) +} diff --git a/nomos-services/http/src/lib.rs b/nomos-services/http/src/lib.rs new file mode 100644 index 00000000..1b9eccbe --- /dev/null +++ b/nomos-services/http/src/lib.rs @@ -0,0 +1,3 @@ +pub mod backends; +pub mod bridge; +pub mod http; diff --git a/nomos-services/metrics/Cargo.toml b/nomos-services/metrics/Cargo.toml index 463e12c8..47e9a149 100644 --- a/nomos-services/metrics/Cargo.toml +++ b/nomos-services/metrics/Cargo.toml @@ -6,29 +6,32 @@ edition = "2021" [[example]] name = "graphql" path = "examples/graphql.rs" -features = ["gql"] +required-features = ["gql"] [dependencies] axum = { version = "0.6", optional = true } async-graphql = { version = "5", optional = true, features = ["tracing"] } async-graphql-axum = { version = "5", optional = true } async-trait = "0.1" +bytes = "1.3" clap = { version = "4", features = ["derive", "env"], optional = true } +nomos-http = { path = "../http", optional = true } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } once_cell = "1.16" parking_lot = "0.12" -prometheus = "0.13" +prometheus = "0.13" serde = { version = "1", features = ["derive"] } +serde_json = "1" tokio = { version = "1", features = ["sync", "macros", "time"] } tracing = "0.1" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } tracing-gelf = "0.7" tower-http = { version = "0.3", features = ["cors", "trace"], optional = true } +thiserror = "1" futures = "0.3" - [features] default = [] -gql = ["clap", "axum", "async-graphql", "async-graphql-axum", "tower-http"] \ No newline at end of file +gql = ["clap", "axum", "async-graphql", "async-graphql-axum", "tower-http", "nomos-http/gql"] diff --git a/nomos-services/metrics/examples/graphql.rs b/nomos-services/metrics/examples/graphql.rs index beed6156..c5fe0c06 100644 --- a/nomos-services/metrics/examples/graphql.rs +++ b/nomos-services/metrics/examples/graphql.rs @@ -114,6 +114,19 @@ pub struct MetricsData { duration: u64, } +#[derive(Debug, Clone)] +pub enum ParseMetricsDataError { + TryFromSliceError(core::array::TryFromSliceError), +} + +impl std::fmt::Display for ParseMetricsDataError { + fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +impl std::error::Error for ParseMetricsDataError {} + fn main() -> Result<(), Box> { let settings = Args::parse(); let graphql = OverwatchRunner::::run( diff --git a/nomos-services/metrics/src/frontend/graphql/mod.rs b/nomos-services/metrics/src/frontend/graphql/mod.rs index ac992664..8b7461ed 100644 --- a/nomos-services/metrics/src/frontend/graphql/mod.rs +++ b/nomos-services/metrics/src/frontend/graphql/mod.rs @@ -1,28 +1,14 @@ // std // crates -use async_graphql::{EmptyMutation, EmptySubscription, Schema}; -use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; -use axum::{ - extract::State, - http::{ - header::{CONTENT_TYPE, USER_AGENT}, - HeaderValue, - }, - routing::post, - Router, Server, -}; -use tower_http::{ - cors::{Any, CorsLayer}, - trace::TraceLayer, -}; - +use nomos_http::backends::HttpBackend; +use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; +use nomos_http::http::{handle_graphql_req, HttpMethod, HttpRequest}; // internal use crate::{MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId}; -use overwatch_rs::services::relay::Relay; +use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, - relay::NoMessage, state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; @@ -31,9 +17,6 @@ use overwatch_rs::services::{ #[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)] #[cfg(feature = "gql")] pub struct GraphqlServerSettings { - /// Socket where the GraphQL will be listening on for incoming requests. - #[arg(short, long = "graphql-addr", default_value_t = std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), 8080), env = "METRICS_GRAPHQL_BIND_ADDRESS")] - pub address: std::net::SocketAddr, /// Max query depth allowed #[arg( long = "graphql-max-depth", @@ -53,25 +36,72 @@ pub struct GraphqlServerSettings { pub cors_origins: Vec, } -async fn graphql_handler( - schema: State, EmptyMutation, EmptySubscription>>, - req: GraphQLRequest, -) -> GraphQLResponse +pub fn metrics_graphql_router( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner where - Backend::MetricsData: async_graphql::OutputType, + MB: MetricsBackend + Clone + Send + 'static + Sync, + MB::MetricsData: async_graphql::OutputType, + B: HttpBackend + Send + Sync + 'static, + B::Error: std::error::Error + Send + Sync + 'static, { - let request = req.into_inner(); - let resp = schema.execute(request).await; - GraphQLResponse::from(resp) + Box::new(Box::pin(async move { + // TODO: Graphql supports http GET requests, should nomos support that? + let (relay, mut res_rx) = + build_http_bridge::, B, _>(handle, HttpMethod::POST, "") + .await + .unwrap(); + + while let Some(HttpRequest { + query: _, + payload, + res_tx, + }) = res_rx.recv().await + { + let res = match handle_graphql_req(payload, relay.clone(), |req, tx| { + Ok(GraphqlMetricsMessage { + req, + reply_channel: tx, + }) + }) + .await + { + Ok(r) => r, + Err(err) => { + tracing::error!(err); + err.to_string() + } + }; + + res_tx.send(res.into()).await.unwrap(); + } + Ok(()) + })) } -#[derive(Clone)] -pub struct Graphql { +#[derive(Debug)] +pub struct GraphqlMetricsMessage { + req: async_graphql::Request, + reply_channel: tokio::sync::oneshot::Sender, +} + +impl RelayMessage for GraphqlMetricsMessage {} + +pub struct Graphql +where + Backend: MetricsBackend + Send + Sync + 'static, + Backend::MetricsData: async_graphql::OutputType, +{ + service_state: Option>, settings: GraphqlServerSettings, backend_channel: Relay>, } -impl ServiceData for Graphql { +impl ServiceData for Graphql +where + Backend: MetricsBackend + Send + Sync + 'static, + Backend::MetricsData: async_graphql::OutputType, +{ const SERVICE_ID: ServiceId = "GraphqlMetrics"; type Settings = GraphqlServerSettings; @@ -80,7 +110,7 @@ impl ServiceData for Graphql; - type Message = NoMessage; + type Message = GraphqlMetricsMessage; } #[async_graphql::Object] @@ -114,40 +144,25 @@ where } #[async_trait::async_trait] -impl ServiceCore for Graphql +impl ServiceCore for Graphql where Backend::MetricsData: async_graphql::OutputType, { fn init(service_state: ServiceStateHandle) -> Result { - let settings = service_state.settings_reader.get_updated_settings(); let backend_channel: Relay> = service_state.overwatch_handle.relay(); + let settings = service_state.settings_reader.get_updated_settings(); Ok(Self { settings, + service_state: Some(service_state), backend_channel, }) } - async fn run(self) -> Result<(), overwatch_rs::DynError> { - let mut builder = CorsLayer::new(); - if self.settings.cors_origins.is_empty() { - builder = builder.allow_origin(Any); - } - for origin in &self.settings.cors_origins { - builder = builder.allow_origin( - origin - .as_str() - .parse::() - .expect("fail to parse origin"), - ); - } - let cors = builder - .allow_headers([CONTENT_TYPE, USER_AGENT]) - .allow_methods(Any); - - let addr = self.settings.address; + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let max_complexity = self.settings.max_complexity; let max_depth = self.settings.max_depth; + let mut inbound_relay = self.service_state.take().unwrap().inbound_relay; let schema = async_graphql::Schema::build( self, async_graphql::EmptyMutation, @@ -159,16 +174,12 @@ where .finish(); tracing::info!(schema = %schema.sdl(), "GraphQL schema definition"); - let router = Router::new() - .route("/", post(graphql_handler::)) - .with_state(schema) - .layer(cors) - .layer(TraceLayer::new_for_http()); - tracing::info!("Metrics Service GraphQL server listening: {}", addr); - Server::bind(&addr) - .serve(router.into_make_service()) - .await?; + while let Some(msg) = inbound_relay.recv().await { + let res = schema.execute(msg.req).await; + msg.reply_channel.send(res).unwrap(); + } + Ok(()) } } diff --git a/nomos-services/metrics/src/types.rs b/nomos-services/metrics/src/types.rs index 11293b96..bc3d0ab0 100644 --- a/nomos-services/metrics/src/types.rs +++ b/nomos-services/metrics/src/types.rs @@ -3,7 +3,7 @@ use ::core::ops::{Deref, DerefMut}; use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value}; use prometheus::HistogramOpts; pub use prometheus::{ - core::{self, Atomic}, + core::{self, Atomic, GenericCounter as PrometheusGenericCounter}, labels, opts, Opts, }; @@ -95,19 +95,21 @@ impl async_graphql::OutputType for MetricDataType { } #[derive(Debug, Clone)] -pub struct GenericGauge(core::GenericGauge); +pub struct GenericGauge { + val: core::GenericGauge, +} impl Deref for GenericGauge { type Target = core::GenericGauge; fn deref(&self) -> &Self::Target { - &self.0 + &self.val } } impl DerefMut for GenericGauge { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.val } } @@ -116,12 +118,11 @@ impl GenericGauge { name: S1, help: S2, ) -> Result { - let opts = Opts::new(name, help); - core::GenericGauge::::with_opts(opts).map(Self) + core::GenericGauge::::new(name, help).map(|v| Self { val: v }) } pub fn with_opts(opts: Opts) -> Result { - core::GenericGauge::::with_opts(opts).map(Self) + core::GenericGauge::::with_opts(opts).map(|v| Self { val: v }) } } @@ -155,84 +156,26 @@ where ctx: &ContextSelectionSet<'_>, field: &Positioned, ) -> ServerResult { - <::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await + <::T as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await } } #[derive(Debug, Clone)] -pub struct Gauge(prometheus::Gauge); - -impl Deref for Gauge { - type Target = prometheus::Gauge; - - fn deref(&self) -> &Self::Target { - &self.0 - } +pub struct GenericCounter { + ctr: core::GenericCounter, } -impl DerefMut for Gauge { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Gauge { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - prometheus::Gauge::new(name, help).map(Self) - } - - pub fn with_opts(opts: Opts) -> Result { - prometheus::Gauge::with_opts(opts).map(Self) - } -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for Gauge { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("Gauge") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, - } - }) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - ::resolve(&self.0.get(), ctx, field).await - } -} - -#[derive(Debug, Clone)] -pub struct GenericCounter(core::GenericCounter); - impl Deref for GenericCounter { type Target = core::GenericCounter; fn deref(&self) -> &Self::Target { - &self.0 + &self.ctr } } impl DerefMut for GenericCounter { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.ctr } } @@ -241,11 +184,11 @@ impl GenericCounter { name: S1, help: S2, ) -> Result { - core::GenericCounter::::new(name, help).map(Self) + core::GenericCounter::::new(name, help).map(|ctr| Self { ctr }) } pub fn with_opts(opts: Opts) -> Result { - core::GenericCounter::::with_opts(opts).map(Self) + core::GenericCounter::::with_opts(opts).map(|ctr| Self { ctr }) } } @@ -279,76 +222,18 @@ where ctx: &ContextSelectionSet<'_>, field: &Positioned, ) -> ServerResult { - <::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await + <::T as async_graphql::OutputType>::resolve(&self.ctr.get(), ctx, field).await } } #[derive(Debug, Clone)] -pub struct Counter(prometheus::Counter); - -impl Deref for Counter { - type Target = prometheus::Counter; - - fn deref(&self) -> &Self::Target { - &self.0 - } +pub struct Histogram { + val: prometheus::Histogram, } -impl DerefMut for Counter { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl Counter { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - prometheus::Counter::new(name, help).map(Self) - } - - pub fn with_opts(opts: Opts) -> Result { - prometheus::Counter::with_opts(opts).map(Self) - } -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for Counter { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("Counter") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, - } - }) - } - - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - ::resolve(&self.0.get(), ctx, field).await - } -} - -#[derive(Debug, Clone)] -pub struct Histogram(prometheus::Histogram); - impl Histogram { pub fn with_opts(opts: HistogramOpts) -> Result { - prometheus::Histogram::with_opts(opts).map(Self) + prometheus::Histogram::with_opts(opts).map(|val| Self { val }) } } @@ -356,18 +241,18 @@ impl Deref for Histogram { type Target = prometheus::Histogram; fn deref(&self) -> &Self::Target { - &self.0 + &self.val } } impl DerefMut for Histogram { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.val } } #[cfg(feature = "async-graphql")] -#[derive(async_graphql::SimpleObject)] +#[derive(async_graphql::SimpleObject, Debug, Clone, Copy)] #[graphql(name = "Histogram")] struct HistogramSample { count: u64, @@ -391,131 +276,95 @@ impl async_graphql::OutputType for Histogram { field: &Positioned, ) -> ServerResult { let sample = HistogramSample { - count: self.0.get_sample_count(), - sum: self.0.get_sample_sum(), + count: self.val.get_sample_count(), + sum: self.val.get_sample_sum(), }; ::resolve(&sample, ctx, field).await } } -#[derive(Debug, Clone)] -pub struct IntCounter(prometheus::IntCounter); - -impl IntCounter { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - prometheus::IntCounter::new(name, help).map(Self) - } - - pub fn with_opts(opts: Opts) -> Result { - prometheus::IntCounter::with_opts(opts).map(Self) - } -} - -impl Deref for IntCounter { - type Target = prometheus::IntCounter; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for IntCounter { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for IntCounter { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("IntCounter") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, +macro_rules! metric_typ { + ($($ty: ident::$setter:ident($primitive:ident)::$name: literal), +$(,)?) => { + $( + #[derive(Clone)] + pub struct $ty { + val: prometheus::$ty, } - }) - } - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - ::resolve(&self.0.get(), ctx, field).await - } -} - -#[derive(Debug, Clone)] -#[repr(transparent)] -pub struct IntGauge(prometheus::IntGauge); - -impl Deref for IntGauge { - type Target = prometheus::IntGauge; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for IntGauge { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl IntGauge { - pub fn new, S2: Into>( - name: S1, - help: S2, - ) -> Result { - prometheus::IntGauge::new(name, help).map(Self) - } - - pub fn with_opts(opts: Opts) -> Result { - prometheus::IntGauge::with_opts(opts).map(Self) - } -} - -#[cfg(feature = "async-graphql")] -#[async_trait::async_trait] -impl async_graphql::OutputType for IntGauge { - fn type_name() -> std::borrow::Cow<'static, str> { - std::borrow::Cow::Borrowed("IntGauge") - } - - fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { - registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { - async_graphql::registry::MetaType::Scalar { - name: Self::type_name().to_string(), - description: None, - is_valid: None, - visible: None, - inaccessible: false, - tags: Vec::new(), - specified_by_url: None, + impl std::fmt::Debug for $ty { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(&self.val, f) + } } - }) - } - async fn resolve( - &self, - ctx: &ContextSelectionSet<'_>, - field: &Positioned, - ) -> ServerResult { - ::resolve(&self.0.get(), ctx, field).await - } + impl $ty { + pub fn new, S2: Into>( + name: S1, + help: S2, + ) -> Result { + prometheus::$ty::new(name, help).map(|val| Self { + val, + }) + } + + pub fn with_opts(opts: Opts) -> Result { + prometheus::$ty::with_opts(opts.clone()).map(|val| Self { + val, + }) + } + } + + impl Deref for $ty { + type Target = prometheus::$ty; + + fn deref(&self) -> &Self::Target { + &self.val + } + } + + impl DerefMut for $ty { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.val + } + } + + #[cfg(feature = "async-graphql")] + #[async_trait::async_trait] + impl async_graphql::OutputType for $ty { + fn type_name() -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed($name) + } + + fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String { + registry.create_output_type::(async_graphql::registry::MetaTypeId::Scalar, |_| { + async_graphql::registry::MetaType::Scalar { + name: Self::type_name().to_string(), + description: None, + is_valid: None, + visible: None, + inaccessible: false, + tags: Vec::new(), + specified_by_url: None, + } + }) + } + + async fn resolve( + &self, + ctx: &ContextSelectionSet<'_>, + field: &Positioned, + ) -> ServerResult { + <$primitive as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await + } + } + )* + }; +} + +metric_typ! { + IntCounter::inc_by(u64)::"IntCounter", + Counter::inc_by(f64)::"Counter", + IntGauge::set(i64)::"IntGauge", + Gauge::set(f64)::"Gauge", }