diff --git a/clients/executor-http-client/src/lib.rs b/clients/executor-http-client/src/lib.rs index d310bede..52ea7525 100644 --- a/clients/executor-http-client/src/lib.rs +++ b/clients/executor-http-client/src/lib.rs @@ -2,7 +2,8 @@ // crates use reqwest::{Client, ClientBuilder, StatusCode, Url}; // internal -use nomos_executor::api::paths; +use nomos_executor::api::{handlers::DispersalRequest, paths}; +use serde::Serialize; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -40,15 +41,20 @@ impl ExecutorHttpClient { } /// Send a `Blob` to be dispersed - pub async fn publish_blob(&self, blob: Vec) -> Result<(), Error> { + pub async fn publish_blob( + &self, + data: Vec, + metadata: Metadata, + ) -> Result<(), Error> { + let req = DispersalRequest { data, metadata }; let url = self .executor_address - .join(paths::DA_ADD_BLOB) + .join(paths::DISPERSE_DATA) .expect("Url should build properly"); let response = self .client .post(url) - .body(blob) + .json(&req) .send() .await .map_err(Error::Request)?; diff --git a/nodes/nomos-executor/src/lib.rs b/nodes/nomos-executor/src/lib.rs index 2daa852e..7642dbae 100644 --- a/nodes/nomos-executor/src/lib.rs +++ b/nodes/nomos-executor/src/lib.rs @@ -21,7 +21,17 @@ use nomos_mempool::backend::mockpool::MockPool; use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; use nomos_mix_service::MixService; -use nomos_node::*; +use nomos_node::DispersedBlobInfo; +use nomos_node::HeaderId; +use nomos_node::MempoolNetworkAdapter; +#[cfg(feature = "metrics")] +use nomos_node::Metrics; +use nomos_node::NetworkBackend; +use nomos_node::{ + BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, Logger, + NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tx, TxMempool, + Wire, MB16, +}; use overwatch_derive::Services; use overwatch_rs::services::handle::ServiceHandle; diff --git a/nomos-da/network/core/src/swarm/executor.rs b/nomos-da/network/core/src/swarm/executor.rs index b0da6a0d..406ddf1a 100644 --- a/nomos-da/network/core/src/swarm/executor.rs +++ b/nomos-da/network/core/src/swarm/executor.rs @@ -178,7 +178,7 @@ where pub async fn run(mut self) { loop { if let Some(event) = self.swarm.next().await { - debug!("Da swarm event received: {event:?}"); + tracing::info!("Da swarm event received: {event:?}"); match event { SwarmEvent::Behaviour(behaviour_event) => { self.handle_behaviour_event(behaviour_event).await; diff --git a/nomos-services/data-availability/dispersal/src/backend/mod.rs b/nomos-services/data-availability/dispersal/src/backend/mod.rs index 3143019a..c09d59cb 100644 --- a/nomos-services/data-availability/dispersal/src/backend/mod.rs +++ b/nomos-services/data-availability/dispersal/src/backend/mod.rs @@ -1,4 +1,5 @@ use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapter}; +use std::time::Duration; use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder}; use overwatch_rs::DynError; @@ -42,6 +43,8 @@ pub trait DispersalBackend { ) -> Result<(), DynError> { let (blob_id, encoded_data) = self.encode(data).await?; self.disperse(encoded_data).await?; + // let disperse and replication happen before pushing to mempool + tokio::time::sleep(Duration::from_secs(1)).await; self.publish_to_mempool(blob_id, metadata).await?; Ok(()) } diff --git a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs index e52a1853..f8c11629 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; +use std::time::Duration; use subnetworks_assignations::MembershipHandler; use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedSender; @@ -148,6 +149,7 @@ where let task = overwatch_handle.runtime().spawn(executor_swarm.run()); + std::thread::sleep(Duration::from_secs(1)); // open streams to dispersal peers for peer_id in dispersal_peers.iter() { executor_open_stream_sender.send(*peer_id).unwrap(); diff --git a/testnet/cfgsync/Cargo.toml b/testnet/cfgsync/Cargo.toml index e8a8de07..5cfa94a1 100644 --- a/testnet/cfgsync/Cargo.toml +++ b/testnet/cfgsync/Cargo.toml @@ -6,8 +6,10 @@ edition = "2021" [dependencies] axum = { version = "0.6" } clap = { version = "4", features = ["derive"] } +nomos-executor = { path = "../../nodes/nomos-executor" } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-node = { path = "../../nodes/nomos-node" } +rand = "0.8" reqwest = { version = "0.11", features = ["json", "rustls-tls"] } tests = { path = "../../tests" } tokio = { version = "1.24", features = ["rt-multi-thread"] } diff --git a/testnet/cfgsync/src/bin/cfgsync-client.rs b/testnet/cfgsync/src/bin/cfgsync-client.rs index b83e70fe..931e51b7 100644 --- a/testnet/cfgsync/src/bin/cfgsync-client.rs +++ b/testnet/cfgsync/src/bin/cfgsync-client.rs @@ -1,7 +1,8 @@ // std use std::{env, fs, net::Ipv4Addr, process}; // crates -use nomos_node::Config as NodeConfig; +use nomos_executor::config::Config as ExecutorConfig; +use nomos_node::Config as ValidatorConfig; use reqwest::Client; use serde::{de::DeserializeOwned, Serialize}; // internal @@ -57,9 +58,22 @@ async fn main() { let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or("http://127.0.0.1:4400".to_string()); let ip = parse_ip(env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_string())); - let node_config_endpoint = format!("{}/node", server_addr); + let host_kind = env::var("CFG_HOST_KIND").unwrap_or_else(|_| "validator".to_string()); - if let Err(err) = get_config::(ip, &node_config_endpoint, &config_file_path).await { + let node_config_endpoint = match host_kind.as_str() { + "executor" => format!("{}/executor", server_addr), + _ => format!("{}/validator", server_addr), + }; + + let config_result = match host_kind.as_str() { + "executor" => { + get_config::(ip, &node_config_endpoint, &config_file_path).await + } + _ => get_config::(ip, &node_config_endpoint, &config_file_path).await, + }; + + // Handle error if the config request fails + if let Err(err) = config_result { eprintln!("Error: {}", err); process::exit(1); } diff --git a/testnet/cfgsync/src/bin/cfgsync-server.rs b/testnet/cfgsync/src/bin/cfgsync-server.rs index 7b8ec4fb..0a38bbfb 100644 --- a/testnet/cfgsync/src/bin/cfgsync-server.rs +++ b/testnet/cfgsync/src/bin/cfgsync-server.rs @@ -12,7 +12,10 @@ use cfgsync::config::Host; use cfgsync::repo::{ConfigRepo, RepoResponse}; use clap::Parser; use serde::{Deserialize, Serialize}; -use tests::{ConsensusConfig, DaConfig}; +use tests::nodes::executor::create_executor_config; +use tests::nodes::validator::create_validator_config; +use tests::topology::configs::consensus::ConsensusParams; +use tests::topology::configs::da::DaParams; use tokio::sync::oneshot::channel; // internal @@ -50,16 +53,16 @@ impl CfgSyncConfig { .map_err(|err| format!("Failed to parse config file: {}", err)) } - fn to_consensus_config(&self) -> ConsensusConfig { - ConsensusConfig { + fn to_consensus_params(&self) -> ConsensusParams { + ConsensusParams { n_participants: self.n_hosts, security_param: self.security_param, active_slot_coeff: self.active_slot_coeff, } } - fn to_da_config(&self) -> DaConfig { - DaConfig { + fn to_da_params(&self) -> DaParams { + DaParams { subnetwork_size: self.subnetwork_size, dispersal_factor: self.dispersal_factor, num_samples: self.num_samples, @@ -76,21 +79,43 @@ struct ClientIp { ip: Ipv4Addr, } -async fn node_config( +async fn validator_config( State(config_repo): State>, Json(payload): Json, ) -> impl IntoResponse { let ClientIp { ip } = payload; let (reply_tx, reply_rx) = channel(); - config_repo.register(Host::default_node_from_ip(ip), reply_tx); + config_repo.register(Host::default_validator_from_ip(ip), reply_tx); match reply_rx.await { Ok(config_response) => match config_response { - RepoResponse::Config(config) => (StatusCode::OK, Json(config)).into_response(), - RepoResponse::Timeout => { - (StatusCode::REQUEST_TIMEOUT, Json(RepoResponse::Timeout)).into_response() + RepoResponse::Config(config) => { + let config = create_validator_config(*config); + (StatusCode::OK, Json(config)).into_response() } + RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(), + }, + Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(), + } +} + +async fn executor_config( + State(config_repo): State>, + Json(payload): Json, +) -> impl IntoResponse { + let ClientIp { ip } = payload; + + let (reply_tx, reply_rx) = channel(); + config_repo.register(Host::default_executor_from_ip(ip), reply_tx); + + match reply_rx.await { + Ok(config_response) => match config_response { + RepoResponse::Config(config) => { + let config = create_executor_config(*config); + (StatusCode::OK, Json(config)).into_response() + } + RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(), }, Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(), } @@ -104,8 +129,8 @@ async fn main() { eprintln!("{}", err); process::exit(1); }); - let consensus_config = config.to_consensus_config(); - let da_config = config.to_da_config(); + let consensus_config = config.to_consensus_params(); + let da_config = config.to_da_params(); let config_repo = ConfigRepo::new( config.n_hosts, @@ -114,7 +139,8 @@ async fn main() { Duration::from_secs(config.timeout), ); let app = Router::new() - .route("/node", post(node_config)) + .route("/validator", post(validator_config)) + .route("/executor", post(executor_config)) .with_state(config_repo.clone()); println!("Server running on http://0.0.0.0:{}", config.port); diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index acf1d0a3..ccfdf8e3 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -1,17 +1,25 @@ // std use std::{collections::HashMap, net::Ipv4Addr, str::FromStr}; // crates -use nomos_libp2p::{Multiaddr, PeerId}; -use nomos_node::Config as NodeConfig; -use tests::{ConsensusConfig, DaConfig, Node, NomosNode}; +use nomos_libp2p::{Multiaddr, PeerId, Protocol}; +use rand::{thread_rng, Rng}; +use tests::topology::configs::{ + consensus::{create_consensus_configs, ConsensusParams}, + da::{create_da_configs, DaParams}, + mix::create_mix_configs, + network::create_network_configs, + GeneralConfig, +}; // internal -const DEFAULT_NETWORK_PORT: u16 = 3000; +const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000; const DEFAULT_DA_NETWORK_PORT: u16 = 3300; +const DEFAULT_MIX_PORT: u16 = 3400; #[derive(Eq, PartialEq, Hash, Clone)] pub enum HostKind { - Nomos, + Validator, + Executor, } #[derive(Eq, PartialEq, Hash, Clone)] @@ -20,31 +28,53 @@ pub struct Host { pub ip: Ipv4Addr, pub network_port: u16, pub da_network_port: u16, + pub mix_port: u16, } impl Host { - pub fn default_node_from_ip(ip: Ipv4Addr) -> Self { + pub fn default_validator_from_ip(ip: Ipv4Addr) -> Self { Self { - kind: HostKind::Nomos, + kind: HostKind::Validator, ip, - network_port: DEFAULT_NETWORK_PORT, + network_port: DEFAULT_LIBP2P_NETWORK_PORT, da_network_port: DEFAULT_DA_NETWORK_PORT, + mix_port: DEFAULT_MIX_PORT, + } + } + + pub fn default_executor_from_ip(ip: Ipv4Addr) -> Self { + Self { + kind: HostKind::Executor, + ip, + network_port: DEFAULT_LIBP2P_NETWORK_PORT, + da_network_port: DEFAULT_DA_NETWORK_PORT, + mix_port: DEFAULT_MIX_PORT, } } } pub fn create_node_configs( - consensus: ConsensusConfig, - da: DaConfig, + consensus_params: ConsensusParams, + da_params: DaParams, hosts: Vec, -) -> HashMap { - let mut configs = NomosNode::create_node_configs(consensus, da); +) -> HashMap { + let mut ids = vec![[0; 32]; consensus_params.n_participants]; + for id in &mut ids { + thread_rng().fill(id); + } + + let consensus_configs = create_consensus_configs(&ids, consensus_params); + let da_configs = create_da_configs(&ids, da_params); + let network_configs = create_network_configs(&ids, Default::default()); + let mix_configs = create_mix_configs(&ids); let mut configured_hosts = HashMap::new(); // Rebuild DA address lists. - let peer_addresses = configs[0].da_network.backend.addresses.clone(); + let peer_addresses = da_configs[0].addresses.clone(); let host_network_init_peers = update_network_init_peers(hosts.clone()); let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses); + let host_mix_membership = + update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone()); let new_peer_addresses: HashMap = host_da_peer_addresses .clone() @@ -52,22 +82,39 @@ pub fn create_node_configs( .map(|(peer_id, (multiaddr, _))| (peer_id, multiaddr)) .collect(); - for (config, host) in configs.iter_mut().zip(hosts.into_iter()) { - config.da_network.backend.addresses = new_peer_addresses.clone(); - - // Libp2p network config. - config.network.backend.inner.host = Ipv4Addr::from_str("0.0.0.0").unwrap(); - config.network.backend.inner.port = host.network_port; - config.network.backend.initial_peers = host_network_init_peers.clone(); + for (i, host) in hosts.into_iter().enumerate() { + let consensus_config = consensus_configs[i].to_owned(); // DA Libp2p network config. - config.da_network.backend.listening_address = Multiaddr::from_str(&format!( + let mut da_config = da_configs[i].to_owned(); + da_config.addresses = new_peer_addresses.clone(); + da_config.listening_address = Multiaddr::from_str(&format!( "/ip4/0.0.0.0/udp/{}/quic-v1", host.da_network_port, )) .unwrap(); - configured_hosts.insert(host.clone(), config.clone()); + // Libp2p network config. + let mut network_config = network_configs[i].to_owned(); + network_config.swarm_config.host = Ipv4Addr::from_str("0.0.0.0").unwrap(); + network_config.swarm_config.port = host.network_port; + network_config.initial_peers = host_network_init_peers.clone(); + + // Mix config. + let mut mix_config = mix_configs[i].to_owned(); + mix_config.backend.listening_address = + Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap(); + mix_config.backend.membership = host_mix_membership.clone(); + + configured_hosts.insert( + host.clone(), + GeneralConfig { + consensus_config, + da_config, + network_config, + mix_config, + }, + ); } configured_hosts @@ -99,13 +146,40 @@ fn update_da_peer_addresses( .collect() } +fn update_mix_membership(hosts: Vec, membership: Vec) -> Vec { + membership + .into_iter() + .zip(hosts) + .map(|(addr, host)| { + Multiaddr::from_str(&format!( + "/ip4/{}/udp/{}/quic-v1/p2p/{}", + host.ip, + host.mix_port, + extract_peer_id(&addr).unwrap(), + )) + .unwrap() + }) + .collect() +} + +fn extract_peer_id(multiaddr: &Multiaddr) -> Option { + multiaddr.iter().find_map(|protocol| { + if let Protocol::P2p(peer_id) = protocol { + Some(peer_id) + } else { + None + } + }) +} + #[cfg(test)] mod cfgsync_tests { use std::str::FromStr; use std::{net::Ipv4Addr, time::Duration}; - use nomos_libp2p::Protocol; - use tests::{ConsensusConfig, DaConfig}; + use nomos_libp2p::{Multiaddr, Protocol}; + use tests::topology::configs::consensus::ConsensusParams; + use tests::topology::configs::da::DaParams; use super::{create_node_configs, Host, HostKind}; @@ -113,20 +187,21 @@ mod cfgsync_tests { fn basic_ip_list() { let hosts = (0..10) .map(|i| Host { - kind: HostKind::Nomos, + kind: HostKind::Validator, ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(), network_port: 3000, da_network_port: 4044, + mix_port: 5000, }) .collect(); let configs = create_node_configs( - ConsensusConfig { + ConsensusParams { n_participants: 10, security_param: 10, active_slot_coeff: 0.9, }, - DaConfig { + DaParams { subnetwork_size: 2, dispersal_factor: 1, num_samples: 1, @@ -139,19 +214,23 @@ mod cfgsync_tests { ); for (host, config) in configs.iter() { - let network_port = config.network.backend.inner.port; - - let da_network_addr = config.da_network.backend.listening_address.clone(); - let da_network_port = da_network_addr - .iter() - .find_map(|protocol| match protocol { - Protocol::Udp(port) => Some(port), - _ => None, - }) - .unwrap(); + let network_port = config.network_config.swarm_config.port; + let da_network_port = extract_port(&config.da_config.listening_address); + let mix_port = extract_port(&config.mix_config.backend.listening_address); assert_eq!(network_port, host.network_port); assert_eq!(da_network_port, host.da_network_port); + assert_eq!(mix_port, host.mix_port); } } + + fn extract_port(multiaddr: &Multiaddr) -> u16 { + multiaddr + .iter() + .find_map(|protocol| match protocol { + Protocol::Udp(port) => Some(port), + _ => None, + }) + .unwrap() + } } diff --git a/testnet/cfgsync/src/repo.rs b/testnet/cfgsync/src/repo.rs index f5e82d38..ee0fffc0 100644 --- a/testnet/cfgsync/src/repo.rs +++ b/testnet/cfgsync/src/repo.rs @@ -3,41 +3,40 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; // crates -use nomos_node::Config as NodeConfig; -use serde::{Deserialize, Serialize}; -use tests::{ConsensusConfig, DaConfig}; +use tests::topology::configs::consensus::ConsensusParams; +use tests::topology::configs::da::DaParams; +use tests::topology::configs::GeneralConfig; use tokio::sync::oneshot::Sender; use tokio::time::timeout; // internal use crate::config::{create_node_configs, Host}; -#[derive(Serialize, Deserialize)] pub enum RepoResponse { - Config(NodeConfig), + Config(Box), Timeout, } pub struct ConfigRepo { waiting_hosts: Mutex>>, n_hosts: usize, - consensus_config: ConsensusConfig, - da_config: DaConfig, + consensus_params: ConsensusParams, + da_params: DaParams, timeout_duration: Duration, } impl ConfigRepo { pub fn new( n_hosts: usize, - consensus_config: ConsensusConfig, - da_config: DaConfig, + consensus_params: ConsensusParams, + da_params: DaParams, timeout_duration: Duration, ) -> Arc { let repo = Arc::new(Self { waiting_hosts: Mutex::new(HashMap::new()), n_hosts, - consensus_config, - da_config, + consensus_params, + da_params, timeout_duration, }); @@ -69,14 +68,14 @@ impl ConfigRepo { .collect(); let configs = create_node_configs( - self.consensus_config.clone(), - self.da_config.clone(), + self.consensus_params.clone(), + self.da_params.clone(), hosts, ); for (host, sender) in waiting_hosts.drain() { let config = configs.get(&host).expect("host should have a config"); - let _ = sender.send(RepoResponse::Config(config.to_owned())); + let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned()))); } } Err(_) => { diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 91a5896d..5438c85e 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -6,7 +6,9 @@ publish = false [dependencies] blst = { version = "0.3.11" } +executor-http-client = { path = "../clients/executor-http-client" } nomos-node = { path = "../nodes/nomos-node", default-features = false } +nomos-executor = { path = "../nodes/nomos-executor", default-features = false } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } @@ -19,6 +21,7 @@ cryptarchia-ledger = { path = "../ledger/cryptarchia-ledger", features = ["serde cl = { path = "../cl/cl" } nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] } nomos-da-network-service = { path = "../nomos-services/data-availability/network" } +nomos-da-dispersal = { path = "../nomos-services/data-availability/dispersal" } nomos-da-indexer = { path = "../nomos-services/data-availability/indexer" } nomos-da-verifier = { path = "../nomos-services/data-availability/verifier" } nomos-da-sampling = { path = "../nomos-services/data-availability/sampling" } @@ -30,7 +33,7 @@ kzgrs-backend = { path = "../nomos-da/kzgrs-backend" } rand = "0.8" once_cell = "1" secp256k1 = { version = "0.26", features = ["rand"] } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.12", features = ["json"] } nomos-libp2p = { path = "../nomos-libp2p" } tempfile = "3.6" serde = { version = "1", features = ["derive"] } @@ -44,14 +47,15 @@ ntest = "0.9.0" criterion = { version = "0.5", features = ["async_tokio"] } nomos-cli = { path = "../nomos-cli" } time = "0.3" +tracing = "0.1" [[test]] name = "test_cryptarchia_happy_path" path = "src/tests/cryptarchia/happy.rs" [[test]] -name = "test_cli" -path = "src/tests/cli.rs" +name = "test_da" +path = "src/tests/da.rs" [features] metrics = ["nomos-node/metrics"] diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 25821071..7c5530f3 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,17 +1,16 @@ pub mod nodes; -pub use nodes::NomosNode; -use once_cell::sync::Lazy; +pub mod topology; // std use std::env; use std::net::TcpListener; use std::ops::Mul; +use std::sync::Mutex; use std::time::Duration; -use std::{fmt::Debug, sync::Mutex}; //crates -use nomos_libp2p::{Multiaddr, Swarm}; -use nomos_node::Config; +use nomos_libp2p::{Multiaddr, PeerId, Swarm}; +use once_cell::sync::Lazy; use rand::{thread_rng, Rng}; static NET_PORT: Lazy> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000..10000))); @@ -46,136 +45,14 @@ pub fn adjust_timeout(d: Duration) -> Duration { } } -#[async_trait::async_trait] -pub trait Node: Sized { - type ConsensusInfo: Debug + Clone + PartialEq; - async fn spawn(mut config: Config) -> Self; - async fn spawn_nodes(config: SpawnConfig) -> Vec { - let mut nodes = Vec::new(); - for conf in Self::node_configs(config) { - nodes.push(Self::spawn(conf).await); - } - nodes - } - fn node_configs(config: SpawnConfig) -> Vec { - match config { - SpawnConfig::Star { consensus, da } => { - let mut configs = Self::create_node_configs(consensus, da); - let next_leader_config = configs.remove(0); - let first_node_addr = node_address(&next_leader_config); - let mut node_configs = vec![next_leader_config]; - for mut conf in configs { - conf.network - .backend - .initial_peers - .push(first_node_addr.clone()); - - node_configs.push(conf); - } - node_configs - } - SpawnConfig::Chain { consensus, da } => { - let mut configs = Self::create_node_configs(consensus, da); - let next_leader_config = configs.remove(0); - let mut prev_node_addr = node_address(&next_leader_config); - let mut node_configs = vec![next_leader_config]; - for mut conf in configs { - conf.network.backend.initial_peers.push(prev_node_addr); - prev_node_addr = node_address(&conf); - - node_configs.push(conf); - } - node_configs - } - } - } - fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec; - async fn consensus_info(&self) -> Self::ConsensusInfo; - fn stop(&mut self); +fn node_address_from_port(port: u16) -> Multiaddr { + Swarm::multiaddr(std::net::Ipv4Addr::new(127, 0, 0, 1), port) } -#[derive(Clone)] -pub enum SpawnConfig { - // Star topology: Every node is initially connected to a single node. - Star { - consensus: ConsensusConfig, - da: DaConfig, - }, - // Chain topology: Every node is chained to the node next to it. - Chain { - consensus: ConsensusConfig, - da: DaConfig, - }, -} - -impl SpawnConfig { - // Returns a SpawnConfig::Chain with proper configurations for happy-path tests - pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self { - Self::Chain { - consensus: ConsensusConfig { - n_participants, - // by setting the active slot coeff close to 1, we also increase the probability of multiple blocks (forks) - // being produced in the same slot (epoch). Setting the security parameter to some value > 1 - // ensures nodes have some time to sync before deciding on the longest chain. - security_param: 10, - // a block should be produced (on average) every slot - active_slot_coeff: 0.9, - }, - da, - } - } - - pub fn star_happy(n_participants: usize, da: DaConfig) -> Self { - Self::Star { - consensus: ConsensusConfig { - n_participants, - // by setting the slot coeff to 1, we also increase the probability of multiple blocks (forks) - // being produced in the same slot (epoch). Setting the security parameter to some value > 1 - // ensures nodes have some time to sync before deciding on the longest chain. - security_param: 10, - // a block should be produced (on average) every slot - active_slot_coeff: 0.9, - }, - da, - } - } -} - -fn node_address(config: &Config) -> Multiaddr { - Swarm::multiaddr( - std::net::Ipv4Addr::new(127, 0, 0, 1), - config.network.backend.inner.port, +fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId { + PeerId::from_public_key( + &nomos_libp2p::ed25519::Keypair::from(node_key) + .public() + .into(), ) } - -#[derive(Clone)] -pub struct ConsensusConfig { - pub n_participants: usize, - pub security_param: u32, - pub active_slot_coeff: f64, -} - -#[derive(Clone)] -pub struct DaConfig { - pub subnetwork_size: usize, - pub dispersal_factor: usize, - pub num_samples: u16, - pub num_subnets: u16, - pub old_blobs_check_interval: Duration, - pub blobs_validity_duration: Duration, - pub global_params_path: String, -} - -impl Default for DaConfig { - fn default() -> Self { - Self { - subnetwork_size: 2, - dispersal_factor: 1, - num_samples: 1, - num_subnets: 2, - old_blobs_check_interval: Duration::from_secs(5), - blobs_validity_duration: Duration::from_secs(u64::MAX), - global_params_path: GLOBAL_PARAMS_PATH.to_string(), - } - } -} diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs new file mode 100644 index 00000000..a8152968 --- /dev/null +++ b/tests/src/nodes/executor.rs @@ -0,0 +1,237 @@ +use std::ops::Range; +use std::process::{Command, Stdio}; +use std::time::Duration; +use std::{net::SocketAddr, process::Child}; + +use cryptarchia_consensus::CryptarchiaSettings; +use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings}; +use nomos_da_dispersal::DispersalServiceSettings; +use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; +use nomos_da_indexer::IndexerSettings; +use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; +use nomos_da_network_service::{ + backends::libp2p::executor::DaNetworkExecutorBackendSettings, NetworkConfig as DaNetworkConfig, +}; +use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings; +use nomos_da_sampling::DaSamplingServiceSettings; +use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; +use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; +use nomos_da_verifier::DaVerifierServiceSettings; +use nomos_executor::api::backend::AxumBackendSettings; +use nomos_executor::config::Config; +use nomos_log::{LoggerBackend, LoggerFormat}; +use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; +use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE}; +use nomos_node::RocksBackendSettings; +use tempfile::NamedTempFile; + +use crate::nodes::LOGS_PREFIX; +use crate::topology::configs::GeneralConfig; +use crate::{adjust_timeout, get_available_port}; + +use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT}; + +const BIN_PATH: &str = "../target/debug/nomos-executor"; + +pub struct Executor { + addr: SocketAddr, + tempdir: tempfile::TempDir, + child: Child, + config: Config, +} + +impl Drop for Executor { + fn drop(&mut self) { + if std::thread::panicking() { + if let Err(e) = persist_tempdir(&mut self.tempdir, "nomos-executor") { + println!("failed to persist tempdir: {e}"); + } + } + + if let Err(e) = self.child.kill() { + println!("failed to kill the child process: {e}"); + } + } +} + +impl Executor { + pub async fn spawn(mut config: Config) -> Self { + let dir = create_tempdir().unwrap(); + let mut file = NamedTempFile::new().unwrap(); + let config_path = file.path().to_owned(); + + // setup logging so that we can intercept it later in testing + config.log.backend = LoggerBackend::File { + directory: dir.path().to_owned(), + prefix: Some(LOGS_PREFIX.into()), + }; + config.log.format = LoggerFormat::Json; + + config.storage.db_path = dir.path().join("db"); + config + .da_sampling + .storage_adapter_settings + .blob_storage_directory = dir.path().to_owned(); + config + .da_verifier + .storage_adapter_settings + .blob_storage_directory = dir.path().to_owned(); + config.da_indexer.storage.blob_storage_directory = dir.path().to_owned(); + + serde_yaml::to_writer(&mut file, &config).unwrap(); + let child = Command::new(std::env::current_dir().unwrap().join(BIN_PATH)) + .arg(&config_path) + .current_dir(dir.path()) + .stdout(Stdio::inherit()) + .spawn() + .unwrap(); + let node = Self { + addr: config.http.backend_settings.address, + child, + tempdir: dir, + config, + }; + tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async { + node.wait_online().await + }) + .await + .unwrap(); + + node + } + + pub async fn get_indexer_range( + &self, + app_id: [u8; 32], + range: Range<[u8; 8]>, + ) -> Vec<([u8; 8], Vec>)> { + CLIENT + .post(format!("http://{}{}", self.addr, DA_GET_RANGE)) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap()) + .send() + .await + .unwrap() + .json::>)>>() + .await + .unwrap() + } + + async fn wait_online(&self) { + loop { + let res = self.get(CL_METRICS).await; + if res.is_ok() && res.unwrap().status().is_success() { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + async fn get(&self, path: &str) -> reqwest::Result { + CLIENT + .get(format!("http://{}{}", self.addr, path)) + .send() + .await + } + + pub fn config(&self) -> &Config { + &self.config + } +} + +pub fn create_executor_config(config: GeneralConfig) -> Config { + Config { + network: NetworkConfig { + backend: Libp2pConfig { + inner: config.network_config.swarm_config, + initial_peers: config.network_config.initial_peers, + }, + }, + mix: nomos_mix_service::MixConfig { + backend: config.mix_config.backend, + }, + cryptarchia: CryptarchiaSettings { + notes: config.consensus_config.notes, + config: config.consensus_config.ledger_config, + genesis_state: config.consensus_config.genesis_state, + time: config.consensus_config.time, + transaction_selector_settings: (), + blob_selector_settings: (), + network_adapter_settings: + cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings { + topic: String::from(nomos_node::CONSENSUS_TOPIC), + }, + mix_adapter_settings: + cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings { + broadcast_settings: + nomos_mix_service::network::libp2p::Libp2pBroadcastSettings { + topic: String::from(nomos_node::CONSENSUS_TOPIC), + }, + }, + }, + da_network: DaNetworkConfig { + backend: DaNetworkExecutorBackendSettings { + validator_settings: DaNetworkBackendSettings { + node_key: config.da_config.node_key, + membership: config.da_config.membership, + addresses: config.da_config.addresses, + listening_address: config.da_config.listening_address, + }, + num_subnets: config.da_config.num_subnets, + }, + }, + da_indexer: IndexerSettings { + storage: IndexerStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, + }, + da_verifier: DaVerifierServiceSettings { + verifier_settings: KzgrsDaVerifierSettings { + sk: config.da_config.verifier_sk, + index: config.da_config.verifier_index, + global_params_path: config.da_config.global_params_path.clone(), + }, + network_adapter_settings: (), + storage_adapter_settings: VerifierStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, + }, + log: Default::default(), + http: nomos_api::ApiServiceSettings { + backend_settings: AxumBackendSettings { + address: format!("127.0.0.1:{}", get_available_port()) + .parse() + .unwrap(), + cors_origins: vec![], + }, + }, + da_sampling: DaSamplingServiceSettings { + sampling_settings: KzgrsSamplingBackendSettings { + num_samples: config.da_config.num_samples, + num_subnets: config.da_config.num_subnets, + old_blobs_check_interval: config.da_config.old_blobs_check_interval, + blobs_validity_duration: config.da_config.blobs_validity_duration, + }, + storage_adapter_settings: SamplingStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, + network_adapter_settings: (), + }, + storage: RocksBackendSettings { + db_path: "./db".into(), + read_only: false, + column_family: Some("blocks".into()), + }, + da_dispersal: DispersalServiceSettings { + backend: DispersalKZGRSBackendSettings { + encoder_settings: EncoderSettings { + num_columns: config.da_config.num_subnets as usize, + with_cache: false, + global_params_path: config.da_config.global_params_path, + }, + dispersal_timeout: Duration::from_secs(u64::MAX), + }, + }, + } +} diff --git a/tests/src/nodes/mod.rs b/tests/src/nodes/mod.rs index fa1985c1..f07087b8 100644 --- a/tests/src/nodes/mod.rs +++ b/tests/src/nodes/mod.rs @@ -1,9 +1,15 @@ -pub mod nomos; -pub use nomos::NomosNode; +pub mod executor; +pub mod validator; +use std::ops::Range; + +use once_cell::sync::Lazy; +use reqwest::Client; +use serde::{Deserialize, Serialize}; use tempfile::TempDir; const LOGS_PREFIX: &str = "__logs"; +static CLIENT: Lazy = Lazy::new(Client::new); fn create_tempdir() -> std::io::Result { // It's easier to use the current location instead of OS-default tempfile location @@ -24,3 +30,9 @@ fn persist_tempdir(tempdir: &mut TempDir, label: &str) -> std::io::Result<()> { let _ = dir.into_path(); Ok(()) } + +#[derive(Serialize, Deserialize)] +struct GetRangeReq { + pub app_id: [u8; 32], + pub range: Range<[u8; 8]>, +} diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs deleted file mode 100644 index 7bbe19ee..00000000 --- a/tests/src/nodes/nomos.rs +++ /dev/null @@ -1,499 +0,0 @@ -// std -use std::net::SocketAddr; -use std::ops::Range; -use std::process::{Child, Command, Stdio}; -use std::str::FromStr; -use std::time::Duration; -// crates -use blst::min_sig::SecretKey; -use cl::{InputWitness, NoteWitness, NullifierSecret}; -use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig}; -use cryptarchia_ledger::LedgerState; -use kzgrs_backend::dispersal::BlobInfo; -use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT}; -use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; -use nomos_da_indexer::IndexerSettings; -use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; -use nomos_da_network_service::NetworkConfig as DaNetworkConfig; -use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; -use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings; -use nomos_da_sampling::DaSamplingServiceSettings; -use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; -use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; -use nomos_da_verifier::DaVerifierServiceSettings; -use nomos_libp2p::{ed25519, Multiaddr, PeerId, SwarmConfig}; -use nomos_log::{LoggerBackend, LoggerFormat}; -use nomos_mempool::MempoolMetrics; -use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings; -use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; -use nomos_node::api::paths::{ - CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK, -}; -use nomos_node::{api::backend::AxumBackendSettings, Config, Tx}; -use nomos_storage::backends::rocksdb::RocksBackendSettings; -use once_cell::sync::Lazy; -use rand::{thread_rng, Rng}; -use reqwest::{Client, Url}; -use serde::{Deserialize, Serialize}; -use subnetworks_assignations::versions::v1::FillFromNodeList; -use subnetworks_assignations::MembershipHandler; -use tempfile::NamedTempFile; -use time::OffsetDateTime; -// internal -use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; -use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node}; - -static CLIENT: Lazy = Lazy::new(Client::new); -const NOMOS_BIN: &str = "../target/debug/nomos-node"; -const DEFAULT_SLOT_TIME: u64 = 2; -const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME"; - -pub struct NomosNode { - addr: SocketAddr, - _tempdir: tempfile::TempDir, - child: Child, - config: Config, -} - -impl Drop for NomosNode { - fn drop(&mut self) { - if std::thread::panicking() { - if let Err(e) = persist_tempdir(&mut self._tempdir, "nomos-node") { - println!("failed to persist tempdir: {e}"); - } - } - - if let Err(e) = self.child.kill() { - println!("failed to kill the child process: {e}"); - } - } -} -impl NomosNode { - pub async fn spawn_inner(mut config: Config) -> Self { - // Waku stores the messages in a db file in the current dir, we need a different - // directory for each node to avoid conflicts - let dir = create_tempdir().unwrap(); - let mut file = NamedTempFile::new().unwrap(); - let config_path = file.path().to_owned(); - - // setup logging so that we can intercept it later in testing - config.log.backend = LoggerBackend::File { - directory: dir.path().to_owned(), - prefix: Some(LOGS_PREFIX.into()), - }; - config.log.format = LoggerFormat::Json; - - config.storage.db_path = dir.path().join("db"); - config - .da_sampling - .storage_adapter_settings - .blob_storage_directory = dir.path().to_owned(); - config - .da_verifier - .storage_adapter_settings - .blob_storage_directory = dir.path().to_owned(); - config.da_indexer.storage.blob_storage_directory = dir.path().to_owned(); - - serde_yaml::to_writer(&mut file, &config).unwrap(); - let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN)) - .arg(&config_path) - .current_dir(dir.path()) - .stdout(Stdio::inherit()) - .spawn() - .unwrap(); - let node = Self { - addr: config.http.backend_settings.address, - child, - _tempdir: dir, - config, - }; - tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async { - node.wait_online().await - }) - .await - .unwrap(); - - node - } - - async fn get(&self, path: &str) -> reqwest::Result { - CLIENT - .get(format!("http://{}{}", self.addr, path)) - .send() - .await - } - - pub fn url(&self) -> Url { - format!("http://{}", self.addr).parse().unwrap() - } - - async fn wait_online(&self) { - loop { - let res = self.get(CL_METRICS).await; - if res.is_ok() && res.unwrap().status().is_success() { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - } - - pub async fn get_block(&self, id: HeaderId) -> Option> { - CLIENT - .post(format!("http://{}{}", self.addr, STORAGE_BLOCK)) - .header("Content-Type", "application/json") - .body(serde_json::to_string(&id).unwrap()) - .send() - .await - .unwrap() - .json::>>() - .await - .unwrap() - } - - pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics { - let discr = match pool { - Pool::Cl => "cl", - Pool::Da => "da", - }; - let addr = format!("/{}/metrics", discr); - let res = self - .get(&addr) - .await - .unwrap() - .json::() - .await - .unwrap(); - MempoolMetrics { - pending_items: res["pending_items"].as_u64().unwrap() as usize, - last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(), - } - } - - pub async fn get_indexer_range( - &self, - app_id: [u8; 32], - range: Range<[u8; 8]>, - ) -> Vec<([u8; 8], Vec>)> { - CLIENT - .post(format!("http://{}{}", self.addr, DA_GET_RANGE)) - .header("Content-Type", "application/json") - .body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap()) - .send() - .await - .unwrap() - .json::>)>>() - .await - .unwrap() - } - - // not async so that we can use this in `Drop` - pub fn get_logs_from_file(&self) -> String { - println!( - "fetching logs from dir {}...", - self._tempdir.path().display() - ); - // std::thread::sleep(std::time::Duration::from_secs(50)); - std::fs::read_dir(self._tempdir.path()) - .unwrap() - .filter_map(|entry| { - let entry = entry.unwrap(); - let path = entry.path(); - if path.is_file() && path.to_str().unwrap().contains(LOGS_PREFIX) { - Some(path) - } else { - None - } - }) - .map(|f| std::fs::read_to_string(f).unwrap()) - .collect::() - } - - pub fn config(&self) -> &Config { - &self.config - } - - pub async fn get_headers(&self, from: Option, to: Option) -> Vec { - let mut req = CLIENT.get(format!("http://{}{}", self.addr, CRYPTARCHIA_HEADERS)); - - if let Some(from) = from { - req = req.query(&[("from", from)]); - } - - if let Some(to) = to { - req = req.query(&[("to", to)]); - } - - let res = req.send().await; - - println!("res: {res:?}"); - - res.unwrap().json::>().await.unwrap() - } -} - -#[async_trait::async_trait] -impl Node for NomosNode { - type ConsensusInfo = CryptarchiaInfo; - - async fn spawn(config: Config) -> Self { - Self::spawn_inner(config).await - } - - async fn consensus_info(&self) -> Self::ConsensusInfo { - let res = self.get(CRYPTARCHIA_INFO).await; - println!("{:?}", res); - res.unwrap().json().await.unwrap() - } - - fn stop(&mut self) { - self.child.kill().unwrap(); - } - - /// Depending on the network topology, the next leader must be spawned first, - /// so the leader can receive votes from all other nodes that will be subsequently spawned. - /// If not, the leader will miss votes from nodes spawned before itself. - /// This issue will be resolved by devising the block catch-up mechanism in the future. - fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec { - // we use the same random bytes for: - // * da id - // * coin sk - // * coin nonce - let mut ids = vec![[0; 32]; consensus.n_participants]; - for id in &mut ids { - thread_rng().fill(id); - } - - let notes = ids - .iter() - .map(|&id| { - let mut sk = [0; 16]; - sk.copy_from_slice(&id[0..16]); - InputWitness::new( - NoteWitness::basic(1, NMO_UNIT, &mut thread_rng()), - NullifierSecret(sk), - ) - }) - .collect::>(); - // no commitments for now, proofs are not checked anyway - let genesis_state = LedgerState::from_commitments( - notes.iter().map(|n| n.note_commitment()), - (ids.len() as u32).into(), - ); - let ledger_config = cryptarchia_ledger::Config { - epoch_stake_distribution_stabilization: 3, - epoch_period_nonce_buffer: 3, - epoch_period_nonce_stabilization: 4, - consensus_config: cryptarchia_engine::Config { - security_param: consensus.security_param, - active_slot_coeff: consensus.active_slot_coeff, - }, - }; - let slot_duration = std::env::var(CONSENSUS_SLOT_TIME_VAR) - .map(|s| ::from_str(&s).unwrap()) - .unwrap_or(DEFAULT_SLOT_TIME); - let time_config = TimeConfig { - slot_duration: Duration::from_secs(slot_duration), - chain_start_time: OffsetDateTime::now_utc(), - }; - - #[allow(unused_mut, unused_variables)] - let mut configs = ids - .into_iter() - .zip(notes) - .enumerate() - .map(|(i, (da_id, coin))| { - create_node_config( - da_id, - genesis_state.clone(), - ledger_config.clone(), - vec![coin], - time_config.clone(), - da.clone(), - ) - }) - .collect::>(); - - // Build Mix membership - let mix_addresses = build_mix_peer_list(&configs); - for config in &mut configs { - config.mix.backend.membership = mix_addresses.clone(); - } - - // Build DA memberships and address lists. - let peer_addresses = build_da_peer_list(&configs); - let peer_ids = peer_addresses.iter().map(|(p, _)| *p).collect::>(); - - for config in &mut configs { - let membership = - FillFromNodeList::new(&peer_ids, da.subnetwork_size, da.dispersal_factor); - let local_peer_id = secret_key_to_peer_id(config.da_network.backend.node_key.clone()); - let subnetwork_ids = membership.membership(&local_peer_id); - config.da_verifier.verifier_settings.index = subnetwork_ids; - config.da_network.backend.membership = membership; - config.da_network.backend.addresses = peer_addresses.iter().cloned().collect(); - } - - configs - } -} - -pub enum Pool { - Da, - Cl, -} - -#[derive(Serialize, Deserialize)] -struct GetRangeReq { - pub app_id: [u8; 32], - pub range: Range<[u8; 8]>, -} - -fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId { - PeerId::from_public_key( - &nomos_libp2p::ed25519::Keypair::from(node_key) - .public() - .into(), - ) -} - -fn build_da_peer_list(configs: &[Config]) -> Vec<(PeerId, Multiaddr)> { - configs - .iter() - .map(|c| { - ( - secret_key_to_peer_id(c.da_network.backend.node_key.clone()), - c.da_network.backend.listening_address.clone(), - ) - }) - .collect() -} - -fn build_mix_peer_list(configs: &[Config]) -> Vec { - configs - .iter() - .map(|c| { - let peer_id = secret_key_to_peer_id(c.mix.backend.node_key.clone()); - c.mix - .backend - .listening_address - .clone() - .with_p2p(peer_id) - .unwrap_or_else(|orig_addr| orig_addr) - }) - .collect() -} - -#[allow(clippy::too_many_arguments)] -fn create_node_config( - id: [u8; 32], - genesis_state: LedgerState, - config: cryptarchia_ledger::Config, - notes: Vec, - time: TimeConfig, - da_config: DaConfig, -) -> Config { - let swarm_config: SwarmConfig = Default::default(); - let node_key = swarm_config.node_key.clone(); - - let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap(); - let verifier_sk_bytes = verifier_sk.to_bytes(); - - let mut config = Config { - network: NetworkConfig { - backend: Libp2pConfig { - inner: swarm_config, - initial_peers: vec![], - }, - }, - mix: nomos_mix_service::MixConfig { - backend: Libp2pMixBackendSettings { - listening_address: Multiaddr::from_str(&format!( - "/ip4/127.0.0.1/udp/{}/quic-v1", - get_available_port(), - )) - .unwrap(), - node_key: ed25519::SecretKey::generate(), - membership: Vec::new(), - peering_degree: 1, - num_mix_layers: 1, - }, - }, - cryptarchia: CryptarchiaSettings { - notes, - config, - genesis_state, - time, - transaction_selector_settings: (), - blob_selector_settings: (), - network_adapter_settings: - cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings { - topic: String::from(nomos_node::CONSENSUS_TOPIC), - }, - mix_adapter_settings: - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings { - broadcast_settings: - nomos_mix_service::network::libp2p::Libp2pBroadcastSettings { - topic: String::from(nomos_node::CONSENSUS_TOPIC), - }, - }, - }, - da_network: DaNetworkConfig { - backend: DaNetworkBackendSettings { - node_key, - listening_address: Multiaddr::from_str(&format!( - "/ip4/127.0.0.1/udp/{}/quic-v1", - get_available_port(), - )) - .unwrap(), - addresses: Default::default(), - membership: Default::default(), - }, - }, - da_indexer: IndexerSettings { - storage: IndexerStorageAdapterSettings { - blob_storage_directory: "./".into(), - }, - }, - da_verifier: DaVerifierServiceSettings { - verifier_settings: KzgrsDaVerifierSettings { - sk: hex::encode(verifier_sk_bytes), - index: Default::default(), - global_params_path: da_config.global_params_path, - }, - network_adapter_settings: (), - storage_adapter_settings: VerifierStorageAdapterSettings { - blob_storage_directory: "./".into(), - }, - }, - log: Default::default(), - http: nomos_api::ApiServiceSettings { - backend_settings: AxumBackendSettings { - address: format!("127.0.0.1:{}", get_available_port()) - .parse() - .unwrap(), - cors_origins: vec![], - }, - }, - da_sampling: DaSamplingServiceSettings { - sampling_settings: KzgrsSamplingBackendSettings { - num_samples: da_config.num_samples, - num_subnets: da_config.num_subnets, - old_blobs_check_interval: da_config.old_blobs_check_interval, - blobs_validity_duration: da_config.blobs_validity_duration, - }, - storage_adapter_settings: SamplingStorageAdapterSettings { - blob_storage_directory: "./".into(), - }, - network_adapter_settings: (), - }, - storage: RocksBackendSettings { - db_path: "./db".into(), - read_only: false, - column_family: Some("blocks".into()), - }, - }; - - config.network.backend.inner.port = get_available_port(); - - config -} diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs new file mode 100644 index 00000000..f65b5c1d --- /dev/null +++ b/tests/src/nodes/validator.rs @@ -0,0 +1,309 @@ +use std::ops::Range; +use std::process::{Command, Stdio}; +use std::time::Duration; +use std::{net::SocketAddr, process::Child}; + +use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings}; +use nomos_core::block::Block; +use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; +use nomos_da_indexer::IndexerSettings; +use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; +use nomos_da_network_service::NetworkConfig as DaNetworkConfig; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings; +use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSamplingServiceSettings}; +use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; +use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings}; +use nomos_log::{LoggerBackend, LoggerFormat}; +use nomos_mempool::MempoolMetrics; +use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; +use nomos_node::api::paths::{ + CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK, +}; +use nomos_node::{api::backend::AxumBackendSettings, Config, RocksBackendSettings}; +use nomos_node::{BlobInfo, HeaderId, Tx}; +use reqwest::Url; +use tempfile::NamedTempFile; + +use crate::nodes::LOGS_PREFIX; +use crate::topology::configs::GeneralConfig; +use crate::{adjust_timeout, get_available_port}; + +use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT}; + +const BIN_PATH: &str = "../target/debug/nomos-node"; + +pub enum Pool { + Da, + Cl, +} + +pub struct Validator { + addr: SocketAddr, + tempdir: tempfile::TempDir, + child: Child, + config: Config, +} + +impl Drop for Validator { + fn drop(&mut self) { + if std::thread::panicking() { + if let Err(e) = persist_tempdir(&mut self.tempdir, "nomos-node") { + println!("failed to persist tempdir: {e}"); + } + } + + if let Err(e) = self.child.kill() { + println!("failed to kill the child process: {e}"); + } + } +} + +impl Validator { + pub async fn spawn(mut config: Config) -> Self { + let dir = create_tempdir().unwrap(); + let mut file = NamedTempFile::new().unwrap(); + let config_path = file.path().to_owned(); + + // setup logging so that we can intercept it later in testing + config.log.backend = LoggerBackend::File { + directory: dir.path().to_owned(), + prefix: Some(LOGS_PREFIX.into()), + }; + config.log.format = LoggerFormat::Json; + + config.storage.db_path = dir.path().join("db"); + config + .da_sampling + .storage_adapter_settings + .blob_storage_directory = dir.path().to_owned(); + config + .da_verifier + .storage_adapter_settings + .blob_storage_directory = dir.path().to_owned(); + config.da_indexer.storage.blob_storage_directory = dir.path().to_owned(); + + serde_yaml::to_writer(&mut file, &config).unwrap(); + let child = Command::new(std::env::current_dir().unwrap().join(BIN_PATH)) + .arg(&config_path) + .current_dir(dir.path()) + .stdout(Stdio::inherit()) + .spawn() + .unwrap(); + let node = Self { + addr: config.http.backend_settings.address, + child, + tempdir: dir, + config, + }; + tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async { + node.wait_online().await + }) + .await + .unwrap(); + + node + } + + async fn get(&self, path: &str) -> reqwest::Result { + CLIENT + .get(format!("http://{}{}", self.addr, path)) + .send() + .await + } + + pub fn url(&self) -> Url { + format!("http://{}", self.addr).parse().unwrap() + } + + async fn wait_online(&self) { + loop { + let res = self.get(CL_METRICS).await; + if res.is_ok() && res.unwrap().status().is_success() { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + pub async fn get_block(&self, id: HeaderId) -> Option> { + CLIENT + .post(format!("http://{}{}", self.addr, STORAGE_BLOCK)) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&id).unwrap()) + .send() + .await + .unwrap() + .json::>>() + .await + .unwrap() + } + + pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics { + let discr = match pool { + Pool::Cl => "cl", + Pool::Da => "da", + }; + let addr = format!("/{}/metrics", discr); + let res = self + .get(&addr) + .await + .unwrap() + .json::() + .await + .unwrap(); + MempoolMetrics { + pending_items: res["pending_items"].as_u64().unwrap() as usize, + last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(), + } + } + + pub async fn get_indexer_range( + &self, + app_id: [u8; 32], + range: Range<[u8; 8]>, + ) -> Vec<([u8; 8], Vec>)> { + CLIENT + .post(format!("http://{}{}", self.addr, DA_GET_RANGE)) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap()) + .send() + .await + .unwrap() + .json::>)>>() + .await + .unwrap() + } + + // not async so that we can use this in `Drop` + pub fn get_logs_from_file(&self) -> String { + println!( + "fetching logs from dir {}...", + self.tempdir.path().display() + ); + // std::thread::sleep(std::time::Duration::from_secs(50)); + std::fs::read_dir(self.tempdir.path()) + .unwrap() + .filter_map(|entry| { + let entry = entry.unwrap(); + let path = entry.path(); + if path.is_file() && path.to_str().unwrap().contains(LOGS_PREFIX) { + Some(path) + } else { + None + } + }) + .map(|f| std::fs::read_to_string(f).unwrap()) + .collect::() + } + + pub fn config(&self) -> &Config { + &self.config + } + + pub async fn get_headers(&self, from: Option, to: Option) -> Vec { + let mut req = CLIENT.get(format!("http://{}{}", self.addr, CRYPTARCHIA_HEADERS)); + + if let Some(from) = from { + req = req.query(&[("from", from)]); + } + + if let Some(to) = to { + req = req.query(&[("to", to)]); + } + + let res = req.send().await; + + println!("res: {res:?}"); + + res.unwrap().json::>().await.unwrap() + } + + pub async fn consensus_info(&self) -> CryptarchiaInfo { + let res = self.get(CRYPTARCHIA_INFO).await; + println!("{:?}", res); + res.unwrap().json().await.unwrap() + } +} + +pub fn create_validator_config(config: GeneralConfig) -> Config { + Config { + network: NetworkConfig { + backend: Libp2pConfig { + inner: config.network_config.swarm_config, + initial_peers: config.network_config.initial_peers, + }, + }, + mix: nomos_mix_service::MixConfig { + backend: config.mix_config.backend, + }, + cryptarchia: CryptarchiaSettings { + notes: config.consensus_config.notes, + config: config.consensus_config.ledger_config, + genesis_state: config.consensus_config.genesis_state, + time: config.consensus_config.time, + transaction_selector_settings: (), + blob_selector_settings: (), + network_adapter_settings: + cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings { + topic: String::from(nomos_node::CONSENSUS_TOPIC), + }, + mix_adapter_settings: + cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings { + broadcast_settings: + nomos_mix_service::network::libp2p::Libp2pBroadcastSettings { + topic: String::from(nomos_node::CONSENSUS_TOPIC), + }, + }, + }, + da_network: DaNetworkConfig { + backend: DaNetworkBackendSettings { + node_key: config.da_config.node_key, + membership: config.da_config.membership, + addresses: config.da_config.addresses, + listening_address: config.da_config.listening_address, + }, + }, + da_indexer: IndexerSettings { + storage: IndexerStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, + }, + da_verifier: DaVerifierServiceSettings { + verifier_settings: KzgrsDaVerifierSettings { + sk: config.da_config.verifier_sk, + index: config.da_config.verifier_index, + global_params_path: config.da_config.global_params_path, + }, + network_adapter_settings: (), + storage_adapter_settings: VerifierStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, + }, + log: Default::default(), + http: nomos_api::ApiServiceSettings { + backend_settings: AxumBackendSettings { + address: format!("127.0.0.1:{}", get_available_port()) + .parse() + .unwrap(), + cors_origins: vec![], + }, + }, + da_sampling: DaSamplingServiceSettings { + sampling_settings: KzgrsSamplingBackendSettings { + num_samples: config.da_config.num_samples, + num_subnets: config.da_config.num_subnets, + old_blobs_check_interval: config.da_config.old_blobs_check_interval, + blobs_validity_duration: config.da_config.blobs_validity_duration, + }, + storage_adapter_settings: SamplingStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, + network_adapter_settings: (), + }, + storage: RocksBackendSettings { + db_path: "./db".into(), + read_only: false, + column_family: Some("blocks".into()), + }, + } +} diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs deleted file mode 100644 index 8f122180..00000000 --- a/tests/src/tests/cli.rs +++ /dev/null @@ -1,162 +0,0 @@ -use nomos_cli::cmds::disseminate::Disseminate; -use nomos_cli::da::network::backend::ExecutorBackend; -use nomos_cli::da::network::backend::ExecutorBackendSettings; -use nomos_da_network_service::NetworkConfig; -use nomos_libp2p::ed25519; -use nomos_libp2p::libp2p; -use nomos_libp2p::Multiaddr; -use nomos_libp2p::PeerId; -use std::collections::HashMap; -use std::time::Duration; -use subnetworks_assignations::versions::v1::FillFromNodeList; -use tempfile::NamedTempFile; -use tests::nodes::NomosNode; -use tests::Node; -use tests::SpawnConfig; -use tests::GLOBAL_PARAMS_PATH; - -const CLI_BIN: &str = "../target/debug/nomos-cli"; -const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"; - -use std::process::Command; - -fn run_disseminate(disseminate: &Disseminate) { - let mut binding = Command::new(CLI_BIN); - let c = binding - .args(["disseminate", "--network-config"]) - .arg(disseminate.network_config.as_os_str()) - .arg("--app-id") - .arg(&disseminate.app_id) - .arg("--index") - .arg(disseminate.index.to_string()) - .arg("--columns") - .arg(disseminate.columns.to_string()) - .arg("--timeout") - .arg(disseminate.timeout.to_string()) - .arg("--wait-until-disseminated") - .arg(disseminate.wait_until_disseminated.to_string()) - .arg("--node-addr") - .arg(disseminate.node_addr.as_ref().unwrap().as_str()) - .arg("--global-params-path") - .arg(GLOBAL_PARAMS_PATH.to_string()); - - match (&disseminate.data, &disseminate.file) { - (Some(data), None) => c.args(["--data", &data]), - (None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]), - (_, _) => panic!("Either data or file needs to be provided, but not both"), - }; - - c.status().expect("failed to execute nomos cli"); -} - -async fn disseminate(nodes: &Vec, config: &mut Disseminate) { - // Nomos Cli is acting as the first node when dispersing the data by using the key associated - // with that Nomos Node. - let first_config = nodes[0].config(); - let node_key = first_config.da_network.backend.node_key.clone(); - let node_addrs: HashMap = nodes - .iter() - .map(|n| { - let libp2p_config = &n.config().network.backend.inner; - let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from( - libp2p_config.node_key.clone(), - )); - let peer_id = PeerId::from(keypair.public()); - let address = n - .config() - .da_network - .backend - .listening_address - .clone() - .with_p2p(peer_id) - .unwrap(); - (peer_id, address) - }) - .collect(); - let membership = first_config.da_network.backend.membership.clone(); - let num_subnets = first_config.da_sampling.sampling_settings.num_subnets; - - let da_network_config: NetworkConfig> = NetworkConfig { - backend: ExecutorBackendSettings { - node_key, - membership, - node_addrs, - num_subnets, - }, - }; - - let mut file = NamedTempFile::new().unwrap(); - let config_path = file.path().to_owned(); - serde_yaml::to_writer(&mut file, &da_network_config).unwrap(); - - config.network_config = config_path; - config.node_addr = Some( - format!( - "http://{}", - nodes[0].config().http.backend_settings.address.clone() - ) - .parse() - .unwrap(), - ); - - run_disseminate(&config); -} - -#[tokio::test] -async fn disseminate_and_retrieve() { - let mut config = Disseminate { - data: Some("hello world".to_string()), - timeout: 60, - wait_until_disseminated: 5, - app_id: APP_ID.into(), - index: 0, - columns: 2, - ..Default::default() - }; - - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( - 2, - tests::DaConfig { - dispersal_factor: 2, - subnetwork_size: 2, - num_subnets: 2, - ..Default::default() - }, - )) - .await; - - disseminate(&nodes, &mut config).await; - tokio::time::sleep(Duration::from_secs(10)).await; - - let from = 0u64.to_be_bytes(); - let to = 1u64.to_be_bytes(); - let app_id = hex::decode(APP_ID).unwrap(); - - let node1_blobs = nodes[0] - .get_indexer_range(app_id.clone().try_into().unwrap(), from..to) - .await; - let node2_blobs = nodes[1] - .get_indexer_range(app_id.try_into().unwrap(), from..to) - .await; - - let node1_idx_0_blobs: Vec<_> = node1_blobs - .iter() - .filter(|(i, _)| i == &from) - .flat_map(|(_, blobs)| blobs) - .collect(); - let node2_idx_0_blobs: Vec<_> = node2_blobs - .iter() - .filter(|(i, _)| i == &from) - .flat_map(|(_, blobs)| blobs) - .collect(); - - // Index zero shouldn't be empty, node 2 replicated both blobs to node 1 because they both - // are in the same subnetwork. - for b in node1_idx_0_blobs.iter() { - assert!(!b.is_empty()) - } - - for b in node2_idx_0_blobs.iter() { - assert!(!b.is_empty()) - } -} diff --git a/tests/src/tests/cryptarchia/happy.rs b/tests/src/tests/cryptarchia/happy.rs index 0500f793..cc8d86f2 100644 --- a/tests/src/tests/cryptarchia/happy.rs +++ b/tests/src/tests/cryptarchia/happy.rs @@ -1,14 +1,18 @@ use futures::stream::{self, StreamExt}; use std::collections::HashSet; use std::time::Duration; -use tests::{adjust_timeout, nodes::NomosNode, Node, SpawnConfig}; +use tests::{ + adjust_timeout, + topology::{Topology, TopologyConfig}, +}; // how many times more than the expected time to produce a predefined number of blocks we wait before timing out const TIMEOUT_MULTIPLIER: f64 = 3.0; // how long we let the chain grow before checking the block at tip - k is the same in all chains const CHAIN_LENGTH_MULTIPLIER: u32 = 2; -async fn happy_test(nodes: &[NomosNode]) { +async fn happy_test(topology: &Topology) { + let nodes = topology.validators(); let config = nodes[0].config(); let security_param = config.cryptarchia.config.consensus_config.security_param; let n_blocks = security_param * CHAIN_LENGTH_MULTIPLIER; @@ -52,6 +56,6 @@ async fn happy_test(nodes: &[NomosNode]) { #[tokio::test] async fn two_nodes_happy() { - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await; - happy_test(&nodes).await; + let topology = Topology::spawn(TopologyConfig::two_validators()).await; + happy_test(&topology).await; } diff --git a/tests/src/tests/da.rs b/tests/src/tests/da.rs new file mode 100644 index 00000000..58201484 --- /dev/null +++ b/tests/src/tests/da.rs @@ -0,0 +1,73 @@ +use executor_http_client::ExecutorHttpClient; + +use reqwest::ClientBuilder; +use reqwest::Url; +use std::time::Duration; +use tests::nodes::executor::Executor; +use tests::topology::Topology; +use tests::topology::TopologyConfig; + +const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"; + +async fn disseminate(executor: &Executor) { + let executor_config = executor.config(); + + let client = ClientBuilder::new() + .build() + .expect("Client from default settings should be able to build"); + + let backend_address = executor_config.http.backend_settings.address; + let exec_url = Url::parse(&format!("http://{}", backend_address)).unwrap(); + let client = ExecutorHttpClient::new(client, exec_url); + + let data = [1u8; 31]; + + let app_id = hex::decode(APP_ID).unwrap(); + let metadata = kzgrs_backend::dispersal::Metadata::new(app_id.try_into().unwrap(), 0u64.into()); + client.publish_blob(data.to_vec(), metadata).await.unwrap(); +} + +#[tokio::test] +async fn disseminate_and_retrieve() { + let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await; + let executor = &topology.executors()[0]; + let validator = &topology.validators()[0]; + + tokio::time::sleep(Duration::from_secs(15)).await; + disseminate(executor).await; + tokio::time::sleep(Duration::from_secs(20)).await; + + let from = 0u64.to_be_bytes(); + let to = 1u64.to_be_bytes(); + let app_id = hex::decode(APP_ID).unwrap(); + + let executor_blobs = executor + .get_indexer_range(app_id.clone().try_into().unwrap(), from..to) + .await; + let validator_blobs = validator + .get_indexer_range(app_id.try_into().unwrap(), from..to) + .await; + + let executor_idx_0_blobs: Vec<_> = executor_blobs + .iter() + .filter(|(i, _)| i == &from) + .flat_map(|(_, blobs)| blobs) + .collect(); + let validator_idx_0_blobs: Vec<_> = validator_blobs + .iter() + .filter(|(i, _)| i == &from) + .flat_map(|(_, blobs)| blobs) + .collect(); + + // Index zero shouldn't be empty, validator replicated both blobs to executor because they both + // are in the same subnetwork. + assert!(executor_idx_0_blobs.len() == 2); + assert!(validator_idx_0_blobs.len() == 2); + for b in executor_idx_0_blobs.iter() { + assert!(!b.is_empty()) + } + + for b in validator_idx_0_blobs.iter() { + assert!(!b.is_empty()) + } +} diff --git a/tests/src/topology/configs/consensus.rs b/tests/src/topology/configs/consensus.rs new file mode 100644 index 00000000..fc3d4360 --- /dev/null +++ b/tests/src/topology/configs/consensus.rs @@ -0,0 +1,92 @@ +use std::str::FromStr; +use std::time::Duration; + +use cl::{InputWitness, NoteWitness, NullifierSecret}; +use cryptarchia_consensus::TimeConfig; +use cryptarchia_ledger::LedgerState; +use nomos_core::staking::NMO_UNIT; +use rand::thread_rng; +use time::OffsetDateTime; + +const DEFAULT_SLOT_TIME: u64 = 2; +const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME"; + +#[derive(Clone)] +pub struct ConsensusParams { + pub n_participants: usize, + pub security_param: u32, + pub active_slot_coeff: f64, +} + +impl ConsensusParams { + pub fn default_for_participants(n_participants: usize) -> Self { + ConsensusParams { + n_participants, + // by setting the slot coeff to 1, we also increase the probability of multiple blocks (forks) + // being produced in the same slot (epoch). Setting the security parameter to some value > 1 + // ensures nodes have some time to sync before deciding on the longest chain. + security_param: 10, + // a block should be produced (on average) every slot + active_slot_coeff: 0.9, + } + } +} + +/// General consensus configuration for a chosen participant, that later could be converted into a +/// specific service or services configuration. +#[derive(Clone)] +pub struct GeneralConsensusConfig { + pub notes: Vec, + pub ledger_config: cryptarchia_ledger::Config, + pub genesis_state: LedgerState, + pub time: TimeConfig, +} + +pub fn create_consensus_configs( + ids: &[[u8; 32]], + consensus_params: ConsensusParams, +) -> Vec { + let notes = ids + .iter() + .map(|&id| { + let mut sk = [0; 16]; + sk.copy_from_slice(&id[0..16]); + InputWitness::new( + NoteWitness::basic(1, NMO_UNIT, &mut thread_rng()), + NullifierSecret(sk), + ) + }) + .collect::>(); + + // no commitments for now, proofs are not checked anyway + let genesis_state = LedgerState::from_commitments( + notes.iter().map(|n| n.note_commitment()), + (ids.len() as u32).into(), + ); + let ledger_config = cryptarchia_ledger::Config { + epoch_stake_distribution_stabilization: 3, + epoch_period_nonce_buffer: 3, + epoch_period_nonce_stabilization: 4, + consensus_config: cryptarchia_engine::Config { + security_param: consensus_params.security_param, + active_slot_coeff: consensus_params.active_slot_coeff, + }, + }; + let slot_duration = std::env::var(CONSENSUS_SLOT_TIME_VAR) + .map(|s| ::from_str(&s).unwrap()) + .unwrap_or(DEFAULT_SLOT_TIME); + let time_config = TimeConfig { + slot_duration: Duration::from_secs(slot_duration), + chain_start_time: OffsetDateTime::now_utc(), + }; + + notes + .into_iter() + .map(|note| GeneralConsensusConfig { + notes: vec![note], + ledger_config: ledger_config.clone(), + genesis_state: genesis_state.clone(), + time: time_config.clone(), + }) + .collect() +} diff --git a/tests/src/topology/configs/da.rs b/tests/src/topology/configs/da.rs new file mode 100644 index 00000000..2abb9d84 --- /dev/null +++ b/tests/src/topology/configs/da.rs @@ -0,0 +1,141 @@ +use std::{ + collections::{HashMap, HashSet}, + env, + path::PathBuf, + str::FromStr, + time::Duration, +}; + +use nomos_libp2p::{ed25519, Multiaddr, PeerId}; +use nomos_node::NomosDaMembership; +use once_cell::sync::Lazy; +use subnetworks_assignations::MembershipHandler; + +use crate::{get_available_port, secret_key_to_peer_id}; + +pub static GLOBAL_PARAMS_PATH: Lazy = Lazy::new(|| { + let relative_path = "./kzgrs/kzgrs_test_params"; + let current_dir = env::current_dir().expect("Failed to get current directory"); + current_dir + .join(relative_path) + .canonicalize() + .expect("Failed to resolve absolute path") + .to_string_lossy() + .to_string() +}); + +#[derive(Clone)] +pub struct DaParams { + pub subnetwork_size: usize, + pub dispersal_factor: usize, + pub num_samples: u16, + pub num_subnets: u16, + pub old_blobs_check_interval: Duration, + pub blobs_validity_duration: Duration, + pub global_params_path: String, +} + +impl Default for DaParams { + fn default() -> Self { + Self { + subnetwork_size: 2, + dispersal_factor: 1, + num_samples: 1, + num_subnets: 2, + old_blobs_check_interval: Duration::from_secs(5), + blobs_validity_duration: Duration::from_secs(u64::MAX), + global_params_path: GLOBAL_PARAMS_PATH.to_string(), + } + } +} + +#[derive(Clone)] +pub struct GeneralDaConfig { + pub node_key: ed25519::SecretKey, + pub peer_id: PeerId, + pub membership: NomosDaMembership, + pub addresses: HashMap, + pub listening_address: Multiaddr, + pub blob_storage_directory: PathBuf, + pub global_params_path: String, + pub verifier_sk: String, + pub verifier_index: HashSet, + pub num_samples: u16, + pub num_subnets: u16, + pub old_blobs_check_interval: Duration, + pub blobs_validity_duration: Duration, +} + +pub fn create_da_configs(ids: &[[u8; 32]], da_params: DaParams) -> Vec { + let mut node_keys = vec![]; + let mut peer_ids = vec![]; + let mut listening_addresses = vec![]; + + for id in ids { + let mut node_key_bytes = *id; + let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes) + .expect("Failed to generate secret key from bytes"); + node_keys.push(node_key.clone()); + + let peer_id = secret_key_to_peer_id(node_key); + peer_ids.push(peer_id); + + let listening_address = Multiaddr::from_str(&format!( + "/ip4/127.0.0.1/udp/{}/quic-v1", + get_available_port(), + )) + .expect("Failed to create multiaddr"); + listening_addresses.push(listening_address); + } + + let membership = NomosDaMembership::new( + &peer_ids, + da_params.subnetwork_size, + da_params.dispersal_factor, + ); + + let addresses = build_da_peer_list(&peer_ids, &listening_addresses); + + ids.iter() + .zip(node_keys) + .enumerate() + .map(|(i, (id, node_key))| { + let blob_storage_directory = PathBuf::from(format!("/tmp/blob_storage_{}", i)); + let verifier_sk = blst::min_sig::SecretKey::key_gen(id, &[]).unwrap(); + let verifier_sk_bytes = verifier_sk.to_bytes(); + let peer_id = peer_ids[i]; + + let subnetwork_ids = membership.membership(&peer_id); + + GeneralDaConfig { + node_key: node_key.clone(), + peer_id, + membership: membership.clone(), + addresses: addresses.clone(), + listening_address: listening_addresses[i].clone(), + blob_storage_directory, + global_params_path: da_params.global_params_path.clone(), + verifier_sk: hex::encode(verifier_sk_bytes), + verifier_index: subnetwork_ids, + num_samples: da_params.num_samples, + num_subnets: da_params.num_subnets, + old_blobs_check_interval: da_params.old_blobs_check_interval, + blobs_validity_duration: da_params.blobs_validity_duration, + } + }) + .collect() +} + +fn build_da_peer_list( + peer_ids: &[PeerId], + listening_addresses: &[Multiaddr], +) -> HashMap { + peer_ids + .iter() + .zip(listening_addresses.iter()) + .map(|(peer_id, listening_address)| { + let p2p_addr = listening_address.clone().with_p2p(*peer_id).unwrap(); + (*peer_id, p2p_addr) + }) + .collect() +} diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/mix.rs new file mode 100644 index 00000000..abc396c9 --- /dev/null +++ b/tests/src/topology/configs/mix.rs @@ -0,0 +1,59 @@ +use std::str::FromStr; + +use nomos_libp2p::{ed25519, Multiaddr}; +use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings; + +use crate::{get_available_port, secret_key_to_peer_id}; + +#[derive(Clone)] +pub struct GeneralMixConfig { + pub backend: Libp2pMixBackendSettings, +} + +pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { + let mut configs: Vec = ids + .iter() + .map(|id| { + let mut node_key_bytes = *id; + let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes) + .expect("Failed to generate secret key from bytes"); + + GeneralMixConfig { + backend: Libp2pMixBackendSettings { + listening_address: Multiaddr::from_str(&format!( + "/ip4/127.0.0.1/udp/{}/quic-v1", + get_available_port(), + )) + .unwrap(), + node_key, + membership: Vec::new(), + peering_degree: 1, + num_mix_layers: 1, + }, + } + }) + .collect(); + + let membership = mix_membership(&configs); + + configs.iter_mut().for_each(|config| { + config.backend.membership = membership.clone(); + }); + + configs +} + +fn mix_membership(configs: &[GeneralMixConfig]) -> Vec { + configs + .iter() + .map(|config| { + let peer_id = secret_key_to_peer_id(config.backend.node_key.clone()); + config + .backend + .listening_address + .clone() + .with_p2p(peer_id) + .unwrap_or_else(|orig_addr| orig_addr) + }) + .collect() +} diff --git a/tests/src/topology/configs/mod.rs b/tests/src/topology/configs/mod.rs new file mode 100644 index 00000000..ebd1562a --- /dev/null +++ b/tests/src/topology/configs/mod.rs @@ -0,0 +1,17 @@ +pub mod consensus; +pub mod da; +pub mod mix; +pub mod network; + +use consensus::GeneralConsensusConfig; +use da::GeneralDaConfig; +use mix::GeneralMixConfig; +use network::GeneralNetworkConfig; + +#[derive(Clone)] +pub struct GeneralConfig { + pub consensus_config: GeneralConsensusConfig, + pub da_config: GeneralDaConfig, + pub network_config: GeneralNetworkConfig, + pub mix_config: GeneralMixConfig, +} diff --git a/tests/src/topology/configs/network.rs b/tests/src/topology/configs/network.rs new file mode 100644 index 00000000..b4768a86 --- /dev/null +++ b/tests/src/topology/configs/network.rs @@ -0,0 +1,83 @@ +use nomos_libp2p::{ed25519, Multiaddr, SwarmConfig}; + +use crate::{get_available_port, node_address_from_port}; + +#[derive(Default)] +pub enum Libp2pNetworkLayout { + #[default] + Star, + Chain, +} + +#[derive(Default)] +pub struct NetworkParams { + pub libp2p_network_layout: Libp2pNetworkLayout, +} + +#[derive(Clone)] +pub struct GeneralNetworkConfig { + pub swarm_config: SwarmConfig, + pub initial_peers: Vec, +} + +pub fn create_network_configs( + ids: &[[u8; 32]], + network_params: NetworkParams, +) -> Vec { + let swarm_configs: Vec = ids + .iter() + .map(|id| { + let mut node_key_bytes = *id; + let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes) + .expect("Failed to generate secret key from bytes"); + + SwarmConfig { + node_key, + port: get_available_port(), + ..Default::default() + } + }) + .collect(); + + let all_initial_peers = initial_peers_by_network_layout(swarm_configs.clone(), network_params); + + swarm_configs + .iter() + .zip(all_initial_peers) + .map(|(swarm_config, initial_peers)| GeneralNetworkConfig { + swarm_config: swarm_config.to_owned(), + initial_peers, + }) + .collect() +} + +fn initial_peers_by_network_layout( + mut swarm_configs: Vec, + network_params: NetworkParams, +) -> Vec> { + let mut all_initial_peers = vec![]; + let first_swarm = swarm_configs.remove(0); + let first_addr = node_address_from_port(first_swarm.port); + + match network_params.libp2p_network_layout { + Libp2pNetworkLayout::Star => { + let other_initial_peers = vec![first_addr]; + all_initial_peers.push(vec![]); // First node has no initial peers. + + for _ in swarm_configs { + all_initial_peers.push(other_initial_peers.clone()); + } + } + Libp2pNetworkLayout::Chain => { + let mut prev_addr = first_addr; + all_initial_peers.push(vec![]); // First node has no initial peers. + + for swarm in swarm_configs { + all_initial_peers.push(vec![prev_addr]); + prev_addr = node_address_from_port(swarm.port); + } + } + } + + all_initial_peers +} diff --git a/tests/src/topology/mod.rs b/tests/src/topology/mod.rs new file mode 100644 index 00000000..5681c703 --- /dev/null +++ b/tests/src/topology/mod.rs @@ -0,0 +1,115 @@ +pub mod configs; + +use configs::{ + da::{create_da_configs, DaParams}, + network::{create_network_configs, NetworkParams}, + GeneralConfig, +}; +use rand::{thread_rng, Rng}; + +use crate::{ + nodes::{ + executor::{create_executor_config, Executor}, + validator::{create_validator_config, Validator}, + }, + topology::configs::{ + consensus::{create_consensus_configs, ConsensusParams}, + mix::create_mix_configs, + }, +}; + +pub struct TopologyConfig { + n_validators: usize, + n_executors: usize, + consensus_params: ConsensusParams, + da_params: DaParams, + network_params: NetworkParams, +} + +impl TopologyConfig { + pub fn two_validators() -> TopologyConfig { + TopologyConfig { + n_validators: 2, + n_executors: 0, + consensus_params: ConsensusParams::default_for_participants(2), + da_params: Default::default(), + network_params: Default::default(), + } + } + + pub fn validator_and_executor() -> TopologyConfig { + TopologyConfig { + n_validators: 1, + n_executors: 1, + consensus_params: ConsensusParams::default_for_participants(2), + da_params: DaParams { + dispersal_factor: 2, + subnetwork_size: 2, + num_subnets: 2, + ..Default::default() + }, + network_params: Default::default(), + } + } +} + +pub struct Topology { + validators: Vec, + executors: Vec, +} + +impl Topology { + pub async fn spawn(config: TopologyConfig) -> Self { + let n_participants = config.n_validators + config.n_executors; + + // we use the same random bytes for: + // * da id + // * coin sk + // * coin nonce + // * libp2p node key + let mut ids = vec![[0; 32]; n_participants]; + for id in &mut ids { + thread_rng().fill(id); + } + + let consensus_configs = create_consensus_configs(&ids, config.consensus_params); + let da_configs = create_da_configs(&ids, config.da_params); + let network_configs = create_network_configs(&ids, config.network_params); + let mix_configs = create_mix_configs(&ids); + + let mut validators = Vec::new(); + for i in 0..config.n_validators { + let config = create_validator_config(GeneralConfig { + consensus_config: consensus_configs[i].to_owned(), + da_config: da_configs[i].to_owned(), + network_config: network_configs[i].to_owned(), + mix_config: mix_configs[i].to_owned(), + }); + validators.push(Validator::spawn(config).await) + } + + let mut executors = Vec::new(); + for i in config.n_validators..n_participants { + let config = create_executor_config(GeneralConfig { + consensus_config: consensus_configs[i].to_owned(), + da_config: da_configs[i].to_owned(), + network_config: network_configs[i].to_owned(), + mix_config: mix_configs[i].to_owned(), + }); + executors.push(Executor::spawn(config).await) + } + + Self { + validators, + executors, + } + } + + pub fn validators(&self) -> &[Validator] { + &self.validators + } + + pub fn executors(&self) -> &[Executor] { + &self.executors + } +}