Merge branch 'master' into chore-da-integration-tests

This commit is contained in:
Roman Zajic 2024-10-03 09:52:10 +08:00 committed by GitHub
commit 455093f825
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
41 changed files with 1506 additions and 950 deletions

View File

@ -1,9 +1,2 @@
# Environment variables for compose.yml file config.
DOCKER_COMPOSE_LIBP2P_REPLICAS=1
DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK=2000000000000000000000000000000000000000000000000000000000000000
DOCKER_COMPOSE_SUPER_MAJORITY_THRESHOLD=1
DOCKER_COMPOSE_ETCDCTL_ENDPOINTS=etcd:2379
DOCKER_COMPOSE_ETCDCTL_API=3
DOCKER_COMPOSE_BOOSTRAP_NET_NODE_KEY=1000000000000000000000000000000000000000000000000000000000000000
DOCKER_COMPOSE_OVERLAY_NODES=$DOCKER_COMPOSE_BOOSTRAP_NET_NODE_KEY
DOCKER_COMPOSE_NET_INITIAL_PEERS=/dns/bootstrap/udp/3000/quic-v1

View File

@ -1,12 +1,2 @@
# Environment variables for compose.yml file config.
DOCKER_COMPOSE_LIBP2P_REPLICAS=3
DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK=2000000000000000000000000000000000000000000000000000000000000000
DOCKER_COMPOSE_SUPER_MAJORITY_THRESHOLD=1
DOCKER_COMPOSE_ETCDCTL_ENDPOINTS=etcd:2379
DOCKER_COMPOSE_ETCDCTL_API=3
DOCKER_COMPOSE_BOOSTRAP_NET_NODE_KEY=1000000000000000000000000000000000000000000000000000000000000000
DOCKER_COMPOSE_OVERLAY_NODES=1000000000000000000000000000000000000000000000000000000000000000
DOCKER_COMPOSE_NET_INITIAL_PEERS=/dns/bootstrap/udp/3000/quic-v1
GRAYLOG_PASSWORD_SECRET="Jcjw7g22kJw3aSjjnCQ7DiQvlSJJ38WZ2MvuIyZ4RTILUoxGEQb5EsmAAdcp3lnnlwdSKaZTDFcvh4Xq2h4aTsa4HLx3SZxM"
GRAYLOG_ROOT_PASSWORD_SHA2="7092a9ec7c94ba6c452a3937a380b9cfdac8e2d0b342c034ea9e306d41ce6d89"

View File

@ -35,6 +35,7 @@ members = [
"ledger/cryptarchia-ledger",
"cl/cl",
"proof_of_leadership/proof_statements",
"testnet/cfgsync",
"tests",
]
exclude = ["proof_of_leadership/risc0/risc0_proofs"]

View File

