1
0
mirror of synced 2025-02-23 13:08:19 +00:00

New http api to nomos-node integration (#490)

* Integrate new http api to nomos-node
This commit is contained in:
Al Liu 2023-11-08 16:55:47 +08:00 committed by GitHub
parent ccc85904d9
commit c3422c196c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 116 additions and 548 deletions

View File

@ -15,4 +15,4 @@ serde = "1"
serde_yaml = "0.9"
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1.29.1"
tokio = { version = "1.33", features = ["macros"] }

View File

@ -5,7 +5,7 @@ edition = "2021"
[features]
default = ["axum"]
axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"]
axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"]
[dependencies]
async-trait = "0.1"
@ -13,7 +13,6 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806"
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tracing = "0.1"
consensus-engine = { path = "../../consensus-engine" }
nomos-core = { path = "../../nomos-core" }
nomos-consensus = { path = "../../nomos-services/consensus" }
@ -30,6 +29,7 @@ tokio = { version = "1.33", default-features = false, features = ["sync"] }
# axum related dependencies
axum = { version = "0.6", optional = true }
hyper = { version = "0.14", features = ["full"], optional = true }
tower-http = { version = "0.4", optional = true, features = ["cors", "trace"] }
# openapi related dependencies

View File

@ -1,12 +1,18 @@
use std::{fmt::Debug, hash::Hash, net::SocketAddr};
use std::{fmt::Debug, hash::Hash};
use axum::{
extract::{Query, State},
http::HeaderValue,
response::Response,
routing, Json, Router, Server,
};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
@ -22,10 +28,13 @@ use crate::{
Backend,
};
#[derive(Clone)]
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
pub addr: SocketAddr,
pub handle: OverwatchHandle,
/// Socket where the server will be listening on for incoming requests.
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
pub cors_origins: Vec<String>,
}
pub struct AxumBackend<T, S, const SIZE: usize> {
@ -80,8 +89,28 @@ where
})
}
async fn serve(self) -> Result<(), Self::Error> {
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}
for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}
let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/da/metrics", routing::get(da_metrics))
.route("/da/status", routing::post(da_status))
@ -94,9 +123,9 @@ where
.route("/storage/block", routing::post(block::<S, T>))
.route("/mempool/add/tx", routing::post(add_tx::<T>))
.route("/mempool/add/cert", routing::post(add_cert))
.with_state(self.settings.handle);
.with_state(handle);
Server::bind(&self.settings.addr)
Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
@ -125,8 +154,8 @@ macro_rules! make_request_and_return_response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_metrics(State(store): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(da::da_mempool_metrics(&store))
async fn da_metrics(State(handle): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(da::da_mempool_metrics(&handle))
}
#[utoipa::path(
@ -138,10 +167,10 @@ async fn da_metrics(State(store): State<OverwatchHandle>) -> Response {
)
)]
async fn da_status(
State(store): State<OverwatchHandle>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da::da_mempool_status(&store, items))
make_request_and_return_response!(da::da_mempool_status(&handle, items))
}
#[utoipa::path(
@ -153,10 +182,10 @@ async fn da_status(
)
)]
async fn da_blobs(
State(store): State<OverwatchHandle>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da::da_blobs(&store, items))
make_request_and_return_response!(da::da_blobs(&handle, items))
}
#[utoipa::path(
@ -167,7 +196,7 @@ async fn da_blobs(
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_metrics<T>(State(store): State<OverwatchHandle>) -> Response
async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> Response
where
T: Transaction
+ Clone
@ -180,7 +209,7 @@ where
+ 'static,
<T as nomos_core::tx::Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
{
make_request_and_return_response!(cl::cl_mempool_metrics::<T>(&store))
make_request_and_return_response!(cl::cl_mempool_metrics::<T>(&handle))
}
#[utoipa::path(
@ -192,7 +221,7 @@ where
)
)]
async fn cl_status<T>(
State(store): State<OverwatchHandle>,
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<T as Transaction>::Hash>>,
) -> Response
where
@ -200,7 +229,7 @@ where
<T as nomos_core::tx::Transaction>::Hash:
Serialize + DeserializeOwned + std::cmp::Ord + Debug + Send + Sync + 'static,
{
make_request_and_return_response!(cl::cl_mempool_status::<T>(&store, items))
make_request_and_return_response!(cl::cl_mempool_status::<T>(&handle, items))
}
#[utoipa::path(
@ -211,13 +240,13 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn carnot_info<Tx, SS, const SIZE: usize>(State(store): State<OverwatchHandle>) -> Response
async fn carnot_info<Tx, SS, const SIZE: usize>(State(handle): State<OverwatchHandle>) -> Response
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
{
make_request_and_return_response!(consensus::carnot_info::<Tx, SS, SIZE>(&store))
make_request_and_return_response!(consensus::carnot_info::<Tx, SS, SIZE>(&handle))
}
#[derive(Deserialize)]
@ -255,8 +284,8 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn libp2p_info(State(store): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(libp2p::libp2p_info(&store))
async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(libp2p::libp2p_info(&handle))
}
#[utoipa::path(
@ -267,12 +296,12 @@ async fn libp2p_info(State(store): State<OverwatchHandle>) -> Response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn block<S, Tx>(State(store): State<OverwatchHandle>, Json(id): Json<BlockId>) -> Response
async fn block<S, Tx>(State(handle): State<OverwatchHandle>, Json(id): Json<BlockId>) -> Response
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
S: StorageSerde + Send + Sync + 'static,
{
make_request_and_return_response!(storage::block_req::<S, Tx>(&store, id))
make_request_and_return_response!(storage::block_req::<S, Tx>(&handle, id))
}
#[utoipa::path(
@ -283,7 +312,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_tx<Tx>(State(store): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
@ -294,18 +323,21 @@ where
nomos_mempool::Transaction,
Tx,
<Tx as Transaction>::Hash,
>(&store, tx, Transaction::hash))
>(&handle, tx, Transaction::hash))
}
#[utoipa::path(
post,
path = "/mempool/add/tx",
path = "/mempool/add/cert",
responses(
(status = 200, description = "Add certificate to the mempool"),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_cert(State(store): State<OverwatchHandle>, Json(cert): Json<Certificate>) -> Response {
async fn add_cert(
State(handle): State<OverwatchHandle>,
Json(cert): Json<Certificate>,
) -> Response {
make_request_and_return_response!(mempool::add::<
Libp2p,
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
@ -313,7 +345,7 @@ async fn add_cert(State(store): State<OverwatchHandle>, Json(cert): Json<Certifi
Certificate,
<Blob as blob::Blob>::Hash,
>(
&store,
&handle,
cert,
nomos_core::da::certificate::Certificate::hash
))

View File

@ -1,5 +1,9 @@
use std::{fmt::Debug, hash::Hash};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::oneshot;
use consensus_engine::{
overlay::{RandomBeaconState, RoundRobin, TreeOverlay},
Block, BlockId,
@ -20,9 +24,6 @@ use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter,
};
use nomos_storage::backends::{sled::SledBackend, StorageSerde};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::oneshot;
pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
ConsensusLibp2pAdapter,
@ -53,6 +54,7 @@ where
.send(ConsensusMsg::Info { tx: sender })
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}
@ -76,5 +78,6 @@ where
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}

View File

@ -17,7 +17,7 @@ where
A::Settings: Send + Sync,
D: Discriminant,
Item: Clone + Debug + Send + Sync + 'static + Hash,
Key: Clone + Debug + Ord + Hash,
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<MempoolService<A, MockPool<Item, Key>, D>>()

View File

@ -1,4 +1,5 @@
use overwatch_rs::{
overwatch::handle::OverwatchHandle,
services::{
handle::ServiceStateHandle,
relay::NoMessage,
@ -21,16 +22,17 @@ pub trait Backend {
where
Self: Sized;
async fn serve(self) -> Result<(), Self::Error>;
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error>;
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ApiServiceSettings<S> {
pub backend_settings: S,
}
pub struct ApiService<B: Backend> {
settings: ApiServiceSettings<B::Settings>,
handle: OverwatchHandle,
}
impl<B: Backend> ServiceData for ApiService<B> {
@ -53,13 +55,16 @@ where
/// Initialize the service with the given state
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let settings = service_state.settings_reader.get_updated_settings();
Ok(Self { settings })
Ok(Self {
settings,
handle: service_state.overwatch_handle,
})
}
/// Service main loop
async fn run(mut self) -> Result<(), DynError> {
let endpoint = B::new(self.settings.backend_settings).await?;
endpoint.serve().await?;
endpoint.serve(self.handle).await?;
Ok(())
}
}

View File

@ -8,7 +8,10 @@ use axum::{routing, Router, Server};
use hyper::Error;
use nomos_node_api::{ApiService, ApiServiceSettings, Backend};
use overwatch_derive::Services;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use overwatch_rs::{
overwatch::{handle::OverwatchHandle, OverwatchRunner},
services::handle::ServiceHandle,
};
use utoipa::{
openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
Modify, OpenApi,
@ -71,7 +74,7 @@ impl Backend for WebServer {
Ok(Self { addr: settings })
}
async fn serve(self) -> Result<(), Self::Error> {
async fn serve(self, _handle: OverwatchHandle) -> Result<(), Self::Error> {
let store = Arc::new(Store::default());
let app = Router::new()
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))

View File

@ -9,7 +9,7 @@ edition = "2021"
blake2 = "0.10"
bincode = "2.0.0-rc.2"
bytes = "1.3"
clap = { version = "4", features = ["derive"] }
clap = { version = "4", features = ["derive", "env"] }
chrono = "0.4"
futures = "0.3"
http = "0.2.9"
@ -20,6 +20,7 @@ tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-node-api = { path = "../nomos-node-api" }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }

View File

@ -51,7 +51,7 @@ network:
end: "0ms"
http:
backend:
backend_settings:
address: 0.0.0.0:8080
cors_origins: []

View File

@ -1,26 +0,0 @@
// std
// crates
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
// internal
use nomos_http::http::HttpResponse;
use nomos_network::backends::libp2p::{Command, Libp2p};
use nomos_network::NetworkMsg;
use overwatch_rs::services::relay::OutboundRelay;
pub(super) async fn handle_libp2p_info_req(
channel: &OutboundRelay<NetworkMsg<Libp2p>>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
channel
.send(NetworkMsg::Process(Command::Info { reply: sender }))
.await
.map_err(|(e, _)| e)?;
let info = receiver.await.unwrap();
res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?;
Ok(())
}

View File

@ -1,426 +0,0 @@
mod libp2p;
use consensus_engine::BlockId;
use libp2p::*;
use std::collections::HashMap;
// std
// crates
use bytes::Bytes;
use http::StatusCode;
use nomos_consensus::{CarnotInfo, ConsensusMsg};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::error;
// internal
use full_replication::{Blob, Certificate};
use nomos_core::{
block::Block,
da::{blob, certificate::Certificate as _},
tx::Transaction,
};
use nomos_da::DaMsg;
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
use nomos_mempool::network::NetworkAdapter;
use nomos_mempool::{Certificate as CertDiscriminant, Transaction as TxDiscriminant};
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::libp2p::Libp2p;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use nomos_node::{Carnot, Tx};
use nomos_node::{DataAvailability as DataAvailabilityService, Wire};
use nomos_storage::{backends::sled::SledBackend, StorageMsg, StorageService};
use overwatch_rs::services::relay::OutboundRelay;
type DaMempoolService = MempoolService<
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<Certificate, <Blob as blob::Blob>::Hash>,
CertDiscriminant,
>;
type ClMempoolService = MempoolService<
Libp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<Tx, <Tx as Transaction>::Hash>,
TxDiscriminant,
>;
macro_rules! get_handler {
($handle:expr, $service:ty, $path:expr => $handler:tt) => {{
let (channel, mut http_request_channel) =
build_http_bridge::<$service, AxumBackend, _>($handle, HttpMethod::GET, $path)
.await
.unwrap();
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
if let Err(e) = $handler(&channel, res_tx).await {
error!(e);
}
}
Ok(())
}};
}
macro_rules! post_handler {
($handle:expr, $service:ty, $path:expr => $handler:tt) => {{
let (channel, mut http_request_channel) =
build_http_bridge::<$service, AxumBackend, _>($handle, HttpMethod::POST, $path)
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Err(e) = $handler(&channel, payload, res_tx).await {
error!(e);
}
}
Ok(())
}};
}
pub fn carnot_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
get_handler!(handle, Carnot, "info" => handle_carnot_info_req)
}))
}
pub fn block_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (channel, mut http_request_channel) =
build_http_bridge::<Carnot, AxumBackend, _>(handle, HttpMethod::GET, "blocks")
.await
.unwrap();
while let Some(HttpRequest { res_tx, query, .. }) = http_request_channel.recv().await {
if let Err(e) = handle_block_info_req(&channel, query, res_tx).await {
error!(e);
}
}
Ok(())
}))
}
pub fn cl_mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
get_handler!(handle, ClMempoolService, "metrics" => handle_mempool_metrics_req)
}))
}
pub fn da_mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
get_handler!(handle, DaMempoolService, "metrics" => handle_mempool_metrics_req)
}))
}
pub fn da_mempool_status_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
post_handler!(handle, DaMempoolService, "status" => handle_mempool_status_req)
}))
}
pub fn cl_mempool_status_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
post_handler!(handle, ClMempoolService, "status" => handle_mempool_status_req)
}))
}
pub fn storage_get_blocks_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
post_handler!(handle, StorageService<SledBackend<Wire>>, "block" => handle_block_get_req)
}))
}
pub fn network_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
get_handler!(handle, NetworkService<Libp2p>, "info" => handle_libp2p_info_req)
}))
}
pub fn da_blob_get_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
post_handler!(handle, DataAvailabilityService, "blobs" => handle_da_blobs_req)
}))
}
pub async fn handle_da_blobs_req<B>(
da_channel: &OutboundRelay<DaMsg<B>>,
payload: Option<Bytes>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError>
where
B: blob::Blob + Serialize,
B::Hash: DeserializeOwned + Send + 'static,
{
let (reply_channel, receiver) = oneshot::channel();
let ids: Vec<B::Hash> = serde_json::from_slice(payload.unwrap_or_default().as_ref())?;
da_channel
.send(DaMsg::Get {
ids: Box::new(ids.into_iter()),
reply_channel,
})
.await
.map_err(|(e, _)| e)?;
let blobs = receiver.await.unwrap();
res_tx
.send(Ok(serde_json::to_string(&blobs).unwrap().into()))
.await?;
Ok(())
}
pub async fn handle_block_get_req(
storage_channel: &OutboundRelay<StorageMsg<SledBackend<Wire>>>,
payload: Option<Bytes>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let key: BlockId = serde_json::from_slice(payload.unwrap_or_default().as_ref())?;
let (msg, receiver) = StorageMsg::new_load_message(key);
storage_channel.send(msg).await.map_err(|(e, _)| e)?;
let block: Option<Block<Tx, Certificate>> = receiver.recv().await?;
res_tx
.send(Ok(serde_json::to_string(&block).unwrap().into()))
.await?;
Ok(())
}
pub async fn handle_block_info_req(
carnot_channel: &OutboundRelay<ConsensusMsg>,
query: HashMap<String, String>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
fn parse_block_id(field: Option<&String>) -> Result<Option<BlockId>, overwatch_rs::DynError> {
field
.map(|id| {
hex::decode(id)
.map_err(|e| e.into())
.and_then(|bytes| {
<[u8; 32]>::try_from(bytes)
.map_err(|e| format!("expected 32 bytes found {}", e.len()).into())
})
.map(BlockId::from)
})
.transpose()
}
const QUERY_FROM: &str = "from";
const QUERY_TO: &str = "to";
let (sender, receiver) = oneshot::channel();
carnot_channel
.send(ConsensusMsg::GetBlocks {
from: parse_block_id(query.get(QUERY_FROM))?,
to: parse_block_id(query.get(QUERY_TO))?,
tx: sender,
})
.await
.map_err(|(e, _)| e)?;
let blocks = receiver.await.unwrap();
res_tx.send(Ok(serde_json::to_vec(&blocks)?.into())).await?;
Ok(())
}
pub async fn handle_mempool_status_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<V, K>>,
payload: Option<Bytes>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError>
where
K: DeserializeOwned,
{
let (sender, receiver) = oneshot::channel();
let items: Vec<K> = serde_json::from_slice(payload.unwrap_or_default().as_ref())?;
mempool_channel
.send(MempoolMsg::Status {
items,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
let status = receiver.await.unwrap();
res_tx
.send(Ok(serde_json::to_string(&status).unwrap().into()))
.await?;
Ok(())
}
pub fn mempool_add_tx_bridge<N, A>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner
where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Item = Tx, Key = <Tx as Transaction>::Hash>
+ Send
+ Sync
+ 'static,
A::Settings: Send + Sync,
{
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) =
build_http_bridge::<
MempoolService<A, MockPool<Tx, <Tx as Transaction>::Hash>, TxDiscriminant>,
AxumBackend,
_,
>(handle.clone(), HttpMethod::POST, "add")
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Err(e) = handle_mempool_add_req(
&mempool_channel,
res_tx,
payload.unwrap_or_default(),
|tx| tx.hash(),
)
.await
{
error!(e);
}
}
Ok(())
}))
}
pub fn mempool_add_cert_bridge<N, A>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner
where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Item = Certificate, Key = <Blob as blob::Blob>::Hash>
+ Send
+ Sync
+ 'static,
A::Settings: Send + Sync,
{
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) = build_http_bridge::<
MempoolService<A, MockPool<Certificate, <Blob as blob::Blob>::Hash>, CertDiscriminant>,
AxumBackend,
_,
>(
handle.clone(), HttpMethod::POST, "add"
)
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Err(e) = handle_mempool_add_req(
&mempool_channel,
res_tx,
payload.unwrap_or_default(),
|cert| cert.hash(),
)
.await
{
error!(e);
}
}
Ok(())
}))
}
async fn handle_carnot_info_req(
carnot_channel: &OutboundRelay<ConsensusMsg>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
carnot_channel
.send(ConsensusMsg::Info { tx: sender })
.await
.map_err(|(e, _)| e)?;
let carnot_info: CarnotInfo = receiver.await.unwrap();
res_tx
.send(Ok(serde_json::to_vec(&carnot_info)?.into()))
.await?;
Ok(())
}
async fn handle_mempool_metrics_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<K, V>>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
let metrics: MempoolMetrics = receiver.await.unwrap();
res_tx
// TODO: use serde to serialize metrics
.send(Ok(format!(
"{{\"pending_items\": {}, \"last_item\": {}}}",
metrics.pending_items, metrics.last_item_timestamp
)
.into()))
.await?;
Ok(())
}
pub(super) async fn handle_mempool_add_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<K, V>>,
res_tx: Sender<HttpResponse>,
wire_item: Bytes,
key: impl Fn(&K) -> V,
) -> Result<(), overwatch_rs::DynError>
where
K: DeserializeOwned,
{
let item: K = serde_json::from_slice(&wire_item)?;
let (sender, receiver) = oneshot::channel();
let key = key(&item);
mempool_channel
.send(MempoolMsg::Add {
item,
key,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
match receiver.await {
Ok(Ok(())) => Ok(res_tx.send(Ok(b"".to_vec().into())).await?),
Ok(Err(())) => Ok(res_tx
.send(Err((
StatusCode::CONFLICT,
"error: unable to add tx".into(),
)))
.await?),
Err(err) => Ok(res_tx
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
.await?),
}
}

View File

@ -4,18 +4,18 @@ use std::{
time::Duration,
};
use crate::Carnot;
use crate::DataAvailability;
use crate::{Carnot, Tx, Wire, MB16};
use clap::{Parser, ValueEnum};
use color_eyre::eyre::{self, eyre, Result};
use hex::FromHex;
#[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
use nomos_http::{backends::axum::AxumBackend, http::HttpService};
use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
use nomos_network::backends::libp2p::Libp2p;
use nomos_network::NetworkService;
use nomos_node_api::{http::backend::axum::AxumBackend, ApiService};
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use tracing::Level;
@ -118,7 +118,7 @@ pub struct DaArgs {
pub struct Config {
pub log: <Logger as ServiceData>::Settings,
pub network: <NetworkService<Libp2p> as ServiceData>::Settings,
pub http: <HttpService<AxumBackend> as ServiceData>::Settings,
pub http: <ApiService<AxumBackend<Tx, Wire, MB16>> as ServiceData>::Settings,
pub consensus: <Carnot as ServiceData>::Settings,
#[cfg(feature = "metrics")]
pub metrics: <MetricsService<MapMetricsBackend<MetricsData>> as ServiceData>::Settings,
@ -206,11 +206,11 @@ impl Config {
} = http_args;
if let Some(addr) = http_addr {
self.http.backend.address = addr;
self.http.backend_settings.address = addr;
}
if let Some(cors) = cors_origins {
self.http.backend.cors_origins = cors;
self.http.backend_settings.cors_origins = cors;
}
Ok(self)

View File

@ -20,9 +20,6 @@ use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
DataAvailabilityService,
};
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::HttpBridgeService;
use nomos_http::http::HttpService;
use nomos_log::Logger;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter;
use nomos_mempool::{
@ -30,6 +27,8 @@ use nomos_mempool::{
Transaction as TxDiscriminant,
};
use nomos_network::backends::libp2p::Libp2p;
use nomos_node_api::http::backend::axum::AxumBackend;
use nomos_node_api::ApiService;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
StorageService,
@ -88,8 +87,7 @@ pub struct Nomos {
>,
>,
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,
http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
da: ServiceHandle<DataAvailability>,

View File

@ -4,19 +4,16 @@ use nomos_node::{
OverlayArgs, Tx,
};
mod bridges;
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_core::{
da::{blob, certificate},
tx::Transaction,
};
use nomos_http::bridge::{HttpBridge, HttpBridgeSettings};
use nomos_mempool::network::adapters::libp2p::{Libp2pAdapter, Settings as AdapterSettings};
use nomos_network::backends::libp2p::Libp2p;
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use overwatch_rs::overwatch::*;
use std::sync::Arc;
const DEFAULT_DB_PATH: &str = "./db";
@ -63,28 +60,6 @@ fn main() -> Result<()> {
.update_overlay(overlay_args)?
.update_network(network_args)?;
let bridges: Vec<HttpBridge> = vec![
Arc::new(Box::new(bridges::carnot_info_bridge)),
Arc::new(Box::new(bridges::block_info_bridge)),
// Due to a limitation in the current api system, we can't connect a single endopint to multiple services
// which means we need two different paths for complete mempool metrics.
Arc::new(Box::new(bridges::cl_mempool_metrics_bridge)),
Arc::new(Box::new(bridges::da_mempool_metrics_bridge)),
Arc::new(Box::new(bridges::cl_mempool_status_bridge)),
Arc::new(Box::new(bridges::da_mempool_status_bridge)),
Arc::new(Box::new(bridges::da_blob_get_bridge)),
Arc::new(Box::new(bridges::storage_get_blocks_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)),
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,
)),
Arc::new(Box::new(
bridges::mempool_add_cert_bridge::<
Libp2p,
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
>,
)),
];
let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings {
network: config.network,
@ -105,7 +80,6 @@ fn main() -> Result<()> {
},
},
consensus: config.consensus,
bridges: HttpBridgeSettings { bridges },
#[cfg(feature = "metrics")]
metrics: config.metrics,
da: config.da,

View File

@ -10,6 +10,7 @@ pub async fn get_blobs(
const BLOBS_PATH: &str = "da/blobs";
CLIENT
.post(node.join(BLOBS_PATH).unwrap())
.header("Content-Type", "application/json")
.body(serde_json::to_string(&ids).unwrap())
.send()
.await?

View File

@ -6,9 +6,10 @@ pub async fn send_certificate<C>(node: &Url, cert: &C) -> Result<Response, Error
where
C: Serialize,
{
const NODE_CERT_PATH: &str = "mempool-da/add";
const NODE_CERT_PATH: &str = "mempool/add/cert";
CLIENT
.post(node.join(NODE_CERT_PATH).unwrap())
.header("Content-Type", "application/json")
.body(serde_json::to_string(cert).unwrap())
.send()
.await

View File

@ -95,7 +95,6 @@ impl CertificateStrategy for AbsoluteNumber<Attestation, Certificate> {
}
}
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub type Voter = [u8; 32];
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]

View File

@ -9,7 +9,7 @@ nomos-node = { path = "../nodes/nomos-node", default-features = false }
nomos-consensus = { path = "../nomos-services/consensus" }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"]}
nomos-log = { path = "../nomos-services/log" }
nomos-http = { path = "../nomos-services/http", features = ["http"] }
nomos-node-api = { path = "../nodes/nomos-node-api", features = ["axum"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
nomos-core = { path = "../nomos-core" }
consensus-engine = { path = "../consensus-engine", features = ["serde"] }

View File

@ -12,13 +12,13 @@ use mixnet_node::MixnetNodeConfig;
use mixnet_topology::MixnetTopology;
use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_core::block::Block;
use nomos_http::backends::axum::AxumBackendSettings;
use nomos_libp2p::{multiaddr, Multiaddr};
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
use nomos_network::backends::libp2p::Libp2pConfig;
use nomos_network::NetworkConfig;
use nomos_node::{Config, Tx};
use nomos_node_api::http::backend::axum::AxumBackendSettings;
// crates
use fraction::Fraction;
use once_cell::sync::Lazy;
@ -30,7 +30,6 @@ static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const CARNOT_INFO_API: &str = "carnot/info";
const STORAGE_BLOCKS_API: &str = "storage/block";
const MEMPOOL_API: &str = "mempool-";
const LOGS_PREFIX: &str = "__logs";
const GET_BLOCKS_INFO: &str = "carnot/blocks";
@ -82,11 +81,11 @@ impl NomosNode {
let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN))
.arg(&config_path)
.current_dir(dir.path())
.stdout(Stdio::null())
.stdout(Stdio::inherit())
.spawn()
.unwrap();
let node = Self {
addr: config.http.backend.address,
addr: config.http.backend_settings.address,
child,
_tempdir: dir,
config,
@ -120,6 +119,7 @@ impl NomosNode {
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
CLIENT
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&id).unwrap())
.send()
.await
@ -134,7 +134,7 @@ impl NomosNode {
Pool::Cl => "cl",
Pool::Da => "da",
};
let addr = format!("{}{}/metrics", MEMPOOL_API, discr);
let addr = format!("{}/metrics", discr);
let res = self
.get(&addr)
.await
@ -144,7 +144,7 @@ impl NomosNode {
.unwrap();
MempoolMetrics {
pending_items: res["pending_items"].as_u64().unwrap() as usize,
last_item_timestamp: res["last_item"].as_u64().unwrap(),
last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(),
}
}
@ -334,8 +334,8 @@ fn create_node_config(
blob_selector_settings: (),
},
log: Default::default(),
http: nomos_http::http::HttpServiceSettings {
backend: AxumBackendSettings {
http: nomos_node_api::ApiServiceSettings {
backend_settings: AxumBackendSettings {
address: format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap(),

View File

@ -37,9 +37,12 @@ async fn disseminate_blob() {
},
},
node_addr: Some(
format!("http://{}", nodes[0].config().http.backend.address.clone())
.parse()
.unwrap(),
format!(
"http://{}",
nodes[0].config().http.backend_settings.address.clone()
)
.parse()
.unwrap(),
),
output: None,
});