DA: Executor http api (#801)
* Node api handlers and backend modules * Executor axum api backend * Expose config functions from node * Descriptive generics names in http api * Nomos node metrics feature
This commit is contained in:
parent
9ecd738d6e
commit
b01c4dd8c7
@ -4,14 +4,39 @@ version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
axum = { version = "0.6" }
|
||||
clap = { version = "4.5.13", features = ["derive"] }
|
||||
color-eyre = "0.6.0"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
|
||||
nomos-api = { path = "../../nomos-services/api" }
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
nomos-da-network-core = { path = "../../nomos-da/network/core" }
|
||||
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
|
||||
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling", features = ["rocksdb-backend"] }
|
||||
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
|
||||
nomos-libp2p = { path = "../../nomos-libp2p" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||
"mock",
|
||||
"libp2p",
|
||||
] }
|
||||
nomos-metrics = { path = "../../nomos-services/metrics" }
|
||||
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
|
||||
nomos-node = { path = "../nomos-node" }
|
||||
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
clap = { version = "4.5.13", features = ["derive"] }
|
||||
rand = "0.8"
|
||||
rand_chacha = "0.3"
|
||||
serde = "1"
|
||||
serde_yaml = "0.9"
|
||||
subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" }
|
||||
tower-http = { version = "0.4", features = ["cors", "trace"] }
|
||||
tracing = "0.1.40"
|
||||
utoipa = "4.0"
|
||||
utoipa-swagger-ui = { version = "4.0" }
|
||||
uuid = { version = "1.10.0", features = ["v4"] }
|
||||
serde_yaml = "0.9.34+deprecated"
|
||||
|
||||
[features]
|
||||
default = ["tracing"]
|
||||
|
312
nodes/nomos-executor/src/api/backend.rs
Normal file
312
nodes/nomos-executor/src/api/backend.rs
Normal file
@ -0,0 +1,312 @@
|
||||
// std
|
||||
use std::error::Error;
|
||||
use std::{fmt::Debug, hash::Hash};
|
||||
// crates
|
||||
use axum::{http::HeaderValue, routing, Router, Server};
|
||||
use hyper::header::{CONTENT_TYPE, USER_AGENT};
|
||||
use nomos_api::Backend;
|
||||
use nomos_core::da::blob::info::DispersedBlobInfo;
|
||||
use nomos_core::da::blob::metadata::Metadata;
|
||||
use nomos_core::da::DaVerifier as CoreDaVerifier;
|
||||
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_da_sampling::backend::DaSamplingServiceBackend;
|
||||
use nomos_da_verifier::backend::VerifierBackend;
|
||||
use nomos_libp2p::PeerId;
|
||||
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
|
||||
use nomos_node::api::handlers::{
|
||||
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
|
||||
cryptarchia_info, get_metrics, get_range, libp2p_info,
|
||||
};
|
||||
use nomos_storage::backends::StorageSerde;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tower_http::{
|
||||
cors::{Any, CorsLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
// internal
|
||||
|
||||
/// Configuration for the Http Server
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct AxumBackendSettings {
|
||||
/// Socket where the server will be listening on for incoming requests.
|
||||
pub address: std::net::SocketAddr,
|
||||
/// Allowed origins for this server deployment requests.
|
||||
pub cors_origins: Vec<String>,
|
||||
}
|
||||
|
||||
pub struct AxumBackend<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
DaBlobInfo,
|
||||
Memebership,
|
||||
DaVerifiedBlobInfo,
|
||||
DaVerifierBackend,
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> {
|
||||
settings: AxumBackendSettings,
|
||||
_attestation: core::marker::PhantomData<DaAttestation>,
|
||||
_blob: core::marker::PhantomData<DaBlob>,
|
||||
_certificate: core::marker::PhantomData<DaBlobInfo>,
|
||||
_membership: core::marker::PhantomData<Memebership>,
|
||||
_vid: core::marker::PhantomData<DaVerifiedBlobInfo>,
|
||||
_verifier_backend: core::marker::PhantomData<DaVerifierBackend>,
|
||||
_tx: core::marker::PhantomData<Tx>,
|
||||
_storage_serde: core::marker::PhantomData<DaStorageSerializer>,
|
||||
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
|
||||
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
|
||||
_sampling_rng: core::marker::PhantomData<SamplingRng>,
|
||||
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
|
||||
}
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(
|
||||
),
|
||||
components(
|
||||
schemas(Status<HeaderId>, MempoolMetrics)
|
||||
),
|
||||
tags(
|
||||
(name = "da", description = "data availibility related APIs")
|
||||
)
|
||||
)]
|
||||
struct ApiDoc;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
DaBlobInfo,
|
||||
Membership,
|
||||
DaVerifiedBlobInfo,
|
||||
DaVerifierBackend,
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> Backend
|
||||
for AxumBackend<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
DaBlobInfo,
|
||||
Membership,
|
||||
DaVerifiedBlobInfo,
|
||||
DaVerifierBackend,
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>
|
||||
where
|
||||
DaAttestation: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
<DaBlob as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
|
||||
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
|
||||
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<DaBlobInfo as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
|
||||
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
DaVerifiedBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ From<DaBlobInfo>
|
||||
+ Eq
|
||||
+ Debug
|
||||
+ Metadata
|
||||
+ Hash
|
||||
+ Clone
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
|
||||
<DaVerifiedBlobInfo as Metadata>::AppId:
|
||||
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
|
||||
<DaVerifiedBlobInfo as Metadata>::Index:
|
||||
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
|
||||
DaVerifierBackend: VerifierBackend + CoreDaVerifier<DaBlob = DaBlob> + Send + Sync + 'static,
|
||||
<DaVerifierBackend as VerifierBackend>::Settings: Clone,
|
||||
<DaVerifierBackend as CoreDaVerifier>::Error: Error,
|
||||
Tx: Transaction
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ for<'de> Deserialize<'de>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<Tx as nomos_core::tx::Transaction>::Hash:
|
||||
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
DaStorageSerializer: StorageSerde + Send + Sync + 'static,
|
||||
SamplingRng: SeedableRng + RngCore + Send + 'static,
|
||||
SamplingBackend: DaSamplingServiceBackend<
|
||||
SamplingRng,
|
||||
BlobId = <DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId,
|
||||
> + Send
|
||||
+ 'static,
|
||||
SamplingBackend::Settings: Clone,
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
|
||||
{
|
||||
type Error = hyper::Error;
|
||||
type Settings = AxumBackendSettings;
|
||||
|
||||
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Ok(Self {
|
||||
settings,
|
||||
_attestation: core::marker::PhantomData,
|
||||
_blob: core::marker::PhantomData,
|
||||
_certificate: core::marker::PhantomData,
|
||||
_membership: core::marker::PhantomData,
|
||||
_vid: core::marker::PhantomData,
|
||||
_verifier_backend: core::marker::PhantomData,
|
||||
_tx: core::marker::PhantomData,
|
||||
_storage_serde: core::marker::PhantomData,
|
||||
_sampling_backend: core::marker::PhantomData,
|
||||
_sampling_network_adapter: core::marker::PhantomData,
|
||||
_sampling_rng: core::marker::PhantomData,
|
||||
_sampling_storage: core::marker::PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
|
||||
let mut builder = CorsLayer::new();
|
||||
if self.settings.cors_origins.is_empty() {
|
||||
builder = builder.allow_origin(Any);
|
||||
}
|
||||
|
||||
for origin in &self.settings.cors_origins {
|
||||
builder = builder.allow_origin(
|
||||
origin
|
||||
.as_str()
|
||||
.parse::<HeaderValue>()
|
||||
.expect("fail to parse origin"),
|
||||
);
|
||||
}
|
||||
|
||||
let app = Router::new()
|
||||
.layer(
|
||||
builder
|
||||
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
||||
.allow_methods(Any),
|
||||
)
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
|
||||
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
|
||||
.route("/cl/status", routing::post(cl_status::<Tx>))
|
||||
.route(
|
||||
"/cryptarchia/info",
|
||||
routing::get(
|
||||
cryptarchia_info::<
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/cryptarchia/headers",
|
||||
routing::get(
|
||||
cryptarchia_headers::<
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/da/add_blob",
|
||||
routing::post(
|
||||
add_blob::<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
Membership,
|
||||
DaVerifierBackend,
|
||||
DaStorageSerializer,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/da/get_range",
|
||||
routing::post(
|
||||
get_range::<
|
||||
Tx,
|
||||
DaBlobInfo,
|
||||
DaVerifiedBlobInfo,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/network/info", routing::get(libp2p_info))
|
||||
.route(
|
||||
"/storage/block",
|
||||
routing::post(block::<DaStorageSerializer, Tx>),
|
||||
)
|
||||
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
|
||||
.route(
|
||||
"/mempool/add/blobinfo",
|
||||
routing::post(
|
||||
add_blob_info::<
|
||||
DaVerifiedBlobInfo,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/metrics", routing::get(get_metrics))
|
||||
.with_state(handle);
|
||||
|
||||
Server::bind(&self.settings.address)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
}
|
||||
}
|
1
nodes/nomos-executor/src/api/mod.rs
Normal file
1
nodes/nomos-executor/src/api/mod.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod backend;
|
67
nodes/nomos-executor/src/config.rs
Normal file
67
nodes/nomos-executor/src/config.rs
Normal file
@ -0,0 +1,67 @@
|
||||
// std
|
||||
// crates
|
||||
use color_eyre::eyre::Result;
|
||||
use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend;
|
||||
use nomos_da_network_service::NetworkService as DaNetworkService;
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
use nomos_node::{
|
||||
config::{update_cryptarchia_consensus, update_log, update_network},
|
||||
CryptarchiaArgs, HttpArgs, LogArgs, Logger, NetworkArgs, NetworkService, Wire,
|
||||
};
|
||||
use nomos_storage::backends::rocksdb::RocksBackend;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use subnetworks_assignations::versions::v1::FillFromNodeList;
|
||||
// internal
|
||||
use crate::ExecutorApiService;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Serialize)]
|
||||
pub struct Config {
|
||||
pub log: <Logger as ServiceData>::Settings,
|
||||
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
|
||||
pub da_network:
|
||||
<DaNetworkService<DaNetworkExecutorBackend<FillFromNodeList>> as ServiceData>::Settings,
|
||||
pub da_indexer: <crate::DaIndexer as ServiceData>::Settings,
|
||||
pub da_verifier: <crate::DaVerifier as ServiceData>::Settings,
|
||||
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
|
||||
pub http: <ExecutorApiService as ServiceData>::Settings,
|
||||
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
|
||||
pub storage: <crate::StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
|
||||
pub wait_online_secs: u64,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn update_from_args(
|
||||
mut self,
|
||||
log_args: LogArgs,
|
||||
network_args: NetworkArgs,
|
||||
http_args: HttpArgs,
|
||||
cryptarchia_args: CryptarchiaArgs,
|
||||
) -> Result<Self> {
|
||||
update_log(&mut self.log, log_args)?;
|
||||
update_network(&mut self.network, network_args)?;
|
||||
update_http(&mut self.http, http_args)?;
|
||||
update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?;
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_http(
|
||||
http: &mut <ExecutorApiService as ServiceData>::Settings,
|
||||
http_args: HttpArgs,
|
||||
) -> Result<()> {
|
||||
let HttpArgs {
|
||||
http_addr,
|
||||
cors_origins,
|
||||
} = http_args;
|
||||
|
||||
if let Some(addr) = http_addr {
|
||||
http.backend_settings.address = addr;
|
||||
}
|
||||
|
||||
if let Some(cors) = cors_origins {
|
||||
http.backend_settings.cors_origins = cors;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@ -1,6 +1,38 @@
|
||||
mod api;
|
||||
pub mod config;
|
||||
|
||||
// std
|
||||
// crates
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use nomos_api::ApiService;
|
||||
use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend;
|
||||
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
|
||||
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
|
||||
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
|
||||
use nomos_node::*;
|
||||
use overwatch_derive::Services;
|
||||
use overwatch_rs::services::handle::ServiceHandle;
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
// internal
|
||||
use api::backend::AxumBackend;
|
||||
|
||||
pub type ExecutorApiService = ApiService<
|
||||
AxumBackend<
|
||||
(),
|
||||
DaBlob,
|
||||
BlobInfo,
|
||||
NomosDaMembership,
|
||||
BlobInfo,
|
||||
KzgrsDaVerifier,
|
||||
Tx,
|
||||
Wire,
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
MB16,
|
||||
>,
|
||||
>;
|
||||
|
||||
#[derive(Services)]
|
||||
pub struct NomosExecutor {
|
||||
@ -10,11 +42,11 @@ pub struct NomosExecutor {
|
||||
da_indexer: ServiceHandle<DaIndexer>,
|
||||
da_verifier: ServiceHandle<DaVerifier>,
|
||||
da_sampling: ServiceHandle<DaSampling>,
|
||||
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<NomosDaMembership>>>,
|
||||
da_network: ServiceHandle<DaNetworkService<DaNetworkExecutorBackend<NomosDaMembership>>>,
|
||||
cl_mempool: ServiceHandle<TxMempool>,
|
||||
da_mempool: ServiceHandle<DaMempool>,
|
||||
cryptarchia: ServiceHandle<Cryptarchia>,
|
||||
http: ServiceHandle<NomosApiService>,
|
||||
http: ServiceHandle<ExecutorApiService>,
|
||||
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics: ServiceHandle<Metrics>,
|
||||
|
@ -1,13 +1,20 @@
|
||||
use nomos_node::*;
|
||||
#[cfg(feature = "metrics")]
|
||||
use nomos_node::{MetricsSettings, NomosRegistry};
|
||||
|
||||
// std
|
||||
// crates
|
||||
use clap::Parser;
|
||||
use color_eyre::eyre::{eyre, Result};
|
||||
use nomos_executor::config::Config as ExecutorConfig;
|
||||
use nomos_executor::{NomosExecutor, NomosExecutorServiceSettings};
|
||||
#[cfg(feature = "metrics")]
|
||||
use nomos_node::MetricsSettings;
|
||||
use nomos_node::{
|
||||
BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs, LogArgs,
|
||||
MempoolAdapterSettings, MetricsArgs, NetworkArgs, Transaction, Tx, TxMempoolSettings, CL_TOPIC,
|
||||
DA_TOPIC,
|
||||
};
|
||||
use overwatch_rs::overwatch::*;
|
||||
use tracing::{span, Level};
|
||||
use uuid::Uuid;
|
||||
// internal
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
@ -39,14 +46,15 @@ fn main() -> Result<()> {
|
||||
cryptarchia_args,
|
||||
metrics_args,
|
||||
} = Args::parse();
|
||||
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
|
||||
.update_log(log_args)?
|
||||
.update_http(http_args)?
|
||||
.update_network(network_args)?
|
||||
.update_cryptarchia_consensus(cryptarchia_args)?;
|
||||
let config = serde_yaml::from_reader::<_, ExecutorConfig>(std::fs::File::open(config)?)?
|
||||
.update_from_args(log_args, network_args, http_args, cryptarchia_args)?;
|
||||
|
||||
let registry = cfg!(feature = "metrics")
|
||||
.then(|| metrics_args.with_metrics.then(NomosRegistry::default))
|
||||
.then(|| {
|
||||
metrics_args
|
||||
.with_metrics
|
||||
.then(nomos_metrics::NomosRegistry::default)
|
||||
})
|
||||
.flatten();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
|
312
nodes/nomos-node/src/api/backend.rs
Normal file
312
nodes/nomos-node/src/api/backend.rs
Normal file
@ -0,0 +1,312 @@
|
||||
// std
|
||||
use std::error::Error;
|
||||
use std::{fmt::Debug, hash::Hash};
|
||||
// crates
|
||||
use axum::{http::HeaderValue, routing, Router, Server};
|
||||
use hyper::header::{CONTENT_TYPE, USER_AGENT};
|
||||
use nomos_api::Backend;
|
||||
use nomos_core::da::blob::info::DispersedBlobInfo;
|
||||
use nomos_core::da::blob::metadata::Metadata;
|
||||
use nomos_core::da::DaVerifier as CoreDaVerifier;
|
||||
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_da_sampling::backend::DaSamplingServiceBackend;
|
||||
use nomos_da_verifier::backend::VerifierBackend;
|
||||
use nomos_libp2p::PeerId;
|
||||
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
|
||||
use nomos_storage::backends::StorageSerde;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tower_http::{
|
||||
cors::{Any, CorsLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
// internal
|
||||
use super::handlers::{
|
||||
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
|
||||
cryptarchia_info, get_metrics, get_range, libp2p_info,
|
||||
};
|
||||
|
||||
/// Configuration for the Http Server
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct AxumBackendSettings {
|
||||
/// Socket where the server will be listening on for incoming requests.
|
||||
pub address: std::net::SocketAddr,
|
||||
/// Allowed origins for this server deployment requests.
|
||||
pub cors_origins: Vec<String>,
|
||||
}
|
||||
|
||||
pub struct AxumBackend<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
DaBlobInfo,
|
||||
Membership,
|
||||
DaVerifiedBlobInfo,
|
||||
DaVerifierBackend,
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> {
|
||||
settings: AxumBackendSettings,
|
||||
_attestation: core::marker::PhantomData<DaAttestation>,
|
||||
_blob: core::marker::PhantomData<DaBlob>,
|
||||
_certificate: core::marker::PhantomData<DaBlobInfo>,
|
||||
_membership: core::marker::PhantomData<Membership>,
|
||||
_vid: core::marker::PhantomData<DaVerifiedBlobInfo>,
|
||||
_verifier_backend: core::marker::PhantomData<DaVerifierBackend>,
|
||||
_tx: core::marker::PhantomData<Tx>,
|
||||
_storage_serde: core::marker::PhantomData<DaStorageSerializer>,
|
||||
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
|
||||
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
|
||||
_sampling_rng: core::marker::PhantomData<SamplingRng>,
|
||||
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
|
||||
}
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(
|
||||
),
|
||||
components(
|
||||
schemas(Status<HeaderId>, MempoolMetrics)
|
||||
),
|
||||
tags(
|
||||
(name = "da", description = "data availibility related APIs")
|
||||
)
|
||||
)]
|
||||
struct ApiDoc;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
DaBlobInfo,
|
||||
Membership,
|
||||
DaVerifiedBlobInfo,
|
||||
DaVerifierBackend,
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> Backend
|
||||
for AxumBackend<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
DaBlobInfo,
|
||||
Membership,
|
||||
DaVerifiedBlobInfo,
|
||||
DaVerifierBackend,
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>
|
||||
where
|
||||
DaAttestation: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
<DaBlob as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
|
||||
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
|
||||
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<DaBlobInfo as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
|
||||
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
DaVerifiedBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ From<DaBlobInfo>
|
||||
+ Eq
|
||||
+ Debug
|
||||
+ Metadata
|
||||
+ Hash
|
||||
+ Clone
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
|
||||
<DaVerifiedBlobInfo as Metadata>::AppId:
|
||||
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
|
||||
<DaVerifiedBlobInfo as Metadata>::Index:
|
||||
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
|
||||
DaVerifierBackend: VerifierBackend + CoreDaVerifier<DaBlob = DaBlob> + Send + Sync + 'static,
|
||||
<DaVerifierBackend as VerifierBackend>::Settings: Clone,
|
||||
<DaVerifierBackend as CoreDaVerifier>::Error: Error,
|
||||
Tx: Transaction
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ for<'de> Deserialize<'de>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<Tx as nomos_core::tx::Transaction>::Hash:
|
||||
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
DaStorageSerializer: StorageSerde + Send + Sync + 'static,
|
||||
SamplingRng: SeedableRng + RngCore + Send + 'static,
|
||||
SamplingBackend: DaSamplingServiceBackend<
|
||||
SamplingRng,
|
||||
BlobId = <DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId,
|
||||
> + Send
|
||||
+ 'static,
|
||||
SamplingBackend::Settings: Clone,
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
|
||||
{
|
||||
type Error = hyper::Error;
|
||||
type Settings = AxumBackendSettings;
|
||||
|
||||
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Ok(Self {
|
||||
settings,
|
||||
_attestation: core::marker::PhantomData,
|
||||
_blob: core::marker::PhantomData,
|
||||
_certificate: core::marker::PhantomData,
|
||||
_membership: core::marker::PhantomData,
|
||||
_vid: core::marker::PhantomData,
|
||||
_verifier_backend: core::marker::PhantomData,
|
||||
_tx: core::marker::PhantomData,
|
||||
_storage_serde: core::marker::PhantomData,
|
||||
_sampling_backend: core::marker::PhantomData,
|
||||
_sampling_network_adapter: core::marker::PhantomData,
|
||||
_sampling_rng: core::marker::PhantomData,
|
||||
_sampling_storage: core::marker::PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
|
||||
let mut builder = CorsLayer::new();
|
||||
if self.settings.cors_origins.is_empty() {
|
||||
builder = builder.allow_origin(Any);
|
||||
}
|
||||
|
||||
for origin in &self.settings.cors_origins {
|
||||
builder = builder.allow_origin(
|
||||
origin
|
||||
.as_str()
|
||||
.parse::<HeaderValue>()
|
||||
.expect("fail to parse origin"),
|
||||
);
|
||||
}
|
||||
|
||||
let app = Router::new()
|
||||
.layer(
|
||||
builder
|
||||
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
||||
.allow_methods(Any),
|
||||
)
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
|
||||
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
|
||||
.route("/cl/status", routing::post(cl_status::<Tx>))
|
||||
.route(
|
||||
"/cryptarchia/info",
|
||||
routing::get(
|
||||
cryptarchia_info::<
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/cryptarchia/headers",
|
||||
routing::get(
|
||||
cryptarchia_headers::<
|
||||
Tx,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/da/add_blob",
|
||||
routing::post(
|
||||
add_blob::<
|
||||
DaAttestation,
|
||||
DaBlob,
|
||||
Membership,
|
||||
DaVerifierBackend,
|
||||
DaStorageSerializer,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/da/get_range",
|
||||
routing::post(
|
||||
get_range::<
|
||||
Tx,
|
||||
DaBlobInfo,
|
||||
DaVerifiedBlobInfo,
|
||||
DaStorageSerializer,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/network/info", routing::get(libp2p_info))
|
||||
.route(
|
||||
"/storage/block",
|
||||
routing::post(block::<DaStorageSerializer, Tx>),
|
||||
)
|
||||
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
|
||||
.route(
|
||||
"/mempool/add/blobinfo",
|
||||
routing::post(
|
||||
add_blob_info::<
|
||||
DaVerifiedBlobInfo,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/metrics", routing::get(get_metrics))
|
||||
.with_state(handle);
|
||||
|
||||
Server::bind(&self.settings.address)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
}
|
||||
}
|
@ -7,16 +7,10 @@ use axum::{
|
||||
extract::{Query, State},
|
||||
http::HeaderValue,
|
||||
response::{IntoResponse, Response},
|
||||
routing, Json, Router, Server,
|
||||
};
|
||||
use hyper::{
|
||||
header::{CONTENT_TYPE, USER_AGENT},
|
||||
Body, StatusCode,
|
||||
};
|
||||
use nomos_api::{
|
||||
http::{cl, consensus, da, libp2p, mempool, metrics, storage},
|
||||
Backend,
|
||||
Json,
|
||||
};
|
||||
use hyper::{header::CONTENT_TYPE, Body, StatusCode};
|
||||
use nomos_api::http::{cl, consensus, da, libp2p, mempool, metrics, storage};
|
||||
use nomos_core::da::blob::info::DispersedBlobInfo;
|
||||
use nomos_core::da::blob::metadata::Metadata;
|
||||
use nomos_core::da::{BlobId, DaVerifier as CoreDaVerifier};
|
||||
@ -25,287 +19,15 @@ use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_da_sampling::backend::DaSamplingServiceBackend;
|
||||
use nomos_da_verifier::backend::VerifierBackend;
|
||||
use nomos_libp2p::PeerId;
|
||||
use nomos_mempool::{
|
||||
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
|
||||
tx::service::openapi::Status, MempoolMetrics,
|
||||
};
|
||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
use nomos_storage::backends::StorageSerde;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tower_http::{
|
||||
cors::{Any, CorsLayer},
|
||||
trace::TraceLayer,
|
||||
};
|
||||
use utoipa::OpenApi;
|
||||
use utoipa_swagger_ui::SwaggerUi;
|
||||
// internal
|
||||
|
||||
/// Configuration for the Http Server
|
||||
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
|
||||
pub struct AxumBackendSettings {
|
||||
/// Socket where the server will be listening on for incoming requests.
|
||||
pub address: std::net::SocketAddr,
|
||||
/// Allowed origins for this server deployment requests.
|
||||
pub cors_origins: Vec<String>,
|
||||
}
|
||||
|
||||
pub struct AxumBackend<
|
||||
A,
|
||||
B,
|
||||
C,
|
||||
M,
|
||||
V,
|
||||
VB,
|
||||
T,
|
||||
S,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> {
|
||||
settings: AxumBackendSettings,
|
||||
_attestation: core::marker::PhantomData<A>,
|
||||
_blob: core::marker::PhantomData<B>,
|
||||
_certificate: core::marker::PhantomData<C>,
|
||||
_membership: core::marker::PhantomData<M>,
|
||||
_vid: core::marker::PhantomData<V>,
|
||||
_verifier_backend: core::marker::PhantomData<VB>,
|
||||
_tx: core::marker::PhantomData<T>,
|
||||
_storage_serde: core::marker::PhantomData<S>,
|
||||
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
|
||||
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
|
||||
_sampling_rng: core::marker::PhantomData<SamplingRng>,
|
||||
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
|
||||
}
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
paths(
|
||||
),
|
||||
components(
|
||||
schemas(Status<HeaderId>, MempoolMetrics)
|
||||
),
|
||||
tags(
|
||||
(name = "da", description = "data availibility related APIs")
|
||||
)
|
||||
)]
|
||||
struct ApiDoc;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<
|
||||
A,
|
||||
B,
|
||||
C,
|
||||
M,
|
||||
V,
|
||||
VB,
|
||||
T,
|
||||
S,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> Backend
|
||||
for AxumBackend<
|
||||
A,
|
||||
B,
|
||||
C,
|
||||
M,
|
||||
V,
|
||||
VB,
|
||||
T,
|
||||
S,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>
|
||||
where
|
||||
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
|
||||
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
|
||||
C: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<C as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
V: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ From<C>
|
||||
+ Eq
|
||||
+ Debug
|
||||
+ Metadata
|
||||
+ Hash
|
||||
+ Clone
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<V as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
|
||||
<V as Metadata>::AppId: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
|
||||
<V as Metadata>::Index:
|
||||
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
|
||||
VB: VerifierBackend + CoreDaVerifier<DaBlob = B> + Send + Sync + 'static,
|
||||
<VB as VerifierBackend>::Settings: Clone,
|
||||
<VB as CoreDaVerifier>::Error: Error,
|
||||
T: Transaction
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ for<'de> Deserialize<'de>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
<T as nomos_core::tx::Transaction>::Hash:
|
||||
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
SamplingRng: SeedableRng + RngCore + Send + 'static,
|
||||
SamplingBackend: DaSamplingServiceBackend<SamplingRng, BlobId = <V as DispersedBlobInfo>::BlobId>
|
||||
+ Send
|
||||
+ 'static,
|
||||
SamplingBackend::Settings: Clone,
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
|
||||
{
|
||||
type Error = hyper::Error;
|
||||
type Settings = AxumBackendSettings;
|
||||
|
||||
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
Ok(Self {
|
||||
settings,
|
||||
_attestation: core::marker::PhantomData,
|
||||
_blob: core::marker::PhantomData,
|
||||
_certificate: core::marker::PhantomData,
|
||||
_membership: core::marker::PhantomData,
|
||||
_vid: core::marker::PhantomData,
|
||||
_verifier_backend: core::marker::PhantomData,
|
||||
_tx: core::marker::PhantomData,
|
||||
_storage_serde: core::marker::PhantomData,
|
||||
_sampling_backend: core::marker::PhantomData,
|
||||
_sampling_network_adapter: core::marker::PhantomData,
|
||||
_sampling_rng: core::marker::PhantomData,
|
||||
_sampling_storage: core::marker::PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
|
||||
let mut builder = CorsLayer::new();
|
||||
if self.settings.cors_origins.is_empty() {
|
||||
builder = builder.allow_origin(Any);
|
||||
}
|
||||
|
||||
for origin in &self.settings.cors_origins {
|
||||
builder = builder.allow_origin(
|
||||
origin
|
||||
.as_str()
|
||||
.parse::<HeaderValue>()
|
||||
.expect("fail to parse origin"),
|
||||
);
|
||||
}
|
||||
|
||||
let app = Router::new()
|
||||
.layer(
|
||||
builder
|
||||
.allow_headers([CONTENT_TYPE, USER_AGENT])
|
||||
.allow_methods(Any),
|
||||
)
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
|
||||
.route("/cl/metrics", routing::get(cl_metrics::<T>))
|
||||
.route("/cl/status", routing::post(cl_status::<T>))
|
||||
.route(
|
||||
"/cryptarchia/info",
|
||||
routing::get(
|
||||
cryptarchia_info::<
|
||||
T,
|
||||
S,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route(
|
||||
"/cryptarchia/headers",
|
||||
routing::get(
|
||||
cryptarchia_headers::<
|
||||
T,
|
||||
S,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/da/add_blob", routing::post(add_blob::<A, B, M, VB, S>))
|
||||
.route(
|
||||
"/da/get_range",
|
||||
routing::post(
|
||||
get_range::<
|
||||
T,
|
||||
C,
|
||||
V,
|
||||
S,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/network/info", routing::get(libp2p_info))
|
||||
.route("/storage/block", routing::post(block::<S, T>))
|
||||
.route("/mempool/add/tx", routing::post(add_tx::<T>))
|
||||
.route(
|
||||
"/mempool/add/blobinfo",
|
||||
routing::post(
|
||||
add_blob_info::<
|
||||
V,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/metrics", routing::get(get_metrics))
|
||||
.with_state(handle);
|
||||
|
||||
Server::bind(&self.settings.address)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! make_request_and_return_response {
|
||||
($cond:expr) => {{
|
||||
match $cond.await {
|
||||
@ -329,7 +51,7 @@ macro_rules! make_request_and_return_response {
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> Response
|
||||
pub async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> Response
|
||||
where
|
||||
T: Transaction
|
||||
+ Clone
|
||||
@ -353,7 +75,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn cl_status<T>(
|
||||
pub async fn cl_status<T>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(items): Json<Vec<<T as Transaction>::Hash>>,
|
||||
) -> Response
|
||||
@ -366,7 +88,7 @@ where
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
#[allow(dead_code)]
|
||||
struct QueryParams {
|
||||
pub struct CryptarchiaInfoQuery {
|
||||
from: Option<HeaderId>,
|
||||
to: Option<HeaderId>,
|
||||
}
|
||||
@ -379,7 +101,7 @@ struct QueryParams {
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn cryptarchia_info<
|
||||
pub async fn cryptarchia_info<
|
||||
Tx,
|
||||
SS,
|
||||
SamplingBackend,
|
||||
@ -430,7 +152,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn cryptarchia_headers<
|
||||
pub async fn cryptarchia_headers<
|
||||
Tx,
|
||||
SS,
|
||||
SamplingBackend,
|
||||
@ -440,7 +162,7 @@ async fn cryptarchia_headers<
|
||||
const SIZE: usize,
|
||||
>(
|
||||
State(store): State<OverwatchHandle>,
|
||||
Query(query): Query<QueryParams>,
|
||||
Query(query): Query<CryptarchiaInfoQuery>,
|
||||
) -> Response
|
||||
where
|
||||
Tx: Transaction
|
||||
@ -463,7 +185,7 @@ where
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
let QueryParams { from, to } = query;
|
||||
let CryptarchiaInfoQuery { from, to } = query;
|
||||
make_request_and_return_response!(consensus::cryptarchia_headers::<
|
||||
Tx,
|
||||
SS,
|
||||
@ -483,7 +205,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_blob<A, B, M, VB, SS>(
|
||||
pub async fn add_blob<A, B, M, VB, SS>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(blob): Json<B>,
|
||||
) -> Response
|
||||
@ -507,7 +229,7 @@ where
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct GetRangeReq<V: Metadata>
|
||||
pub struct GetRangeReq<V: Metadata>
|
||||
where
|
||||
<V as Metadata>::AppId: Serialize + DeserializeOwned,
|
||||
<V as Metadata>::Index: Serialize + DeserializeOwned,
|
||||
@ -524,7 +246,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn get_range<
|
||||
pub async fn get_range<
|
||||
Tx,
|
||||
C,
|
||||
V,
|
||||
@ -606,7 +328,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
|
||||
pub async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
|
||||
make_request_and_return_response!(libp2p::libp2p_info(&handle))
|
||||
}
|
||||
|
||||
@ -618,7 +340,10 @@ async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn block<S, Tx>(State(handle): State<OverwatchHandle>, Json(id): Json<HeaderId>) -> Response
|
||||
pub async fn block<S, Tx>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(id): Json<HeaderId>,
|
||||
) -> Response
|
||||
where
|
||||
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
@ -634,7 +359,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
|
||||
pub async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
|
||||
where
|
||||
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
|
||||
@ -655,7 +380,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_blob_info<B, SamplingBackend, SamplingAdapter, SamplingRng, SamplingStorage>(
|
||||
pub async fn add_blob_info<B, SamplingBackend, SamplingAdapter, SamplingRng, SamplingStorage>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(blob_info): Json<B>,
|
||||
) -> Response
|
||||
@ -700,7 +425,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn get_metrics(State(handle): State<OverwatchHandle>) -> Response {
|
||||
pub async fn get_metrics(State(handle): State<OverwatchHandle>) -> Response {
|
||||
match metrics::gather(&handle).await {
|
||||
Ok(encoded_metrics) => Response::builder()
|
||||
.status(StatusCode::OK)
|
2
nodes/nomos-node/src/api/mod.rs
Normal file
2
nodes/nomos-node/src/api/mod.rs
Normal file
@ -0,0 +1,2 @@
|
||||
pub mod backend;
|
||||
pub mod handlers;
|
@ -75,7 +75,7 @@ pub struct NetworkArgs {
|
||||
#[derive(Parser, Debug, Clone)]
|
||||
pub struct HttpArgs {
|
||||
#[clap(long = "http-host", env = "HTTP_HOST")]
|
||||
http_addr: Option<SocketAddr>,
|
||||
pub http_addr: Option<SocketAddr>,
|
||||
|
||||
#[clap(long = "http-cors-origin", env = "HTTP_CORS_ORIGIN")]
|
||||
pub cors_origins: Option<Vec<String>>,
|
||||
@ -126,7 +126,22 @@ pub struct Config {
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn update_log(mut self, log_args: LogArgs) -> Result<Self> {
|
||||
pub fn update_from_args(
|
||||
mut self,
|
||||
log_args: LogArgs,
|
||||
network_args: NetworkArgs,
|
||||
http_args: HttpArgs,
|
||||
cryptarchia_args: CryptarchiaArgs,
|
||||
) -> Result<Self> {
|
||||
update_log(&mut self.log, log_args)?;
|
||||
update_network(&mut self.network, network_args)?;
|
||||
update_http(&mut self.http, http_args)?;
|
||||
update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?;
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_log(log: &mut <Logger as ServiceData>::Settings, log_args: LogArgs) -> Result<()> {
|
||||
let LogArgs {
|
||||
backend,
|
||||
log_addr: addr,
|
||||
@ -138,7 +153,7 @@ impl Config {
|
||||
|
||||
// Override the file config with the one from env variables.
|
||||
if let Some(backend) = backend {
|
||||
self.log.backend = match backend {
|
||||
log.backend = match backend {
|
||||
LoggerBackendType::Gelf => LoggerBackend::Gelf {
|
||||
addr: addr
|
||||
.ok_or_else(|| eyre!("Gelf backend requires an address."))?
|
||||
@ -147,8 +162,7 @@ impl Config {
|
||||
.ok_or_else(|| eyre!("Invalid gelf address"))?,
|
||||
},
|
||||
LoggerBackendType::File => LoggerBackend::File {
|
||||
directory: directory
|
||||
.ok_or_else(|| eyre!("File backend requires a directory."))?,
|
||||
directory: directory.ok_or_else(|| eyre!("File backend requires a directory."))?,
|
||||
prefix,
|
||||
},
|
||||
LoggerBackendType::Stdout => LoggerBackend::Stdout,
|
||||
@ -158,22 +172,25 @@ impl Config {
|
||||
|
||||
// Update parts of the config.
|
||||
if let Some(format_str) = format {
|
||||
self.log.format = match format_str.as_str() {
|
||||
log.format = match format_str.as_str() {
|
||||
"Json" => LoggerFormat::Json,
|
||||
"Plain" => LoggerFormat::Plain,
|
||||
_ => return Err(eyre!("Invalid log format provided.")),
|
||||
};
|
||||
}
|
||||
if let Some(level_str) = level {
|
||||
self.log.level = match level_str.as_str() {
|
||||
log.level = match level_str.as_str() {
|
||||
"DEBUG" => Level::DEBUG,
|
||||
_ => return Err(eyre!("Invalid log level provided.")),
|
||||
};
|
||||
}
|
||||
Ok(self)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_network(mut self, network_args: NetworkArgs) -> Result<Self> {
|
||||
pub fn update_network(
|
||||
network: &mut <NetworkService<NetworkBackend> as ServiceData>::Settings,
|
||||
network_args: NetworkArgs,
|
||||
) -> Result<()> {
|
||||
let NetworkArgs {
|
||||
host,
|
||||
port,
|
||||
@ -182,48 +199,53 @@ impl Config {
|
||||
} = network_args;
|
||||
|
||||
if let Some(IpAddr::V4(h)) = host {
|
||||
self.network.backend.inner.host = h;
|
||||
network.backend.inner.host = h;
|
||||
} else if host.is_some() {
|
||||
return Err(eyre!("Unsupported ip version"));
|
||||
}
|
||||
|
||||
if let Some(port) = port {
|
||||
self.network.backend.inner.port = port as u16;
|
||||
network.backend.inner.port = port as u16;
|
||||
}
|
||||
|
||||
if let Some(node_key) = node_key {
|
||||
let mut key_bytes = hex::decode(node_key)?;
|
||||
self.network.backend.inner.node_key =
|
||||
SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
|
||||
network.backend.inner.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
|
||||
}
|
||||
|
||||
if let Some(peers) = initial_peers {
|
||||
self.network.backend.initial_peers = peers;
|
||||
network.backend.initial_peers = peers;
|
||||
}
|
||||
|
||||
// TODO: configure mixclient and mixnode if the mixnet feature is enabled
|
||||
|
||||
Ok(self)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_http(mut self, http_args: HttpArgs) -> Result<Self> {
|
||||
pub fn update_http(
|
||||
http: &mut <NomosApiService as ServiceData>::Settings,
|
||||
http_args: HttpArgs,
|
||||
) -> Result<()> {
|
||||
let HttpArgs {
|
||||
http_addr,
|
||||
cors_origins,
|
||||
} = http_args;
|
||||
|
||||
if let Some(addr) = http_addr {
|
||||
self.http.backend_settings.address = addr;
|
||||
http.backend_settings.address = addr;
|
||||
}
|
||||
|
||||
if let Some(cors) = cors_origins {
|
||||
self.http.backend_settings.cors_origins = cors;
|
||||
http.backend_settings.cors_origins = cors;
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn update_cryptarchia_consensus(mut self, consensus_args: CryptarchiaArgs) -> Result<Self> {
|
||||
pub fn update_cryptarchia_consensus(
|
||||
cryptarchia: &mut <crate::Cryptarchia as ServiceData>::Settings,
|
||||
consensus_args: CryptarchiaArgs,
|
||||
) -> Result<()> {
|
||||
let CryptarchiaArgs {
|
||||
chain_start_time,
|
||||
slot_duration,
|
||||
@ -232,12 +254,11 @@ impl Config {
|
||||
} = consensus_args;
|
||||
|
||||
if let Some(start_time) = chain_start_time {
|
||||
self.cryptarchia.time.chain_start_time =
|
||||
time::OffsetDateTime::from_unix_timestamp(start_time)?;
|
||||
cryptarchia.time.chain_start_time = time::OffsetDateTime::from_unix_timestamp(start_time)?;
|
||||
}
|
||||
|
||||
if let Some(duration) = slot_duration {
|
||||
self.cryptarchia.time.slot_duration = std::time::Duration::from_secs(duration);
|
||||
cryptarchia.time.slot_duration = std::time::Duration::from_secs(duration);
|
||||
}
|
||||
|
||||
if let Some(sk) = note_secret_key {
|
||||
@ -245,12 +266,11 @@ impl Config {
|
||||
|
||||
let value = note_value.expect("Should be available if coin sk provided");
|
||||
|
||||
self.cryptarchia.notes.push(InputWitness::new(
|
||||
cryptarchia.notes.push(InputWitness::new(
|
||||
NoteWitness::basic(value as u64, NMO_UNIT, &mut rand::thread_rng()),
|
||||
NullifierSecret::from_bytes(sk),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1,10 +1,10 @@
|
||||
pub mod api;
|
||||
mod config;
|
||||
pub mod config;
|
||||
mod tx;
|
||||
|
||||
// std
|
||||
// crates
|
||||
use api::AxumBackend;
|
||||
use api::backend::AxumBackend;
|
||||
use bytes::Bytes;
|
||||
use color_eyre::eyre::Result;
|
||||
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
|
||||
@ -81,7 +81,7 @@ pub type NomosApiService = ApiService<
|
||||
|
||||
pub const CL_TOPIC: &str = "cl";
|
||||
pub const DA_TOPIC: &str = "da";
|
||||
const MB16: usize = 1024 * 1024 * 16;
|
||||
pub const MB16: usize = 1024 * 1024 * 16;
|
||||
|
||||
pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
|
||||
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
|
||||
|
@ -47,10 +47,7 @@ fn main() -> Result<()> {
|
||||
metrics_args,
|
||||
} = Args::parse();
|
||||
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
|
||||
.update_log(log_args)?
|
||||
.update_http(http_args)?
|
||||
.update_network(network_args)?
|
||||
.update_cryptarchia_consensus(cryptarchia_args)?;
|
||||
.update_from_args(log_args, network_args, http_args, cryptarchia_args)?;
|
||||
|
||||
let registry = cfg!(feature = "metrics")
|
||||
.then(|| {
|
||||
|
@ -34,7 +34,7 @@ use nomos_mempool::MempoolMetrics;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::libp2p::mixnet::MixnetConfig;
|
||||
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
||||
use nomos_node::{api::AxumBackendSettings, Config, Tx};
|
||||
use nomos_node::{api::backend::AxumBackendSettings, Config, Tx};
|
||||
use nomos_storage::backends::rocksdb::RocksBackendSettings;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::{thread_rng, Rng};
|
||||
|
Loading…
x
Reference in New Issue
Block a user