use std::error::Error; use std::sync::Arc; use bytes::Bytes; use clap::Parser; use nomos_http::backends::HttpBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService}; use nomos_http::{ backends::axum::{AxumBackend, AxumBackendSettings}, http::*, }; use overwatch_rs::services::relay::{OutboundRelay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; use parking_lot::Mutex; use tokio::sync::oneshot; async fn handle_count(counter: Arc>, reply_channel: oneshot::Sender) { *counter.lock() += 1; let count = *counter.lock(); if let Err(e) = reply_channel.send(count) { tracing::error!("dummy service send error: {e}"); } } fn dummy_graphql_router( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner where B: HttpBackend + Send + Sync + 'static, B::Error: Error + Send + Sync + 'static, { static SCHEMA: once_cell::sync::Lazy< async_graphql::Schema< DummyGraphql, async_graphql::EmptyMutation, async_graphql::EmptySubscription, >, > = once_cell::sync::Lazy::new(|| { async_graphql::Schema::build( DummyGraphql::default(), async_graphql::EmptyMutation, async_graphql::EmptySubscription, ) .finish() }); Box::new(Box::pin(async move { // TODO: Graphql supports http GET requests, should nomos support that? let (dummy, mut hello_res_rx) = build_http_bridge::(handle, HttpMethod::POST, "") .await .unwrap(); while let Some(HttpRequest { query: _, payload, res_tx, }) = hello_res_rx.recv().await { let res = match handle_graphql_req(&SCHEMA, payload, dummy.clone()).await { Ok(r) => r, Err(err) => { tracing::error!(err); err.to_string() } }; res_tx.send(Ok(res.into())).await.unwrap(); } Ok(()) })) } async fn handle_graphql_req( schema: &async_graphql::Schema< DummyGraphql, async_graphql::EmptyMutation, async_graphql::EmptySubscription, >, payload: Option, dummy: OutboundRelay, ) -> Result { // TODO: Move to the graphql frontend as a helper function? let payload = payload.ok_or("empty payload")?; let req = async_graphql::http::receive_batch_json(&payload[..]) .await? .into_single()?; let (sender, receiver) = oneshot::channel(); dummy .send(DummyGraphqlMsg { reply_channel: sender, }) .await .unwrap(); // wait for the dummy service to respond receiver.await.unwrap(); let res = serde_json::to_string(&schema.execute(req).await)?; Ok(res) } #[derive(Debug, Clone, Default)] pub struct DummyGraphql { val: Arc>, } #[async_graphql::Object] impl DummyGraphql { async fn val(&self) -> i32 { let mut val = self.val.lock(); *val += 1; *val } } pub struct DummyGraphqlService { val: Arc>, service_state: ServiceStateHandle, } #[derive(Debug)] pub struct DummyGraphqlMsg { reply_channel: oneshot::Sender, } impl RelayMessage for DummyGraphqlMsg {} impl ServiceData for DummyGraphqlService { const SERVICE_ID: ServiceId = "DummyGraphqlService"; type Settings = (); type State = NoState<()>; type StateOperator = NoOperator; type Message = DummyGraphqlMsg; } #[async_trait::async_trait] impl ServiceCore for DummyGraphqlService { fn init(service_state: ServiceStateHandle) -> Result { Ok(Self { service_state, val: Arc::new(Mutex::new(0)), }) } async fn run(self) -> Result<(), overwatch_rs::DynError> { let Self { service_state: ServiceStateHandle { mut inbound_relay, .. }, val, } = self; // Handle the http request to dummy service. while let Some(msg) = inbound_relay.recv().await { handle_count(val.clone(), msg.reply_channel).await; } Ok(()) } } #[derive(overwatch_derive::Services)] struct Services { http: ServiceHandle>, router: ServiceHandle, dummy_graphql: ServiceHandle, } #[derive(clap::Parser)] pub struct Args { #[clap(flatten)] http: AxumBackendSettings, } fn main() -> Result<(), overwatch_rs::DynError> { tracing_subscriber::fmt::fmt().with_file(false).init(); let settings = Args::parse(); let app = OverwatchRunner::::run( ServicesServiceSettings { http: nomos_http::http::HttpServiceSettings { backend: settings.http, }, router: nomos_http::bridge::HttpBridgeSettings { bridges: vec![Arc::new(Box::new(dummy_graphql_router::))], }, dummy_graphql: (), }, None, )?; tracing::info!("overwatch ready"); app.wait_finished(); Ok(()) }