nomos-node/nomos-services/metrics/examples/graphql.rs

149 lines
4.3 KiB
Rust
Raw Normal View History

use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use clap::Parser;
use metrics::{
frontend::graphql::{Graphql, GraphqlServerSettings},
MetricsBackend, MetricsMessage, MetricsService, OwnedServiceId,
};
use overwatch_rs::{
overwatch::OverwatchRunner,
services::{
handle::{ServiceHandle, ServiceStateHandle},
relay::{NoMessage, Relay},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
},
};
#[derive(Debug, Clone)]
pub struct ConcurrentMapMetricsBackend(Arc<Mutex<HashMap<ServiceId, MetricsData>>>);
#[async_trait::async_trait]
impl MetricsBackend for ConcurrentMapMetricsBackend {
type MetricsData = MetricsData;
type Error = ();
type Settings = &'static [ServiceId];
fn init(config: Self::Settings) -> Self {
let mut map = HashMap::with_capacity(config.len());
for service_id in config {
map.insert(*service_id, MetricsData::default());
}
Self(Arc::new(Mutex::new(map)))
}
async fn update(&mut self, service_id: ServiceId, data: Self::MetricsData) {
self.0.lock().unwrap().insert(service_id, data);
}
async fn load(&self, service_id: &OwnedServiceId) -> Option<Self::MetricsData> {
self.0.lock().unwrap().get(service_id.as_ref()).cloned()
}
}
#[derive(Clone)]
pub struct MetricsUpdater<Backend: MetricsBackend + Send + Sync + 'static> {
backend_channel: Relay<MetricsService<Backend>>,
}
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceData for MetricsUpdater<Backend> {
const SERVICE_ID: ServiceId = "MetricsUpdater";
type Settings = ();
type State = NoState<()>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl<Backend: MetricsBackend<MetricsData = MetricsData> + Clone + Send + Sync + 'static> ServiceCore
for MetricsUpdater<Backend>
where
Backend::MetricsData: async_graphql::OutputType,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let backend_channel: Relay<MetricsService<Backend>> =
service_state.overwatch_handle.relay();
Ok(Self { backend_channel })
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let replay = self.backend_channel.connect().await.map_err(|e| {
tracing::error!(err = ?e, "Metrics Updater: relay connect error");
e
})?;
let tags = ["foo", "bar", "baz"].iter().cycle();
for (duration, service_id) in tags.enumerate() {
let message = MetricsMessage::Update {
service_id,
data: MetricsData {
duration: duration as u64,
},
};
replay.send(message).await.map_err(|(e, _)| {
tracing::error!(err = ?e, "Metrics Updater: relay send error");
e
})?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
}
}
#[derive(overwatch_derive::Services)]
struct Services {
graphql: ServiceHandle<Graphql<ConcurrentMapMetricsBackend>>,
metrics: ServiceHandle<MetricsService<ConcurrentMapMetricsBackend>>,
updater: ServiceHandle<MetricsUpdater<ConcurrentMapMetricsBackend>>,
}
#[derive(clap::Parser)]
pub struct Args {
#[clap(flatten)]
graphql: GraphqlServerSettings,
}
#[derive(Debug, Default, Clone, async_graphql::SimpleObject)]
pub struct MetricsData {
duration: u64,
}
Http service api (#44) * Cargo http-service folder * WIP: http server * Revert comments in network service * Router service and axum implementation structure * Move bin contents to examples dir * Add http service and server traits * HttpMsg definition * WIP: axum backend * fix example in Cargo.toml * Shared axum router and router modification methods * Http example with axum and metrics service * make project compile and add Error associated type * Axum backend shared router fixes * Dummy service implementation for http example * remove unused clone on mutex * Cargo http-service folder * WIP: http server * Revert comments in network service * Router service and axum implementation structure * Move bin contents to examples dir * Add http service and server traits * HttpMsg definition * WIP: axum backend * fix example in Cargo.toml * Shared axum router and router modification methods * Http example with axum and metrics service * make project compile and add Error associated type * Axum backend shared router fixes * Dummy service implementation for http example * remove unused clone on mutex * Fix typos and remove unused code * Fix failing tests when feature flags are not set * Use bytes as a type for payload and response in http service * Refactored http crate layout into differential services files * First stab at router service * Fully piped http bridge system * Start building bridge helper function * Refactor bridge builder helper and update example * impl serialization for metrics data * Get updated copy of router when processing request * remove unused code * fix typo * [POC]: Http service: support add graphql handler (#47) * WIP: add graphql endpoint * support add graphql handler * remove generic * fix clippy warnings * Add post put and patch handlers that expect bytes as body * Graphql example file * WIP: Use http post method for graphql related queries * Parse graphql requests in handler * Simplify handlers for post and other data methods * Revert "Simplify handlers for post and other data methods" This reverts commit 96f2b1821e5cfe90be81baac8a6b4a2f9780d477. * add tracing and remove comments * Pass response bytes without any modifications * Use receive_batch_json for gql request parsing * Readme for running examples * fix conflicts * add a general helper function for graphql * remove unused function * cleanup code * move schema initialization to handle function * adapt metrics to http service * fix clippy warnings * remove unused fn * fix clippy * optimize example Co-authored-by: gusto <bacvinka@gmail.com> Co-authored-by: Gusto Bacvinka <augustinas.bacvinka@gmail.com> * Fix cargo build without features * Simplify handlers for routes with data Co-authored-by: al8n <scygliu1@gmail.com> Co-authored-by: Daniel Sanchez Quiros <sanchez.quiros.daniel@gmail.com>
2023-01-19 14:51:30 +00:00
#[derive(Debug, Clone)]
pub enum ParseMetricsDataError {
TryFromSliceError(core::array::TryFromSliceError),
}
impl std::fmt::Display for ParseMetricsDataError {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
impl std::error::Error for ParseMetricsDataError {}
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let settings = Args::parse();
let graphql = OverwatchRunner::<Services>::run(
ServicesServiceSettings {
graphql: settings.graphql,
metrics: &["Foo", "Bar"],
updater: (),
},
None,
)?;
tracing_subscriber::fmt::fmt()
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()))
.with_file(false)
.init();
graphql.wait_finished();
Ok(())
}