Remove old http api implementation (#622)
This commit is contained in:
parent
2677199ed6
commit
de6138fc27
@ -11,7 +11,6 @@ members = [
|
||||
"nomos-services/carnot-consensus",
|
||||
"nomos-services/cryptarchia-consensus",
|
||||
"nomos-services/mempool",
|
||||
"nomos-services/http",
|
||||
"nomos-services/data-availability",
|
||||
"nomos-services/system-sig",
|
||||
"nomos-da/reed-solomon",
|
||||
|
@ -29,7 +29,6 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||
"metrics",
|
||||
] }
|
||||
nomos-metrics = { path = "../../nomos-metrics" }
|
||||
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
|
||||
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
|
||||
|
||||
carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = [
|
||||
|
@ -1,47 +0,0 @@
|
||||
[package]
|
||||
name = "nomos-http"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[example]]
|
||||
name = "axum"
|
||||
path = "examples/axum.rs"
|
||||
required-features = ["http"]
|
||||
|
||||
[[example]]
|
||||
name = "graphql"
|
||||
path = "examples/graphql.rs"
|
||||
required-features = ["http", "gql"]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
axum = { version = "0.6", optional = true }
|
||||
async-trait = "0.1"
|
||||
# async-graphql does not follow semver, so we pin the version
|
||||
async-graphql = { version = "=5.0.5", optional = true }
|
||||
bytes = "1.3"
|
||||
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||
futures = "0.3"
|
||||
http = "0.2.9"
|
||||
hyper = { version = "0.14", optional = true }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
parking_lot = { version = "0.12", optional = true }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "1.0", optional = true }
|
||||
thiserror = "1"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["json"] }
|
||||
tower = { version = "0.4", optional = true }
|
||||
tower-http = { version = "0.3", features = ["cors", "trace"] }
|
||||
tokio = { version = "1", features = ["sync", "macros"] }
|
||||
tower-service = "0.3.2"
|
||||
|
||||
[dev-dependencies]
|
||||
once_cell = "1.17"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
http = ["clap", "axum", "serde_json", "parking_lot", "hyper", "tower"]
|
||||
gql = ["async-graphql", "serde_json"]
|
@ -1,33 +0,0 @@
|
||||
# Http service examples
|
||||
|
||||
## Axum.rs
|
||||
A simple service to demonstrate how to register http handler for overwatch service.
|
||||
|
||||
To run this example use:
|
||||
```bash
|
||||
cargo run --example axum --features http
|
||||
```
|
||||
|
||||
A GET endpoint will be registered at `http://localhost:8080/dummy/`. An endpoint corresponds with the Service name.
|
||||
|
||||
## Graphql.rs
|
||||
A demonstration of usage from within an overwatch service over the http.
|
||||
|
||||
To run this example use:
|
||||
```bash
|
||||
cargo run --example graphql --features http,gql
|
||||
```
|
||||
|
||||
An endpoint will be registered at `http://localhost:8080/dummygraphqlservice/`. An endpoint corresponds with the Service name.
|
||||
|
||||
To query this endpoint use:
|
||||
```bash
|
||||
curl --location --request POST 'localhost:8080/dummygraphqlservice/' \
|
||||
--data-raw '{"query":"query {val}","variables":{}}'
|
||||
|
||||
```
|
||||
|
||||
Every response should increment the `val` variable.
|
||||
```json
|
||||
{"data":{"val":1}}
|
||||
```
|
@ -1,138 +0,0 @@
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use nomos_http::backends::HttpBackend;
|
||||
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService};
|
||||
use nomos_http::{
|
||||
backends::axum::{AxumBackend, AxumBackendSettings},
|
||||
http::*,
|
||||
};
|
||||
use overwatch_rs::services::relay::RelayMessage;
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
|
||||
use parking_lot::Mutex;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub struct DummyService {
|
||||
counter: Arc<Mutex<i32>>,
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DummyMsg {
|
||||
reply_channel: oneshot::Sender<i32>,
|
||||
}
|
||||
|
||||
impl RelayMessage for DummyMsg {}
|
||||
|
||||
impl ServiceData for DummyService {
|
||||
const SERVICE_ID: ServiceId = "Dummy";
|
||||
type Settings = ();
|
||||
type State = NoState<()>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = DummyMsg;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ServiceCore for DummyService {
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
Ok(Self {
|
||||
counter: Default::default(),
|
||||
service_state,
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||
let Self {
|
||||
counter,
|
||||
service_state: ServiceStateHandle {
|
||||
mut inbound_relay, ..
|
||||
},
|
||||
} = self;
|
||||
|
||||
// Handle the http request to dummy service.
|
||||
while let Some(msg) = inbound_relay.recv().await {
|
||||
handle_hello(counter.clone(), msg.reply_channel).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_hello(counter: Arc<Mutex<i32>>, reply_channel: oneshot::Sender<i32>) {
|
||||
*counter.lock() += 1;
|
||||
let count = *counter.lock();
|
||||
|
||||
if let Err(e) = reply_channel.send(count) {
|
||||
tracing::error!("dummy service send error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
fn dummy_router<B>(handle: overwatch_rs::overwatch::handle::OverwatchHandle) -> HttpBridgeRunner
|
||||
where
|
||||
B: HttpBackend + Send + Sync + 'static,
|
||||
B::Error: Error + Send + Sync + 'static,
|
||||
{
|
||||
Box::new(Box::pin(async move {
|
||||
let (dummy, mut hello_res_rx) =
|
||||
build_http_bridge::<DummyService, B, _>(handle, HttpMethod::GET, "")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest { res_tx, .. }) = hello_res_rx.recv().await {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
dummy
|
||||
.send(DummyMsg {
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let value = receiver.await.unwrap();
|
||||
res_tx
|
||||
.send(Ok(format!("Hello, world! {value}").into()))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(overwatch_derive::Services)]
|
||||
struct Services {
|
||||
http: ServiceHandle<HttpService<AxumBackend>>,
|
||||
router: ServiceHandle<HttpBridgeService>,
|
||||
dummy: ServiceHandle<DummyService>,
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub struct Args {
|
||||
#[clap(flatten)]
|
||||
http: AxumBackendSettings,
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
tracing_subscriber::fmt::fmt().with_file(false).init();
|
||||
|
||||
let settings = Args::parse();
|
||||
let app = OverwatchRunner::<Services>::run(
|
||||
ServicesServiceSettings {
|
||||
http: nomos_http::http::HttpServiceSettings {
|
||||
backend: settings.http,
|
||||
},
|
||||
router: nomos_http::bridge::HttpBridgeSettings {
|
||||
bridges: vec![Arc::new(Box::new(dummy_router::<AxumBackend>))],
|
||||
},
|
||||
dummy: (),
|
||||
},
|
||||
None,
|
||||
)?;
|
||||
|
||||
tracing::info!("overwatch ready");
|
||||
app.wait_finished();
|
||||
Ok(())
|
||||
}
|
@ -1,202 +0,0 @@
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use clap::Parser;
|
||||
use nomos_http::backends::HttpBackend;
|
||||
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner, HttpBridgeService};
|
||||
use nomos_http::{
|
||||
backends::axum::{AxumBackend, AxumBackendSettings},
|
||||
http::*,
|
||||
};
|
||||
use overwatch_rs::services::relay::{OutboundRelay, RelayMessage};
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
|
||||
use parking_lot::Mutex;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
async fn handle_count(counter: Arc<Mutex<i32>>, reply_channel: oneshot::Sender<i32>) {
|
||||
*counter.lock() += 1;
|
||||
let count = *counter.lock();
|
||||
|
||||
if let Err(e) = reply_channel.send(count) {
|
||||
tracing::error!("dummy service send error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
fn dummy_graphql_router<B>(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner
|
||||
where
|
||||
B: HttpBackend + Send + Sync + 'static,
|
||||
B::Error: Error + Send + Sync + 'static,
|
||||
{
|
||||
static SCHEMA: once_cell::sync::Lazy<
|
||||
async_graphql::Schema<
|
||||
DummyGraphql,
|
||||
async_graphql::EmptyMutation,
|
||||
async_graphql::EmptySubscription,
|
||||
>,
|
||||
> = once_cell::sync::Lazy::new(|| {
|
||||
async_graphql::Schema::build(
|
||||
DummyGraphql::default(),
|
||||
async_graphql::EmptyMutation,
|
||||
async_graphql::EmptySubscription,
|
||||
)
|
||||
.finish()
|
||||
});
|
||||
|
||||
Box::new(Box::pin(async move {
|
||||
// TODO: Graphql supports http GET requests, should nomos support that?
|
||||
let (dummy, mut hello_res_rx) =
|
||||
build_http_bridge::<DummyGraphqlService, B, _>(handle, HttpMethod::POST, "")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest {
|
||||
query: _,
|
||||
payload,
|
||||
res_tx,
|
||||
}) = hello_res_rx.recv().await
|
||||
{
|
||||
let res = match handle_graphql_req(&SCHEMA, payload, dummy.clone()).await {
|
||||
Ok(r) => r,
|
||||
Err(err) => {
|
||||
tracing::error!(err);
|
||||
err.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
res_tx.send(Ok(res.into())).await.unwrap();
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_graphql_req(
|
||||
schema: &async_graphql::Schema<
|
||||
DummyGraphql,
|
||||
async_graphql::EmptyMutation,
|
||||
async_graphql::EmptySubscription,
|
||||
>,
|
||||
payload: Option<Bytes>,
|
||||
dummy: OutboundRelay<DummyGraphqlMsg>,
|
||||
) -> Result<String, overwatch_rs::DynError> {
|
||||
// TODO: Move to the graphql frontend as a helper function?
|
||||
let payload = payload.ok_or("empty payload")?;
|
||||
let req = async_graphql::http::receive_batch_json(&payload[..])
|
||||
.await?
|
||||
.into_single()?;
|
||||
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
dummy
|
||||
.send(DummyGraphqlMsg {
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// wait for the dummy service to respond
|
||||
receiver.await.unwrap();
|
||||
let res = serde_json::to_string(&schema.execute(req).await)?;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct DummyGraphql {
|
||||
val: Arc<Mutex<i32>>,
|
||||
}
|
||||
|
||||
#[async_graphql::Object]
|
||||
impl DummyGraphql {
|
||||
async fn val(&self) -> i32 {
|
||||
let mut val = self.val.lock();
|
||||
*val += 1;
|
||||
*val
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DummyGraphqlService {
|
||||
val: Arc<Mutex<i32>>,
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DummyGraphqlMsg {
|
||||
reply_channel: oneshot::Sender<i32>,
|
||||
}
|
||||
|
||||
impl RelayMessage for DummyGraphqlMsg {}
|
||||
|
||||
impl ServiceData for DummyGraphqlService {
|
||||
const SERVICE_ID: ServiceId = "DummyGraphqlService";
|
||||
type Settings = ();
|
||||
type State = NoState<()>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = DummyGraphqlMsg;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ServiceCore for DummyGraphqlService {
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
Ok(Self {
|
||||
service_state,
|
||||
val: Arc::new(Mutex::new(0)),
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||
let Self {
|
||||
service_state: ServiceStateHandle {
|
||||
mut inbound_relay, ..
|
||||
},
|
||||
val,
|
||||
} = self;
|
||||
|
||||
// Handle the http request to dummy service.
|
||||
while let Some(msg) = inbound_relay.recv().await {
|
||||
handle_count(val.clone(), msg.reply_channel).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(overwatch_derive::Services)]
|
||||
struct Services {
|
||||
http: ServiceHandle<HttpService<AxumBackend>>,
|
||||
router: ServiceHandle<HttpBridgeService>,
|
||||
dummy_graphql: ServiceHandle<DummyGraphqlService>,
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
pub struct Args {
|
||||
#[clap(flatten)]
|
||||
http: AxumBackendSettings,
|
||||
}
|
||||
|
||||
fn main() -> Result<(), overwatch_rs::DynError> {
|
||||
tracing_subscriber::fmt::fmt().with_file(false).init();
|
||||
|
||||
let settings = Args::parse();
|
||||
let app = OverwatchRunner::<Services>::run(
|
||||
ServicesServiceSettings {
|
||||
http: nomos_http::http::HttpServiceSettings {
|
||||
backend: settings.http,
|
||||
},
|
||||
router: nomos_http::bridge::HttpBridgeSettings {
|
||||
bridges: vec![Arc::new(Box::new(dummy_graphql_router::<AxumBackend>))],
|
||||
},
|
||||
dummy_graphql: (),
|
||||
},
|
||||
None,
|
||||
)?;
|
||||
|
||||
tracing::info!("overwatch ready");
|
||||
app.wait_finished();
|
||||
Ok(())
|
||||
}
|
@ -1,167 +0,0 @@
|
||||
// std
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
// crates
|
||||
use axum::{
|
||||
body::Bytes,
|
||||
extract::Query,
|
||||
http::HeaderValue,
|
||||
routing::{get, patch, post, put},
|
||||
Router,
|
||||
};
|
||||
use http::StatusCode;
|
||||
use hyper::{
|
||||
header::{CONTENT_TYPE, USER_AGENT},
|
||||
Body, Request,
|
||||
};
|
||||
use overwatch_rs::{services::state::NoState, DynError};
|
||||
use parking_lot::Mutex;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tower::make::Shared;
|
||||
use tower_http::{
|
||||
cors::{Any, CorsLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use tower_service::Service;
|
||||
|
||||
// internal
|
||||
use super::HttpBackend;
|
||||
use crate::http::{HttpMethod, HttpRequest, Route};
|
||||
|
||||
/// Configuration for the Http Server
|
||||
#[derive(Debug, Clone, clap::Args, serde::Deserialize, serde::Serialize)]
|
||||
pub struct AxumBackendSettings {
|
||||
/// Socket where the server will be listening on for incoming requests.
|
||||
#[arg(
|
||||
short, long = "http-addr",
|
||||
default_value_t = std::net::SocketAddr::new(
|
||||
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
|
||||
8080,
|
||||
),
|
||||
env = "HTTP_BIND_ADDRESS"
|
||||
)]
|
||||
pub address: std::net::SocketAddr,
|
||||
/// Allowed origins for this server deployment requests.
|
||||
#[arg(long = "http-cors-origin")]
|
||||
pub cors_origins: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AxumBackendError {
|
||||
#[error("axum backend: send error: {0}")]
|
||||
SendError(#[from] tokio::sync::mpsc::error::SendError<HttpRequest>),
|
||||
|
||||
#[error("axum backend: {0}")]
|
||||
Any(DynError),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AxumBackend {
|
||||
config: AxumBackendSettings,
|
||||
router: Arc<Mutex<Router>>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl HttpBackend for AxumBackend {
|
||||
type Settings = AxumBackendSettings;
|
||||
type State = NoState<AxumBackendSettings>;
|
||||
type Error = AxumBackendError;
|
||||
|
||||
fn new(config: Self::Settings) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
let mut builder = CorsLayer::new();
|
||||
if config.cors_origins.is_empty() {
|
||||
builder = builder.allow_origin(Any);
|
||||
}
|
||||
|
||||
for origin in &config.cors_origins {
|
||||
builder = builder.allow_origin(
|
||||
origin
|
||||
.as_str()
|
||||
.parse::<HeaderValue>()
|
||||
.expect("fail to parse origin"),
|
||||
);
|
||||
}
|
||||
|
||||
let router = Arc::new(Mutex::new(
|
||||
Router::new()
|
||||
.layer(
|
||||
builder
|
||||
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
||||
.allow_methods(Any),
|
||||
)
|
||||
.layer(TraceLayer::new_for_http()),
|
||||
));
|
||||
|
||||
Ok(Self { config, router })
|
||||
}
|
||||
|
||||
fn add_route(
|
||||
&self,
|
||||
service_id: overwatch_rs::services::ServiceId,
|
||||
route: Route,
|
||||
req_stream: Sender<HttpRequest>,
|
||||
) {
|
||||
let path = format!("/{}/{}", service_id.to_lowercase(), route.path);
|
||||
tracing::info!("Axum backend: adding route {}", path);
|
||||
self.add_data_route(route.method, &path, req_stream);
|
||||
}
|
||||
|
||||
async fn run(&self) -> Result<(), overwatch_rs::DynError> {
|
||||
let router = self.router.clone();
|
||||
let service = tower::service_fn(move |request: Request<Body>| {
|
||||
let mut router = router.lock().clone();
|
||||
router.call(request)
|
||||
});
|
||||
|
||||
axum::Server::bind(&self.config.address)
|
||||
.serve(Shared::new(service))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl AxumBackend {
|
||||
fn add_data_route(&self, method: HttpMethod, path: &str, req_stream: Sender<HttpRequest>) {
|
||||
let handle_data = |Query(query): Query<HashMap<String, String>>, payload: Option<Bytes>| {
|
||||
handle_req(req_stream, query, payload)
|
||||
};
|
||||
|
||||
let handler = match method {
|
||||
HttpMethod::GET => get(handle_data),
|
||||
HttpMethod::POST => post(handle_data),
|
||||
HttpMethod::PUT => put(handle_data),
|
||||
HttpMethod::PATCH => patch(handle_data),
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
|
||||
let mut router = self.router.lock();
|
||||
*router = router.clone().route(path, handler)
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_req(
|
||||
req_stream: Sender<HttpRequest>,
|
||||
query: HashMap<String, String>,
|
||||
payload: Option<Bytes>,
|
||||
) -> Result<Bytes, (StatusCode, String)> {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
|
||||
match req_stream
|
||||
.send(HttpRequest {
|
||||
query,
|
||||
payload,
|
||||
res_tx: tx,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(_) => rx.recv().await.ok_or_else(|| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"closed response channel".into(),
|
||||
)
|
||||
})?,
|
||||
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
|
||||
}
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
#[cfg(feature = "http")]
|
||||
pub mod axum;
|
||||
|
||||
use std::fmt::Debug;
|
||||
|
||||
use overwatch_rs::services::{state::ServiceState, ServiceId};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::http::{HttpRequest, Route};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait HttpBackend {
|
||||
type Settings: Clone + Debug + Send + Sync + 'static;
|
||||
type State: ServiceState<Settings = Self::Settings> + Clone;
|
||||
type Error: std::fmt::Display;
|
||||
|
||||
fn new(config: Self::Settings) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
fn add_route(&self, service_id: ServiceId, route: Route, req_stream: Sender<HttpRequest>);
|
||||
|
||||
async fn run(&self) -> Result<(), overwatch_rs::DynError>;
|
||||
}
|
@ -1,105 +0,0 @@
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||
use overwatch_rs::services::relay::{NoMessage, OutboundRelay};
|
||||
use overwatch_rs::services::{
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
use overwatch_rs::DynError;
|
||||
use tokio::sync::mpsc::{channel, Receiver};
|
||||
|
||||
use crate::backends::HttpBackend;
|
||||
use crate::http::{HttpMethod, HttpMsg, HttpRequest, HttpService};
|
||||
|
||||
pub type HttpBridgeRunner =
|
||||
Box<dyn Future<Output = Result<(), overwatch_rs::DynError>> + Send + Unpin + 'static>;
|
||||
|
||||
// TODO: If we can get rid of the clone bound on here remove Arc.
|
||||
// For now as we bind this through the settings we need to keep it.
|
||||
pub type HttpBridge = Arc<
|
||||
Box<
|
||||
dyn Fn(overwatch_rs::overwatch::handle::OverwatchHandle) -> HttpBridgeRunner
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
>,
|
||||
>;
|
||||
|
||||
// TODO: Add error handling
|
||||
pub async fn build_http_bridge<S, B, P>(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
method: HttpMethod,
|
||||
path: P,
|
||||
) -> Result<(OutboundRelay<S::Message>, Receiver<HttpRequest>), overwatch_rs::DynError>
|
||||
where
|
||||
S: ServiceCore + Send + Sync + 'static,
|
||||
B: HttpBackend + Send + Sync + 'static,
|
||||
B::Error: Error + Send + Sync + 'static,
|
||||
P: Into<String> + Send + Sync + 'static,
|
||||
{
|
||||
let http_relay = handle.clone().relay::<HttpService<B>>().connect().await?;
|
||||
|
||||
let service_relay = handle.clone().relay::<S>().connect().await?;
|
||||
|
||||
let (http_sender, http_receiver) = channel(1);
|
||||
|
||||
// Register on http service to receive GET requests.
|
||||
http_relay
|
||||
.send(HttpMsg::add_http_handler(
|
||||
method,
|
||||
S::SERVICE_ID,
|
||||
path,
|
||||
http_sender,
|
||||
))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
Ok((service_relay, http_receiver))
|
||||
}
|
||||
|
||||
pub struct HttpBridgeService {
|
||||
pub(crate) runners: Vec<HttpBridgeRunner>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HttpBridgeSettings {
|
||||
pub bridges: Vec<HttpBridge>,
|
||||
}
|
||||
|
||||
impl Debug for HttpBridgeSettings {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RouterSettings")
|
||||
.field("runners len", &self.bridges.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl ServiceData for HttpBridgeService {
|
||||
const SERVICE_ID: ServiceId = "Router";
|
||||
type Settings = HttpBridgeSettings;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = NoMessage;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ServiceCore for HttpBridgeService {
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||
let runners = service_state.settings_reader.get_updated_settings().bridges;
|
||||
let runners: Vec<_> = runners
|
||||
.into_iter()
|
||||
.map(|r| (r)(service_state.overwatch_handle.clone()))
|
||||
.collect();
|
||||
Ok(Self { runners })
|
||||
}
|
||||
|
||||
async fn run(self) -> Result<(), DynError> {
|
||||
futures::future::join_all(self.runners).await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,212 +0,0 @@
|
||||
// std
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
fmt::{self, Debug},
|
||||
sync::Arc,
|
||||
};
|
||||
// crates
|
||||
use bytes::Bytes;
|
||||
use http::StatusCode;
|
||||
#[cfg(feature = "gql")]
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
relay::{InboundRelay, RelayMessage},
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{mpsc::Sender, oneshot};
|
||||
|
||||
// internal
|
||||
use crate::backends::HttpBackend;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct HttpServiceSettings<B: HttpBackend> {
|
||||
pub backend: B::Settings,
|
||||
}
|
||||
|
||||
pub struct HttpService<B: HttpBackend> {
|
||||
backend: B,
|
||||
inbound_relay: InboundRelay<HttpMsg>,
|
||||
}
|
||||
|
||||
impl<B: HttpBackend + 'static> ServiceData for HttpService<B> {
|
||||
const SERVICE_ID: ServiceId = "Http";
|
||||
type Settings = HttpServiceSettings<B>;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = HttpMsg;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum HttpMethod {
|
||||
GET,
|
||||
POST,
|
||||
PUT,
|
||||
PATCH,
|
||||
DELETE,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct Route {
|
||||
pub method: HttpMethod,
|
||||
pub path: String,
|
||||
}
|
||||
|
||||
impl core::fmt::Debug for Route {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Route")
|
||||
.field("method", &self.method)
|
||||
.field("path", &self.path)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub type HttpResponse = Result<Bytes, (StatusCode, String)>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HttpRequest {
|
||||
pub query: HashMap<String, String>,
|
||||
pub payload: Option<bytes::Bytes>,
|
||||
pub res_tx: Sender<HttpResponse>,
|
||||
}
|
||||
|
||||
// HttpMsg is a message that is sent via the relay to communicate with
|
||||
// the HttpService.
|
||||
pub enum HttpMsg {
|
||||
AddHandler {
|
||||
service_id: ServiceId,
|
||||
route: Route,
|
||||
req_stream: Sender<HttpRequest>,
|
||||
},
|
||||
}
|
||||
|
||||
impl HttpMsg {
|
||||
pub fn add_http_handler<P: Into<String>>(
|
||||
method: HttpMethod,
|
||||
service_id: ServiceId,
|
||||
path: P,
|
||||
req_stream: Sender<HttpRequest>,
|
||||
) -> Self {
|
||||
Self::AddHandler {
|
||||
service_id,
|
||||
route: Route {
|
||||
method,
|
||||
path: path.into(),
|
||||
},
|
||||
req_stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RelayMessage for HttpMsg {}
|
||||
|
||||
impl Debug for HttpMsg {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Self::AddHandler {
|
||||
service_id,
|
||||
route,
|
||||
req_stream: _,
|
||||
} => write!(
|
||||
fmt,
|
||||
"HttpMsg::AddHandler {{ sender: {service_id:?}, route: {route:?} }}"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<B> ServiceCore for HttpService<B>
|
||||
where
|
||||
B: HttpBackend + Send + Sync + 'static,
|
||||
<B as HttpBackend>::Error: Error + Send + Sync + 'static,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let inbound_relay = service_state.inbound_relay;
|
||||
<B as HttpBackend>::new(service_state.settings_reader.get_updated_settings().backend)
|
||||
.map(|backend| Self {
|
||||
backend,
|
||||
inbound_relay,
|
||||
})
|
||||
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||
let Self {
|
||||
backend,
|
||||
mut inbound_relay,
|
||||
} = self;
|
||||
|
||||
let backend = Arc::new(backend);
|
||||
let (stop_tx, mut stop_rx) = oneshot::channel();
|
||||
tokio::spawn({
|
||||
let backend = backend.clone();
|
||||
async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(msg) = inbound_relay.recv() => {
|
||||
match msg {
|
||||
HttpMsg::AddHandler {
|
||||
service_id,
|
||||
route,
|
||||
req_stream,
|
||||
} => {
|
||||
backend.add_route(service_id, route, req_stream);
|
||||
},
|
||||
}
|
||||
}
|
||||
_server_exit = &mut stop_rx => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
backend.run().await.map_err(|e| {
|
||||
if stop_tx.send(()).is_err() {
|
||||
tracing::error!("HTTP service: failed to send stop signal to HTTP backend.");
|
||||
}
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: HttpBackend> Clone for HttpServiceSettings<B> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
backend: self.backend.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: optimize error construct?
|
||||
#[cfg(feature = "gql")]
|
||||
pub async fn handle_graphql_req<M, F>(
|
||||
payload: Option<Bytes>,
|
||||
relay: OutboundRelay<M>,
|
||||
f: F,
|
||||
) -> Result<String, overwatch_rs::DynError>
|
||||
where
|
||||
F: FnOnce(
|
||||
async_graphql::Request,
|
||||
oneshot::Sender<async_graphql::Response>,
|
||||
) -> Result<M, overwatch_rs::DynError>,
|
||||
{
|
||||
let payload = payload.ok_or("empty payload")?;
|
||||
let req = async_graphql::http::receive_batch_json(&payload[..])
|
||||
.await?
|
||||
.into_single()?;
|
||||
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
relay.send(f(req, sender)?).await.map_err(|_| {
|
||||
tracing::error!(err = "failed to send graphql request to the http service");
|
||||
"failed to send graphql request to the frontend"
|
||||
})?;
|
||||
|
||||
let res = receiver.await.unwrap();
|
||||
let res = serde_json::to_string(&res)?;
|
||||
Ok(res)
|
||||
}
|
@ -1,3 +0,0 @@
|
||||
pub mod backends;
|
||||
pub mod bridge;
|
||||
pub mod http;
|
@ -15,7 +15,6 @@ async-trait = "0.1"
|
||||
bytes = "1.3"
|
||||
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||
futures = "0.3"
|
||||
nomos-http = { path = "../http", optional = true }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
once_cell = "1.16"
|
||||
@ -31,4 +30,4 @@ thiserror = "1"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
gql = ["clap", "axum", "async-graphql", "tower-http", "nomos-http/gql"]
|
||||
gql = ["clap", "axum", "async-graphql", "tower-http"]
|
||||
|
Loading…
x
Reference in New Issue
Block a user