Http service api (#44)
* Cargo http-service folder
* WIP: http server
* Revert comments in network service
* Router service and axum implementation structure
* Move bin contents to examples dir
* Add http service and server traits
* HttpMsg definition
* WIP: axum backend
* fix example in Cargo.toml
* Shared axum router and router modification methods
* Http example with axum and metrics service
* make project compile and add Error associated type
* Axum backend shared router fixes
* Dummy service implementation for http example
* remove unused clone on mutex
* Cargo http-service folder
* WIP: http server
* Revert comments in network service
* Router service and axum implementation structure
* Move bin contents to examples dir
* Add http service and server traits
* HttpMsg definition
* WIP: axum backend
* fix example in Cargo.toml
* Shared axum router and router modification methods
* Http example with axum and metrics service
* make project compile and add Error associated type
* Axum backend shared router fixes
* Dummy service implementation for http example
* remove unused clone on mutex
* Fix typos and remove unused code
* Fix failing tests when feature flags are not set
* Use bytes as a type for payload and response in http service
* Refactored http crate layout into differential services files
* First stab at router service
* Fully piped http bridge system
* Start building bridge helper function
* Refactor bridge builder helper and update example
* impl serialization for metrics data
* Get updated copy of router when processing request
* remove unused code
* fix typo
* [POC]: Http service: support add graphql handler (#47)
* WIP: add graphql endpoint
* support add graphql handler
* remove generic
* fix clippy warnings
* Add post put and patch handlers that expect bytes as body
* Graphql example file
* WIP: Use http post method for graphql related queries
* Parse graphql requests in handler
* Simplify handlers for post and other data methods
* Revert "Simplify handlers for post and other data methods"
This reverts commit 96f2b1821e
.
* add tracing and remove comments
* Pass response bytes without any modifications
* Use receive_batch_json for gql request parsing
* Readme for running examples
* fix conflicts
* add a general helper function for graphql
* remove unused function
* cleanup code
* move schema initialization to handle function
* adapt metrics to http service
* fix clippy warnings
* remove unused fn
* fix clippy
* optimize example
Co-authored-by: gusto <bacvinka@gmail.com>
Co-authored-by: Gusto Bacvinka <augustinas.bacvinka@gmail.com>
* Fix cargo build without features
* Simplify handlers for routes with data
Co-authored-by: al8n <scygliu1@gmail.com>
Co-authored-by: Daniel Sanchez Quiros <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
parent
613d9ac1b2
commit
915ec00b34
|
@ -7,5 +7,6 @@ members = [
|
||||||
"nomos-services/storage",
|
"nomos-services/storage",
|
||||||
"nomos-services/consensus",
|
"nomos-services/consensus",
|
||||||
"nomos-services/mempool",
|
"nomos-services/mempool",
|
||||||
|
"nomos-services/http",
|
||||||
"nomos-node"
|
"nomos-node"
|
||||||
]
|
]
|
|
@ -0,0 +1,49 @@
|
||||||
|
[package]
|
||||||
|
name = "nomos-http"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "axum"
|
||||||
|
path = "examples/axum.rs"
|
||||||
|
required-features = ["http"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "graphql"
|
||||||
|
path = "examples/graphql.rs"
|
||||||
|
required-features = ["http", "gql"]
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
axum = { version = "0.6", optional = true }
|
||||||
|
async-trait = "0.1"
|
||||||
|
# async-graphql does not follow semver, so we pin the version
|
||||||
|
async-graphql = { version = "=5.0.5", optional = true }
|
||||||
|
bytes = "1.3"
|
||||||
|
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||||
|
futures = "0.3"
|
||||||
|
hyper = { version = "0.14", optional = true }
|
||||||
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
|
parking_lot = { version = "0.12", optional = true }
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = { version = "1.0", optional = true }
|
||||||
|
thiserror = "1"
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-appender = "0.2"
|
||||||
|
tracing-subscriber = { version = "0.3", features = ["json"] }
|
||||||
|
tracing-gelf = "0.7"
|
||||||
|
tower = { version = "0.4", optional = true }
|
||||||
|
tower-http = { version = "0.3", features = ["cors", "trace"] }
|
||||||
|
tokio = { version = "1", features = ["sync", "macros"] }
|
||||||
|
tower-service = "0.3.2"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
metrics = { path = "../metrics", features = ["gql"] }
|
||||||
|
once_cell = "1.17"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
http = ["clap", "axum", "serde_json", "parking_lot", "hyper", "tower"]
|
||||||
|
gql = ["async-graphql", "serde_json"]
|
|
@ -0,0 +1,33 @@
|
||||||
|
# Http service examples
|
||||||
|
|
||||||
|
## Axum.rs
|
||||||
|
A simple service to demonstrate how to register http handler for overwatch service.
|
||||||
|
|
||||||
|
To run this example use:
|
||||||
|
```bash
|
||||||
|
cargo run --example axum --features http
|
||||||
|
```
|
||||||
|
|
||||||
|
A GET enpoint will be registered at `http://localhost:8080/dummy/`. An endpoint corresponds with the Service name.
|
||||||
|
|
||||||
|
## Graphql.rs
|
||||||
|
A demonstration of usage from within an overwatch service over the http.
|
||||||
|
|
||||||
|
To run this example use:
|
||||||
|
```bash
|
||||||
|
cargo run --example graphql --features http,gql
|
||||||
|
```
|
||||||
|
|
||||||
|
An enpoint will be registered at `http://localhost:8080/dummygraphqlservice/`. An endpoint corresponds with the Service name.
|
||||||
|
|
||||||
|
To query this endpoint use:
|
||||||
|
```bash
|
||||||
|
curl --location --request POST 'localhost:8080/dummygraphqlservice/' \
|
||||||
|
--data-raw '{"query":"query {val}","variables":{}}'
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
Every response should increment the `val` variable.
|
||||||
|
```json
|
||||||
|
{"data":{"val":1}}
|
||||||
|
```
|
|
@ -0,0 +1,138 @@
|
||||||
|
use std::error::Error;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use nomos_http::backends::HttpBackend;
|
||||||
|
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService};
|
||||||
|
use nomos_http::{
|
||||||
|
backends::axum::{AxumBackend, AxumBackendSettings},
|
||||||
|
http::*,
|
||||||
|
};
|
||||||
|
use overwatch_rs::services::relay::RelayMessage;
|
||||||
|
use overwatch_rs::services::{
|
||||||
|
handle::ServiceStateHandle,
|
||||||
|
state::{NoOperator, NoState},
|
||||||
|
ServiceCore, ServiceData, ServiceId,
|
||||||
|
};
|
||||||
|
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
pub struct DummyService {
|
||||||
|
counter: Arc<Mutex<i32>>,
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DummyMsg {
|
||||||
|
reply_channel: oneshot::Sender<i32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayMessage for DummyMsg {}
|
||||||
|
|
||||||
|
impl ServiceData for DummyService {
|
||||||
|
const SERVICE_ID: ServiceId = "Dummy";
|
||||||
|
type Settings = ();
|
||||||
|
type State = NoState<()>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = DummyMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ServiceCore for DummyService {
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
|
Ok(Self {
|
||||||
|
counter: Default::default(),
|
||||||
|
service_state,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||||
|
let Self {
|
||||||
|
counter,
|
||||||
|
service_state: ServiceStateHandle {
|
||||||
|
mut inbound_relay, ..
|
||||||
|
},
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
// Handle the http request to dummy service.
|
||||||
|
while let Some(msg) = inbound_relay.recv().await {
|
||||||
|
handle_hello(counter.clone(), msg.reply_channel).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_hello(counter: Arc<Mutex<i32>>, reply_channel: oneshot::Sender<i32>) {
|
||||||
|
*counter.lock() += 1;
|
||||||
|
let count = *counter.lock();
|
||||||
|
|
||||||
|
if let Err(e) = reply_channel.send(count) {
|
||||||
|
tracing::error!("dummy service send error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dummy_router<B>(handle: overwatch_rs::overwatch::handle::OverwatchHandle) -> HttpBridgeRunner
|
||||||
|
where
|
||||||
|
B: HttpBackend + Send + Sync + 'static,
|
||||||
|
B::Error: Error + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
Box::new(Box::pin(async move {
|
||||||
|
let (dummy, mut hello_res_rx) =
|
||||||
|
build_http_bridge::<DummyService, B, _>(handle, HttpMethod::GET, "")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
while let Some(HttpRequest { res_tx, .. }) = hello_res_rx.recv().await {
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
dummy
|
||||||
|
.send(DummyMsg {
|
||||||
|
reply_channel: sender,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let value = receiver.await.unwrap();
|
||||||
|
res_tx
|
||||||
|
.send(format!("Hello, world! {}", value).into())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(overwatch_derive::Services)]
|
||||||
|
struct Services {
|
||||||
|
http: ServiceHandle<HttpService<AxumBackend>>,
|
||||||
|
router: ServiceHandle<HttpBridgeService>,
|
||||||
|
dummy: ServiceHandle<DummyService>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(clap::Parser)]
|
||||||
|
pub struct Args {
|
||||||
|
#[clap(flatten)]
|
||||||
|
http: AxumBackendSettings,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
tracing_subscriber::fmt::fmt().with_file(false).init();
|
||||||
|
|
||||||
|
let settings = Args::parse();
|
||||||
|
let app = OverwatchRunner::<Services>::run(
|
||||||
|
ServicesServiceSettings {
|
||||||
|
http: nomos_http::http::Config {
|
||||||
|
backend: settings.http,
|
||||||
|
},
|
||||||
|
router: nomos_http::bridge::HttpBridgeSettings {
|
||||||
|
runners: vec![Arc::new(Box::new(dummy_router::<AxumBackend>))],
|
||||||
|
},
|
||||||
|
dummy: (),
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
tracing::info!("overwatch ready");
|
||||||
|
app.wait_finished();
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,202 @@
|
||||||
|
use std::error::Error;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use clap::Parser;
|
||||||
|
use nomos_http::backends::HttpBackend;
|
||||||
|
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService};
|
||||||
|
use nomos_http::{
|
||||||
|
backends::axum::{AxumBackend, AxumBackendSettings},
|
||||||
|
http::*,
|
||||||
|
};
|
||||||
|
use overwatch_rs::services::relay::{OutboundRelay, RelayMessage};
|
||||||
|
use overwatch_rs::services::{
|
||||||
|
handle::ServiceStateHandle,
|
||||||
|
state::{NoOperator, NoState},
|
||||||
|
ServiceCore, ServiceData, ServiceId,
|
||||||
|
};
|
||||||
|
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
async fn handle_count(counter: Arc<Mutex<i32>>, reply_channel: oneshot::Sender<i32>) {
|
||||||
|
*counter.lock() += 1;
|
||||||
|
let count = *counter.lock();
|
||||||
|
|
||||||
|
if let Err(e) = reply_channel.send(count) {
|
||||||
|
tracing::error!("dummy service send error: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn dummy_graphql_router<B>(
|
||||||
|
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||||
|
) -> HttpBridgeRunner
|
||||||
|
where
|
||||||
|
B: HttpBackend + Send + Sync + 'static,
|
||||||
|
B::Error: Error + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
static SCHEMA: once_cell::sync::Lazy<
|
||||||
|
async_graphql::Schema<
|
||||||
|
DummyGraphql,
|
||||||
|
async_graphql::EmptyMutation,
|
||||||
|
async_graphql::EmptySubscription,
|
||||||
|
>,
|
||||||
|
> = once_cell::sync::Lazy::new(|| {
|
||||||
|
async_graphql::Schema::build(
|
||||||
|
DummyGraphql::default(),
|
||||||
|
async_graphql::EmptyMutation,
|
||||||
|
async_graphql::EmptySubscription,
|
||||||
|
)
|
||||||
|
.finish()
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(Box::pin(async move {
|
||||||
|
// TODO: Graphql supports http GET requests, should nomos support that?
|
||||||
|
let (dummy, mut hello_res_rx) =
|
||||||
|
build_http_bridge::<DummyGraphqlService, B, _>(handle, HttpMethod::POST, "")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
while let Some(HttpRequest {
|
||||||
|
query: _,
|
||||||
|
payload,
|
||||||
|
res_tx,
|
||||||
|
}) = hello_res_rx.recv().await
|
||||||
|
{
|
||||||
|
let res = match handle_graphql_req(&SCHEMA, payload, dummy.clone()).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(err);
|
||||||
|
err.to_string()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
res_tx.send(res.into()).await.unwrap();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_graphql_req(
|
||||||
|
schema: &async_graphql::Schema<
|
||||||
|
DummyGraphql,
|
||||||
|
async_graphql::EmptyMutation,
|
||||||
|
async_graphql::EmptySubscription,
|
||||||
|
>,
|
||||||
|
payload: Option<Bytes>,
|
||||||
|
dummy: OutboundRelay<DummyGraphqlMsg>,
|
||||||
|
) -> Result<String, overwatch_rs::DynError> {
|
||||||
|
// TODO: Move to the graphql frontend as a helper function?
|
||||||
|
let payload = payload.ok_or("empty payload")?;
|
||||||
|
let req = async_graphql::http::receive_batch_json(&payload[..])
|
||||||
|
.await?
|
||||||
|
.into_single()?;
|
||||||
|
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
dummy
|
||||||
|
.send(DummyGraphqlMsg {
|
||||||
|
reply_channel: sender,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// wait for the dummy service to respond
|
||||||
|
receiver.await.unwrap();
|
||||||
|
let res = serde_json::to_string(&schema.execute(req).await)?;
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct DummyGraphql {
|
||||||
|
val: Arc<Mutex<i32>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_graphql::Object]
|
||||||
|
impl DummyGraphql {
|
||||||
|
async fn val(&self) -> i32 {
|
||||||
|
let mut val = self.val.lock();
|
||||||
|
*val += 1;
|
||||||
|
*val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct DummyGraphqlService {
|
||||||
|
val: Arc<Mutex<i32>>,
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct DummyGraphqlMsg {
|
||||||
|
reply_channel: oneshot::Sender<i32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayMessage for DummyGraphqlMsg {}
|
||||||
|
|
||||||
|
impl ServiceData for DummyGraphqlService {
|
||||||
|
const SERVICE_ID: ServiceId = "DummyGraphqlService";
|
||||||
|
type Settings = ();
|
||||||
|
type State = NoState<()>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = DummyGraphqlMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ServiceCore for DummyGraphqlService {
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
|
Ok(Self {
|
||||||
|
service_state,
|
||||||
|
val: Arc::new(Mutex::new(0)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||||
|
let Self {
|
||||||
|
service_state: ServiceStateHandle {
|
||||||
|
mut inbound_relay, ..
|
||||||
|
},
|
||||||
|
val,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
// Handle the http request to dummy service.
|
||||||
|
while let Some(msg) = inbound_relay.recv().await {
|
||||||
|
handle_count(val.clone(), msg.reply_channel).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(overwatch_derive::Services)]
|
||||||
|
struct Services {
|
||||||
|
http: ServiceHandle<HttpService<AxumBackend>>,
|
||||||
|
router: ServiceHandle<HttpBridgeService>,
|
||||||
|
dummy_graphql: ServiceHandle<DummyGraphqlService>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(clap::Parser)]
|
||||||
|
pub struct Args {
|
||||||
|
#[clap(flatten)]
|
||||||
|
http: AxumBackendSettings,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() -> Result<(), overwatch_rs::DynError> {
|
||||||
|
tracing_subscriber::fmt::fmt().with_file(false).init();
|
||||||
|
|
||||||
|
let settings = Args::parse();
|
||||||
|
let app = OverwatchRunner::<Services>::run(
|
||||||
|
ServicesServiceSettings {
|
||||||
|
http: nomos_http::http::Config {
|
||||||
|
backend: settings.http,
|
||||||
|
},
|
||||||
|
router: nomos_http::bridge::HttpBridgeSettings {
|
||||||
|
runners: vec![Arc::new(Box::new(dummy_graphql_router::<AxumBackend>))],
|
||||||
|
},
|
||||||
|
dummy_graphql: (),
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
tracing::info!("overwatch ready");
|
||||||
|
app.wait_finished();
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,161 @@
|
||||||
|
// std
|
||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
|
// crates
|
||||||
|
use axum::{
|
||||||
|
body::Bytes,
|
||||||
|
extract::Query,
|
||||||
|
http::HeaderValue,
|
||||||
|
routing::{get, patch, post, put},
|
||||||
|
Router,
|
||||||
|
};
|
||||||
|
use hyper::{
|
||||||
|
header::{CONTENT_TYPE, USER_AGENT},
|
||||||
|
Body, Request,
|
||||||
|
};
|
||||||
|
use overwatch_rs::{services::state::NoState, DynError};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
use tower::make::Shared;
|
||||||
|
use tower_http::{
|
||||||
|
cors::{Any, CorsLayer},
|
||||||
|
trace::TraceLayer,
|
||||||
|
};
|
||||||
|
use tower_service::Service;
|
||||||
|
|
||||||
|
// internal
|
||||||
|
use super::HttpBackend;
|
||||||
|
use crate::http::{HttpMethod, HttpRequest, Route};
|
||||||
|
|
||||||
|
/// Configuration for the Http Server
|
||||||
|
#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct AxumBackendSettings {
|
||||||
|
/// Socket where the server will be listening on for incoming requests.
|
||||||
|
#[arg(
|
||||||
|
short, long = "http-addr",
|
||||||
|
default_value_t = std::net::SocketAddr::new(
|
||||||
|
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
|
||||||
|
8080,
|
||||||
|
),
|
||||||
|
env = "HTTP_BIND_ADDRESS"
|
||||||
|
)]
|
||||||
|
pub address: std::net::SocketAddr,
|
||||||
|
/// Allowed origins for this server deployment requests.
|
||||||
|
#[arg(long = "http-cors-origin")]
|
||||||
|
pub cors_origins: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum AxumBackendError {
|
||||||
|
#[error("axum backend: send error: {0}")]
|
||||||
|
SendError(#[from] tokio::sync::mpsc::error::SendError<HttpRequest>),
|
||||||
|
|
||||||
|
#[error("axum backend: {0}")]
|
||||||
|
Any(DynError),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct AxumBackend {
|
||||||
|
config: AxumBackendSettings,
|
||||||
|
router: Arc<Mutex<Router>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl HttpBackend for AxumBackend {
|
||||||
|
type Config = AxumBackendSettings;
|
||||||
|
type State = NoState<AxumBackendSettings>;
|
||||||
|
type Error = AxumBackendError;
|
||||||
|
|
||||||
|
fn new(config: Self::Config) -> Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
{
|
||||||
|
let mut builder = CorsLayer::new();
|
||||||
|
if config.cors_origins.is_empty() {
|
||||||
|
builder = builder.allow_origin(Any);
|
||||||
|
}
|
||||||
|
|
||||||
|
for origin in &config.cors_origins {
|
||||||
|
builder = builder.allow_origin(
|
||||||
|
origin
|
||||||
|
.as_str()
|
||||||
|
.parse::<HeaderValue>()
|
||||||
|
.expect("fail to parse origin"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let router = Arc::new(Mutex::new(
|
||||||
|
Router::new()
|
||||||
|
.layer(
|
||||||
|
builder
|
||||||
|
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
||||||
|
.allow_methods(Any),
|
||||||
|
)
|
||||||
|
.layer(TraceLayer::new_for_http()),
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(Self { config, router })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_route(
|
||||||
|
&self,
|
||||||
|
service_id: overwatch_rs::services::ServiceId,
|
||||||
|
route: Route,
|
||||||
|
req_stream: Sender<HttpRequest>,
|
||||||
|
) {
|
||||||
|
let path = format!("/{}/{}", service_id.to_lowercase(), route.path);
|
||||||
|
tracing::info!("Axum backend: adding route {}", path);
|
||||||
|
self.add_data_route(route.method, &path, req_stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(&self) -> Result<(), overwatch_rs::DynError> {
|
||||||
|
let router = self.router.clone();
|
||||||
|
let service = tower::service_fn(move |request: Request<Body>| {
|
||||||
|
let mut router = router.lock().clone();
|
||||||
|
async move { router.call(request).await }
|
||||||
|
});
|
||||||
|
|
||||||
|
axum::Server::bind(&self.config.address)
|
||||||
|
.serve(Shared::new(service))
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AxumBackend {
|
||||||
|
fn add_data_route(&self, method: HttpMethod, path: &str, req_stream: Sender<HttpRequest>) {
|
||||||
|
let handle_data = |Query(query): Query<HashMap<String, String>>, payload: Option<Bytes>| async move {
|
||||||
|
handle_req(req_stream, query, payload).await
|
||||||
|
};
|
||||||
|
|
||||||
|
let handler = match method {
|
||||||
|
HttpMethod::GET => get(handle_data),
|
||||||
|
HttpMethod::POST => post(handle_data),
|
||||||
|
HttpMethod::PUT => put(handle_data),
|
||||||
|
HttpMethod::PATCH => patch(handle_data),
|
||||||
|
_ => unimplemented!(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut router = self.router.lock();
|
||||||
|
*router = router.clone().route(path, handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_req(
|
||||||
|
req_stream: Sender<HttpRequest>,
|
||||||
|
query: HashMap<String, String>,
|
||||||
|
payload: Option<Bytes>,
|
||||||
|
) -> Result<Bytes, String> {
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||||
|
match req_stream
|
||||||
|
.send(HttpRequest {
|
||||||
|
query,
|
||||||
|
payload,
|
||||||
|
res_tx: tx,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => rx.recv().await.ok_or_else(|| "".into()),
|
||||||
|
Err(e) => Err(AxumBackendError::SendError(e).to_string()),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
#[cfg(feature = "http")]
|
||||||
|
pub mod axum;
|
||||||
|
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
use overwatch_rs::services::{state::ServiceState, ServiceId};
|
||||||
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
use crate::http::{HttpRequest, Route};
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait HttpBackend {
|
||||||
|
type Config: Clone + Debug + Send + Sync + 'static;
|
||||||
|
type State: ServiceState<Settings = Self::Config> + Clone;
|
||||||
|
type Error: std::fmt::Display;
|
||||||
|
|
||||||
|
fn new(config: Self::Config) -> Result<Self, Self::Error>
|
||||||
|
where
|
||||||
|
Self: Sized;
|
||||||
|
|
||||||
|
fn add_route(&self, service_id: ServiceId, route: Route, req_stream: Sender<HttpRequest>);
|
||||||
|
|
||||||
|
async fn run(&self) -> Result<(), overwatch_rs::DynError>;
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fmt::{Debug, Formatter};
|
||||||
|
use std::future::Future;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
|
use overwatch_rs::services::relay::{NoMessage, OutboundRelay};
|
||||||
|
use overwatch_rs::services::{
|
||||||
|
state::{NoOperator, NoState},
|
||||||
|
ServiceCore, ServiceData, ServiceId,
|
||||||
|
};
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
use tokio::sync::mpsc::{channel, Receiver};
|
||||||
|
|
||||||
|
use crate::backends::HttpBackend;
|
||||||
|
use crate::http::{HttpMethod, HttpMsg, HttpRequest, HttpService};
|
||||||
|
|
||||||
|
pub type HttpBridgeRunner =
|
||||||
|
Box<dyn Future<Output = Result<(), overwatch_rs::DynError>> + Send + Unpin + 'static>;
|
||||||
|
|
||||||
|
// TODO: If we can get rid of the clone bound on here remove Arc.
|
||||||
|
// For now as we bind this through the settings we need to keep it.
|
||||||
|
pub type HttpBridge = Arc<
|
||||||
|
Box<
|
||||||
|
dyn Fn(overwatch_rs::overwatch::handle::OverwatchHandle) -> HttpBridgeRunner
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
>,
|
||||||
|
>;
|
||||||
|
|
||||||
|
// TODO: Add error handling
|
||||||
|
pub async fn build_http_bridge<S, B, P>(
|
||||||
|
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||||
|
method: HttpMethod,
|
||||||
|
path: P,
|
||||||
|
) -> Result<(OutboundRelay<S::Message>, Receiver<HttpRequest>), overwatch_rs::DynError>
|
||||||
|
where
|
||||||
|
S: ServiceCore + Send + Sync + 'static,
|
||||||
|
B: HttpBackend + Send + Sync + 'static,
|
||||||
|
B::Error: Error + Send + Sync + 'static,
|
||||||
|
P: Into<String> + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let http_relay = handle.clone().relay::<HttpService<B>>().connect().await?;
|
||||||
|
|
||||||
|
let service_relay = handle.clone().relay::<S>().connect().await?;
|
||||||
|
|
||||||
|
let (http_sender, http_receiver) = channel(1);
|
||||||
|
|
||||||
|
// Register on http service to receive GET requests.
|
||||||
|
http_relay
|
||||||
|
.send(HttpMsg::add_http_handler(
|
||||||
|
method,
|
||||||
|
S::SERVICE_ID,
|
||||||
|
path,
|
||||||
|
http_sender,
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.map_err(|(e, _)| e)?;
|
||||||
|
|
||||||
|
Ok((service_relay, http_receiver))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HttpBridgeService {
|
||||||
|
pub(crate) runners: Vec<HttpBridgeRunner>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct HttpBridgeSettings {
|
||||||
|
pub runners: Vec<HttpBridge>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for HttpBridgeSettings {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("RouterSettings")
|
||||||
|
.field("runners len", &self.runners.len())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceData for HttpBridgeService {
|
||||||
|
const SERVICE_ID: ServiceId = "Router";
|
||||||
|
type Settings = HttpBridgeSettings;
|
||||||
|
type State = NoState<Self::Settings>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = NoMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ServiceCore for HttpBridgeService {
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||||
|
let runners = service_state.settings_reader.get_updated_settings().runners;
|
||||||
|
let runners: Vec<_> = runners
|
||||||
|
.into_iter()
|
||||||
|
.map(|r| (r)(service_state.overwatch_handle.clone()))
|
||||||
|
.collect();
|
||||||
|
Ok(Self { runners })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) -> Result<(), DynError> {
|
||||||
|
futures::future::join_all(self.runners).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,209 @@
|
||||||
|
// std
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
error::Error,
|
||||||
|
fmt::{self, Debug},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
// crates
|
||||||
|
use overwatch_rs::services::{
|
||||||
|
handle::ServiceStateHandle,
|
||||||
|
relay::{InboundRelay, OutboundRelay, RelayMessage},
|
||||||
|
state::{NoOperator, NoState},
|
||||||
|
ServiceCore, ServiceData, ServiceId,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::sync::{mpsc::Sender, oneshot};
|
||||||
|
|
||||||
|
// internal
|
||||||
|
use crate::backends::HttpBackend;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct Config<B: HttpBackend> {
|
||||||
|
pub backend: B::Config,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HttpService<B: HttpBackend> {
|
||||||
|
backend: B,
|
||||||
|
inbound_relay: InboundRelay<HttpMsg>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: HttpBackend + 'static> ServiceData for HttpService<B> {
|
||||||
|
const SERVICE_ID: ServiceId = "Http";
|
||||||
|
type Settings = Config<B>;
|
||||||
|
type State = NoState<Self::Settings>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = HttpMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
|
pub enum HttpMethod {
|
||||||
|
GET,
|
||||||
|
POST,
|
||||||
|
PUT,
|
||||||
|
PATCH,
|
||||||
|
DELETE,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
|
pub struct Route {
|
||||||
|
pub method: HttpMethod,
|
||||||
|
pub path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl core::fmt::Debug for Route {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("Route")
|
||||||
|
.field("method", &self.method)
|
||||||
|
.field("path", &self.path)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct HttpRequest {
|
||||||
|
pub query: HashMap<String, String>,
|
||||||
|
pub payload: Option<bytes::Bytes>,
|
||||||
|
pub res_tx: Sender<bytes::Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// HttpMsg is a message that is sent via the relay to communicate with
|
||||||
|
// the HttpService.
|
||||||
|
pub enum HttpMsg {
|
||||||
|
AddHandler {
|
||||||
|
service_id: ServiceId,
|
||||||
|
route: Route,
|
||||||
|
req_stream: Sender<HttpRequest>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpMsg {
|
||||||
|
pub fn add_http_handler<P: Into<String>>(
|
||||||
|
method: HttpMethod,
|
||||||
|
service_id: ServiceId,
|
||||||
|
path: P,
|
||||||
|
req_stream: Sender<HttpRequest>,
|
||||||
|
) -> Self {
|
||||||
|
Self::AddHandler {
|
||||||
|
service_id,
|
||||||
|
route: Route {
|
||||||
|
method,
|
||||||
|
path: path.into(),
|
||||||
|
},
|
||||||
|
req_stream,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayMessage for HttpMsg {}
|
||||||
|
|
||||||
|
impl Debug for HttpMsg {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
Self::AddHandler {
|
||||||
|
service_id,
|
||||||
|
route,
|
||||||
|
req_stream: _,
|
||||||
|
} => write!(
|
||||||
|
fmt,
|
||||||
|
"HttpMsg::AddHandler {{ sender: {:?}, route: {:?} }}",
|
||||||
|
service_id, route
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<B> ServiceCore for HttpService<B>
|
||||||
|
where
|
||||||
|
B: HttpBackend + Send + Sync + 'static,
|
||||||
|
<B as HttpBackend>::Error: Error + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
|
let inbound_relay = service_state.inbound_relay;
|
||||||
|
<B as HttpBackend>::new(service_state.settings_reader.get_updated_settings().backend)
|
||||||
|
.map(|backend| Self {
|
||||||
|
backend,
|
||||||
|
inbound_relay,
|
||||||
|
})
|
||||||
|
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||||
|
let Self {
|
||||||
|
backend,
|
||||||
|
mut inbound_relay,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
let backend = Arc::new(backend);
|
||||||
|
let (stop_tx, mut stop_rx) = oneshot::channel();
|
||||||
|
tokio::spawn({
|
||||||
|
let backend = backend.clone();
|
||||||
|
async move {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(msg) = inbound_relay.recv() => {
|
||||||
|
match msg {
|
||||||
|
HttpMsg::AddHandler {
|
||||||
|
service_id,
|
||||||
|
route,
|
||||||
|
req_stream,
|
||||||
|
} => {
|
||||||
|
backend.add_route(service_id, route, req_stream);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_server_exit = &mut stop_rx => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
backend.run().await.map_err(|e| {
|
||||||
|
if stop_tx.send(()).is_err() {
|
||||||
|
tracing::error!("HTTP service: failed to send stop signal to HTTP backend.");
|
||||||
|
}
|
||||||
|
e
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: HttpBackend> Clone for Config<B> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
backend: self.backend.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: optimize error construct?
|
||||||
|
#[cfg(feature = "gql")]
|
||||||
|
pub async fn handle_graphql_req<M, F>(
|
||||||
|
payload: Option<Bytes>,
|
||||||
|
relay: OutboundRelay<M>,
|
||||||
|
f: F,
|
||||||
|
) -> Result<String, overwatch_rs::DynError>
|
||||||
|
where
|
||||||
|
F: FnOnce(
|
||||||
|
async_graphql::Request,
|
||||||
|
oneshot::Sender<async_graphql::Response>,
|
||||||
|
) -> Result<M, overwatch_rs::DynError>,
|
||||||
|
{
|
||||||
|
let payload = payload.ok_or("empty payload")?;
|
||||||
|
let req = async_graphql::http::receive_batch_json(&payload[..])
|
||||||
|
.await?
|
||||||
|
.into_single()?;
|
||||||
|
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
relay.send(f(req, sender)?).await.map_err(|_| {
|
||||||
|
tracing::error!(err = "failed to send graphql request to the http service");
|
||||||
|
"failed to send graphql request to the frontend"
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let res = receiver.await.unwrap();
|
||||||
|
let res = serde_json::to_string(&res)?;
|
||||||
|
Ok(res)
|
||||||
|
}
|
|
@ -0,0 +1,3 @@
|
||||||
|
pub mod backends;
|
||||||
|
pub mod bridge;
|
||||||
|
pub mod http;
|
|
@ -6,29 +6,32 @@ edition = "2021"
|
||||||
[[example]]
|
[[example]]
|
||||||
name = "graphql"
|
name = "graphql"
|
||||||
path = "examples/graphql.rs"
|
path = "examples/graphql.rs"
|
||||||
features = ["gql"]
|
required-features = ["gql"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
axum = { version = "0.6", optional = true }
|
axum = { version = "0.6", optional = true }
|
||||||
async-graphql = { version = "5", optional = true, features = ["tracing"] }
|
async-graphql = { version = "5", optional = true, features = ["tracing"] }
|
||||||
async-graphql-axum = { version = "5", optional = true }
|
async-graphql-axum = { version = "5", optional = true }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
|
bytes = "1.3"
|
||||||
clap = { version = "4", features = ["derive", "env"], optional = true }
|
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||||
|
nomos-http = { path = "../http", optional = true }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
once_cell = "1.16"
|
once_cell = "1.16"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
prometheus = "0.13"
|
prometheus = "0.13"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
tokio = { version = "1", features = ["sync", "macros", "time"] }
|
tokio = { version = "1", features = ["sync", "macros", "time"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-appender = "0.2"
|
tracing-appender = "0.2"
|
||||||
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
||||||
tracing-gelf = "0.7"
|
tracing-gelf = "0.7"
|
||||||
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
|
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
|
||||||
|
thiserror = "1"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
gql = ["clap", "axum", "async-graphql", "async-graphql-axum", "tower-http"]
|
gql = ["clap", "axum", "async-graphql", "async-graphql-axum", "tower-http", "nomos-http/gql"]
|
||||||
|
|
|
@ -114,6 +114,19 @@ pub struct MetricsData {
|
||||||
duration: u64,
|
duration: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum ParseMetricsDataError {
|
||||||
|
TryFromSliceError(core::array::TryFromSliceError),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for ParseMetricsDataError {
|
||||||
|
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for ParseMetricsDataError {}
|
||||||
|
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let settings = Args::parse();
|
let settings = Args::parse();
|
||||||
let graphql = OverwatchRunner::<Services>::run(
|
let graphql = OverwatchRunner::<Services>::run(
|
||||||
|
|
|
@ -1,28 +1,14 @@
|
||||||
// std
|
// std
|
||||||
|
|
||||||
// crates
|
// crates
|
||||||
use async_graphql::{EmptyMutation, EmptySubscription, Schema};
|
use nomos_http::backends::HttpBackend;
|
||||||
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
|
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
|
||||||
use axum::{
|
use nomos_http::http::{handle_graphql_req, HttpMethod, HttpRequest};
|
||||||
extract::State,
|
|
||||||
http::{
|
|
||||||
header::{CONTENT_TYPE, USER_AGENT},
|
|
||||||
HeaderValue,
|
|
||||||
},
|
|
||||||
routing::post,
|
|
||||||
Router, Server,
|
|
||||||
};
|
|
||||||
use tower_http::{
|
|
||||||
cors::{Any, CorsLayer},
|
|
||||||
trace::TraceLayer,
|
|
||||||
};
|
|
||||||
|
|
||||||
// internal
|
// internal
|
||||||
use crate::{MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId};
|
use crate::{MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId};
|
||||||
use overwatch_rs::services::relay::Relay;
|
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
relay::NoMessage,
|
|
||||||
state::{NoOperator, NoState},
|
state::{NoOperator, NoState},
|
||||||
ServiceCore, ServiceData, ServiceId,
|
ServiceCore, ServiceData, ServiceId,
|
||||||
};
|
};
|
||||||
|
@ -31,9 +17,6 @@ use overwatch_rs::services::{
|
||||||
#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)]
|
#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)]
|
||||||
#[cfg(feature = "gql")]
|
#[cfg(feature = "gql")]
|
||||||
pub struct GraphqlServerSettings {
|
pub struct GraphqlServerSettings {
|
||||||
/// Socket where the GraphQL will be listening on for incoming requests.
|
|
||||||
#[arg(short, long = "graphql-addr", default_value_t = std::net::SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), 8080), env = "METRICS_GRAPHQL_BIND_ADDRESS")]
|
|
||||||
pub address: std::net::SocketAddr,
|
|
||||||
/// Max query depth allowed
|
/// Max query depth allowed
|
||||||
#[arg(
|
#[arg(
|
||||||
long = "graphql-max-depth",
|
long = "graphql-max-depth",
|
||||||
|
@ -53,25 +36,72 @@ pub struct GraphqlServerSettings {
|
||||||
pub cors_origins: Vec<String>,
|
pub cors_origins: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn graphql_handler<Backend: MetricsBackend + Send + 'static + Sync>(
|
pub fn metrics_graphql_router<MB, B>(
|
||||||
schema: State<Schema<Graphql<Backend>, EmptyMutation, EmptySubscription>>,
|
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||||
req: GraphQLRequest,
|
) -> HttpBridgeRunner
|
||||||
) -> GraphQLResponse
|
|
||||||
where
|
where
|
||||||
Backend::MetricsData: async_graphql::OutputType,
|
MB: MetricsBackend + Clone + Send + 'static + Sync,
|
||||||
|
MB::MetricsData: async_graphql::OutputType,
|
||||||
|
B: HttpBackend + Send + Sync + 'static,
|
||||||
|
B::Error: std::error::Error + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
let request = req.into_inner();
|
Box::new(Box::pin(async move {
|
||||||
let resp = schema.execute(request).await;
|
// TODO: Graphql supports http GET requests, should nomos support that?
|
||||||
GraphQLResponse::from(resp)
|
let (relay, mut res_rx) =
|
||||||
|
build_http_bridge::<Graphql<MB>, B, _>(handle, HttpMethod::POST, "")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
while let Some(HttpRequest {
|
||||||
|
query: _,
|
||||||
|
payload,
|
||||||
|
res_tx,
|
||||||
|
}) = res_rx.recv().await
|
||||||
|
{
|
||||||
|
let res = match handle_graphql_req(payload, relay.clone(), |req, tx| {
|
||||||
|
Ok(GraphqlMetricsMessage {
|
||||||
|
req,
|
||||||
|
reply_channel: tx,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(err);
|
||||||
|
err.to_string()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
res_tx.send(res.into()).await.unwrap();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Debug)]
|
||||||
pub struct Graphql<Backend: MetricsBackend + Send + Sync + 'static> {
|
pub struct GraphqlMetricsMessage {
|
||||||
|
req: async_graphql::Request,
|
||||||
|
reply_channel: tokio::sync::oneshot::Sender<async_graphql::Response>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayMessage for GraphqlMetricsMessage {}
|
||||||
|
|
||||||
|
pub struct Graphql<Backend>
|
||||||
|
where
|
||||||
|
Backend: MetricsBackend + Send + Sync + 'static,
|
||||||
|
Backend::MetricsData: async_graphql::OutputType,
|
||||||
|
{
|
||||||
|
service_state: Option<ServiceStateHandle<Self>>,
|
||||||
settings: GraphqlServerSettings,
|
settings: GraphqlServerSettings,
|
||||||
backend_channel: Relay<MetricsService<Backend>>,
|
backend_channel: Relay<MetricsService<Backend>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceData for Graphql<Backend> {
|
impl<Backend> ServiceData for Graphql<Backend>
|
||||||
|
where
|
||||||
|
Backend: MetricsBackend + Send + Sync + 'static,
|
||||||
|
Backend::MetricsData: async_graphql::OutputType,
|
||||||
|
{
|
||||||
const SERVICE_ID: ServiceId = "GraphqlMetrics";
|
const SERVICE_ID: ServiceId = "GraphqlMetrics";
|
||||||
|
|
||||||
type Settings = GraphqlServerSettings;
|
type Settings = GraphqlServerSettings;
|
||||||
|
@ -80,7 +110,7 @@ impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceData for Graphql<Ba
|
||||||
|
|
||||||
type StateOperator = NoOperator<Self::State>;
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
|
||||||
type Message = NoMessage;
|
type Message = GraphqlMetricsMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_graphql::Object]
|
#[async_graphql::Object]
|
||||||
|
@ -114,40 +144,25 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<Backend: MetricsBackend + Clone + Send + Sync + 'static> ServiceCore for Graphql<Backend>
|
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for Graphql<Backend>
|
||||||
where
|
where
|
||||||
Backend::MetricsData: async_graphql::OutputType,
|
Backend::MetricsData: async_graphql::OutputType,
|
||||||
{
|
{
|
||||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
let settings = service_state.settings_reader.get_updated_settings();
|
|
||||||
let backend_channel: Relay<MetricsService<Backend>> =
|
let backend_channel: Relay<MetricsService<Backend>> =
|
||||||
service_state.overwatch_handle.relay();
|
service_state.overwatch_handle.relay();
|
||||||
|
let settings = service_state.settings_reader.get_updated_settings();
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
settings,
|
settings,
|
||||||
|
service_state: Some(service_state),
|
||||||
backend_channel,
|
backend_channel,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||||
let mut builder = CorsLayer::new();
|
|
||||||
if self.settings.cors_origins.is_empty() {
|
|
||||||
builder = builder.allow_origin(Any);
|
|
||||||
}
|
|
||||||
for origin in &self.settings.cors_origins {
|
|
||||||
builder = builder.allow_origin(
|
|
||||||
origin
|
|
||||||
.as_str()
|
|
||||||
.parse::<HeaderValue>()
|
|
||||||
.expect("fail to parse origin"),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let cors = builder
|
|
||||||
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
|
||||||
.allow_methods(Any);
|
|
||||||
|
|
||||||
let addr = self.settings.address;
|
|
||||||
let max_complexity = self.settings.max_complexity;
|
let max_complexity = self.settings.max_complexity;
|
||||||
let max_depth = self.settings.max_depth;
|
let max_depth = self.settings.max_depth;
|
||||||
|
let mut inbound_relay = self.service_state.take().unwrap().inbound_relay;
|
||||||
let schema = async_graphql::Schema::build(
|
let schema = async_graphql::Schema::build(
|
||||||
self,
|
self,
|
||||||
async_graphql::EmptyMutation,
|
async_graphql::EmptyMutation,
|
||||||
|
@ -159,16 +174,12 @@ where
|
||||||
.finish();
|
.finish();
|
||||||
|
|
||||||
tracing::info!(schema = %schema.sdl(), "GraphQL schema definition");
|
tracing::info!(schema = %schema.sdl(), "GraphQL schema definition");
|
||||||
let router = Router::new()
|
|
||||||
.route("/", post(graphql_handler::<Backend>))
|
|
||||||
.with_state(schema)
|
|
||||||
.layer(cors)
|
|
||||||
.layer(TraceLayer::new_for_http());
|
|
||||||
|
|
||||||
tracing::info!("Metrics Service GraphQL server listening: {}", addr);
|
while let Some(msg) = inbound_relay.recv().await {
|
||||||
Server::bind(&addr)
|
let res = schema.execute(msg.req).await;
|
||||||
.serve(router.into_make_service())
|
msg.reply_channel.send(res).unwrap();
|
||||||
.await?;
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ use ::core::ops::{Deref, DerefMut};
|
||||||
use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value};
|
use async_graphql::{parser::types::Field, ContextSelectionSet, Positioned, ServerResult, Value};
|
||||||
use prometheus::HistogramOpts;
|
use prometheus::HistogramOpts;
|
||||||
pub use prometheus::{
|
pub use prometheus::{
|
||||||
core::{self, Atomic},
|
core::{self, Atomic, GenericCounter as PrometheusGenericCounter},
|
||||||
labels, opts, Opts,
|
labels, opts, Opts,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -95,19 +95,21 @@ impl async_graphql::OutputType for MetricDataType {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct GenericGauge<T: Atomic>(core::GenericGauge<T>);
|
pub struct GenericGauge<T: Atomic> {
|
||||||
|
val: core::GenericGauge<T>,
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: Atomic> Deref for GenericGauge<T> {
|
impl<T: Atomic> Deref for GenericGauge<T> {
|
||||||
type Target = core::GenericGauge<T>;
|
type Target = core::GenericGauge<T>;
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.0
|
&self.val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Atomic> DerefMut for GenericGauge<T> {
|
impl<T: Atomic> DerefMut for GenericGauge<T> {
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
&mut self.0
|
&mut self.val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,12 +118,11 @@ impl<T: Atomic> GenericGauge<T> {
|
||||||
name: S1,
|
name: S1,
|
||||||
help: S2,
|
help: S2,
|
||||||
) -> Result<Self, prometheus::Error> {
|
) -> Result<Self, prometheus::Error> {
|
||||||
let opts = Opts::new(name, help);
|
core::GenericGauge::<T>::new(name, help).map(|v| Self { val: v })
|
||||||
core::GenericGauge::<T>::with_opts(opts).map(Self)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||||
core::GenericGauge::<T>::with_opts(opts).map(Self)
|
core::GenericGauge::<T>::with_opts(opts).map(|v| Self { val: v })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,84 +156,26 @@ where
|
||||||
ctx: &ContextSelectionSet<'_>,
|
ctx: &ContextSelectionSet<'_>,
|
||||||
field: &Positioned<Field>,
|
field: &Positioned<Field>,
|
||||||
) -> ServerResult<Value> {
|
) -> ServerResult<Value> {
|
||||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Gauge(prometheus::Gauge);
|
pub struct GenericCounter<T: Atomic> {
|
||||||
|
ctr: core::GenericCounter<T>,
|
||||||
impl Deref for Gauge {
|
|
||||||
type Target = prometheus::Gauge;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DerefMut for Gauge {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Gauge {
|
|
||||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
|
||||||
name: S1,
|
|
||||||
help: S2,
|
|
||||||
) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::Gauge::new(name, help).map(Self)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::Gauge::with_opts(opts).map(Self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "async-graphql")]
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl async_graphql::OutputType for Gauge {
|
|
||||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
|
||||||
std::borrow::Cow::Borrowed("Gauge")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
|
||||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
|
||||||
async_graphql::registry::MetaType::Scalar {
|
|
||||||
name: Self::type_name().to_string(),
|
|
||||||
description: None,
|
|
||||||
is_valid: None,
|
|
||||||
visible: None,
|
|
||||||
inaccessible: false,
|
|
||||||
tags: Vec::new(),
|
|
||||||
specified_by_url: None,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn resolve(
|
|
||||||
&self,
|
|
||||||
ctx: &ContextSelectionSet<'_>,
|
|
||||||
field: &Positioned<Field>,
|
|
||||||
) -> ServerResult<Value> {
|
|
||||||
<f64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct GenericCounter<T: Atomic>(core::GenericCounter<T>);
|
|
||||||
|
|
||||||
impl<T: Atomic> Deref for GenericCounter<T> {
|
impl<T: Atomic> Deref for GenericCounter<T> {
|
||||||
type Target = core::GenericCounter<T>;
|
type Target = core::GenericCounter<T>;
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.0
|
&self.ctr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Atomic> DerefMut for GenericCounter<T> {
|
impl<T: Atomic> DerefMut for GenericCounter<T> {
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
&mut self.0
|
&mut self.ctr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,11 +184,11 @@ impl<T: Atomic> GenericCounter<T> {
|
||||||
name: S1,
|
name: S1,
|
||||||
help: S2,
|
help: S2,
|
||||||
) -> Result<Self, prometheus::Error> {
|
) -> Result<Self, prometheus::Error> {
|
||||||
core::GenericCounter::<T>::new(name, help).map(Self)
|
core::GenericCounter::<T>::new(name, help).map(|ctr| Self { ctr })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||||
core::GenericCounter::<T>::with_opts(opts).map(Self)
|
core::GenericCounter::<T>::with_opts(opts).map(|ctr| Self { ctr })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,76 +222,18 @@ where
|
||||||
ctx: &ContextSelectionSet<'_>,
|
ctx: &ContextSelectionSet<'_>,
|
||||||
field: &Positioned<Field>,
|
field: &Positioned<Field>,
|
||||||
) -> ServerResult<Value> {
|
) -> ServerResult<Value> {
|
||||||
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
<<T as Atomic>::T as async_graphql::OutputType>::resolve(&self.ctr.get(), ctx, field).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct Counter(prometheus::Counter);
|
pub struct Histogram {
|
||||||
|
val: prometheus::Histogram,
|
||||||
impl Deref for Counter {
|
|
||||||
type Target = prometheus::Counter;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DerefMut for Counter {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Counter {
|
|
||||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
|
||||||
name: S1,
|
|
||||||
help: S2,
|
|
||||||
) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::Counter::new(name, help).map(Self)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::Counter::with_opts(opts).map(Self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "async-graphql")]
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl async_graphql::OutputType for Counter {
|
|
||||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
|
||||||
std::borrow::Cow::Borrowed("Counter")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
|
||||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
|
||||||
async_graphql::registry::MetaType::Scalar {
|
|
||||||
name: Self::type_name().to_string(),
|
|
||||||
description: None,
|
|
||||||
is_valid: None,
|
|
||||||
visible: None,
|
|
||||||
inaccessible: false,
|
|
||||||
tags: Vec::new(),
|
|
||||||
specified_by_url: None,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn resolve(
|
|
||||||
&self,
|
|
||||||
ctx: &ContextSelectionSet<'_>,
|
|
||||||
field: &Positioned<Field>,
|
|
||||||
) -> ServerResult<Value> {
|
|
||||||
<f64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Histogram(prometheus::Histogram);
|
|
||||||
|
|
||||||
impl Histogram {
|
impl Histogram {
|
||||||
pub fn with_opts(opts: HistogramOpts) -> Result<Self, prometheus::Error> {
|
pub fn with_opts(opts: HistogramOpts) -> Result<Self, prometheus::Error> {
|
||||||
prometheus::Histogram::with_opts(opts).map(Self)
|
prometheus::Histogram::with_opts(opts).map(|val| Self { val })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,18 +241,18 @@ impl Deref for Histogram {
|
||||||
type Target = prometheus::Histogram;
|
type Target = prometheus::Histogram;
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
&self.0
|
&self.val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DerefMut for Histogram {
|
impl DerefMut for Histogram {
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
&mut self.0
|
&mut self.val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "async-graphql")]
|
#[cfg(feature = "async-graphql")]
|
||||||
#[derive(async_graphql::SimpleObject)]
|
#[derive(async_graphql::SimpleObject, Debug, Clone, Copy)]
|
||||||
#[graphql(name = "Histogram")]
|
#[graphql(name = "Histogram")]
|
||||||
struct HistogramSample {
|
struct HistogramSample {
|
||||||
count: u64,
|
count: u64,
|
||||||
|
@ -391,131 +276,95 @@ impl async_graphql::OutputType for Histogram {
|
||||||
field: &Positioned<Field>,
|
field: &Positioned<Field>,
|
||||||
) -> ServerResult<Value> {
|
) -> ServerResult<Value> {
|
||||||
let sample = HistogramSample {
|
let sample = HistogramSample {
|
||||||
count: self.0.get_sample_count(),
|
count: self.val.get_sample_count(),
|
||||||
sum: self.0.get_sample_sum(),
|
sum: self.val.get_sample_sum(),
|
||||||
};
|
};
|
||||||
|
|
||||||
<HistogramSample as async_graphql::OutputType>::resolve(&sample, ctx, field).await
|
<HistogramSample as async_graphql::OutputType>::resolve(&sample, ctx, field).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
macro_rules! metric_typ {
|
||||||
pub struct IntCounter(prometheus::IntCounter);
|
($($ty: ident::$setter:ident($primitive:ident)::$name: literal), +$(,)?) => {
|
||||||
|
$(
|
||||||
impl IntCounter {
|
#[derive(Clone)]
|
||||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
pub struct $ty {
|
||||||
name: S1,
|
val: prometheus::$ty,
|
||||||
help: S2,
|
|
||||||
) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::IntCounter::new(name, help).map(Self)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::IntCounter::with_opts(opts).map(Self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Deref for IntCounter {
|
|
||||||
type Target = prometheus::IntCounter;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DerefMut for IntCounter {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "async-graphql")]
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl async_graphql::OutputType for IntCounter {
|
|
||||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
|
||||||
std::borrow::Cow::Borrowed("IntCounter")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
|
||||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
|
||||||
async_graphql::registry::MetaType::Scalar {
|
|
||||||
name: Self::type_name().to_string(),
|
|
||||||
description: None,
|
|
||||||
is_valid: None,
|
|
||||||
visible: None,
|
|
||||||
inaccessible: false,
|
|
||||||
tags: Vec::new(),
|
|
||||||
specified_by_url: None,
|
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn resolve(
|
impl std::fmt::Debug for $ty {
|
||||||
&self,
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
ctx: &ContextSelectionSet<'_>,
|
<prometheus::$ty as std::fmt::Debug>::fmt(&self.val, f)
|
||||||
field: &Positioned<Field>,
|
}
|
||||||
) -> ServerResult<Value> {
|
|
||||||
<u64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
#[repr(transparent)]
|
|
||||||
pub struct IntGauge(prometheus::IntGauge);
|
|
||||||
|
|
||||||
impl Deref for IntGauge {
|
|
||||||
type Target = prometheus::IntGauge;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DerefMut for IntGauge {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IntGauge {
|
|
||||||
pub fn new<S1: Into<String>, S2: Into<String>>(
|
|
||||||
name: S1,
|
|
||||||
help: S2,
|
|
||||||
) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::IntGauge::new(name, help).map(Self)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
|
||||||
prometheus::IntGauge::with_opts(opts).map(Self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "async-graphql")]
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl async_graphql::OutputType for IntGauge {
|
|
||||||
fn type_name() -> std::borrow::Cow<'static, str> {
|
|
||||||
std::borrow::Cow::Borrowed("IntGauge")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
|
||||||
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
|
||||||
async_graphql::registry::MetaType::Scalar {
|
|
||||||
name: Self::type_name().to_string(),
|
|
||||||
description: None,
|
|
||||||
is_valid: None,
|
|
||||||
visible: None,
|
|
||||||
inaccessible: false,
|
|
||||||
tags: Vec::new(),
|
|
||||||
specified_by_url: None,
|
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn resolve(
|
impl $ty {
|
||||||
&self,
|
pub fn new<S1: Into<String>, S2: Into<String>>(
|
||||||
ctx: &ContextSelectionSet<'_>,
|
name: S1,
|
||||||
field: &Positioned<Field>,
|
help: S2,
|
||||||
) -> ServerResult<Value> {
|
) -> Result<Self, prometheus::Error> {
|
||||||
<i64 as async_graphql::OutputType>::resolve(&self.0.get(), ctx, field).await
|
prometheus::$ty::new(name, help).map(|val| Self {
|
||||||
}
|
val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_opts(opts: Opts) -> Result<Self, prometheus::Error> {
|
||||||
|
prometheus::$ty::with_opts(opts.clone()).map(|val| Self {
|
||||||
|
val,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for $ty {
|
||||||
|
type Target = prometheus::$ty;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DerefMut for $ty {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "async-graphql")]
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl async_graphql::OutputType for $ty {
|
||||||
|
fn type_name() -> std::borrow::Cow<'static, str> {
|
||||||
|
std::borrow::Cow::Borrowed($name)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_type_info(registry: &mut async_graphql::registry::Registry) -> String {
|
||||||
|
registry.create_output_type::<Self, _>(async_graphql::registry::MetaTypeId::Scalar, |_| {
|
||||||
|
async_graphql::registry::MetaType::Scalar {
|
||||||
|
name: Self::type_name().to_string(),
|
||||||
|
description: None,
|
||||||
|
is_valid: None,
|
||||||
|
visible: None,
|
||||||
|
inaccessible: false,
|
||||||
|
tags: Vec::new(),
|
||||||
|
specified_by_url: None,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve(
|
||||||
|
&self,
|
||||||
|
ctx: &ContextSelectionSet<'_>,
|
||||||
|
field: &Positioned<Field>,
|
||||||
|
) -> ServerResult<Value> {
|
||||||
|
<$primitive as async_graphql::OutputType>::resolve(&self.val.get(), ctx, field).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)*
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
metric_typ! {
|
||||||
|
IntCounter::inc_by(u64)::"IntCounter",
|
||||||
|
Counter::inc_by(f64)::"Counter",
|
||||||
|
IntGauge::set(i64)::"IntGauge",
|
||||||
|
Gauge::set(f64)::"Gauge",
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue