1
0
mirror of synced 2025-02-12 15:56:44 +00:00

DA: Use executor in tests (#818)

* Use executor in tests instead of nomos node

* Executor config from node config

* Bring generics to the testing game

* Fill in missing gaps in test

* Implement testnode wrapper

* Use sleep on dispersal service instead

* Fix cfgsync

* Clippy happy

* Clippy happy tests

* Mixnet config in tests for validator

* Tests: General config and multiple nodes (#832)

* Use executor in tests instead of nomos node

* Bring generics to the testing game

* Fill in missing gaps in test

* Clippy happy

* Mixnet config in tests for validator

* Derive different types of configs from general in tests

* Validator and executor in cfgsync

---------

Co-authored-by: danielSanchezQ <3danimanimal@gmail.com>

* Tests executor node mix config (#834)

* Merge branch 'master' into tests-executor-node-mix-config

* add mix configs

---------

Co-authored-by: danielSanchezQ <3danimanimal@gmail.com>
Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com>
This commit is contained in:
gusto 2024-10-21 15:38:56 +03:00 committed by GitHub
parent 83d9ef7738
commit 328398ca68
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 1380 additions and 877 deletions

View File

@ -2,7 +2,8 @@
// crates
use reqwest::{Client, ClientBuilder, StatusCode, Url};
// internal
use nomos_executor::api::paths;
use nomos_executor::api::{handlers::DispersalRequest, paths};
use serde::Serialize;
#[derive(thiserror::Error, Debug)]
pub enum Error {
@ -40,15 +41,20 @@ impl ExecutorHttpClient {
}
/// Send a `Blob` to be dispersed
pub async fn publish_blob(&self, blob: Vec<u8>) -> Result<(), Error> {
pub async fn publish_blob<Metadata: Serialize>(
&self,
data: Vec<u8>,
metadata: Metadata,
) -> Result<(), Error> {
let req = DispersalRequest { data, metadata };
let url = self
.executor_address
.join(paths::DA_ADD_BLOB)
.join(paths::DISPERSE_DATA)
.expect("Url should build properly");
let response = self
.client
.post(url)
.body(blob)
.json(&req)
.send()
.await
.map_err(Error::Request)?;

View File

@ -21,7 +21,17 @@ use nomos_mempool::backend::mockpool::MockPool;
use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_mix_service::MixService;
use nomos_node::*;
use nomos_node::DispersedBlobInfo;
use nomos_node::HeaderId;
use nomos_node::MempoolNetworkAdapter;
#[cfg(feature = "metrics")]
use nomos_node::Metrics;
use nomos_node::NetworkBackend;
use nomos_node::{
BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, Logger,
NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tx, TxMempool,
Wire, MB16,
};
use overwatch_derive::Services;
use overwatch_rs::services::handle::ServiceHandle;

View File

@ -178,7 +178,7 @@ where
pub async fn run(mut self) {
loop {
if let Some(event) = self.swarm.next().await {
debug!("Da swarm event received: {event:?}");
tracing::info!("Da swarm event received: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_behaviour_event(behaviour_event).await;

View File

@ -1,4 +1,5 @@
use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapter};
use std::time::Duration;
use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder};
use overwatch_rs::DynError;
@ -42,6 +43,8 @@ pub trait DispersalBackend {
) -> Result<(), DynError> {
let (blob_id, encoded_data) = self.encode(data).await?;
self.disperse(encoded_data).await?;
// let disperse and replication happen before pushing to mempool
tokio::time::sleep(Duration::from_secs(1)).await;
self.publish_to_mempool(blob_id, metadata).await?;
Ok(())
}

View File

@ -19,6 +19,7 @@ use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use std::time::Duration;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
@ -148,6 +149,7 @@ where
let task = overwatch_handle.runtime().spawn(executor_swarm.run());
std::thread::sleep(Duration::from_secs(1));
// open streams to dispersal peers
for peer_id in dispersal_peers.iter() {
executor_open_stream_sender.send(*peer_id).unwrap();

View File

@ -6,8 +6,10 @@ edition = "2021"
[dependencies]
axum = { version = "0.6" }
clap = { version = "4", features = ["derive"] }
nomos-executor = { path = "../../nodes/nomos-executor" }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-node = { path = "../../nodes/nomos-node" }
rand = "0.8"
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
tests = { path = "../../tests" }
tokio = { version = "1.24", features = ["rt-multi-thread"] }

View File

@ -1,7 +1,8 @@
// std
use std::{env, fs, net::Ipv4Addr, process};
// crates
use nomos_node::Config as NodeConfig;
use nomos_executor::config::Config as ExecutorConfig;
use nomos_node::Config as ValidatorConfig;
use reqwest::Client;
use serde::{de::DeserializeOwned, Serialize};
// internal
@ -57,9 +58,22 @@ async fn main() {
let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or("http://127.0.0.1:4400".to_string());
let ip = parse_ip(env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_string()));
let node_config_endpoint = format!("{}/node", server_addr);
let host_kind = env::var("CFG_HOST_KIND").unwrap_or_else(|_| "validator".to_string());
if let Err(err) = get_config::<NodeConfig>(ip, &node_config_endpoint, &config_file_path).await {
let node_config_endpoint = match host_kind.as_str() {
"executor" => format!("{}/executor", server_addr),
_ => format!("{}/validator", server_addr),
};
let config_result = match host_kind.as_str() {
"executor" => {
get_config::<ExecutorConfig>(ip, &node_config_endpoint, &config_file_path).await
}
_ => get_config::<ValidatorConfig>(ip, &node_config_endpoint, &config_file_path).await,
};
// Handle error if the config request fails
if let Err(err) = config_result {
eprintln!("Error: {}", err);
process::exit(1);
}

View File

@ -12,7 +12,10 @@ use cfgsync::config::Host;
use cfgsync::repo::{ConfigRepo, RepoResponse};
use clap::Parser;
use serde::{Deserialize, Serialize};
use tests::{ConsensusConfig, DaConfig};
use tests::nodes::executor::create_executor_config;
use tests::nodes::validator::create_validator_config;
use tests::topology::configs::consensus::ConsensusParams;
use tests::topology::configs::da::DaParams;
use tokio::sync::oneshot::channel;
// internal
@ -50,16 +53,16 @@ impl CfgSyncConfig {
.map_err(|err| format!("Failed to parse config file: {}", err))
}
fn to_consensus_config(&self) -> ConsensusConfig {
ConsensusConfig {
fn to_consensus_params(&self) -> ConsensusParams {
ConsensusParams {
n_participants: self.n_hosts,
security_param: self.security_param,
active_slot_coeff: self.active_slot_coeff,
}
}
fn to_da_config(&self) -> DaConfig {
DaConfig {
fn to_da_params(&self) -> DaParams {
DaParams {
subnetwork_size: self.subnetwork_size,
dispersal_factor: self.dispersal_factor,
num_samples: self.num_samples,
@ -76,21 +79,43 @@ struct ClientIp {
ip: Ipv4Addr,
}
async fn node_config(
async fn validator_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp { ip } = payload;
let (reply_tx, reply_rx) = channel();
config_repo.register(Host::default_node_from_ip(ip), reply_tx);
config_repo.register(Host::default_validator_from_ip(ip), reply_tx);
match reply_rx.await {
Ok(config_response) => match config_response {
RepoResponse::Config(config) => (StatusCode::OK, Json(config)).into_response(),
RepoResponse::Timeout => {
(StatusCode::REQUEST_TIMEOUT, Json(RepoResponse::Timeout)).into_response()
RepoResponse::Config(config) => {
let config = create_validator_config(*config);
(StatusCode::OK, Json(config)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
},
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
}
}
async fn executor_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp { ip } = payload;
let (reply_tx, reply_rx) = channel();
config_repo.register(Host::default_executor_from_ip(ip), reply_tx);
match reply_rx.await {
Ok(config_response) => match config_response {
RepoResponse::Config(config) => {
let config = create_executor_config(*config);
(StatusCode::OK, Json(config)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
},
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
}
@ -104,8 +129,8 @@ async fn main() {
eprintln!("{}", err);
process::exit(1);
});
let consensus_config = config.to_consensus_config();
let da_config = config.to_da_config();
let consensus_config = config.to_consensus_params();
let da_config = config.to_da_params();
let config_repo = ConfigRepo::new(
config.n_hosts,
@ -114,7 +139,8 @@ async fn main() {
Duration::from_secs(config.timeout),
);
let app = Router::new()
.route("/node", post(node_config))
.route("/validator", post(validator_config))
.route("/executor", post(executor_config))
.with_state(config_repo.clone());
println!("Server running on http://0.0.0.0:{}", config.port);

View File

@ -1,17 +1,25 @@
// std
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
// crates
use nomos_libp2p::{Multiaddr, PeerId};
use nomos_node::Config as NodeConfig;
use tests::{ConsensusConfig, DaConfig, Node, NomosNode};
use nomos_libp2p::{Multiaddr, PeerId, Protocol};
use rand::{thread_rng, Rng};
use tests::topology::configs::{
consensus::{create_consensus_configs, ConsensusParams},
da::{create_da_configs, DaParams},
mix::create_mix_configs,
network::create_network_configs,
GeneralConfig,
};
// internal
const DEFAULT_NETWORK_PORT: u16 = 3000;
const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000;
const DEFAULT_DA_NETWORK_PORT: u16 = 3300;
const DEFAULT_MIX_PORT: u16 = 3400;
#[derive(Eq, PartialEq, Hash, Clone)]
pub enum HostKind {
Nomos,
Validator,
Executor,
}
#[derive(Eq, PartialEq, Hash, Clone)]
@ -20,31 +28,53 @@ pub struct Host {
pub ip: Ipv4Addr,
pub network_port: u16,
pub da_network_port: u16,
pub mix_port: u16,
}
impl Host {
pub fn default_node_from_ip(ip: Ipv4Addr) -> Self {
pub fn default_validator_from_ip(ip: Ipv4Addr) -> Self {
Self {
kind: HostKind::Nomos,
kind: HostKind::Validator,
ip,
network_port: DEFAULT_NETWORK_PORT,
network_port: DEFAULT_LIBP2P_NETWORK_PORT,
da_network_port: DEFAULT_DA_NETWORK_PORT,
mix_port: DEFAULT_MIX_PORT,
}
}
pub fn default_executor_from_ip(ip: Ipv4Addr) -> Self {
Self {
kind: HostKind::Executor,
ip,
network_port: DEFAULT_LIBP2P_NETWORK_PORT,
da_network_port: DEFAULT_DA_NETWORK_PORT,
mix_port: DEFAULT_MIX_PORT,
}
}
}
pub fn create_node_configs(
consensus: ConsensusConfig,
da: DaConfig,
consensus_params: ConsensusParams,
da_params: DaParams,
hosts: Vec<Host>,
) -> HashMap<Host, NodeConfig> {
let mut configs = NomosNode::create_node_configs(consensus, da);
) -> HashMap<Host, GeneralConfig> {
let mut ids = vec![[0; 32]; consensus_params.n_participants];
for id in &mut ids {
thread_rng().fill(id);
}
let consensus_configs = create_consensus_configs(&ids, consensus_params);
let da_configs = create_da_configs(&ids, da_params);
let network_configs = create_network_configs(&ids, Default::default());
let mix_configs = create_mix_configs(&ids);
let mut configured_hosts = HashMap::new();
// Rebuild DA address lists.
let peer_addresses = configs[0].da_network.backend.addresses.clone();
let peer_addresses = da_configs[0].addresses.clone();
let host_network_init_peers = update_network_init_peers(hosts.clone());
let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses);
let host_mix_membership =
update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone());
let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses
.clone()
@ -52,22 +82,39 @@ pub fn create_node_configs(
.map(|(peer_id, (multiaddr, _))| (peer_id, multiaddr))
.collect();
for (config, host) in configs.iter_mut().zip(hosts.into_iter()) {
config.da_network.backend.addresses = new_peer_addresses.clone();
// Libp2p network config.
config.network.backend.inner.host = Ipv4Addr::from_str("0.0.0.0").unwrap();
config.network.backend.inner.port = host.network_port;
config.network.backend.initial_peers = host_network_init_peers.clone();
for (i, host) in hosts.into_iter().enumerate() {
let consensus_config = consensus_configs[i].to_owned();
// DA Libp2p network config.
config.da_network.backend.listening_address = Multiaddr::from_str(&format!(
let mut da_config = da_configs[i].to_owned();
da_config.addresses = new_peer_addresses.clone();
da_config.listening_address = Multiaddr::from_str(&format!(
"/ip4/0.0.0.0/udp/{}/quic-v1",
host.da_network_port,
))
.unwrap();
configured_hosts.insert(host.clone(), config.clone());
// Libp2p network config.
let mut network_config = network_configs[i].to_owned();
network_config.swarm_config.host = Ipv4Addr::from_str("0.0.0.0").unwrap();
network_config.swarm_config.port = host.network_port;
network_config.initial_peers = host_network_init_peers.clone();
// Mix config.
let mut mix_config = mix_configs[i].to_owned();
mix_config.backend.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap();
mix_config.backend.membership = host_mix_membership.clone();
configured_hosts.insert(
host.clone(),
GeneralConfig {
consensus_config,
da_config,
network_config,
mix_config,
},
);
}
configured_hosts
@ -99,13 +146,40 @@ fn update_da_peer_addresses(
.collect()
}
fn update_mix_membership(hosts: Vec<Host>, membership: Vec<Multiaddr>) -> Vec<Multiaddr> {
membership
.into_iter()
.zip(hosts)
.map(|(addr, host)| {
Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1/p2p/{}",
host.ip,
host.mix_port,
extract_peer_id(&addr).unwrap(),
))
.unwrap()
})
.collect()
}
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
multiaddr.iter().find_map(|protocol| {
if let Protocol::P2p(peer_id) = protocol {
Some(peer_id)
} else {
None
}
})
}
#[cfg(test)]
mod cfgsync_tests {
use std::str::FromStr;
use std::{net::Ipv4Addr, time::Duration};
use nomos_libp2p::Protocol;
use tests::{ConsensusConfig, DaConfig};
use nomos_libp2p::{Multiaddr, Protocol};
use tests::topology::configs::consensus::ConsensusParams;
use tests::topology::configs::da::DaParams;
use super::{create_node_configs, Host, HostKind};
@ -113,20 +187,21 @@ mod cfgsync_tests {
fn basic_ip_list() {
let hosts = (0..10)
.map(|i| Host {
kind: HostKind::Nomos,
kind: HostKind::Validator,
ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(),
network_port: 3000,
da_network_port: 4044,
mix_port: 5000,
})
.collect();
let configs = create_node_configs(
ConsensusConfig {
ConsensusParams {
n_participants: 10,
security_param: 10,
active_slot_coeff: 0.9,
},
DaConfig {
DaParams {
subnetwork_size: 2,
dispersal_factor: 1,
num_samples: 1,
@ -139,19 +214,23 @@ mod cfgsync_tests {
);
for (host, config) in configs.iter() {
let network_port = config.network.backend.inner.port;
let da_network_addr = config.da_network.backend.listening_address.clone();
let da_network_port = da_network_addr
.iter()
.find_map(|protocol| match protocol {
Protocol::Udp(port) => Some(port),
_ => None,
})
.unwrap();
let network_port = config.network_config.swarm_config.port;
let da_network_port = extract_port(&config.da_config.listening_address);
let mix_port = extract_port(&config.mix_config.backend.listening_address);
assert_eq!(network_port, host.network_port);
assert_eq!(da_network_port, host.da_network_port);
assert_eq!(mix_port, host.mix_port);
}
}
fn extract_port(multiaddr: &Multiaddr) -> u16 {
multiaddr
.iter()
.find_map(|protocol| match protocol {
Protocol::Udp(port) => Some(port),
_ => None,
})
.unwrap()
}
}

View File

@ -3,41 +3,40 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
// crates
use nomos_node::Config as NodeConfig;
use serde::{Deserialize, Serialize};
use tests::{ConsensusConfig, DaConfig};
use tests::topology::configs::consensus::ConsensusParams;
use tests::topology::configs::da::DaParams;
use tests::topology::configs::GeneralConfig;
use tokio::sync::oneshot::Sender;
use tokio::time::timeout;
// internal
use crate::config::{create_node_configs, Host};
#[derive(Serialize, Deserialize)]
pub enum RepoResponse {
Config(NodeConfig),
Config(Box<GeneralConfig>),
Timeout,
}
pub struct ConfigRepo {
waiting_hosts: Mutex<HashMap<Host, Sender<RepoResponse>>>,
n_hosts: usize,
consensus_config: ConsensusConfig,
da_config: DaConfig,
consensus_params: ConsensusParams,
da_params: DaParams,
timeout_duration: Duration,
}
impl ConfigRepo {
pub fn new(
n_hosts: usize,
consensus_config: ConsensusConfig,
da_config: DaConfig,
consensus_params: ConsensusParams,
da_params: DaParams,
timeout_duration: Duration,
) -> Arc<Self> {
let repo = Arc::new(Self {
waiting_hosts: Mutex::new(HashMap::new()),
n_hosts,
consensus_config,
da_config,
consensus_params,
da_params,
timeout_duration,
});
@ -69,14 +68,14 @@ impl ConfigRepo {
.collect();
let configs = create_node_configs(
self.consensus_config.clone(),
self.da_config.clone(),
self.consensus_params.clone(),
self.da_params.clone(),
hosts,
);
for (host, sender) in waiting_hosts.drain() {
let config = configs.get(&host).expect("host should have a config");
let _ = sender.send(RepoResponse::Config(config.to_owned()));
let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned())));
}
}
Err(_) => {

View File

@ -6,7 +6,9 @@ publish = false
[dependencies]
blst = { version = "0.3.11" }
executor-http-client = { path = "../clients/executor-http-client" }
nomos-node = { path = "../nodes/nomos-node", default-features = false }
nomos-executor = { path = "../nodes/nomos-executor", default-features = false }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] }
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
@ -19,6 +21,7 @@ cryptarchia-ledger = { path = "../ledger/cryptarchia-ledger", features = ["serde
cl = { path = "../cl/cl" }
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-da-network-service = { path = "../nomos-services/data-availability/network" }
nomos-da-dispersal = { path = "../nomos-services/data-availability/dispersal" }
nomos-da-indexer = { path = "../nomos-services/data-availability/indexer" }
nomos-da-verifier = { path = "../nomos-services/data-availability/verifier" }
nomos-da-sampling = { path = "../nomos-services/data-availability/sampling" }
@ -30,7 +33,7 @@ kzgrs-backend = { path = "../nomos-da/kzgrs-backend" }
rand = "0.8"
once_cell = "1"
secp256k1 = { version = "0.26", features = ["rand"] }
reqwest = { version = "0.11", features = ["json"] }
reqwest = { version = "0.12", features = ["json"] }
nomos-libp2p = { path = "../nomos-libp2p" }
tempfile = "3.6"
serde = { version = "1", features = ["derive"] }
@ -44,14 +47,15 @@ ntest = "0.9.0"
criterion = { version = "0.5", features = ["async_tokio"] }
nomos-cli = { path = "../nomos-cli" }
time = "0.3"
tracing = "0.1"
[[test]]
name = "test_cryptarchia_happy_path"
path = "src/tests/cryptarchia/happy.rs"
[[test]]
name = "test_cli"
path = "src/tests/cli.rs"
name = "test_da"
path = "src/tests/da.rs"
[features]
metrics = ["nomos-node/metrics"]

View File

@ -1,17 +1,16 @@
pub mod nodes;
pub use nodes::NomosNode;
use once_cell::sync::Lazy;
pub mod topology;
// std
use std::env;
use std::net::TcpListener;
use std::ops::Mul;
use std::sync::Mutex;
use std::time::Duration;
use std::{fmt::Debug, sync::Mutex};
//crates
use nomos_libp2p::{Multiaddr, Swarm};
use nomos_node::Config;
use nomos_libp2p::{Multiaddr, PeerId, Swarm};
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
static NET_PORT: Lazy<Mutex<u16>> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000..10000)));
@ -46,136 +45,14 @@ pub fn adjust_timeout(d: Duration) -> Duration {
}
}
#[async_trait::async_trait]
pub trait Node: Sized {
type ConsensusInfo: Debug + Clone + PartialEq;
async fn spawn(mut config: Config) -> Self;
async fn spawn_nodes(config: SpawnConfig) -> Vec<Self> {
let mut nodes = Vec::new();
for conf in Self::node_configs(config) {
nodes.push(Self::spawn(conf).await);
}
nodes
}
fn node_configs(config: SpawnConfig) -> Vec<Config> {
match config {
SpawnConfig::Star { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
let next_leader_config = configs.remove(0);
let first_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
for mut conf in configs {
conf.network
.backend
.initial_peers
.push(first_node_addr.clone());
node_configs.push(conf);
}
node_configs
}
SpawnConfig::Chain { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
let next_leader_config = configs.remove(0);
let mut prev_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
for mut conf in configs {
conf.network.backend.initial_peers.push(prev_node_addr);
prev_node_addr = node_address(&conf);
node_configs.push(conf);
}
node_configs
}
}
}
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config>;
async fn consensus_info(&self) -> Self::ConsensusInfo;
fn stop(&mut self);
fn node_address_from_port(port: u16) -> Multiaddr {
Swarm::multiaddr(std::net::Ipv4Addr::new(127, 0, 0, 1), port)
}
#[derive(Clone)]
pub enum SpawnConfig {
// Star topology: Every node is initially connected to a single node.
Star {
consensus: ConsensusConfig,
da: DaConfig,
},
// Chain topology: Every node is chained to the node next to it.
Chain {
consensus: ConsensusConfig,
da: DaConfig,
},
}
impl SpawnConfig {
// Returns a SpawnConfig::Chain with proper configurations for happy-path tests
pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self {
Self::Chain {
consensus: ConsensusConfig {
n_participants,
// by setting the active slot coeff close to 1, we also increase the probability of multiple blocks (forks)
// being produced in the same slot (epoch). Setting the security parameter to some value > 1
// ensures nodes have some time to sync before deciding on the longest chain.
security_param: 10,
// a block should be produced (on average) every slot
active_slot_coeff: 0.9,
},
da,
}
}
pub fn star_happy(n_participants: usize, da: DaConfig) -> Self {
Self::Star {
consensus: ConsensusConfig {
n_participants,
// by setting the slot coeff to 1, we also increase the probability of multiple blocks (forks)
// being produced in the same slot (epoch). Setting the security parameter to some value > 1
// ensures nodes have some time to sync before deciding on the longest chain.
security_param: 10,
// a block should be produced (on average) every slot
active_slot_coeff: 0.9,
},
da,
}
}
}
fn node_address(config: &Config) -> Multiaddr {
Swarm::multiaddr(
std::net::Ipv4Addr::new(127, 0, 0, 1),
config.network.backend.inner.port,
fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId {
PeerId::from_public_key(
&nomos_libp2p::ed25519::Keypair::from(node_key)
.public()
.into(),
)
}
#[derive(Clone)]
pub struct ConsensusConfig {
pub n_participants: usize,
pub security_param: u32,
pub active_slot_coeff: f64,
}
#[derive(Clone)]
pub struct DaConfig {
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
pub global_params_path: String,
}
impl Default for DaConfig {
fn default() -> Self {
Self {
subnetwork_size: 2,
dispersal_factor: 1,
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(u64::MAX),
global_params_path: GLOBAL_PARAMS_PATH.to_string(),
}
}
}

237
tests/src/nodes/executor.rs Normal file
View File

@ -0,0 +1,237 @@
use std::ops::Range;
use std::process::{Command, Stdio};
use std::time::Duration;
use std::{net::SocketAddr, process::Child};
use cryptarchia_consensus::CryptarchiaSettings;
use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings};
use nomos_da_dispersal::DispersalServiceSettings;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_da_network_service::{
backends::libp2p::executor::DaNetworkExecutorBackendSettings, NetworkConfig as DaNetworkConfig,
};
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_executor::api::backend::AxumBackendSettings;
use nomos_executor::config::Config;
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
use nomos_node::RocksBackendSettings;
use tempfile::NamedTempFile;
use crate::nodes::LOGS_PREFIX;
use crate::topology::configs::GeneralConfig;
use crate::{adjust_timeout, get_available_port};
use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT};
const BIN_PATH: &str = "../target/debug/nomos-executor";
pub struct Executor {
addr: SocketAddr,
tempdir: tempfile::TempDir,
child: Child,
config: Config,
}
impl Drop for Executor {
fn drop(&mut self) {
if std::thread::panicking() {
if let Err(e) = persist_tempdir(&mut self.tempdir, "nomos-executor") {
println!("failed to persist tempdir: {e}");
}
}
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
impl Executor {
pub async fn spawn(mut config: Config) -> Self {
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
// setup logging so that we can intercept it later in testing
config.log.backend = LoggerBackend::File {
directory: dir.path().to_owned(),
prefix: Some(LOGS_PREFIX.into()),
};
config.log.format = LoggerFormat::Json;
config.storage.db_path = dir.path().join("db");
config
.da_sampling
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config
.da_verifier
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config.da_indexer.storage.blob_storage_directory = dir.path().to_owned();
serde_yaml::to_writer(&mut file, &config).unwrap();
let child = Command::new(std::env::current_dir().unwrap().join(BIN_PATH))
.arg(&config_path)
.current_dir(dir.path())
.stdout(Stdio::inherit())
.spawn()
.unwrap();
let node = Self {
addr: config.http.backend_settings.address,
child,
tempdir: dir,
config,
};
tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async {
node.wait_online().await
})
.await
.unwrap();
node
}
pub async fn get_indexer_range(
&self,
app_id: [u8; 32],
range: Range<[u8; 8]>,
) -> Vec<([u8; 8], Vec<Vec<u8>>)> {
CLIENT
.post(format!("http://{}{}", self.addr, DA_GET_RANGE))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap())
.send()
.await
.unwrap()
.json::<Vec<([u8; 8], Vec<Vec<u8>>)>>()
.await
.unwrap()
}
async fn wait_online(&self) {
loop {
let res = self.get(CL_METRICS).await;
if res.is_ok() && res.unwrap().status().is_success() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn get(&self, path: &str) -> reqwest::Result<reqwest::Response> {
CLIENT
.get(format!("http://{}{}", self.addr, path))
.send()
.await
}
pub fn config(&self) -> &Config {
&self.config
}
}
pub fn create_executor_config(config: GeneralConfig) -> Config {
Config {
network: NetworkConfig {
backend: Libp2pConfig {
inner: config.network_config.swarm_config,
initial_peers: config.network_config.initial_peers,
},
},
mix: nomos_mix_service::MixConfig {
backend: config.mix_config.backend,
},
cryptarchia: CryptarchiaSettings {
notes: config.consensus_config.notes,
config: config.consensus_config.ledger_config,
genesis_state: config.consensus_config.genesis_state,
time: config.consensus_config.time,
transaction_selector_settings: (),
blob_selector_settings: (),
network_adapter_settings:
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
mix_adapter_settings:
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings {
broadcast_settings:
nomos_mix_service::network::libp2p::Libp2pBroadcastSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
},
},
da_network: DaNetworkConfig {
backend: DaNetworkExecutorBackendSettings {
validator_settings: DaNetworkBackendSettings {
node_key: config.da_config.node_key,
membership: config.da_config.membership,
addresses: config.da_config.addresses,
listening_address: config.da_config.listening_address,
},
num_subnets: config.da_config.num_subnets,
},
},
da_indexer: IndexerSettings {
storage: IndexerStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
},
da_verifier: DaVerifierServiceSettings {
verifier_settings: KzgrsDaVerifierSettings {
sk: config.da_config.verifier_sk,
index: config.da_config.verifier_index,
global_params_path: config.da_config.global_params_path.clone(),
},
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
},
log: Default::default(),
http: nomos_api::ApiServiceSettings {
backend_settings: AxumBackendSettings {
address: format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap(),
cors_origins: vec![],
},
},
da_sampling: DaSamplingServiceSettings {
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: config.da_config.num_samples,
num_subnets: config.da_config.num_subnets,
old_blobs_check_interval: config.da_config.old_blobs_check_interval,
blobs_validity_duration: config.da_config.blobs_validity_duration,
},
storage_adapter_settings: SamplingStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
network_adapter_settings: (),
},
storage: RocksBackendSettings {
db_path: "./db".into(),
read_only: false,
column_family: Some("blocks".into()),
},
da_dispersal: DispersalServiceSettings {
backend: DispersalKZGRSBackendSettings {
encoder_settings: EncoderSettings {
num_columns: config.da_config.num_subnets as usize,
with_cache: false,
global_params_path: config.da_config.global_params_path,
},
dispersal_timeout: Duration::from_secs(u64::MAX),
},
},
}
}

View File

@ -1,9 +1,15 @@
pub mod nomos;
pub use nomos::NomosNode;
pub mod executor;
pub mod validator;
use std::ops::Range;
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tempfile::TempDir;
const LOGS_PREFIX: &str = "__logs";
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
fn create_tempdir() -> std::io::Result<TempDir> {
// It's easier to use the current location instead of OS-default tempfile location
@ -24,3 +30,9 @@ fn persist_tempdir(tempdir: &mut TempDir, label: &str) -> std::io::Result<()> {
let _ = dir.into_path();
Ok(())
}
#[derive(Serialize, Deserialize)]
struct GetRangeReq {
pub app_id: [u8; 32],
pub range: Range<[u8; 8]>,
}

View File

@ -1,499 +0,0 @@
// std
use std::net::SocketAddr;
use std::ops::Range;
use std::process::{Child, Command, Stdio};
use std::str::FromStr;
use std::time::Duration;
// crates
use blst::min_sig::SecretKey;
use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig};
use cryptarchia_ledger::LedgerState;
use kzgrs_backend::dispersal::BlobInfo;
use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT};
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_libp2p::{ed25519, Multiaddr, PeerId, SwarmConfig};
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{
CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
};
use nomos_node::{api::backend::AxumBackendSettings, Config, Tx};
use nomos_storage::backends::rocksdb::RocksBackendSettings;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use subnetworks_assignations::MembershipHandler;
use tempfile::NamedTempFile;
use time::OffsetDateTime;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node};
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const DEFAULT_SLOT_TIME: u64 = 2;
const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME";
pub struct NomosNode {
addr: SocketAddr,
_tempdir: tempfile::TempDir,
child: Child,
config: Config,
}
impl Drop for NomosNode {
fn drop(&mut self) {
if std::thread::panicking() {
if let Err(e) = persist_tempdir(&mut self._tempdir, "nomos-node") {
println!("failed to persist tempdir: {e}");
}
}
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
impl NomosNode {
pub async fn spawn_inner(mut config: Config) -> Self {
// Waku stores the messages in a db file in the current dir, we need a different
// directory for each node to avoid conflicts
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
// setup logging so that we can intercept it later in testing
config.log.backend = LoggerBackend::File {
directory: dir.path().to_owned(),
prefix: Some(LOGS_PREFIX.into()),
};
config.log.format = LoggerFormat::Json;
config.storage.db_path = dir.path().join("db");
config
.da_sampling
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config
.da_verifier
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config.da_indexer.storage.blob_storage_directory = dir.path().to_owned();
serde_yaml::to_writer(&mut file, &config).unwrap();
let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN))
.arg(&config_path)
.current_dir(dir.path())
.stdout(Stdio::inherit())
.spawn()
.unwrap();
let node = Self {
addr: config.http.backend_settings.address,
child,
_tempdir: dir,
config,
};
tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async {
node.wait_online().await
})
.await
.unwrap();
node
}
async fn get(&self, path: &str) -> reqwest::Result<reqwest::Response> {
CLIENT
.get(format!("http://{}{}", self.addr, path))
.send()
.await
}
pub fn url(&self) -> Url {
format!("http://{}", self.addr).parse().unwrap()
}
async fn wait_online(&self) {
loop {
let res = self.get(CL_METRICS).await;
if res.is_ok() && res.unwrap().status().is_success() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, BlobInfo>> {
CLIENT
.post(format!("http://{}{}", self.addr, STORAGE_BLOCK))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&id).unwrap())
.send()
.await
.unwrap()
.json::<Option<Block<Tx, BlobInfo>>>()
.await
.unwrap()
}
pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics {
let discr = match pool {
Pool::Cl => "cl",
Pool::Da => "da",
};
let addr = format!("/{}/metrics", discr);
let res = self
.get(&addr)
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
MempoolMetrics {
pending_items: res["pending_items"].as_u64().unwrap() as usize,
last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(),
}
}
pub async fn get_indexer_range(
&self,
app_id: [u8; 32],
range: Range<[u8; 8]>,
) -> Vec<([u8; 8], Vec<Vec<u8>>)> {
CLIENT
.post(format!("http://{}{}", self.addr, DA_GET_RANGE))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap())
.send()
.await
.unwrap()
.json::<Vec<([u8; 8], Vec<Vec<u8>>)>>()
.await
.unwrap()
}
// not async so that we can use this in `Drop`
pub fn get_logs_from_file(&self) -> String {
println!(
"fetching logs from dir {}...",
self._tempdir.path().display()
);
// std::thread::sleep(std::time::Duration::from_secs(50));
std::fs::read_dir(self._tempdir.path())
.unwrap()
.filter_map(|entry| {
let entry = entry.unwrap();
let path = entry.path();
if path.is_file() && path.to_str().unwrap().contains(LOGS_PREFIX) {
Some(path)
} else {
None
}
})
.map(|f| std::fs::read_to_string(f).unwrap())
.collect::<String>()
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn get_headers(&self, from: Option<HeaderId>, to: Option<HeaderId>) -> Vec<HeaderId> {
let mut req = CLIENT.get(format!("http://{}{}", self.addr, CRYPTARCHIA_HEADERS));
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
let res = req.send().await;
println!("res: {res:?}");
res.unwrap().json::<Vec<HeaderId>>().await.unwrap()
}
}
#[async_trait::async_trait]
impl Node for NomosNode {
type ConsensusInfo = CryptarchiaInfo;
async fn spawn(config: Config) -> Self {
Self::spawn_inner(config).await
}
async fn consensus_info(&self) -> Self::ConsensusInfo {
let res = self.get(CRYPTARCHIA_INFO).await;
println!("{:?}", res);
res.unwrap().json().await.unwrap()
}
fn stop(&mut self) {
self.child.kill().unwrap();
}
/// Depending on the network topology, the next leader must be spawned first,
/// so the leader can receive votes from all other nodes that will be subsequently spawned.
/// If not, the leader will miss votes from nodes spawned before itself.
/// This issue will be resolved by devising the block catch-up mechanism in the future.
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config> {
// we use the same random bytes for:
// * da id
// * coin sk
// * coin nonce
let mut ids = vec![[0; 32]; consensus.n_participants];
for id in &mut ids {
thread_rng().fill(id);
}
let notes = ids
.iter()
.map(|&id| {
let mut sk = [0; 16];
sk.copy_from_slice(&id[0..16]);
InputWitness::new(
NoteWitness::basic(1, NMO_UNIT, &mut thread_rng()),
NullifierSecret(sk),
)
})
.collect::<Vec<_>>();
// no commitments for now, proofs are not checked anyway
let genesis_state = LedgerState::from_commitments(
notes.iter().map(|n| n.note_commitment()),
(ids.len() as u32).into(),
);
let ledger_config = cryptarchia_ledger::Config {
epoch_stake_distribution_stabilization: 3,
epoch_period_nonce_buffer: 3,
epoch_period_nonce_stabilization: 4,
consensus_config: cryptarchia_engine::Config {
security_param: consensus.security_param,
active_slot_coeff: consensus.active_slot_coeff,
},
};
let slot_duration = std::env::var(CONSENSUS_SLOT_TIME_VAR)
.map(|s| <u64>::from_str(&s).unwrap())
.unwrap_or(DEFAULT_SLOT_TIME);
let time_config = TimeConfig {
slot_duration: Duration::from_secs(slot_duration),
chain_start_time: OffsetDateTime::now_utc(),
};
#[allow(unused_mut, unused_variables)]
let mut configs = ids
.into_iter()
.zip(notes)
.enumerate()
.map(|(i, (da_id, coin))| {
create_node_config(
da_id,
genesis_state.clone(),
ledger_config.clone(),
vec![coin],
time_config.clone(),
da.clone(),
)
})
.collect::<Vec<_>>();
// Build Mix membership
let mix_addresses = build_mix_peer_list(&configs);
for config in &mut configs {
config.mix.backend.membership = mix_addresses.clone();
}
// Build DA memberships and address lists.
let peer_addresses = build_da_peer_list(&configs);
let peer_ids = peer_addresses.iter().map(|(p, _)| *p).collect::<Vec<_>>();
for config in &mut configs {
let membership =
FillFromNodeList::new(&peer_ids, da.subnetwork_size, da.dispersal_factor);
let local_peer_id = secret_key_to_peer_id(config.da_network.backend.node_key.clone());
let subnetwork_ids = membership.membership(&local_peer_id);
config.da_verifier.verifier_settings.index = subnetwork_ids;
config.da_network.backend.membership = membership;
config.da_network.backend.addresses = peer_addresses.iter().cloned().collect();
}
configs
}
}
pub enum Pool {
Da,
Cl,
}
#[derive(Serialize, Deserialize)]
struct GetRangeReq {
pub app_id: [u8; 32],
pub range: Range<[u8; 8]>,
}
fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId {
PeerId::from_public_key(
&nomos_libp2p::ed25519::Keypair::from(node_key)
.public()
.into(),
)
}
fn build_da_peer_list(configs: &[Config]) -> Vec<(PeerId, Multiaddr)> {
configs
.iter()
.map(|c| {
(
secret_key_to_peer_id(c.da_network.backend.node_key.clone()),
c.da_network.backend.listening_address.clone(),
)
})
.collect()
}
fn build_mix_peer_list(configs: &[Config]) -> Vec<Multiaddr> {
configs
.iter()
.map(|c| {
let peer_id = secret_key_to_peer_id(c.mix.backend.node_key.clone());
c.mix
.backend
.listening_address
.clone()
.with_p2p(peer_id)
.unwrap_or_else(|orig_addr| orig_addr)
})
.collect()
}
#[allow(clippy::too_many_arguments)]
fn create_node_config(
id: [u8; 32],
genesis_state: LedgerState,
config: cryptarchia_ledger::Config,
notes: Vec<InputWitness>,
time: TimeConfig,
da_config: DaConfig,
) -> Config {
let swarm_config: SwarmConfig = Default::default();
let node_key = swarm_config.node_key.clone();
let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap();
let verifier_sk_bytes = verifier_sk.to_bytes();
let mut config = Config {
network: NetworkConfig {
backend: Libp2pConfig {
inner: swarm_config,
initial_peers: vec![],
},
},
mix: nomos_mix_service::MixConfig {
backend: Libp2pMixBackendSettings {
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
))
.unwrap(),
node_key: ed25519::SecretKey::generate(),
membership: Vec::new(),
peering_degree: 1,
num_mix_layers: 1,
},
},
cryptarchia: CryptarchiaSettings {
notes,
config,
genesis_state,
time,
transaction_selector_settings: (),
blob_selector_settings: (),
network_adapter_settings:
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
mix_adapter_settings:
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings {
broadcast_settings:
nomos_mix_service::network::libp2p::Libp2pBroadcastSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
},
},
da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings {
node_key,
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
))
.unwrap(),
addresses: Default::default(),
membership: Default::default(),
},
},
da_indexer: IndexerSettings {
storage: IndexerStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
},
da_verifier: DaVerifierServiceSettings {
verifier_settings: KzgrsDaVerifierSettings {
sk: hex::encode(verifier_sk_bytes),
index: Default::default(),
global_params_path: da_config.global_params_path,
},
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
},
log: Default::default(),
http: nomos_api::ApiServiceSettings {
backend_settings: AxumBackendSettings {
address: format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap(),
cors_origins: vec![],
},
},
da_sampling: DaSamplingServiceSettings {
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: da_config.num_samples,
num_subnets: da_config.num_subnets,
old_blobs_check_interval: da_config.old_blobs_check_interval,
blobs_validity_duration: da_config.blobs_validity_duration,
},
storage_adapter_settings: SamplingStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
network_adapter_settings: (),
},
storage: RocksBackendSettings {
db_path: "./db".into(),
read_only: false,
column_family: Some("blocks".into()),
},
};
config.network.backend.inner.port = get_available_port();
config
}

View File

@ -0,0 +1,309 @@
use std::ops::Range;
use std::process::{Command, Stdio};
use std::time::Duration;
use std::{net::SocketAddr, process::Child};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings};
use nomos_core::block::Block;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings;
use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSamplingServiceSettings};
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings};
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{
CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
};
use nomos_node::{api::backend::AxumBackendSettings, Config, RocksBackendSettings};
use nomos_node::{BlobInfo, HeaderId, Tx};
use reqwest::Url;
use tempfile::NamedTempFile;
use crate::nodes::LOGS_PREFIX;
use crate::topology::configs::GeneralConfig;
use crate::{adjust_timeout, get_available_port};
use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT};
const BIN_PATH: &str = "../target/debug/nomos-node";
pub enum Pool {
Da,
Cl,
}
pub struct Validator {
addr: SocketAddr,
tempdir: tempfile::TempDir,
child: Child,
config: Config,
}
impl Drop for Validator {
fn drop(&mut self) {
if std::thread::panicking() {
if let Err(e) = persist_tempdir(&mut self.tempdir, "nomos-node") {
println!("failed to persist tempdir: {e}");
}
}
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
impl Validator {
pub async fn spawn(mut config: Config) -> Self {
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
// setup logging so that we can intercept it later in testing
config.log.backend = LoggerBackend::File {
directory: dir.path().to_owned(),
prefix: Some(LOGS_PREFIX.into()),
};
config.log.format = LoggerFormat::Json;
config.storage.db_path = dir.path().join("db");
config
.da_sampling
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config
.da_verifier
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config.da_indexer.storage.blob_storage_directory = dir.path().to_owned();
serde_yaml::to_writer(&mut file, &config).unwrap();
let child = Command::new(std::env::current_dir().unwrap().join(BIN_PATH))
.arg(&config_path)
.current_dir(dir.path())
.stdout(Stdio::inherit())
.spawn()
.unwrap();
let node = Self {
addr: config.http.backend_settings.address,
child,
tempdir: dir,
config,
};
tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async {
node.wait_online().await
})
.await
.unwrap();
node
}
async fn get(&self, path: &str) -> reqwest::Result<reqwest::Response> {
CLIENT
.get(format!("http://{}{}", self.addr, path))
.send()
.await
}
pub fn url(&self) -> Url {
format!("http://{}", self.addr).parse().unwrap()
}
async fn wait_online(&self) {
loop {
let res = self.get(CL_METRICS).await;
if res.is_ok() && res.unwrap().status().is_success() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, BlobInfo>> {
CLIENT
.post(format!("http://{}{}", self.addr, STORAGE_BLOCK))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&id).unwrap())
.send()
.await
.unwrap()
.json::<Option<Block<Tx, BlobInfo>>>()
.await
.unwrap()
}
pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics {
let discr = match pool {
Pool::Cl => "cl",
Pool::Da => "da",
};
let addr = format!("/{}/metrics", discr);
let res = self
.get(&addr)
.await
.unwrap()
.json::<serde_json::Value>()
.await
.unwrap();
MempoolMetrics {
pending_items: res["pending_items"].as_u64().unwrap() as usize,
last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(),
}
}
pub async fn get_indexer_range(
&self,
app_id: [u8; 32],
range: Range<[u8; 8]>,
) -> Vec<([u8; 8], Vec<Vec<u8>>)> {
CLIENT
.post(format!("http://{}{}", self.addr, DA_GET_RANGE))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap())
.send()
.await
.unwrap()
.json::<Vec<([u8; 8], Vec<Vec<u8>>)>>()
.await
.unwrap()
}
// not async so that we can use this in `Drop`
pub fn get_logs_from_file(&self) -> String {
println!(
"fetching logs from dir {}...",
self.tempdir.path().display()
);
// std::thread::sleep(std::time::Duration::from_secs(50));
std::fs::read_dir(self.tempdir.path())
.unwrap()
.filter_map(|entry| {
let entry = entry.unwrap();
let path = entry.path();
if path.is_file() && path.to_str().unwrap().contains(LOGS_PREFIX) {
Some(path)
} else {
None
}
})
.map(|f| std::fs::read_to_string(f).unwrap())
.collect::<String>()
}
pub fn config(&self) -> &Config {
&self.config
}
pub async fn get_headers(&self, from: Option<HeaderId>, to: Option<HeaderId>) -> Vec<HeaderId> {
let mut req = CLIENT.get(format!("http://{}{}", self.addr, CRYPTARCHIA_HEADERS));
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
let res = req.send().await;
println!("res: {res:?}");
res.unwrap().json::<Vec<HeaderId>>().await.unwrap()
}
pub async fn consensus_info(&self) -> CryptarchiaInfo {
let res = self.get(CRYPTARCHIA_INFO).await;
println!("{:?}", res);
res.unwrap().json().await.unwrap()
}
}
pub fn create_validator_config(config: GeneralConfig) -> Config {
Config {
network: NetworkConfig {
backend: Libp2pConfig {
inner: config.network_config.swarm_config,
initial_peers: config.network_config.initial_peers,
},
},
mix: nomos_mix_service::MixConfig {
backend: config.mix_config.backend,
},
cryptarchia: CryptarchiaSettings {
notes: config.consensus_config.notes,
config: config.consensus_config.ledger_config,
genesis_state: config.consensus_config.genesis_state,
time: config.consensus_config.time,
transaction_selector_settings: (),
blob_selector_settings: (),
network_adapter_settings:
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
mix_adapter_settings:
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings {
broadcast_settings:
nomos_mix_service::network::libp2p::Libp2pBroadcastSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
},
},
da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings {
node_key: config.da_config.node_key,
membership: config.da_config.membership,
addresses: config.da_config.addresses,
listening_address: config.da_config.listening_address,
},
},
da_indexer: IndexerSettings {
storage: IndexerStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
},
da_verifier: DaVerifierServiceSettings {
verifier_settings: KzgrsDaVerifierSettings {
sk: config.da_config.verifier_sk,
index: config.da_config.verifier_index,
global_params_path: config.da_config.global_params_path,
},
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
},
log: Default::default(),
http: nomos_api::ApiServiceSettings {
backend_settings: AxumBackendSettings {
address: format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap(),
cors_origins: vec![],
},
},
da_sampling: DaSamplingServiceSettings {
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: config.da_config.num_samples,
num_subnets: config.da_config.num_subnets,
old_blobs_check_interval: config.da_config.old_blobs_check_interval,
blobs_validity_duration: config.da_config.blobs_validity_duration,
},
storage_adapter_settings: SamplingStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
network_adapter_settings: (),
},
storage: RocksBackendSettings {
db_path: "./db".into(),
read_only: false,
column_family: Some("blocks".into()),
},
}
}

View File

@ -1,162 +0,0 @@
use nomos_cli::cmds::disseminate::Disseminate;
use nomos_cli::da::network::backend::ExecutorBackend;
use nomos_cli::da::network::backend::ExecutorBackendSettings;
use nomos_da_network_service::NetworkConfig;
use nomos_libp2p::ed25519;
use nomos_libp2p::libp2p;
use nomos_libp2p::Multiaddr;
use nomos_libp2p::PeerId;
use std::collections::HashMap;
use std::time::Duration;
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tempfile::NamedTempFile;
use tests::nodes::NomosNode;
use tests::Node;
use tests::SpawnConfig;
use tests::GLOBAL_PARAMS_PATH;
const CLI_BIN: &str = "../target/debug/nomos-cli";
const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715";
use std::process::Command;
fn run_disseminate(disseminate: &Disseminate) {
let mut binding = Command::new(CLI_BIN);
let c = binding
.args(["disseminate", "--network-config"])
.arg(disseminate.network_config.as_os_str())
.arg("--app-id")
.arg(&disseminate.app_id)
.arg("--index")
.arg(disseminate.index.to_string())
.arg("--columns")
.arg(disseminate.columns.to_string())
.arg("--timeout")
.arg(disseminate.timeout.to_string())
.arg("--wait-until-disseminated")
.arg(disseminate.wait_until_disseminated.to_string())
.arg("--node-addr")
.arg(disseminate.node_addr.as_ref().unwrap().as_str())
.arg("--global-params-path")
.arg(GLOBAL_PARAMS_PATH.to_string());
match (&disseminate.data, &disseminate.file) {
(Some(data), None) => c.args(["--data", &data]),
(None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]),
(_, _) => panic!("Either data or file needs to be provided, but not both"),
};
c.status().expect("failed to execute nomos cli");
}
async fn disseminate(nodes: &Vec<NomosNode>, config: &mut Disseminate) {
// Nomos Cli is acting as the first node when dispersing the data by using the key associated
// with that Nomos Node.
let first_config = nodes[0].config();
let node_key = first_config.da_network.backend.node_key.clone();
let node_addrs: HashMap<PeerId, Multiaddr> = nodes
.iter()
.map(|n| {
let libp2p_config = &n.config().network.backend.inner;
let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from(
libp2p_config.node_key.clone(),
));
let peer_id = PeerId::from(keypair.public());
let address = n
.config()
.da_network
.backend
.listening_address
.clone()
.with_p2p(peer_id)
.unwrap();
(peer_id, address)
})
.collect();
let membership = first_config.da_network.backend.membership.clone();
let num_subnets = first_config.da_sampling.sampling_settings.num_subnets;
let da_network_config: NetworkConfig<ExecutorBackend<FillFromNodeList>> = NetworkConfig {
backend: ExecutorBackendSettings {
node_key,
membership,
node_addrs,
num_subnets,
},
};
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
serde_yaml::to_writer(&mut file, &da_network_config).unwrap();
config.network_config = config_path;
config.node_addr = Some(
format!(
"http://{}",
nodes[0].config().http.backend_settings.address.clone()
)
.parse()
.unwrap(),
);
run_disseminate(&config);
}
#[tokio::test]
async fn disseminate_and_retrieve() {
let mut config = Disseminate {
data: Some("hello world".to_string()),
timeout: 60,
wait_until_disseminated: 5,
app_id: APP_ID.into(),
index: 0,
columns: 2,
..Default::default()
};
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
2,
tests::DaConfig {
dispersal_factor: 2,
subnetwork_size: 2,
num_subnets: 2,
..Default::default()
},
))
.await;
disseminate(&nodes, &mut config).await;
tokio::time::sleep(Duration::from_secs(10)).await;
let from = 0u64.to_be_bytes();
let to = 1u64.to_be_bytes();
let app_id = hex::decode(APP_ID).unwrap();
let node1_blobs = nodes[0]
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
.await;
let node2_blobs = nodes[1]
.get_indexer_range(app_id.try_into().unwrap(), from..to)
.await;
let node1_idx_0_blobs: Vec<_> = node1_blobs
.iter()
.filter(|(i, _)| i == &from)
.flat_map(|(_, blobs)| blobs)
.collect();
let node2_idx_0_blobs: Vec<_> = node2_blobs
.iter()
.filter(|(i, _)| i == &from)
.flat_map(|(_, blobs)| blobs)
.collect();
// Index zero shouldn't be empty, node 2 replicated both blobs to node 1 because they both
// are in the same subnetwork.
for b in node1_idx_0_blobs.iter() {
assert!(!b.is_empty())
}
for b in node2_idx_0_blobs.iter() {
assert!(!b.is_empty())
}
}

View File

@ -1,14 +1,18 @@
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use std::time::Duration;
use tests::{adjust_timeout, nodes::NomosNode, Node, SpawnConfig};
use tests::{
adjust_timeout,
topology::{Topology, TopologyConfig},
};
// how many times more than the expected time to produce a predefined number of blocks we wait before timing out
const TIMEOUT_MULTIPLIER: f64 = 3.0;
// how long we let the chain grow before checking the block at tip - k is the same in all chains
const CHAIN_LENGTH_MULTIPLIER: u32 = 2;
async fn happy_test(nodes: &[NomosNode]) {
async fn happy_test(topology: &Topology) {
let nodes = topology.validators();
let config = nodes[0].config();
let security_param = config.cryptarchia.config.consensus_config.security_param;
let n_blocks = security_param * CHAIN_LENGTH_MULTIPLIER;
@ -52,6 +56,6 @@ async fn happy_test(nodes: &[NomosNode]) {
#[tokio::test]
async fn two_nodes_happy() {
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await;
happy_test(&nodes).await;
let topology = Topology::spawn(TopologyConfig::two_validators()).await;
happy_test(&topology).await;
}

73
tests/src/tests/da.rs Normal file
View File

@ -0,0 +1,73 @@
use executor_http_client::ExecutorHttpClient;
use reqwest::ClientBuilder;
use reqwest::Url;
use std::time::Duration;
use tests::nodes::executor::Executor;
use tests::topology::Topology;
use tests::topology::TopologyConfig;
const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715";
async fn disseminate(executor: &Executor) {
let executor_config = executor.config();
let client = ClientBuilder::new()
.build()
.expect("Client from default settings should be able to build");
let backend_address = executor_config.http.backend_settings.address;
let exec_url = Url::parse(&format!("http://{}", backend_address)).unwrap();
let client = ExecutorHttpClient::new(client, exec_url);
let data = [1u8; 31];
let app_id = hex::decode(APP_ID).unwrap();
let metadata = kzgrs_backend::dispersal::Metadata::new(app_id.try_into().unwrap(), 0u64.into());
client.publish_blob(data.to_vec(), metadata).await.unwrap();
}
#[tokio::test]
async fn disseminate_and_retrieve() {
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;
let executor = &topology.executors()[0];
let validator = &topology.validators()[0];
tokio::time::sleep(Duration::from_secs(15)).await;
disseminate(executor).await;
tokio::time::sleep(Duration::from_secs(20)).await;
let from = 0u64.to_be_bytes();
let to = 1u64.to_be_bytes();
let app_id = hex::decode(APP_ID).unwrap();
let executor_blobs = executor
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
.await;
let validator_blobs = validator
.get_indexer_range(app_id.try_into().unwrap(), from..to)
.await;
let executor_idx_0_blobs: Vec<_> = executor_blobs
.iter()
.filter(|(i, _)| i == &from)
.flat_map(|(_, blobs)| blobs)
.collect();
let validator_idx_0_blobs: Vec<_> = validator_blobs
.iter()
.filter(|(i, _)| i == &from)
.flat_map(|(_, blobs)| blobs)
.collect();
// Index zero shouldn't be empty, validator replicated both blobs to executor because they both
// are in the same subnetwork.
assert!(executor_idx_0_blobs.len() == 2);
assert!(validator_idx_0_blobs.len() == 2);
for b in executor_idx_0_blobs.iter() {
assert!(!b.is_empty())
}
for b in validator_idx_0_blobs.iter() {
assert!(!b.is_empty())
}
}

View File

@ -0,0 +1,92 @@
use std::str::FromStr;
use std::time::Duration;
use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::TimeConfig;
use cryptarchia_ledger::LedgerState;
use nomos_core::staking::NMO_UNIT;
use rand::thread_rng;
use time::OffsetDateTime;
const DEFAULT_SLOT_TIME: u64 = 2;
const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME";
#[derive(Clone)]
pub struct ConsensusParams {
pub n_participants: usize,
pub security_param: u32,
pub active_slot_coeff: f64,
}
impl ConsensusParams {
pub fn default_for_participants(n_participants: usize) -> Self {
ConsensusParams {
n_participants,
// by setting the slot coeff to 1, we also increase the probability of multiple blocks (forks)
// being produced in the same slot (epoch). Setting the security parameter to some value > 1
// ensures nodes have some time to sync before deciding on the longest chain.
security_param: 10,
// a block should be produced (on average) every slot
active_slot_coeff: 0.9,
}
}
}
/// General consensus configuration for a chosen participant, that later could be converted into a
/// specific service or services configuration.
#[derive(Clone)]
pub struct GeneralConsensusConfig {
pub notes: Vec<InputWitness>,
pub ledger_config: cryptarchia_ledger::Config,
pub genesis_state: LedgerState,
pub time: TimeConfig,
}
pub fn create_consensus_configs(
ids: &[[u8; 32]],
consensus_params: ConsensusParams,
) -> Vec<GeneralConsensusConfig> {
let notes = ids
.iter()
.map(|&id| {
let mut sk = [0; 16];
sk.copy_from_slice(&id[0..16]);
InputWitness::new(
NoteWitness::basic(1, NMO_UNIT, &mut thread_rng()),
NullifierSecret(sk),
)
})
.collect::<Vec<_>>();
// no commitments for now, proofs are not checked anyway
let genesis_state = LedgerState::from_commitments(
notes.iter().map(|n| n.note_commitment()),
(ids.len() as u32).into(),
);
let ledger_config = cryptarchia_ledger::Config {
epoch_stake_distribution_stabilization: 3,
epoch_period_nonce_buffer: 3,
epoch_period_nonce_stabilization: 4,
consensus_config: cryptarchia_engine::Config {
security_param: consensus_params.security_param,
active_slot_coeff: consensus_params.active_slot_coeff,
},
};
let slot_duration = std::env::var(CONSENSUS_SLOT_TIME_VAR)
.map(|s| <u64>::from_str(&s).unwrap())
.unwrap_or(DEFAULT_SLOT_TIME);
let time_config = TimeConfig {
slot_duration: Duration::from_secs(slot_duration),
chain_start_time: OffsetDateTime::now_utc(),
};
notes
.into_iter()
.map(|note| GeneralConsensusConfig {
notes: vec![note],
ledger_config: ledger_config.clone(),
genesis_state: genesis_state.clone(),
time: time_config.clone(),
})
.collect()
}

View File

@ -0,0 +1,141 @@
use std::{
collections::{HashMap, HashSet},
env,
path::PathBuf,
str::FromStr,
time::Duration,
};
use nomos_libp2p::{ed25519, Multiaddr, PeerId};
use nomos_node::NomosDaMembership;
use once_cell::sync::Lazy;
use subnetworks_assignations::MembershipHandler;
use crate::{get_available_port, secret_key_to_peer_id};
pub static GLOBAL_PARAMS_PATH: Lazy<String> = Lazy::new(|| {
let relative_path = "./kzgrs/kzgrs_test_params";
let current_dir = env::current_dir().expect("Failed to get current directory");
current_dir
.join(relative_path)
.canonicalize()
.expect("Failed to resolve absolute path")
.to_string_lossy()
.to_string()
});
#[derive(Clone)]
pub struct DaParams {
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
pub global_params_path: String,
}
impl Default for DaParams {
fn default() -> Self {
Self {
subnetwork_size: 2,
dispersal_factor: 1,
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(u64::MAX),
global_params_path: GLOBAL_PARAMS_PATH.to_string(),
}
}
}
#[derive(Clone)]
pub struct GeneralDaConfig {
pub node_key: ed25519::SecretKey,
pub peer_id: PeerId,
pub membership: NomosDaMembership,
pub addresses: HashMap<PeerId, Multiaddr>,
pub listening_address: Multiaddr,
pub blob_storage_directory: PathBuf,
pub global_params_path: String,
pub verifier_sk: String,
pub verifier_index: HashSet<u32>,
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
}
pub fn create_da_configs(ids: &[[u8; 32]], da_params: DaParams) -> Vec<GeneralDaConfig> {
let mut node_keys = vec![];
let mut peer_ids = vec![];
let mut listening_addresses = vec![];
for id in ids {
let mut node_key_bytes = *id;
let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes)
.expect("Failed to generate secret key from bytes");
node_keys.push(node_key.clone());
let peer_id = secret_key_to_peer_id(node_key);
peer_ids.push(peer_id);
let listening_address = Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
))
.expect("Failed to create multiaddr");
listening_addresses.push(listening_address);
}
let membership = NomosDaMembership::new(
&peer_ids,
da_params.subnetwork_size,
da_params.dispersal_factor,
);
let addresses = build_da_peer_list(&peer_ids, &listening_addresses);
ids.iter()
.zip(node_keys)
.enumerate()
.map(|(i, (id, node_key))| {
let blob_storage_directory = PathBuf::from(format!("/tmp/blob_storage_{}", i));
let verifier_sk = blst::min_sig::SecretKey::key_gen(id, &[]).unwrap();
let verifier_sk_bytes = verifier_sk.to_bytes();
let peer_id = peer_ids[i];
let subnetwork_ids = membership.membership(&peer_id);
GeneralDaConfig {
node_key: node_key.clone(),
peer_id,
membership: membership.clone(),
addresses: addresses.clone(),
listening_address: listening_addresses[i].clone(),
blob_storage_directory,
global_params_path: da_params.global_params_path.clone(),
verifier_sk: hex::encode(verifier_sk_bytes),
verifier_index: subnetwork_ids,
num_samples: da_params.num_samples,
num_subnets: da_params.num_subnets,
old_blobs_check_interval: da_params.old_blobs_check_interval,
blobs_validity_duration: da_params.blobs_validity_duration,
}
})
.collect()
}
fn build_da_peer_list(
peer_ids: &[PeerId],
listening_addresses: &[Multiaddr],
) -> HashMap<PeerId, Multiaddr> {
peer_ids
.iter()
.zip(listening_addresses.iter())
.map(|(peer_id, listening_address)| {
let p2p_addr = listening_address.clone().with_p2p(*peer_id).unwrap();
(*peer_id, p2p_addr)
})
.collect()
}

View File

@ -0,0 +1,59 @@
use std::str::FromStr;
use nomos_libp2p::{ed25519, Multiaddr};
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
use crate::{get_available_port, secret_key_to_peer_id};
#[derive(Clone)]
pub struct GeneralMixConfig {
pub backend: Libp2pMixBackendSettings,
}
pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
let mut configs: Vec<GeneralMixConfig> = ids
.iter()
.map(|id| {
let mut node_key_bytes = *id;
let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes)
.expect("Failed to generate secret key from bytes");
GeneralMixConfig {
backend: Libp2pMixBackendSettings {
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
))
.unwrap(),
node_key,
membership: Vec::new(),
peering_degree: 1,
num_mix_layers: 1,
},
}
})
.collect();
let membership = mix_membership(&configs);
configs.iter_mut().for_each(|config| {
config.backend.membership = membership.clone();
});
configs
}
fn mix_membership(configs: &[GeneralMixConfig]) -> Vec<Multiaddr> {
configs
.iter()
.map(|config| {
let peer_id = secret_key_to_peer_id(config.backend.node_key.clone());
config
.backend
.listening_address
.clone()
.with_p2p(peer_id)
.unwrap_or_else(|orig_addr| orig_addr)
})
.collect()
}

View File

@ -0,0 +1,17 @@
pub mod consensus;
pub mod da;
pub mod mix;
pub mod network;
use consensus::GeneralConsensusConfig;
use da::GeneralDaConfig;
use mix::GeneralMixConfig;
use network::GeneralNetworkConfig;
#[derive(Clone)]
pub struct GeneralConfig {
pub consensus_config: GeneralConsensusConfig,
pub da_config: GeneralDaConfig,
pub network_config: GeneralNetworkConfig,
pub mix_config: GeneralMixConfig,
}

View File

@ -0,0 +1,83 @@
use nomos_libp2p::{ed25519, Multiaddr, SwarmConfig};
use crate::{get_available_port, node_address_from_port};
#[derive(Default)]
pub enum Libp2pNetworkLayout {
#[default]
Star,
Chain,
}
#[derive(Default)]
pub struct NetworkParams {
pub libp2p_network_layout: Libp2pNetworkLayout,
}
#[derive(Clone)]
pub struct GeneralNetworkConfig {
pub swarm_config: SwarmConfig,
pub initial_peers: Vec<Multiaddr>,
}
pub fn create_network_configs(
ids: &[[u8; 32]],
network_params: NetworkParams,
) -> Vec<GeneralNetworkConfig> {
let swarm_configs: Vec<SwarmConfig> = ids
.iter()
.map(|id| {
let mut node_key_bytes = *id;
let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes)
.expect("Failed to generate secret key from bytes");
SwarmConfig {
node_key,
port: get_available_port(),
..Default::default()
}
})
.collect();
let all_initial_peers = initial_peers_by_network_layout(swarm_configs.clone(), network_params);
swarm_configs
.iter()
.zip(all_initial_peers)
.map(|(swarm_config, initial_peers)| GeneralNetworkConfig {
swarm_config: swarm_config.to_owned(),
initial_peers,
})
.collect()
}
fn initial_peers_by_network_layout(
mut swarm_configs: Vec<SwarmConfig>,
network_params: NetworkParams,
) -> Vec<Vec<Multiaddr>> {
let mut all_initial_peers = vec![];
let first_swarm = swarm_configs.remove(0);
let first_addr = node_address_from_port(first_swarm.port);
match network_params.libp2p_network_layout {
Libp2pNetworkLayout::Star => {
let other_initial_peers = vec![first_addr];
all_initial_peers.push(vec![]); // First node has no initial peers.
for _ in swarm_configs {
all_initial_peers.push(other_initial_peers.clone());
}
}
Libp2pNetworkLayout::Chain => {
let mut prev_addr = first_addr;
all_initial_peers.push(vec![]); // First node has no initial peers.
for swarm in swarm_configs {
all_initial_peers.push(vec![prev_addr]);
prev_addr = node_address_from_port(swarm.port);
}
}
}
all_initial_peers
}

115
tests/src/topology/mod.rs Normal file
View File

@ -0,0 +1,115 @@
pub mod configs;
use configs::{
da::{create_da_configs, DaParams},
network::{create_network_configs, NetworkParams},
GeneralConfig,
};
use rand::{thread_rng, Rng};
use crate::{
nodes::{
executor::{create_executor_config, Executor},
validator::{create_validator_config, Validator},
},
topology::configs::{
consensus::{create_consensus_configs, ConsensusParams},
mix::create_mix_configs,
},
};
pub struct TopologyConfig {
n_validators: usize,
n_executors: usize,
consensus_params: ConsensusParams,
da_params: DaParams,
network_params: NetworkParams,
}
impl TopologyConfig {
pub fn two_validators() -> TopologyConfig {
TopologyConfig {
n_validators: 2,
n_executors: 0,
consensus_params: ConsensusParams::default_for_participants(2),
da_params: Default::default(),
network_params: Default::default(),
}
}
pub fn validator_and_executor() -> TopologyConfig {
TopologyConfig {
n_validators: 1,
n_executors: 1,
consensus_params: ConsensusParams::default_for_participants(2),
da_params: DaParams {
dispersal_factor: 2,
subnetwork_size: 2,
num_subnets: 2,
..Default::default()
},
network_params: Default::default(),
}
}
}
pub struct Topology {
validators: Vec<Validator>,
executors: Vec<Executor>,
}
impl Topology {
pub async fn spawn(config: TopologyConfig) -> Self {
let n_participants = config.n_validators + config.n_executors;
// we use the same random bytes for:
// * da id
// * coin sk
// * coin nonce
// * libp2p node key
let mut ids = vec![[0; 32]; n_participants];
for id in &mut ids {
thread_rng().fill(id);
}
let consensus_configs = create_consensus_configs(&ids, config.consensus_params);
let da_configs = create_da_configs(&ids, config.da_params);
let network_configs = create_network_configs(&ids, config.network_params);
let mix_configs = create_mix_configs(&ids);
let mut validators = Vec::new();
for i in 0..config.n_validators {
let config = create_validator_config(GeneralConfig {
consensus_config: consensus_configs[i].to_owned(),
da_config: da_configs[i].to_owned(),
network_config: network_configs[i].to_owned(),
mix_config: mix_configs[i].to_owned(),
});
validators.push(Validator::spawn(config).await)
}
let mut executors = Vec::new();
for i in config.n_validators..n_participants {
let config = create_executor_config(GeneralConfig {
consensus_config: consensus_configs[i].to_owned(),
da_config: da_configs[i].to_owned(),
network_config: network_configs[i].to_owned(),
mix_config: mix_configs[i].to_owned(),
});
executors.push(Executor::spawn(config).await)
}
Self {
validators,
executors,
}
}
pub fn validators(&self) -> &[Validator] {
&self.validators
}
pub fn executors(&self) -> &[Executor] {
&self.executors
}
}