Add Executor client (#811)

* Refactor paths into common modules

* Kickoff http client

* Add blob to send blob

* Refactor publish method. Will reuse older endpoint in further prs

* Use paths in tests

* Fix more tests
This commit is contained in:
Daniel Sanchez 2024-10-08 17:43:58 +02:00 committed by GitHub
parent 07c9096924
commit c983eb2260
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 135 additions and 45 deletions

View File

@ -38,6 +38,7 @@ members = [
"proof_of_leadership/proof_statements",
"testnet/cfgsync",
"tests",
"clients/executor-http-client",
]
exclude = ["proof_of_leadership/risc0/risc0_proofs"]
resolver = "2"

View File

@ -0,0 +1,11 @@
[package]
name = "executor-http-client"
version = "0.1.0"
edition = "2021"
[dependencies]
nomos-core = { path = "../../nomos-core" }
nomos-executor = { path = "../../nodes/nomos-executor" }
reqwest = "0.12"
serde = "1.0"
thiserror = "1.0"

View File

@ -0,0 +1,63 @@
// std
// crates
use reqwest::{Client, ClientBuilder, StatusCode, Url};
// internal
use nomos_executor::api::paths;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Internal server error: {0}")]
Server(String),
#[error(transparent)]
Request(reqwest::Error),
}
#[derive(Clone)]
pub struct ExecutorHttpClient {
client: Client,
executor_address: Url,
}
impl Default for ExecutorHttpClient {
fn default() -> Self {
let client = ClientBuilder::new()
.build()
.expect("Client from default settings should be able to build");
let executor_address = Url::parse("https://127.0.0.1:3333").unwrap();
Self {
client,
executor_address,
}
}
}
impl ExecutorHttpClient {
pub fn new(client: Client, executor_address: Url) -> Self {
Self {
client,
executor_address,
}
}
/// Send a `Blob` to be dispersed
pub async fn publish_blob(&self, blob: Vec<u8>) -> Result<(), Error> {
let url = self
.executor_address
.join(paths::DA_ADD_BLOB)
.expect("Url should build properly");
let response = self
.client
.post(url)
.body(blob)
.send()
.await
.map_err(Error::Request)?;
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::INTERNAL_SERVER_ERROR => Err(Error::Server(
response.text().await.map_err(Error::Request)?,
)),
_ => unreachable!("As per the documentation it can only return 200 or 500 responses"),
}
}
}

View File