@ -1,8 +1,19 @@
version: '3.8'
services:
bootstrap:
container_name: bootstrap
cfgsync:
container_name: cfgsync
build:
context: .
dockerfile: testnet/Dockerfile
image: nomos:latest
volumes:
- ./testnet:/etc/nomos
entrypoint: /etc/nomos/scripts/run_cfgsync.sh
nomos-node-0:
container_name: nomos_node_0
build:
context: .
dockerfile: testnet/Dockerfile
@ -12,110 +23,63 @@ services:
- "18080:18080/tcp"
volumes:
- ./testnet:/etc/nomos
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- cfgsync
- graylog
environment:
- BOOTSTRAP_NODE_KEY=${DOCKER_COMPOSE_BOOSTRAP_NET_NODE_KEY:-1000000000000000000000000000000000000000000000000000000000000000}
- LIBP2P_NODE_MASK=${DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK:-2000000000000000000000000000000000000000000000000000000000000000}
- LIBP2P_REPLICAS=3
- OVERLAY_NODES=${DOCKER_COMPOSE_OVERLAY_NODES:-1000000000000000000000000000000000000000000000000000000000000000}
entrypoint: /etc/nomos/scripts/run_bootstrap_node.sh
entrypoint: /etc/nomos/scripts/run_nomos_node.sh
libp2p-node-1:
container_name: libp2p_node_1
nomos-node-1:
container_name: nomos_node_1
build:
context: .
dockerfile: testnet/Dockerfile
image: nomos:latest
volumes:
- ./testnet:/etc/nomos
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- bootstrap
- etcd
- cfgsync
- graylog
ports:
- "3001:3000/udp"
- "18081:18080/tcp"
environment:
- LIBP2P_REPLICAS=3
- ETCDCTL_ENDPOINTS=${DOCKER_COMPOSE_ETCDCTL_ENDPOINTS:-etcd:2379}
- ETCDCTL_API=${DOCKER_COMPOSE_ETCDCTL_API:-3}
- LIBP2P_NODE_MASK=${DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK:-2000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_NODES=${DOCKER_COMPOSE_OVERLAY_NODES:-1000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_SUPER_MAJORITY_THRESHOLD=${DOCKER_COMPOSE_SUPER_MAJORITY_THRESHOLD:-1}
- NET_INITIAL_PEERS=${DOCKER_COMPOSE_NET_INITIAL_PEERS:-/dns/bootstrap/udp/3000/quic-v1}
entrypoint: /etc/nomos/scripts/run_nomos_node.sh
libp2p-node-2:
container_name: libp2p_node_2
nomos-node-2:
container_name: nomos_node_2
build:
context: .
dockerfile: testnet/Dockerfile
image: nomos:latest
volumes:
- ./testnet:/etc/nomos
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- bootstrap
- etcd
- cfgsync
- graylog
ports:
- "3002:3000/udp"
- "18082:18080/tcp"
environment:
- LIBP2P_REPLICAS=3
- ETCDCTL_ENDPOINTS=${DOCKER_COMPOSE_ETCDCTL_ENDPOINTS:-etcd:2379}
- ETCDCTL_API=${DOCKER_COMPOSE_ETCDCTL_API:-3}
- LIBP2P_NODE_MASK=${DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK:-2000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_NODES=${DOCKER_COMPOSE_OVERLAY_NODES:-1000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_SUPER_MAJORITY_THRESHOLD=${DOCKER_COMPOSE_SUPER_MAJORITY_THRESHOLD:-1}
- NET_INITIAL_PEERS=${DOCKER_COMPOSE_NET_INITIAL_PEERS:-/dns/bootstrap/udp/3000/quic-v1}
entrypoint: /etc/nomos/scripts/run_nomos_node.sh
libp2p-node-3:
container_name: libp2p_node_3
nomos-node-3:
container_name: nomos_node_3
build:
context: .
dockerfile: testnet/Dockerfile
image: nomos:latest
volumes:
- ./testnet:/etc/nomos
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- bootstrap
- etcd
- cfgsync
- graylog
ports:
- "3003:3000/udp"
- "18083:18080/tcp"
environment:
- LIBP2P_REPLICAS=3
- ETCDCTL_ENDPOINTS=${DOCKER_COMPOSE_ETCDCTL_ENDPOINTS:-etcd:2379}
- ETCDCTL_API=${DOCKER_COMPOSE_ETCDCTL_API:-3}
- LIBP2P_NODE_MASK=${DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK:-2000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_NODES=${DOCKER_COMPOSE_OVERLAY_NODES:-1000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_SUPER_MAJORITY_THRESHOLD=${DOCKER_COMPOSE_SUPER_MAJORITY_THRESHOLD:-1}
- NET_INITIAL_PEERS=${DOCKER_COMPOSE_NET_INITIAL_PEERS:-/dns/bootstrap/udp/3000/quic-v1}
entrypoint: /etc/nomos/scripts/run_nomos_node.sh
chatbot:
container_name: chatbot
build:
context: .
dockerfile: testnet/Dockerfile
image: nomos:latest
volumes:
- ./testnet:/etc/nomos
entrypoint: /etc/nomos/scripts/run_nomos_bot.sh
etcd:
container_name: etcd
image: quay.io/coreos/etcd:v3.4.15
ports:
- "2379:2379/tcp"
command:
- /usr/local/bin/etcd
- --advertise-client-urls=http://etcd:2379
- --listen-client-urls=http://0.0.0.0:2379
prometheus:
container_name: prometheus
image: prom/prometheus:latest

View File

@ -1,22 +1,16 @@
services:
bootstrap:
cfgsync:
container_name: cfgsync
build:
context: .
dockerfile: testnet/Dockerfile
ports:
- "3000:3000/udp"
- "18080:18080/tcp"
image: nomos:latest
volumes:
- ./testnet:/etc/nomos
environment:
- BOOTSTRAP_NODE_KEY=${DOCKER_COMPOSE_BOOSTRAP_NET_NODE_KEY:-1000000000000000000000000000000000000000000000000000000000000000}
- LIBP2P_NODE_MASK=${DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK:-2000000000000000000000000000000000000000000000000000000000000000}
- LIBP2P_REPLICAS=${DOCKER_COMPOSE_LIBP2P_REPLICAS:-1}
- OVERLAY_NODES=${DOCKER_COMPOSE_OVERLAY_NODES:-1000000000000000000000000000000000000000000000000000000000000000}
entrypoint: /etc/nomos/scripts/run_bootstrap_node.sh
entrypoint: /etc/nomos/scripts/run_cfgsync.sh
libp2p-node:
nomos-node:
build:
context: .
dockerfile: testnet/Dockerfile
@ -25,23 +19,5 @@ services:
deploy:
replicas: ${DOCKER_COMPOSE_LIBP2P_REPLICAS:-1}
depends_on:
- bootstrap
- etcd
environment:
- LIBP2P_REPLICAS=${DOCKER_COMPOSE_LIBP2P_REPLICAS:-1}
- ETCDCTL_ENDPOINTS=${DOCKER_COMPOSE_ETCDCTL_ENDPOINTS:-etcd:2379}
- ETCDCTL_API=${DOCKER_COMPOSE_ETCDCTL_API:-3}
- LIBP2P_NODE_MASK=${DOCKER_COMPOSE_LIBP2P_NODE_KEY_MASK:-2000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_NODES=${DOCKER_COMPOSE_OVERLAY_NODES:-1000000000000000000000000000000000000000000000000000000000000000}
- OVERLAY_SUPER_MAJORITY_THRESHOLD=${DOCKER_COMPOSE_SUPER_MAJORITY_THRESHOLD:-1}
- NET_INITIAL_PEERS=${DOCKER_COMPOSE_NET_INITIAL_PEERS:-/dns/bootstrap/udp/3000/quic-v1}
- cfgsync
entrypoint: /etc/nomos/scripts/run_nomos_node.sh
etcd:
image: quay.io/coreos/etcd:v3.4.15
ports:
- "2379:2379/tcp"
command:
- /usr/local/bin/etcd
- --advertise-client-urls=http://etcd:2379
- --listen-client-urls=http://0.0.0.0:2379

View File

@ -4,14 +4,39 @@ version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
axum = { version = "0.6" }
clap = { version = "4.5.13", features = ["derive"] }
color-eyre = "0.6.0"
hyper = { version = "0.14", features = ["full"] }
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
nomos-api = { path = "../../nomos-services/api" }
nomos-core = { path = "../../nomos-core" }
nomos-da-network-core = { path = "../../nomos-da/network/core" }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling", features = ["rocksdb-backend"] }
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-mempool = { path = "../../nomos-services/mempool", features = [
"mock",
"libp2p",
] }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-node = { path = "../nomos-node" }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
clap = { version = "4.5.13", features = ["derive"] }
rand = "0.8"
rand_chacha = "0.3"
serde = "1"
serde_yaml = "0.9"
subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" }
tower-http = { version = "0.4", features = ["cors", "trace"] }
tracing = "0.1.40"
utoipa = "4.0"
utoipa-swagger-ui = { version = "4.0" }
uuid = { version = "1.10.0", features = ["v4"] }
serde_yaml = "0.9.34+deprecated"
[features]
default = ["tracing"]

View File

@ -0,0 +1,312 @@
// std
use std::error::Error;
use std::{fmt::Debug, hash::Hash};
// crates
use axum::{http::HeaderValue, routing, Router, Server};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use nomos_api::Backend;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
use nomos_da_network_core::SubnetworkId;
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_libp2p::PeerId;
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
use nomos_node::api::handlers::{
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
cryptarchia_info, get_metrics, get_range, libp2p_info,
};
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
// internal
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
/// Socket where the server will be listening on for incoming requests.
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
pub cors_origins: Vec<String>,
}
pub struct AxumBackend<
DaAttestation,
DaBlob,
DaBlobInfo,
Memebership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<DaAttestation>,
_blob: core::marker::PhantomData<DaBlob>,
_certificate: core::marker::PhantomData<DaBlobInfo>,
_membership: core::marker::PhantomData<Memebership>,
_vid: core::marker::PhantomData<DaVerifiedBlobInfo>,
_verifier_backend: core::marker::PhantomData<DaVerifierBackend>,
_tx: core::marker::PhantomData<Tx>,
_storage_serde: core::marker::PhantomData<DaStorageSerializer>,
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
_sampling_rng: core::marker::PhantomData<SamplingRng>,
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
}
#[derive(OpenApi)]
#[openapi(
paths(
),
components(
schemas(Status<HeaderId>, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")
)
)]
struct ApiDoc;
#[async_trait::async_trait]
impl<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> Backend
for AxumBackend<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>
where
DaAttestation: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<DaBlob as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<DaBlobInfo as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
DaVerifiedBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<DaBlobInfo>
+ Eq
+ Debug
+ Metadata
+ Hash
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<DaVerifiedBlobInfo as Metadata>::AppId:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
<DaVerifiedBlobInfo as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
DaVerifierBackend: VerifierBackend + CoreDaVerifier<DaBlob = DaBlob> + Send + Sync + 'static,
<DaVerifierBackend as VerifierBackend>::Settings: Clone,
<DaVerifierBackend as CoreDaVerifier>::Error: Error,
Tx: Transaction
+ Clone
+ Debug
+ Eq
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<Tx as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
DaStorageSerializer: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore + Send + 'static,
SamplingBackend: DaSamplingServiceBackend<
SamplingRng,
BlobId = <DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId,
> + Send
+ 'static,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self {
settings,
_attestation: core::marker::PhantomData,
_blob: core::marker::PhantomData,
_certificate: core::marker::PhantomData,
_membership: core::marker::PhantomData,
_vid: core::marker::PhantomData,
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
_sampling_backend: core::marker::PhantomData,
_sampling_network_adapter: core::marker::PhantomData,
_sampling_rng: core::marker::PhantomData,
_sampling_storage: core::marker::PhantomData,
})
}
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}
for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}
let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
.route("/cl/status", routing::post(cl_status::<Tx>))
.route(
"/cryptarchia/info",
routing::get(
cryptarchia_info::<
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/cryptarchia/headers",
routing::get(
cryptarchia_headers::<
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/da/add_blob",
routing::post(
add_blob::<
DaAttestation,
DaBlob,
Membership,
DaVerifierBackend,
DaStorageSerializer,
>,
),
)
.route(
"/da/get_range",
routing::post(
get_range::<
Tx,
DaBlobInfo,
DaVerifiedBlobInfo,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route(
"/storage/block",
routing::post(block::<DaStorageSerializer, Tx>),
)
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
.route(
"/mempool/add/blobinfo",
routing::post(
add_blob_info::<
DaVerifiedBlobInfo,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
>,
),
)
.route("/metrics", routing::get(get_metrics))
.with_state(handle);
Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
}

View File

@ -0,0 +1 @@
pub mod backend;

View File

@ -0,0 +1,67 @@
// std
// crates
use color_eyre::eyre::Result;
use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_node::{
config::{update_cryptarchia_consensus, update_log, update_network},
CryptarchiaArgs, HttpArgs, LogArgs, Logger, NetworkArgs, NetworkService, Wire,
};
use nomos_storage::backends::rocksdb::RocksBackend;
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
// internal
use crate::ExecutorApiService;
#[derive(Deserialize, Debug, Clone, Serialize)]
pub struct Config {
pub log: <Logger as ServiceData>::Settings,
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
pub da_network:
<DaNetworkService<DaNetworkExecutorBackend<FillFromNodeList>> as ServiceData>::Settings,
pub da_indexer: <crate::DaIndexer as ServiceData>::Settings,
pub da_verifier: <crate::DaVerifier as ServiceData>::Settings,
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
pub http: <ExecutorApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
pub storage: <crate::StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
pub wait_online_secs: u64,
}
impl Config {
pub fn update_from_args(
mut self,
log_args: LogArgs,
network_args: NetworkArgs,
http_args: HttpArgs,
cryptarchia_args: CryptarchiaArgs,
) -> Result<Self> {
update_log(&mut self.log, log_args)?;
update_network(&mut self.network, network_args)?;
update_http(&mut self.http, http_args)?;
update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?;
Ok(self)
}
}
pub fn update_http(
http: &mut <ExecutorApiService as ServiceData>::Settings,
http_args: HttpArgs,
) -> Result<()> {
let HttpArgs {
http_addr,
cors_origins,
} = http_args;
if let Some(addr) = http_addr {
http.backend_settings.address = addr;
}
if let Some(cors) = cors_origins {
http.backend_settings.cors_origins = cors;
}
Ok(())
}

View File

@ -1,6 +1,38 @@
mod api;
pub mod config;
// std
// crates
use kzgrs_backend::common::blob::DaBlob;
use nomos_api::ApiService;
use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_node::*;
use overwatch_derive::Services;
use overwatch_rs::services::handle::ServiceHandle;
use rand_chacha::ChaCha20Rng;
// internal
use api::backend::AxumBackend;
pub type ExecutorApiService = ApiService<
AxumBackend<
(),
DaBlob,
BlobInfo,
NomosDaMembership,
BlobInfo,
KzgrsDaVerifier,
Tx,
Wire,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
MB16,
>,
>;
#[derive(Services)]
pub struct NomosExecutor {
@ -10,11 +42,11 @@ pub struct NomosExecutor {
da_indexer: ServiceHandle<DaIndexer>,
da_verifier: ServiceHandle<DaVerifier>,
da_sampling: ServiceHandle<DaSampling>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<NomosDaMembership>>>,
da_network: ServiceHandle<DaNetworkService<DaNetworkExecutorBackend<NomosDaMembership>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,
http: ServiceHandle<NomosApiService>,
http: ServiceHandle<ExecutorApiService>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,

View File

@ -1,13 +1,20 @@
use nomos_node::*;
#[cfg(feature = "metrics")]
use nomos_node::{MetricsSettings, NomosRegistry};
// std
// crates
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_executor::config::Config as ExecutorConfig;
use nomos_executor::{NomosExecutor, NomosExecutorServiceSettings};
#[cfg(feature = "metrics")]
use nomos_node::MetricsSettings;
use nomos_node::{
BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs, LogArgs,
MempoolAdapterSettings, MetricsArgs, NetworkArgs, Transaction, Tx, TxMempoolSettings, CL_TOPIC,
DA_TOPIC,
};
use overwatch_rs::overwatch::*;
use tracing::{span, Level};
use uuid::Uuid;
// internal
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
@ -39,14 +46,15 @@ fn main() -> Result<()> {
cryptarchia_args,
metrics_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_log(log_args)?
.update_http(http_args)?
.update_network(network_args)?
.update_cryptarchia_consensus(cryptarchia_args)?;
let config = serde_yaml::from_reader::<_, ExecutorConfig>(std::fs::File::open(config)?)?
.update_from_args(log_args, network_args, http_args, cryptarchia_args)?;
let registry = cfg!(feature = "metrics")
.then(|| metrics_args.with_metrics.then(NomosRegistry::default))
.then(|| {
metrics_args
.with_metrics
.then(nomos_metrics::NomosRegistry::default)
})
.flatten();
#[cfg(debug_assertions)]

View File

@ -0,0 +1,312 @@
// std
use std::error::Error;
use std::{fmt::Debug, hash::Hash};
// crates
use axum::{http::HeaderValue, routing, Router, Server};
use hyper::header::{CONTENT_TYPE, USER_AGENT};
use nomos_api::Backend;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
use nomos_da_network_core::SubnetworkId;
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_libp2p::PeerId;
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
// internal
use super::handlers::{
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
cryptarchia_info, get_metrics, get_range, libp2p_info,
};
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
/// Socket where the server will be listening on for incoming requests.
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
pub cors_origins: Vec<String>,
}
pub struct AxumBackend<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<DaAttestation>,
_blob: core::marker::PhantomData<DaBlob>,
_certificate: core::marker::PhantomData<DaBlobInfo>,
_membership: core::marker::PhantomData<Membership>,
_vid: core::marker::PhantomData<DaVerifiedBlobInfo>,
_verifier_backend: core::marker::PhantomData<DaVerifierBackend>,
_tx: core::marker::PhantomData<Tx>,
_storage_serde: core::marker::PhantomData<DaStorageSerializer>,
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
_sampling_rng: core::marker::PhantomData<SamplingRng>,
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
}
#[derive(OpenApi)]
#[openapi(
paths(
),
components(
schemas(Status<HeaderId>, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")
)
)]
struct ApiDoc;
#[async_trait::async_trait]
impl<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> Backend
for AxumBackend<
DaAttestation,
DaBlob,
DaBlobInfo,
Membership,
DaVerifiedBlobInfo,
DaVerifierBackend,
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>
where
DaAttestation: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<DaBlob as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<DaBlobInfo as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
DaVerifiedBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<DaBlobInfo>
+ Eq
+ Debug
+ Metadata
+ Hash
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<DaVerifiedBlobInfo as Metadata>::AppId:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
<DaVerifiedBlobInfo as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
DaVerifierBackend: VerifierBackend + CoreDaVerifier<DaBlob = DaBlob> + Send + Sync + 'static,
<DaVerifierBackend as VerifierBackend>::Settings: Clone,
<DaVerifierBackend as CoreDaVerifier>::Error: Error,
Tx: Transaction
+ Clone
+ Debug
+ Eq
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<Tx as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
DaStorageSerializer: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore + Send + 'static,
SamplingBackend: DaSamplingServiceBackend<
SamplingRng,
BlobId = <DaVerifiedBlobInfo as DispersedBlobInfo>::BlobId,
> + Send
+ 'static,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self {
settings,
_attestation: core::marker::PhantomData,
_blob: core::marker::PhantomData,
_certificate: core::marker::PhantomData,
_membership: core::marker::PhantomData,
_vid: core::marker::PhantomData,
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
_sampling_backend: core::marker::PhantomData,
_sampling_network_adapter: core::marker::PhantomData,
_sampling_rng: core::marker::PhantomData,
_sampling_storage: core::marker::PhantomData,
})
}
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}
for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}
let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/cl/metrics", routing::get(cl_metrics::<Tx>))
.route("/cl/status", routing::post(cl_status::<Tx>))
.route(
"/cryptarchia/info",
routing::get(
cryptarchia_info::<
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/cryptarchia/headers",
routing::get(
cryptarchia_headers::<
Tx,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/da/add_blob",
routing::post(
add_blob::<
DaAttestation,
DaBlob,
Membership,
DaVerifierBackend,
DaStorageSerializer,
>,
),
)
.route(
"/da/get_range",
routing::post(
get_range::<
Tx,
DaBlobInfo,
DaVerifiedBlobInfo,
DaStorageSerializer,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route(
"/storage/block",
routing::post(block::<DaStorageSerializer, Tx>),
)
.route("/mempool/add/tx", routing::post(add_tx::<Tx>))
.route(
"/mempool/add/blobinfo",
routing::post(
add_blob_info::<
DaVerifiedBlobInfo,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
>,
),
)
.route("/metrics", routing::get(get_metrics))
.with_state(handle);
Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
}

View File

@ -7,16 +7,10 @@ use axum::{
extract::{Query, State},
http::HeaderValue,
response::{IntoResponse, Response},
routing, Json, Router, Server,
};
use hyper::{
header::{CONTENT_TYPE, USER_AGENT},
Body, StatusCode,
};
use nomos_api::{
http::{cl, consensus, da, libp2p, mempool, metrics, storage},
Backend,
Json,
};
use hyper::{header::CONTENT_TYPE, Body, StatusCode};
use nomos_api::http::{cl, consensus, da, libp2p, mempool, metrics, storage};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::{BlobId, DaVerifier as CoreDaVerifier};
@ -25,287 +19,15 @@ use nomos_da_network_core::SubnetworkId;
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_libp2p::PeerId;
use nomos_mempool::{
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
tx::service::openapi::Status, MempoolMetrics,
};
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
use tower_http::{
cors::{Any, CorsLayer},
trace::TraceLayer,
};
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
// internal
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
/// Socket where the server will be listening on for incoming requests.
pub address: std::net::SocketAddr,
/// Allowed origins for this server deployment requests.
pub cors_origins: Vec<String>,
}
pub struct AxumBackend<
A,
B,
C,
M,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<A>,
_blob: core::marker::PhantomData<B>,
_certificate: core::marker::PhantomData<C>,
_membership: core::marker::PhantomData<M>,
_vid: core::marker::PhantomData<V>,
_verifier_backend: core::marker::PhantomData<VB>,
_tx: core::marker::PhantomData<T>,
_storage_serde: core::marker::PhantomData<S>,
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
_sampling_rng: core::marker::PhantomData<SamplingRng>,
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
}
#[derive(OpenApi)]
#[openapi(
paths(
),
components(
schemas(Status<HeaderId>, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")
)
)]
struct ApiDoc;
#[async_trait::async_trait]
impl<
A,
B,
C,
M,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
const SIZE: usize,
> Backend
for AxumBackend<
A,
B,
C,
M,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>
where
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
C: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<C as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
V: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<C>
+ Eq
+ Debug
+ Metadata
+ Hash
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<V as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<V as Metadata>::AppId: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
<V as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B> + Send + Sync + 'static,
<VB as VerifierBackend>::Settings: Clone,
<VB as CoreDaVerifier>::Error: Error,
T: Transaction
+ Clone
+ Debug
+ Eq
+ Hash
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static,
<T as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore + Send + 'static,
SamplingBackend: DaSamplingServiceBackend<SamplingRng, BlobId = <V as DispersedBlobInfo>::BlobId>
+ Send
+ 'static,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;
async fn new(settings: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self {
settings,
_attestation: core::marker::PhantomData,
_blob: core::marker::PhantomData,
_certificate: core::marker::PhantomData,
_membership: core::marker::PhantomData,
_vid: core::marker::PhantomData,
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
_sampling_backend: core::marker::PhantomData,
_sampling_network_adapter: core::marker::PhantomData,
_sampling_rng: core::marker::PhantomData,
_sampling_storage: core::marker::PhantomData,
})
}
async fn serve(self, handle: OverwatchHandle) -> Result<(), Self::Error> {
let mut builder = CorsLayer::new();
if self.settings.cors_origins.is_empty() {
builder = builder.allow_origin(Any);
}
for origin in &self.settings.cors_origins {
builder = builder.allow_origin(
origin
.as_str()
.parse::<HeaderValue>()
.expect("fail to parse origin"),
);
}
let app = Router::new()
.layer(
builder
.allow_headers([CONTENT_TYPE, USER_AGENT])
.allow_methods(Any),
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/cl/metrics", routing::get(cl_metrics::<T>))
.route("/cl/status", routing::post(cl_status::<T>))
.route(
"/cryptarchia/info",
routing::get(
cryptarchia_info::<
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route(
"/cryptarchia/headers",
routing::get(
cryptarchia_headers::<
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route("/da/add_blob", routing::post(add_blob::<A, B, M, VB, S>))
.route(
"/da/get_range",
routing::post(
get_range::<
T,
C,
V,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
SIZE,
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route("/storage/block", routing::post(block::<S, T>))
.route("/mempool/add/tx", routing::post(add_tx::<T>))
.route(
"/mempool/add/blobinfo",
routing::post(
add_blob_info::<
V,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
>,
),
)
.route("/metrics", routing::get(get_metrics))
.with_state(handle);
Server::bind(&self.settings.address)
.serve(app.into_make_service())
.await
}
}
macro_rules! make_request_and_return_response {
($cond:expr) => {{
match $cond.await {
@ -329,7 +51,7 @@ macro_rules! make_request_and_return_response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> Response
pub async fn cl_metrics<T>(State(handle): State<OverwatchHandle>) -> Response
where
T: Transaction
+ Clone
@ -353,7 +75,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cl_status<T>(
pub async fn cl_status<T>(
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<T as Transaction>::Hash>>,
) -> Response
@ -366,7 +88,7 @@ where
}
#[derive(Deserialize)]
#[allow(dead_code)]
struct QueryParams {
pub struct CryptarchiaInfoQuery {
from: Option<HeaderId>,
to: Option<HeaderId>,
}
@ -379,7 +101,7 @@ struct QueryParams {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cryptarchia_info<
pub async fn cryptarchia_info<
Tx,
SS,
SamplingBackend,
@ -430,7 +152,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cryptarchia_headers<
pub async fn cryptarchia_headers<
Tx,
SS,
SamplingBackend,
@ -440,7 +162,7 @@ async fn cryptarchia_headers<
const SIZE: usize,
>(
State(store): State<OverwatchHandle>,
Query(query): Query<QueryParams>,
Query(query): Query<CryptarchiaInfoQuery>,
) -> Response
where
Tx: Transaction
@ -463,7 +185,7 @@ where
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
{
let QueryParams { from, to } = query;
let CryptarchiaInfoQuery { from, to } = query;
make_request_and_return_response!(consensus::cryptarchia_headers::<
Tx,
SS,
@ -483,7 +205,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_blob<A, B, M, VB, SS>(
pub async fn add_blob<A, B, M, VB, SS>(
State(handle): State<OverwatchHandle>,
Json(blob): Json<B>,
) -> Response
@ -507,7 +229,7 @@ where
}
#[derive(Serialize, Deserialize)]
struct GetRangeReq<V: Metadata>
pub struct GetRangeReq<V: Metadata>
where
<V as Metadata>::AppId: Serialize + DeserializeOwned,
<V as Metadata>::Index: Serialize + DeserializeOwned,
@ -524,7 +246,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn get_range<
pub async fn get_range<
Tx,
C,
V,
@ -606,7 +328,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
pub async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(libp2p::libp2p_info(&handle))
}
@ -618,7 +340,10 @@ async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn block<S, Tx>(State(handle): State<OverwatchHandle>, Json(id): Json<HeaderId>) -> Response
pub async fn block<S, Tx>(
State(handle): State<OverwatchHandle>,
Json(id): Json<HeaderId>,
) -> Response
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
S: StorageSerde + Send + Sync + 'static,
@ -634,7 +359,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
pub async fn add_tx<Tx>(State(handle): State<OverwatchHandle>, Json(tx): Json<Tx>) -> Response
where
Tx: Transaction + Clone + Debug + Hash + Serialize + DeserializeOwned + Send + Sync + 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
@ -655,7 +380,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_blob_info<B, SamplingBackend, SamplingAdapter, SamplingRng, SamplingStorage>(
pub async fn add_blob_info<B, SamplingBackend, SamplingAdapter, SamplingRng, SamplingStorage>(
State(handle): State<OverwatchHandle>,
Json(blob_info): Json<B>,
) -> Response
@ -700,7 +425,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn get_metrics(State(handle): State<OverwatchHandle>) -> Response {
pub async fn get_metrics(State(handle): State<OverwatchHandle>) -> Response {
match metrics::gather(&handle).await {
Ok(encoded_metrics) => Response::builder()
.status(StatusCode::OK)

View File

@ -0,0 +1,2 @@
pub mod backend;
pub mod handlers;

View File

@ -75,7 +75,7 @@ pub struct NetworkArgs {
#[derive(Parser, Debug, Clone)]
pub struct HttpArgs {
#[clap(long = "http-host", env = "HTTP_HOST")]
http_addr: Option<SocketAddr>,
pub http_addr: Option<SocketAddr>,
#[clap(long = "http-cors-origin", env = "HTTP_CORS_ORIGIN")]
pub cors_origins: Option<Vec<String>>,
@ -122,135 +122,154 @@ pub struct Config {
pub http: <NomosApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
pub storage: <crate::StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
pub wait_online_secs: u64,
}
impl Config {
pub fn update_log(mut self, log_args: LogArgs) -> Result<Self> {
let LogArgs {
backend,
log_addr: addr,
directory,
prefix,
format,
level,
} = log_args;
// Override the file config with the one from env variables.
if let Some(backend) = backend {
self.log.backend = match backend {
LoggerBackendType::Gelf => LoggerBackend::Gelf {
addr: addr
.ok_or_else(|| eyre!("Gelf backend requires an address."))?
.to_socket_addrs()?
.next()
.ok_or_else(|| eyre!("Invalid gelf address"))?,
},
LoggerBackendType::File => LoggerBackend::File {
directory: directory
.ok_or_else(|| eyre!("File backend requires a directory."))?,
prefix,
},
LoggerBackendType::Stdout => LoggerBackend::Stdout,
LoggerBackendType::Stderr => LoggerBackend::Stderr,
}
};
// Update parts of the config.
if let Some(format_str) = format {
self.log.format = match format_str.as_str() {
"Json" => LoggerFormat::Json,
"Plain" => LoggerFormat::Plain,
_ => return Err(eyre!("Invalid log format provided.")),
};
}
if let Some(level_str) = level {
self.log.level = match level_str.as_str() {
"DEBUG" => Level::DEBUG,
_ => return Err(eyre!("Invalid log level provided.")),
};
}
Ok(self)
}
pub fn update_network(mut self, network_args: NetworkArgs) -> Result<Self> {
let NetworkArgs {
host,
port,
node_key,
initial_peers,
} = network_args;
if let Some(IpAddr::V4(h)) = host {
self.network.backend.inner.host = h;
} else if host.is_some() {
return Err(eyre!("Unsupported ip version"));
}
if let Some(port) = port {
self.network.backend.inner.port = port as u16;
}
if let Some(node_key) = node_key {
let mut key_bytes = hex::decode(node_key)?;
self.network.backend.inner.node_key =
SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
}
if let Some(peers) = initial_peers {
self.network.backend.initial_peers = peers;
}
// TODO: configure mixclient and mixnode if the mixnet feature is enabled
Ok(self)
}
pub fn update_http(mut self, http_args: HttpArgs) -> Result<Self> {
let HttpArgs {
http_addr,
cors_origins,
} = http_args;
if let Some(addr) = http_addr {
self.http.backend_settings.address = addr;
}
if let Some(cors) = cors_origins {
self.http.backend_settings.cors_origins = cors;
}
Ok(self)
}
pub fn update_cryptarchia_consensus(mut self, consensus_args: CryptarchiaArgs) -> Result<Self> {
let CryptarchiaArgs {
chain_start_time,
slot_duration,
note_secret_key,
note_value,
} = consensus_args;
if let Some(start_time) = chain_start_time {
self.cryptarchia.time.chain_start_time =
time::OffsetDateTime::from_unix_timestamp(start_time)?;
}
if let Some(duration) = slot_duration {
self.cryptarchia.time.slot_duration = std::time::Duration::from_secs(duration);
}
if let Some(sk) = note_secret_key {
let sk = <[u8; 16]>::from_hex(sk)?;
let value = note_value.expect("Should be available if coin sk provided");
self.cryptarchia.notes.push(InputWitness::new(
NoteWitness::basic(value as u64, NMO_UNIT, &mut rand::thread_rng()),
NullifierSecret::from_bytes(sk),
));
}
pub fn update_from_args(
mut self,
log_args: LogArgs,
network_args: NetworkArgs,
http_args: HttpArgs,
cryptarchia_args: CryptarchiaArgs,
) -> Result<Self> {
update_log(&mut self.log, log_args)?;
update_network(&mut self.network, network_args)?;
update_http(&mut self.http, http_args)?;
update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?;
Ok(self)
}
}
pub fn update_log(log: &mut <Logger as ServiceData>::Settings, log_args: LogArgs) -> Result<()> {
let LogArgs {
backend,
log_addr: addr,
directory,
prefix,
format,
level,
} = log_args;
// Override the file config with the one from env variables.
if let Some(backend) = backend {
log.backend = match backend {
LoggerBackendType::Gelf => LoggerBackend::Gelf {
addr: addr
.ok_or_else(|| eyre!("Gelf backend requires an address."))?
.to_socket_addrs()?
.next()
.ok_or_else(|| eyre!("Invalid gelf address"))?,
},
LoggerBackendType::File => LoggerBackend::File {
directory: directory.ok_or_else(|| eyre!("File backend requires a directory."))?,
prefix,
},
LoggerBackendType::Stdout => LoggerBackend::Stdout,
LoggerBackendType::Stderr => LoggerBackend::Stderr,
}
};
// Update parts of the config.
if let Some(format_str) = format {
log.format = match format_str.as_str() {
"Json" => LoggerFormat::Json,
"Plain" => LoggerFormat::Plain,
_ => return Err(eyre!("Invalid log format provided.")),
};
}
if let Some(level_str) = level {
log.level = match level_str.as_str() {
"DEBUG" => Level::DEBUG,
_ => return Err(eyre!("Invalid log level provided.")),
};
}
Ok(())
}
pub fn update_network(
network: &mut <NetworkService<NetworkBackend> as ServiceData>::Settings,
network_args: NetworkArgs,
) -> Result<()> {
let NetworkArgs {
host,
port,
node_key,
initial_peers,
} = network_args;
if let Some(IpAddr::V4(h)) = host {
network.backend.inner.host = h;
} else if host.is_some() {
return Err(eyre!("Unsupported ip version"));
}
if let Some(port) = port {
network.backend.inner.port = port as u16;
}
if let Some(node_key) = node_key {
let mut key_bytes = hex::decode(node_key)?;
network.backend.inner.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
}
if let Some(peers) = initial_peers {
network.backend.initial_peers = peers;
}
// TODO: configure mixclient and mixnode if the mixnet feature is enabled
Ok(())
}
pub fn update_http(
http: &mut <NomosApiService as ServiceData>::Settings,
http_args: HttpArgs,
) -> Result<()> {
let HttpArgs {
http_addr,
cors_origins,
} = http_args;
if let Some(addr) = http_addr {
http.backend_settings.address = addr;
}
if let Some(cors) = cors_origins {
http.backend_settings.cors_origins = cors;
}
Ok(())
}
pub fn update_cryptarchia_consensus(
cryptarchia: &mut <crate::Cryptarchia as ServiceData>::Settings,
consensus_args: CryptarchiaArgs,
) -> Result<()> {
let CryptarchiaArgs {
chain_start_time,
slot_duration,
note_secret_key,
note_value,
} = consensus_args;
if let Some(start_time) = chain_start_time {
cryptarchia.time.chain_start_time = time::OffsetDateTime::from_unix_timestamp(start_time)?;
}
if let Some(duration) = slot_duration {
cryptarchia.time.slot_duration = std::time::Duration::from_secs(duration);
}
if let Some(sk) = note_secret_key {
let sk = <[u8; 16]>::from_hex(sk)?;
let value = note_value.expect("Should be available if coin sk provided");
cryptarchia.notes.push(InputWitness::new(
NoteWitness::basic(value as u64, NMO_UNIT, &mut rand::thread_rng()),
NullifierSecret::from_bytes(sk),
));
}
Ok(())
}

View File

@ -1,10 +1,10 @@
pub mod api;
mod config;
pub mod config;
mod tx;
// std
// crates
use api::AxumBackend;
use api::backend::AxumBackend;
use bytes::Bytes;
use color_eyre::eyre::Result;
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
@ -81,7 +81,7 @@ pub type NomosApiService = ApiService<
pub const CL_TOPIC: &str = "cl";
pub const DA_TOPIC: &str = "da";
const MB16: usize = 1024 * 1024 * 16;
pub const MB16: usize = 1024 * 1024 * 16;
pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,

View File

@ -47,10 +47,7 @@ fn main() -> Result<()> {
metrics_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_log(log_args)?
.update_http(http_args)?
.update_network(network_args)?
.update_cryptarchia_consensus(cryptarchia_args)?;
.update_from_args(log_args, network_args, http_args, cryptarchia_args)?;
let registry = cfg!(feature = "metrics")
.then(|| {

View File

@ -1,13 +1,13 @@
# BUILD IMAGE ---------------------------------------------------------
FROM rust:1.80.0-slim-bookworm AS builder
FROM rust:1.81.0-slim-bookworm AS builder
WORKDIR /nomos
COPY . .
# Install dependencies needed for building RocksDB and etcd.
RUN apt-get update && apt-get install -yq \
git gcc g++ clang etcd-client libssl-dev \
git gcc g++ clang libssl-dev \
pkg-config protobuf-compiler
RUN cargo install cargo-binstall
@ -18,7 +18,7 @@ RUN cargo build --release --all --features metrics
# NODE IMAGE ----------------------------------------------------------
FROM bitnami/minideb:latest
FROM bitnami/minideb:bookworm
LABEL maintainer="augustinas@status.im" \
source="https://github.com/logos-co/nomos-node" \
@ -27,9 +27,11 @@ LABEL maintainer="augustinas@status.im" \
# nomos default ports
EXPOSE 3000 8080 9000 60000
RUN apt-get update && apt-get install -y libssl3
COPY --from=builder /nomos/target/release/nomos-node /usr/bin/nomos-node
COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli
COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl
COPY nodes/nomos-node/config.yaml /etc/nomos/config.yaml
COPY --from=builder /nomos/target/release/cfgsync-server /usr/bin/cfgsync-server
COPY --from=builder /nomos/target/release/cfgsync-client /usr/bin/cfgsync-client
ENTRYPOINT ["/usr/bin/nomos-node"]

View File

@ -2,9 +2,7 @@
The Nomos Docker Compose Testnet contains four distinct service types:
- **Bootstrap Node Service**: A singular Nomos node with its own service and a deterministic DNS address. Other nodes utilize this as their initial peer.
- **Libp2p Node Services**: Multiple dynamically spawned Nomos nodes that announce their existence through etcd.
- **Etcd Service**: A container running an etcd instance.
- **Nomos Node Services**: Multiple dynamically spawned Nomos nodes that synchronizes their configuration via cfgsync utility.
## Building
@ -41,15 +39,15 @@ docker compose up -d
Followed by:
```bash
docker compose logs -f {bootstrap,libp2p-node,etcd}
docker compose logs -f nomos-node
```
## Using testnet
Bootstrap node is accessible from the host via `3000` and `18080` ports. To expose other nomos nodes, please update `libp2p-node` service in the `compose.yml` file with this configuration:
Bootstrap node is accessible from the host via `3000` and `18080` ports. To expose other nomos nodes, please update `nomos-node` service in the `compose.yml` file with this configuration:
```bash
libp2p-node:
nomos-node-0:
ports:
- "3001-3010:3000" # Use range depending on the number of nomos node replicas.
- "18081-18190:18080"

View File

@ -1,68 +0,0 @@
log:
backend: "Stdout"
format: "Json"
level: "info"
cryptarchia:
config:
epoch_stake_distribution_stabilization: 3
epoch_period_nonce_buffer: 3
epoch_period_nonce_stabilization: 4
consensus_config:
security_param: 10
active_slot_coeff: 0.9
time:
slot_duration:
secs: 5
nanos: 0
chain_start_time: [2024, 115, 6, 45, 44, 159214915, 0, 0, 0]
coins:
genesis_state:
lead_commitments:
- 20345e93cc65057a391893cbd88d86568efd3073156564797e4a912e4ae1c3ab
- 1594ef82f13d0b64284a9134f2f2ed3b30bca26812a69886a3f9ed737f117bd5
- 76721421649fbf175aff27470e40f44ade69bac844abcf27215f5c0d79d2ec46
- 06f7f2078ba6b24af7c5aae6f24889f6c609195ad796fb11b42ad6e0a3f8c10f
spend_commitments:
- 20345e93cc65057a391893cbd88d86568efd3073156564797e4a912e4ae1c3ab
- 1594ef82f13d0b64284a9134f2f2ed3b30bca26812a69886a3f9ed737f117bd5
- 76721421649fbf175aff27470e40f44ade69bac844abcf27215f5c0d79d2ec46
- 06f7f2078ba6b24af7c5aae6f24889f6c609195ad796fb11b42ad6e0a3f8c10f
nullifiers: []
nonce: '0000000000000000000000000000000000000000000000000000000000000000'
slot: 0
next_epoch_state:
epoch: 1
nonce: '0000000000000000000000000000000000000000000000000000000000000000'
commitments: []
total_stake: 4
epoch_state:
epoch: 0
nonce: '0000000000000000000000000000000000000000000000000000000000000000'
commitments: []
total_stake: 4
network:
backend:
host: 0.0.0.0
port: 3000
log_level: "fatal"
node_key: "0000000000000000000000000000000000000000000000000000000000000001"
discV5BootstrapNodes: []
initial_peers: []
relayTopics: []
http:
backend_settings:
address: 0.0.0.0:18080
cors_origins: []
da:
da_protocol:
voter: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
num_attestations: 1
backend:
max_capacity: 10000
evicting_period:
secs: 3600
nanos: 0

16
testnet/cfgsync.yaml Normal file
View File

@ -0,0 +1,16 @@
port: 4400
n_hosts: 4
timeout: 10
# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9
# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
global_params_path: "/kzgrs_test_params"

View File

@ -0,0 +1,16 @@
[package]
name = "cfgsync"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.6" }
clap = { version = "4", features = ["derive"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-node = { path = "../../nodes/nomos-node" }
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
tests = { path = "../../tests" }
tokio = { version = "1.24", features = ["rt-multi-thread"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"

View File

@ -0,0 +1,66 @@
// std
use std::{env, fs, net::Ipv4Addr, process};
// crates
use nomos_node::Config as NodeConfig;
use reqwest::Client;
use serde::{de::DeserializeOwned, Serialize};
// internal
#[derive(Serialize)]
struct ClientIp {
ip: Ipv4Addr,
}
fn parse_ip(ip_str: String) -> Ipv4Addr {
ip_str.parse().unwrap_or_else(|_| {
eprintln!("Invalid IP format, defaulting to 127.0.0.1");
Ipv4Addr::new(127, 0, 0, 1)
})
}
async fn get_config<Config: Serialize + DeserializeOwned>(
ip: Ipv4Addr,
url: &str,
config_file: &str,
) -> Result<(), String> {
let client = Client::new();
let response = client
.post(url)
.json(&ClientIp { ip })
.send()
.await
.map_err(|err| format!("Failed to send IP announcement: {}", err))?;
if !response.status().is_success() {
return Err(format!("Server error: {:?}", response.status()));
}
let config = response
.json::<Config>()
.await
.map_err(|err| format!("Failed to parse response: {}", err))?;
let yaml = serde_yaml::to_string(&config)
.map_err(|err| format!("Failed to serialize config to YAML: {}", err))?;
fs::write(config_file, yaml)
.map_err(|err| format!("Failed to write config to file: {}", err))?;
println!("Config saved to {}", config_file);
Ok(())
}
#[tokio::main]
async fn main() {
let config_file_path = env::var("CFG_FILE_PATH").unwrap_or("config.yaml".to_string());
let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or("http://127.0.0.1:4400".to_string());
let ip = parse_ip(env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_string()));
let node_config_endpoint = format!("{}/node", server_addr);
if let Err(err) = get_config::<NodeConfig>(ip, &node_config_endpoint, &config_file_path).await {
eprintln!("Error: {}", err);
process::exit(1);
}
}

View File

@ -0,0 +1,125 @@
// std
use std::net::Ipv4Addr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::{fs, process};
// crates
use axum::extract::State;
use axum::Json;
use axum::{http::StatusCode, response::IntoResponse, routing::post, Router};
use cfgsync::config::Host;
use cfgsync::repo::{ConfigRepo, RepoResponse};
use clap::Parser;
use serde::{Deserialize, Serialize};
use tests::{ConsensusConfig, DaConfig};
use tokio::sync::oneshot::channel;
// internal
#[derive(Parser, Debug)]
#[command(about = "CfgSync")]
struct Args {
config: PathBuf,
}
#[derive(Debug, Deserialize)]
struct CfgSyncConfig {
port: u16,
n_hosts: usize,
timeout: u64,
// ConsensusConfig related parameters
security_param: u32,
active_slot_coeff: f64,
// DaConfig related parameters
subnetwork_size: usize,
dispersal_factor: usize,
num_samples: u16,
num_subnets: u16,
old_blobs_check_interval_secs: u64,
blobs_validity_duration_secs: u64,
global_params_path: String,
}
impl CfgSyncConfig {
fn load_from_file(file_path: &PathBuf) -> Result<Self, String> {
let config_content = fs::read_to_string(file_path)
.map_err(|err| format!("Failed to read config file: {}", err))?;
serde_yaml::from_str(&config_content)
.map_err(|err| format!("Failed to parse config file: {}", err))
}
fn to_consensus_config(&self) -> ConsensusConfig {
ConsensusConfig {
n_participants: self.n_hosts,
security_param: self.security_param,
active_slot_coeff: self.active_slot_coeff,
}
}
fn to_da_config(&self) -> DaConfig {
DaConfig {
subnetwork_size: self.subnetwork_size,
dispersal_factor: self.dispersal_factor,
num_samples: self.num_samples,
num_subnets: self.num_subnets,
old_blobs_check_interval: Duration::from_secs(self.old_blobs_check_interval_secs),
blobs_validity_duration: Duration::from_secs(self.blobs_validity_duration_secs),
global_params_path: self.global_params_path.clone(),
}
}
}
#[derive(Serialize, Deserialize)]
struct ClientIp {
ip: Ipv4Addr,
}
async fn node_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp { ip } = payload;
let (reply_tx, reply_rx) = channel();
config_repo.register(Host::default_node_from_ip(ip), reply_tx);
match reply_rx.await {
Ok(config_response) => match config_response {
RepoResponse::Config(config) => (StatusCode::OK, Json(config)).into_response(),
RepoResponse::Timeout => {
(StatusCode::REQUEST_TIMEOUT, Json(RepoResponse::Timeout)).into_response()
}
},
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
}
}
#[tokio::main]
async fn main() {
let cli = Args::parse();
let config = CfgSyncConfig::load_from_file(&cli.config).unwrap_or_else(|err| {
eprintln!("{}", err);
process::exit(1);
});
let consensus_config = config.to_consensus_config();
let da_config = config.to_da_config();
let config_repo = ConfigRepo::new(
config.n_hosts,
consensus_config,
da_config,
Duration::from_secs(config.timeout),
);
let app = Router::new()
.route("/node", post(node_config))
.with_state(config_repo.clone());
println!("Server running on http://0.0.0.0:{}", config.port);
axum::Server::bind(&format!("0.0.0.0:{}", config.port).parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}

View File

@ -0,0 +1,157 @@
// std
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
// crates
use nomos_libp2p::{Multiaddr, PeerId};
use nomos_node::Config as NodeConfig;
use tests::{ConsensusConfig, DaConfig, Node, NomosNode};
// internal
const DEFAULT_NETWORK_PORT: u16 = 3000;
const DEFAULT_DA_NETWORK_PORT: u16 = 3300;
#[derive(Eq, PartialEq, Hash, Clone)]
pub enum HostKind {
Nomos,
}
#[derive(Eq, PartialEq, Hash, Clone)]
pub struct Host {
pub kind: HostKind,
pub ip: Ipv4Addr,
pub network_port: u16,
pub da_network_port: u16,
}
impl Host {
pub fn default_node_from_ip(ip: Ipv4Addr) -> Self {
Self {
kind: HostKind::Nomos,
ip,
network_port: DEFAULT_NETWORK_PORT,
da_network_port: DEFAULT_DA_NETWORK_PORT,
}
}
}
pub fn create_node_configs(
consensus: ConsensusConfig,
da: DaConfig,
hosts: Vec<Host>,
) -> HashMap<Host, NodeConfig> {
let mut configs = NomosNode::create_node_configs(consensus, da);
let mut configured_hosts = HashMap::new();
// Rebuild DA address lists.
let peer_addresses = configs[0].da_network.backend.addresses.clone();
let host_network_init_peers = update_network_init_peers(hosts.clone());
let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses);
let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses
.clone()
.into_iter()
.map(|(peer_id, (multiaddr, _))| (peer_id, multiaddr))
.collect();
for (config, host) in configs.iter_mut().zip(hosts.into_iter()) {
config.da_network.backend.addresses = new_peer_addresses.clone();
// Libp2p network config.
config.network.backend.inner.host = Ipv4Addr::from_str("0.0.0.0").unwrap();
config.network.backend.inner.port = host.network_port;
config.network.backend.initial_peers = host_network_init_peers.clone();
// DA Libp2p network config.
config.da_network.backend.listening_address = Multiaddr::from_str(&format!(
"/ip4/0.0.0.0/udp/{}/quic-v1",
host.da_network_port,
))
.unwrap();
configured_hosts.insert(host.clone(), config.clone());
}
configured_hosts
}
fn update_network_init_peers(hosts: Vec<Host>) -> Vec<Multiaddr> {
hosts
.iter()
.map(|h| nomos_libp2p::Swarm::multiaddr(h.ip, h.network_port))
.collect()
}
fn update_da_peer_addresses(
hosts: Vec<Host>,
peer_addresses: HashMap<PeerId, Multiaddr>,
) -> HashMap<PeerId, (Multiaddr, Ipv4Addr)> {
peer_addresses
.into_iter()
.zip(hosts)
.map(|((peer_id, _), host)| {
let new_multiaddr = Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1",
host.ip, host.da_network_port,
))
.unwrap();
(peer_id, (new_multiaddr, host.ip))
})
.collect()
}
#[cfg(test)]
mod cfgsync_tests {
use std::str::FromStr;
use std::{net::Ipv4Addr, time::Duration};
use nomos_libp2p::Protocol;
use tests::{ConsensusConfig, DaConfig};
use super::{create_node_configs, Host, HostKind};
#[test]
fn basic_ip_list() {
let hosts = (0..10)
.map(|i| Host {
kind: HostKind::Nomos,
ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(),
network_port: 3000,
da_network_port: 4044,
})
.collect();
let configs = create_node_configs(
ConsensusConfig {
n_participants: 10,
security_param: 10,
active_slot_coeff: 0.9,
},
DaConfig {
subnetwork_size: 2,
dispersal_factor: 1,
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(u64::MAX),
global_params_path: "".into(),
},
hosts,
);
for (host, config) in configs.iter() {
let network_port = config.network.backend.inner.port;
let da_network_addr = config.da_network.backend.listening_address.clone();
let da_network_port = da_network_addr
.iter()
.find_map(|protocol| match protocol {
Protocol::Udp(port) => Some(port),
_ => None,
})
.unwrap();
assert_eq!(network_port, host.network_port);
assert_eq!(da_network_port, host.da_network_port);
}
}
}

View File

@ -0,0 +1,2 @@
pub mod config;
pub mod repo;

101
testnet/cfgsync/src/repo.rs Normal file
View File

@ -0,0 +1,101 @@
// std
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
// crates
use nomos_node::Config as NodeConfig;
use serde::{Deserialize, Serialize};
use tests::{ConsensusConfig, DaConfig};
use tokio::sync::oneshot::Sender;
use tokio::time::timeout;
// internal
use crate::config::{create_node_configs, Host};
#[derive(Serialize, Deserialize)]
pub enum RepoResponse {
Config(NodeConfig),
Timeout,
}
pub struct ConfigRepo {
waiting_hosts: Mutex<HashMap<Host, Sender<RepoResponse>>>,
n_hosts: usize,
consensus_config: ConsensusConfig,
da_config: DaConfig,
timeout_duration: Duration,
}
impl ConfigRepo {
pub fn new(
n_hosts: usize,
consensus_config: ConsensusConfig,
da_config: DaConfig,
timeout_duration: Duration,
) -> Arc<Self> {
let repo = Arc::new(Self {
waiting_hosts: Mutex::new(HashMap::new()),
n_hosts,
consensus_config,
da_config,
timeout_duration,
});
let repo_clone = repo.clone();
tokio::spawn(async move {
repo_clone.run().await;
});
repo
}
pub fn register(&self, host: Host, reply_tx: Sender<RepoResponse>) {
let mut waiting_hosts = self.waiting_hosts.lock().unwrap();
waiting_hosts.insert(host, reply_tx);
}
async fn run(&self) {
let timeout_duration = self.timeout_duration;
match timeout(timeout_duration, self.wait_for_hosts()).await {
Ok(_) => {
println!("All hosts have announced their IPs");
let mut waiting_hosts = self.waiting_hosts.lock().unwrap();
let hosts = waiting_hosts
.iter()
.map(|(host, _)| host)
.cloned()
.collect();
let configs = create_node_configs(
self.consensus_config.clone(),
self.da_config.clone(),
hosts,
);
for (host, sender) in waiting_hosts.drain() {
let config = configs.get(&host).expect("host should have a config");
let _ = sender.send(RepoResponse::Config(config.to_owned()));
}
}
Err(_) => {
println!("Timeout: Not all hosts announced within the time limit");
let mut waiting_hosts = self.waiting_hosts.lock().unwrap();
for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Timeout);
}
}
}
}
async fn wait_for_hosts(&self) {
loop {
if self.waiting_hosts.lock().unwrap().len() >= self.n_hosts {
break;
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}

View File

@ -1,8 +0,0 @@
backend:
host: 0.0.0.0
port: 4007
log_level: "fatal"
node_key: "0000000000000000000000000000000000000000000000000000000000000667"
discV5BootstrapNodes: []
initial_peers: ["/dns/bootstrap/udp/3000/quic-v1"]
relayTopics: []

View File

@ -1,10 +0,0 @@
log:
backend: "Stdout"
format: "Json"
level: "info"
api:
backend_settings:
address: 0.0.0.0:9090
cors_origins: []

View File

@ -1,68 +0,0 @@
log:
backend: "Stdout"
format: "Json"
level: "info"
cryptarchia:
config:
epoch_stake_distribution_stabilization: 3
epoch_period_nonce_buffer: 3
epoch_period_nonce_stabilization: 4
consensus_config:
security_param: 10
active_slot_coeff: 0.9
time:
slot_duration:
secs: 5
nanos: 0
chain_start_time: [2024, 115, 6, 45, 44, 159214915, 0, 0, 0]
coins:
genesis_state:
lead_commitments:
- 20345e93cc65057a391893cbd88d86568efd3073156564797e4a912e4ae1c3ab
- 1594ef82f13d0b64284a9134f2f2ed3b30bca26812a69886a3f9ed737f117bd5
- 76721421649fbf175aff27470e40f44ade69bac844abcf27215f5c0d79d2ec46
- 06f7f2078ba6b24af7c5aae6f24889f6c609195ad796fb11b42ad6e0a3f8c10f
spend_commitments:
- 20345e93cc65057a391893cbd88d86568efd3073156564797e4a912e4ae1c3ab
- 1594ef82f13d0b64284a9134f2f2ed3b30bca26812a69886a3f9ed737f117bd5
- 76721421649fbf175aff27470e40f44ade69bac844abcf27215f5c0d79d2ec46
- 06f7f2078ba6b24af7c5aae6f24889f6c609195ad796fb11b42ad6e0a3f8c10f
nullifiers: []
nonce: '0000000000000000000000000000000000000000000000000000000000000000'
slot: 0
next_epoch_state:
epoch: 1
nonce: '0000000000000000000000000000000000000000000000000000000000000000'
commitments: []
total_stake: 4
epoch_state:
epoch: 0
nonce: '0000000000000000000000000000000000000000000000000000000000000000'
commitments: []
total_stake: 4
network:
backend:
host: 0.0.0.0
port: 3000
log_level: "fatal"
node_key: "0000000000000000000000000000000000000000000000000000000000000001"
discV5BootstrapNodes: []
initial_peers: ["/dns/bootstrap/udp/3000/quic-v1"]
relayTopics: []
http:
backend_settings:
address: 0.0.0.0:18080
cors_origins: []
da:
da_protocol:
voter: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
num_attestations: 1
backend:
max_capacity: 10000
evicting_period:
secs: 3600
nanos: 0

View File

@ -1,22 +0,0 @@
#!/bin/sh
set -e
# This node id will be used to generate consensus node list.
tmp_node_id=0
# OVERLAY_NODES might be set in compose.yml.
tmp_overlay_nodes=$OVERLAY_NODES
# All spawned nodes should be added to consensus configuration.
for i in $(seq 1 $LIBP2P_REPLICAS); do
tmp_node_id=$((tmp_node_id + 1))
node_key=$(/etc/nomos/scripts/node_key_from_id.sh "$LIBP2P_NODE_MASK" "$tmp_node_id")
if [ -z "$tmp_overlay_nodes" ]; then
tmp_overlay_nodes=$node_key
else
tmp_overlay_nodes="${tmp_overlay_nodes},${node_key}"
fi
done
echo "${tmp_overlay_nodes}"

View File

@ -1,17 +0,0 @@
#!/bin/sh
set -e
if [ -z "$1" ] || [ -z "$2" ]; then
echo "Usage: $0 <libp2p_node_mask> <node_id>"
exit 1
fi
libp2p_node_mask=$1
node_id=$2
node_key_from_id() {
echo "${libp2p_node_mask}" | sed "s/.\{${#node_id}\}$/${node_id}/"
}
node_key_from_id

View File

@ -1,45 +0,0 @@
#!/bin/sh
# LIBP2P_NODE_MASK is set via compose.yml file.
node_key_from_id() {
echo "${LIBP2P_NODE_MASK}" | sed "s/.\{${#NODE_ID}\}$/${NODE_ID}/"
}
END=$LIBP2P_REPLICAS
NODE_ID=1
NODE_IP=$(hostname -i)
NODE_KEY=$(node_key_from_id)
register_node() {
## Conditional transaction to set node config key if it doesn't exist.
## Newlines in EOF block are important, more info here:
## https://github.com/etcd-io/etcd/tree/main/etcdctl#examples-3
etcdctl txn <<EOF
mod("/node/${NODE_ID}") = "0"
put /node/${NODE_ID} "${NODE_ID}"
put /config/node/${NODE_ID}/key "${NODE_KEY}"
put /config/node/${NODE_ID}/ip "${NODE_IP}"
EOF
}
while [ "${NODE_ID}" -le "${END}" ]; do
result=$(register_node)
# Check if the key was registered or already exists
if [ "${result}" != "FAILURE" ]; then
break
else
NODE_ID=$((NODE_ID + 1))
NODE_KEY=$(node_key_from_id)
fi
done
if [ "${NODE_ID}" -gt "${END}" ]; then
echo "Reached the limit without registering a ${NODE_ID}."
return 1
fi
echo "${NODE_KEY}"

View File

@ -1,28 +0,0 @@
#!/bin/sh
set -e
CONSENSUS_CHAIN_START=$(date +%s)
CONSENSUS_COIN_SK=$BOOTSTRAP_NODE_KEY
CONSENSUS_COIN_NONCE=$BOOTSTRAP_NODE_KEY
CONSENSUS_COIN_VALUE=1
DA_VOTER=$BOOTSTRAP_NODE_KEY
NET_NODE_KEY=$BOOTSTRAP_NODE_KEY
OVERLAY_NODES=$(/etc/nomos/scripts/consensus_node_list.sh)
export CONSENSUS_COIN_SK \
CONSENSUS_COIN_NONCE \
CONSENSUS_COIN_VALUE \
CONSENSUS_CHAIN_START \
DA_VOTER \
OVERLAY_NODES \
NET_NODE_KEY
echo "I am a container ${HOSTNAME} node ${NET_NODE_KEY}"
echo "CONSENSUS_COIN_SK: ${CONSENSUS_COIN_SK}"
echo "CONSENSUS_COIN_NONCE: ${CONSENSUS_COIN_NONCE}"
echo "CONSENSUS_COIN_VALUE: ${CONSENSUS_COIN_VALUE}"
echo "DA_VOTER: ${DA_VOTER}"
echo "OVERLAY_NODES: ${OVERLAY_NODES}"
exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml --with-metrics --log-backend gelf --log-addr graylog:12201

5
testnet/scripts/run_cfgsync.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/sh
set -e
exec /usr/bin/cfgsync-server /etc/nomos/cfgsync.yaml

View File

@ -1,9 +0,0 @@
#!/bin/sh
echo "I am a container ${HOSTNAME} bot"
while true
do
/usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yaml --node http://bootstrap:18080
sleep 10
done

View File

@ -2,43 +2,10 @@
set -e
# Set env variables for nomos-node.
NET_NODE_KEY=$(/etc/nomos/scripts/register_node.sh)
CONSENSUS_CHAIN_START=$(date +%s)
CONSENSUS_COIN_SK=$NET_NODE_KEY
CONSENSUS_COIN_NONCE=$NET_NODE_KEY
CONSENSUS_COIN_VALUE=1
DA_VOTER=$NET_NODE_KEY
OVERLAY_NODES=$(/etc/nomos/scripts/consensus_node_list.sh)
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="http://cfgsync:4400" \
CFG_HOST_IP=$(hostname -i) \
RISC0_DEV_MODE=true
node_ids=$(etcdctl get "/node/" --prefix --keys-only)
for node_id in $node_ids; do
node_key=$(etcdctl get "/config${node_id}/key" --print-value-only)
node_ip=$(etcdctl get "/config${node_id}/ip" --print-value-only)
node_multiaddr="/ip4/${node_ip}/udp/3000/quic-v1"
if [ -z "$NET_INITIAL_PEERS" ]; then
NET_INITIAL_PEERS=$node_multiaddr
else
NET_INITIAL_PEERS="${NET_INITIAL_PEERS},${node_multiaddr}"
fi
done
export CONSENSUS_COIN_SK \
CONSENSUS_COIN_NONCE \
CONSENSUS_COIN_VALUE \
CONSENSUS_CHAIN_START \
DA_VOTER \
OVERLAY_NODES \
NET_NODE_KEY \
NET_INITIAL_PEERS
echo "I am a container ${HOSTNAME} node ${NET_NODE_KEY}"
echo "CONSENSUS_COIN_SK: ${CONSENSUS_COIN_SK}"
echo "CONSENSUS_COIN_NONCE: ${CONSENSUS_COIN_NONCE}"
echo "CONSENSUS_COIN_VALUE: ${CONSENSUS_COIN_VALUE}"
echo "DA_VOTER: ${DA_VOTER}"
echo "OVERLAY_NODES: ${OVERLAY_NODES}"
echo "NET_INITIAL_PEERS: ${NET_INITIAL_PEERS}"
exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml --with-metrics --log-backend gelf --log-addr graylog:12201
/usr/bin/cfgsync-client && \
exec /usr/bin/nomos-node /config.yaml --with-metrics --log-backend gelf --log-addr graylog:12201

View File

@ -10,7 +10,7 @@ use std::time::Duration;
use std::{fmt::Debug, sync::Mutex};
//crates
use nomos_libp2p::{Multiaddr, PeerId, Swarm};
use nomos_libp2p::{Multiaddr, Swarm};
use nomos_node::Config;
use rand::{thread_rng, Rng};
@ -59,12 +59,8 @@ pub trait Node: Sized {
}
fn node_configs(config: SpawnConfig) -> Vec<Config> {
match config {
SpawnConfig::Star {
consensus,
da,
test,
} => {
let mut configs = Self::create_node_configs(consensus, da, test);
SpawnConfig::Star { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
let next_leader_config = configs.remove(0);
let first_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
@ -78,12 +74,8 @@ pub trait Node: Sized {
}
node_configs
}
SpawnConfig::Chain {
consensus,
da,
test,
} => {
let mut configs = Self::create_node_configs(consensus, da, test);
SpawnConfig::Chain { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
let next_leader_config = configs.remove(0);
let mut prev_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
@ -97,11 +89,7 @@ pub trait Node: Sized {
}
}
}
fn create_node_configs(
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
) -> Vec<Config>;
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config>;
async fn consensus_info(&self) -> Self::ConsensusInfo;
fn stop(&mut self);
}
@ -112,19 +100,17 @@ pub enum SpawnConfig {
Star {
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
},
// Chain topology: Every node is chained to the node next to it.
Chain {
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
},
}
impl SpawnConfig {
// Returns a SpawnConfig::Chain with proper configurations for happy-path tests
pub fn chain_happy(n_participants: usize, da: DaConfig, test: TestConfig) -> Self {
pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self {
Self::Chain {
consensus: ConsensusConfig {
n_participants,
@ -136,11 +122,10 @@ impl SpawnConfig {
active_slot_coeff: 0.9,
},
da,
test,
}
}
pub fn star_happy(n_participants: usize, da: DaConfig, test: TestConfig) -> Self {
pub fn star_happy(n_participants: usize, da: DaConfig) -> Self {
Self::Star {
consensus: ConsensusConfig {
n_participants,
@ -152,7 +137,6 @@ impl SpawnConfig {
active_slot_coeff: 0.9,
},
da,
test,
}
}
}
@ -175,7 +159,6 @@ pub struct ConsensusConfig {
pub struct DaConfig {
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub executor_peer_ids: Vec<PeerId>,
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
@ -188,7 +171,6 @@ impl Default for DaConfig {
Self {
subnetwork_size: 2,
dispersal_factor: 1,
executor_peer_ids: vec![],
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
@ -197,16 +179,3 @@ impl Default for DaConfig {
}
}
}
#[derive(Clone)]
pub struct TestConfig {
pub wait_online_secs: u64,
}
impl Default for TestConfig {
fn default() -> Self {
Self {
wait_online_secs: 10,
}
}
}

View File

@ -34,7 +34,7 @@ use nomos_mempool::MempoolMetrics;
#[cfg(feature = "mixnet")]
use nomos_network::backends::libp2p::mixnet::MixnetConfig;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::{api::AxumBackendSettings, Config, Tx};
use nomos_node::{api::backend::AxumBackendSettings, Config, Tx};
use nomos_storage::backends::rocksdb::RocksBackendSettings;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
@ -46,7 +46,7 @@ use tempfile::NamedTempFile;
use time::OffsetDateTime;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node, TestConfig};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node};
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const CRYPTARCHIA_INFO_API: &str = "cryptarchia/info";
@ -86,7 +86,6 @@ impl NomosNode {
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
let wait_online_secs = config.wait_online_secs;
// setup logging so that we can intercept it later in testing
config.log.backend = LoggerBackend::File {
@ -119,10 +118,9 @@ impl NomosNode {
_tempdir: dir,
config,
};
tokio::time::timeout(
adjust_timeout(Duration::from_secs(wait_online_secs)),
async { node.wait_online().await },
)
tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async {
node.wait_online().await
})
.await
.unwrap();
@ -266,11 +264,7 @@ impl Node for NomosNode {
/// so the leader can receive votes from all other nodes that will be subsequently spawned.
/// If not, the leader will miss votes from nodes spawned before itself.
/// This issue will be resolved by devising the block catch-up mechanism in the future.
fn create_node_configs(
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
) -> Vec<Config> {
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config> {
// we use the same random bytes for:
// * da id
// * coin sk
@ -329,7 +323,6 @@ impl Node for NomosNode {
vec![coin],
time_config.clone(),
da.clone(),
test.wait_online_secs,
#[cfg(feature = "mixnet")]
MixnetConfig {
mixclient: mixclient_config.clone(),
@ -341,8 +334,7 @@ impl Node for NomosNode {
// Build DA memberships and address lists.
let peer_addresses = build_da_peer_list(&configs);
let mut peer_ids = peer_addresses.iter().map(|(p, _)| *p).collect::<Vec<_>>();
peer_ids.extend(da.executor_peer_ids);
let peer_ids = peer_addresses.iter().map(|(p, _)| *p).collect::<Vec<_>>();
for config in &mut configs {
let membership =
@ -459,10 +451,10 @@ fn create_node_config(
notes: Vec<InputWitness>,
time: TimeConfig,
da_config: DaConfig,
wait_online_secs: u64,
#[cfg(feature = "mixnet")] mixnet_config: MixnetConfig,
) -> Config {
let swarm_config: SwarmConfig = Default::default();
let node_key = swarm_config.node_key.clone();
let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap();
let verifier_sk_bytes = verifier_sk.to_bytes();
@ -470,7 +462,7 @@ fn create_node_config(
let mut config = Config {
network: NetworkConfig {
backend: Libp2pConfig {
inner: swarm_config.clone(),
inner: swarm_config,
initial_peers: vec![],
#[cfg(feature = "mixnet")]
mixnet: mixnet_config,
@ -486,7 +478,7 @@ fn create_node_config(
},
da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings {
node_key: swarm_config.node_key,
node_key,
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
@ -538,7 +530,6 @@ fn create_node_config(
read_only: false,
column_family: Some("blocks".into()),
},
wait_online_secs,
};
config.network.backend.inner.port = get_available_port();

View File

@ -128,9 +128,6 @@ async fn disseminate_and_retrieve() {
num_subnets: 2,
..Default::default()
},
tests::TestConfig {
wait_online_secs: 50,
},
))
.await;

View File

@ -52,11 +52,6 @@ async fn happy_test(nodes: &[NomosNode]) {
#[tokio::test]
async fn two_nodes_happy() {
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
2,
Default::default(),
Default::default(),
))
.await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await;
happy_test(&nodes).await;
}