@ -4,6 +4,17 @@ use std::{fmt::Debug, hash::Hash};
// crates
use axum::{http::HeaderValue, routing, Router, Server};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
// internal
use crate::api::paths;
use nomos_api::Backend;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
@ -20,16 +31,6 @@ use nomos_node::api::handlers::{
};
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
// internal
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
@ -226,10 +227,10 @@ where
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
.route("/cl/status", routing::post(cl_status::<Tx>))
.route(paths::CL_METRICS, routing::get(cl_metrics::<Tx>))
.route(paths::CL_STATUS, routing::post(cl_status::<Tx>))
.route(
"/cryptarchia/info",
paths::CRYPTARCHIA_INFO,
routing::get(
cryptarchia_info::<
Tx,
@ -243,7 +244,7 @@ where
),
)
.route(
"/cryptarchia/headers",
paths::CRYOTARCHIA_HEADERS,
routing::get(
cryptarchia_headers::<
Tx,
@ -257,7 +258,7 @@ where
),
)
.route(
"/da/add_blob",
paths::DA_ADD_BLOB,
routing::post(
add_blob::<
DaAttestation,
@ -269,7 +270,7 @@ where
),
)
.route(
"/da/get_range",
paths::DA_GET_RANGE,
routing::post(
get_range::<
Tx,
@ -284,14 +285,14 @@ where
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route(paths::NETWORK_INFO, routing::get(libp2p_info))
.route(
"/storage/block",
paths::STORAGE_BLOCK,
routing::post(block::<DaStorageSerializer, Tx>),
)
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
.route(paths::MEMPOOL_ADD_TX, routing::post(add_tx::<Tx>))
.route(
"/mempool/add/blobinfo",
paths::MEMPOOL_ADD_BLOB_INFO,
routing::post(
add_blob_info::<
DaVerifiedBlobInfo,
@ -302,7 +303,7 @@ where
>,
),
)
.route("/metrics", routing::get(get_metrics))
.route(paths::METRICS, routing::get(get_metrics))
.with_state(handle);
Server::bind(&self.settings.address)

View File

@ -1 +1,2 @@
pub mod backend;
pub mod paths;

View File

@ -0,0 +1 @@
pub use nomos_node::api::paths::*;

View File

@ -1,4 +1,4 @@
mod api;
pub mod api;
pub mod config;
// std

View File

@ -2,6 +2,7 @@
use std::error::Error;
use std::{fmt::Debug, hash::Hash};
// crates
use crate::api::paths;
use axum::{http::HeaderValue, routing, Router, Server};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use nomos_api::Backend;
@ -226,10 +227,10 @@ where
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
.route("/cl/status", routing::post(cl_status::<Tx>))
.route(paths::CL_METRICS, routing::get(cl_metrics::<Tx>))
.route(paths::CL_STATUS, routing::post(cl_status::<Tx>))
.route(
"/cryptarchia/info",
paths::CRYPTARCHIA_INFO,
routing::get(
cryptarchia_info::<
Tx,
@ -243,7 +244,7 @@ where
),
)
.route(
"/cryptarchia/headers",
paths::CRYOTARCHIA_HEADERS,
routing::get(
cryptarchia_headers::<
Tx,
@ -257,7 +258,7 @@ where
),
)
.route(
"/da/add_blob",
paths::DA_ADD_BLOB,
routing::post(
add_blob::<
DaAttestation,
@ -269,7 +270,7 @@ where
),
)
.route(
"/da/get_range",
paths::DA_GET_RANGE,
routing::post(
get_range::<
Tx,
@ -284,14 +285,14 @@ where
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route(paths::NETWORK_INFO, routing::get(libp2p_info))
.route(
"/storage/block",
paths::STORAGE_BLOCK,
routing::post(block::<DaStorageSerializer, Tx>),
)
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
.route(paths::MEMPOOL_ADD_TX, routing::post(add_tx::<Tx>))
.route(
"/mempool/add/blobinfo",
paths::MEMPOOL_ADD_BLOB_INFO,
routing::post(
add_blob_info::<
DaVerifiedBlobInfo,
@ -302,7 +303,7 @@ where
>,
),
)
.route("/metrics", routing::get(get_metrics))
.route(paths::METRICS, routing::get(get_metrics))
.with_state(handle);
Server::bind(&self.settings.address)

View File

@ -201,7 +201,7 @@ where
post,
path = "/da/add_blob",
responses(
(status = 200, description = "Attestation for DA blob", body = Option<Attestation>),
(status = 200, description = "Blob to be published received"),
(status = 500, description = "Internal server error", body = String),
)
)]

View File

@ -1,2 +1,3 @@
pub mod backend;
pub mod handlers;
pub mod paths;

View File

@ -0,0 +1,11 @@
pub const CL_METRICS: &str = "/cl/metrics";
pub const CL_STATUS: &str = "/cl/status";
pub const CRYPTARCHIA_INFO: &str = "/cryptarchia/info";
pub const CRYOTARCHIA_HEADERS: &str = "/cryptarchia/headers";
pub const DA_ADD_BLOB: &str = "/da/add-blob";
pub const DA_GET_RANGE: &str = "/da/get-range";
pub const NETWORK_INFO: &str = "/network/info";
pub const STORAGE_BLOCK: &str = "/storage/block";
pub const MEMPOOL_ADD_TX: &str = "/mempool/add/tx";
pub const MEMPOOL_ADD_BLOB_INFO: &str = "/mempool/add/blobinfo";
pub const METRICS: &str = "/metrics";

View File

@ -34,6 +34,9 @@ use nomos_mempool::MempoolMetrics;
#[cfg(feature = "mixnet")]
use nomos_network::backends::libp2p::mixnet::MixnetConfig;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{
CL_METRICS, CRYOTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
};
use nomos_node::{api::backend::AxumBackendSettings, Config, Tx};
use nomos_storage::backends::rocksdb::RocksBackendSettings;
use once_cell::sync::Lazy;
@ -49,11 +52,7 @@ use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node};
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const CRYPTARCHIA_INFO_API: &str = "cryptarchia/info";
const GET_HEADERS_INFO: &str = "cryptarchia/headers";
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const STORAGE_BLOCKS_API: &str = "storage/block";
const INDEXER_RANGE_API: &str = "da/get_range";
const DEFAULT_SLOT_TIME: u64 = 2;
const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME";
#[cfg(feature = "mixnet")]
@ -129,7 +128,7 @@ impl NomosNode {
async fn get(&self, path: &str) -> reqwest::Result<reqwest::Response> {
CLIENT
.get(format!("http://{}/{}", self.addr, path))
.get(format!("http://{}{}", self.addr, path))
.send()
.await
}
@ -140,7 +139,7 @@ impl NomosNode {
async fn wait_online(&self) {
loop {
let res = self.get("cl/metrics").await;
let res = self.get(CL_METRICS).await;
if res.is_ok() && res.unwrap().status().is_success() {
break;
}
@ -150,7 +149,7 @@ impl NomosNode {
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, BlobInfo>> {
CLIENT
.post(format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
.post(format!("http://{}{}", self.addr, STORAGE_BLOCK))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&id).unwrap())
.send()
@ -166,7 +165,7 @@ impl NomosNode {
Pool::Cl => "cl",
Pool::Da => "da",
};
let addr = format!("{}/metrics", discr);
let addr = format!("/{}/metrics", discr);
let res = self
.get(&addr)
.await
@ -186,7 +185,7 @@ impl NomosNode {
range: Range<[u8; 8]>,
) -> Vec<([u8; 8], Vec<Vec<u8>>)> {
CLIENT
.post(format!("http://{}/{}", self.addr, INDEXER_RANGE_API))
.post(format!("http://{}{}", self.addr, DA_GET_RANGE))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap())
.send()
@ -224,7 +223,7 @@ impl NomosNode {
}
pub async fn get_headers(&self, from: Option<HeaderId>, to: Option<HeaderId>) -> Vec<HeaderId> {
let mut req = CLIENT.get(format!("http://{}/{}", self.addr, GET_HEADERS_INFO));
let mut req = CLIENT.get(format!("http://{}{}", self.addr, CRYOTARCHIA_HEADERS));
if let Some(from) = from {
req = req.query(&[("from", from)]);
@ -251,7 +250,7 @@ impl Node for NomosNode {
}
async fn consensus_info(&self) -> Self::ConsensusInfo {
let res = self.get(CRYPTARCHIA_INFO_API).await;
let res = self.get(CRYPTARCHIA_INFO).await;
println!("{:?}", res);
res.unwrap().json().await.unwrap()
}