From ab026f67faf6e1da68922356822f45dde4bb241d Mon Sep 17 00:00:00 2001 From: andrussal Date: Wed, 10 Dec 2025 07:41:14 +0100 Subject: [PATCH] Refactor topology module into separated config/generation/deployment/readiness --- testing-framework/core/src/topology/config.rs | 357 ++++ .../core/src/topology/deployment.rs | 235 +++ .../core/src/topology/generation.rs | 293 ++++ testing-framework/core/src/topology/mod.rs | 1478 +---------------- .../core/src/topology/readiness/mod.rs | 425 +++++ testing-framework/core/src/topology/utils.rs | 106 ++ 6 files changed, 1429 insertions(+), 1465 deletions(-) create mode 100644 testing-framework/core/src/topology/config.rs create mode 100644 testing-framework/core/src/topology/deployment.rs create mode 100644 testing-framework/core/src/topology/generation.rs create mode 100644 testing-framework/core/src/topology/readiness/mod.rs create mode 100644 testing-framework/core/src/topology/utils.rs diff --git a/testing-framework/core/src/topology/config.rs b/testing-framework/core/src/topology/config.rs new file mode 100644 index 0000000..c9022c9 --- /dev/null +++ b/testing-framework/core/src/topology/config.rs @@ -0,0 +1,357 @@ +use std::time::Duration; + +use nomos_core::{ + mantle::GenesisTx as _, + sdp::{Locator, ServiceType}, +}; +use nomos_da_network_core::swarm::DAConnectionPolicySettings; +use testing_framework_config::topology::configs::{ + api::create_api_configs, + blend::create_blend_configs, + bootstrap::{SHORT_PROLONGED_BOOTSTRAP_PERIOD, create_bootstrap_configs}, + consensus::{ + ConsensusParams, ProviderInfo, create_consensus_configs, + create_genesis_tx_with_declarations, + }, + da::{DaParams, create_da_configs}, + network::{Libp2pNetworkLayout, NetworkParams, create_network_configs}, + tracing::create_tracing_configs, + wallet::WalletConfig, +}; + +use crate::topology::{ + GeneratedNodeConfig, GeneratedTopology, NodeRole, + configs::{GeneralConfig, time::default_time_config}, + create_kms_configs, resolve_ids, resolve_ports, +}; + +/// High-level topology settings used to generate node configs for a scenario. +#[derive(Clone)] +pub struct TopologyConfig { + pub n_validators: usize, + pub n_executors: usize, + pub consensus_params: ConsensusParams, + pub da_params: DaParams, + pub network_params: NetworkParams, + pub wallet_config: WalletConfig, +} + +impl TopologyConfig { + /// Create a config with zero nodes; counts must be set before building. + #[must_use] + pub fn empty() -> Self { + Self { + n_validators: 0, + n_executors: 0, + consensus_params: ConsensusParams::default_for_participants(1), + da_params: DaParams::default(), + network_params: NetworkParams::default(), + wallet_config: WalletConfig::default(), + } + } + + #[must_use] + /// Convenience config with two validators for consensus-only scenarios. + pub fn two_validators() -> Self { + Self { + n_validators: 2, + n_executors: 0, + consensus_params: ConsensusParams::default_for_participants(2), + da_params: DaParams::default(), + network_params: NetworkParams::default(), + wallet_config: WalletConfig::default(), + } + } + + #[must_use] + /// Single validator + single executor config for minimal dual-role setups. + pub fn validator_and_executor() -> Self { + Self { + n_validators: 1, + n_executors: 1, + consensus_params: ConsensusParams::default_for_participants(2), + da_params: DaParams { + dispersal_factor: 2, + subnetwork_size: 2, + num_subnets: 2, + policy_settings: DAConnectionPolicySettings { + min_dispersal_peers: 1, + min_replication_peers: 1, + max_dispersal_failures: 0, + max_sampling_failures: 0, + max_replication_failures: 0, + malicious_threshold: 0, + }, + balancer_interval: Duration::from_secs(1), + ..Default::default() + }, + network_params: NetworkParams::default(), + wallet_config: WalletConfig::default(), + } + } + + #[must_use] + /// Build a topology with explicit validator and executor counts. + pub fn with_node_numbers(validators: usize, executors: usize) -> Self { + let participants = validators + executors; + assert!(participants > 0, "topology must include at least one node"); + + let mut da_params = DaParams::default(); + let da_nodes = participants; + if da_nodes <= 1 { + da_params.subnetwork_size = 1; + da_params.num_subnets = 1; + da_params.dispersal_factor = 1; + da_params.policy_settings.min_dispersal_peers = 0; + da_params.policy_settings.min_replication_peers = 0; + } else { + let dispersal = da_nodes.min(da_params.dispersal_factor.max(2)); + da_params.dispersal_factor = dispersal; + da_params.subnetwork_size = da_params.subnetwork_size.max(dispersal); + da_params.num_subnets = da_params.subnetwork_size as u16; + let min_peers = dispersal.saturating_sub(1).max(1); + da_params.policy_settings.min_dispersal_peers = min_peers; + da_params.policy_settings.min_replication_peers = min_peers; + da_params.balancer_interval = Duration::from_secs(1); + } + + Self { + n_validators: validators, + n_executors: executors, + consensus_params: ConsensusParams::default_for_participants(participants), + da_params, + network_params: NetworkParams::default(), + wallet_config: WalletConfig::default(), + } + } + + #[must_use] + /// Build a topology with one executor and a configurable validator set. + pub fn validators_and_executor( + num_validators: usize, + num_subnets: usize, + dispersal_factor: usize, + ) -> Self { + Self { + n_validators: num_validators, + n_executors: 1, + consensus_params: ConsensusParams::default_for_participants(num_validators + 1), + da_params: DaParams { + dispersal_factor, + subnetwork_size: num_subnets, + num_subnets: num_subnets as u16, + policy_settings: DAConnectionPolicySettings { + min_dispersal_peers: num_subnets, + min_replication_peers: dispersal_factor - 1, + max_dispersal_failures: 0, + max_sampling_failures: 0, + max_replication_failures: 0, + malicious_threshold: 0, + }, + balancer_interval: Duration::from_secs(5), + ..Default::default() + }, + network_params: NetworkParams::default(), + wallet_config: WalletConfig::default(), + } + } + + #[must_use] + pub const fn wallet(&self) -> &WalletConfig { + &self.wallet_config + } +} + +/// Builder that produces `GeneratedTopology` instances from a `TopologyConfig`. +#[derive(Clone)] +pub struct TopologyBuilder { + config: TopologyConfig, + ids: Option>, + da_ports: Option>, + blend_ports: Option>, +} + +impl TopologyBuilder { + #[must_use] + /// Create a builder from a base topology config. + pub const fn new(config: TopologyConfig) -> Self { + Self { + config, + ids: None, + da_ports: None, + blend_ports: None, + } + } + + #[must_use] + /// Provide deterministic node IDs. + pub fn with_ids(mut self, ids: Vec<[u8; 32]>) -> Self { + self.ids = Some(ids); + self + } + + #[must_use] + /// Override DA ports for nodes in order. + pub fn with_da_ports(mut self, ports: Vec) -> Self { + self.da_ports = Some(ports); + self + } + + #[must_use] + /// Override blend ports for nodes in order. + pub fn with_blend_ports(mut self, ports: Vec) -> Self { + self.blend_ports = Some(ports); + self + } + + #[must_use] + pub const fn with_validator_count(mut self, validators: usize) -> Self { + self.config.n_validators = validators; + self + } + + #[must_use] + /// Set executor count. + pub const fn with_executor_count(mut self, executors: usize) -> Self { + self.config.n_executors = executors; + self + } + + #[must_use] + /// Set validator and executor counts together. + pub const fn with_node_counts(mut self, validators: usize, executors: usize) -> Self { + self.config.n_validators = validators; + self.config.n_executors = executors; + self + } + + #[must_use] + /// Configure the libp2p network layout. + pub const fn with_network_layout(mut self, layout: Libp2pNetworkLayout) -> Self { + self.config.network_params.libp2p_network_layout = layout; + self + } + + /// Override wallet configuration used in genesis. + pub fn with_wallet_config(mut self, wallet: WalletConfig) -> Self { + self.config.wallet_config = wallet; + self + } + + #[must_use] + /// Finalize and generate topology and node descriptors. + pub fn build(self) -> GeneratedTopology { + let Self { + config, + ids, + da_ports, + blend_ports, + } = self; + + let n_participants = config.n_validators + config.n_executors; + assert!(n_participants > 0, "topology must have at least one node"); + + let ids = resolve_ids(ids, n_participants); + let da_ports = resolve_ports(da_ports, n_participants, "DA"); + let blend_ports = resolve_ports(blend_ports, n_participants, "Blend"); + + let mut consensus_configs = + create_consensus_configs(&ids, &config.consensus_params, &config.wallet_config); + let bootstrapping_config = create_bootstrap_configs(&ids, SHORT_PROLONGED_BOOTSTRAP_PERIOD); + let da_configs = create_da_configs(&ids, &config.da_params, &da_ports); + let network_configs = create_network_configs(&ids, &config.network_params); + let blend_configs = create_blend_configs(&ids, &blend_ports); + let api_configs = create_api_configs(&ids); + let tracing_configs = create_tracing_configs(&ids); + let time_config = default_time_config(); + + let mut providers: Vec<_> = da_configs + .iter() + .enumerate() + .map(|(i, da_conf)| ProviderInfo { + service_type: ServiceType::DataAvailability, + provider_sk: da_conf.signer.clone(), + zk_sk: da_conf.secret_zk_key.clone(), + locator: Locator(da_conf.listening_address.clone()), + note: consensus_configs[0].da_notes[i].clone(), + }) + .collect(); + providers.extend( + blend_configs + .iter() + .enumerate() + .map(|(i, blend_conf)| ProviderInfo { + service_type: ServiceType::BlendNetwork, + provider_sk: blend_conf.signer.clone(), + zk_sk: blend_conf.secret_zk_key.clone(), + locator: Locator(blend_conf.backend_core.listening_address.clone()), + note: consensus_configs[0].blend_notes[i].clone(), + }), + ); + + let ledger_tx = consensus_configs[0] + .genesis_tx + .mantle_tx() + .ledger_tx + .clone(); + let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers); + for c in &mut consensus_configs { + c.genesis_tx = genesis_tx.clone(); + } + + let kms_configs = + create_kms_configs(&blend_configs, &da_configs, &config.wallet_config.accounts); + + let mut validators = Vec::with_capacity(config.n_validators); + let mut executors = Vec::with_capacity(config.n_executors); + + for i in 0..n_participants { + let general = GeneralConfig { + consensus_config: consensus_configs[i].clone(), + bootstrapping_config: bootstrapping_config[i].clone(), + da_config: da_configs[i].clone(), + network_config: network_configs[i].clone(), + blend_config: blend_configs[i].clone(), + api_config: api_configs[i].clone(), + tracing_config: tracing_configs[i].clone(), + time_config: time_config.clone(), + kms_config: kms_configs[i].clone(), + }; + + let role = if i < config.n_validators { + NodeRole::Validator + } else { + NodeRole::Executor + }; + let index = match role { + NodeRole::Validator => i, + NodeRole::Executor => i - config.n_validators, + }; + + let descriptor = GeneratedNodeConfig { + role, + index, + id: ids[i], + general, + da_port: da_ports[i], + blend_port: blend_ports[i], + }; + + match role { + NodeRole::Validator => validators.push(descriptor), + NodeRole::Executor => executors.push(descriptor), + } + } + + GeneratedTopology { + config, + validators, + executors, + } + } + + #[must_use] + pub const fn config(&self) -> &TopologyConfig { + &self.config + } +} diff --git a/testing-framework/core/src/topology/deployment.rs b/testing-framework/core/src/topology/deployment.rs new file mode 100644 index 0000000..58b1b17 --- /dev/null +++ b/testing-framework/core/src/topology/deployment.rs @@ -0,0 +1,235 @@ +use std::collections::HashSet; + +use nomos_core::sdp::SessionNumber; + +use crate::{ + nodes::{ + executor::{Executor, create_executor_config}, + validator::{Validator, create_validator_config}, + }, + topology::{ + TopologyBuilder, TopologyConfig, + configs::GeneralConfig, + generation::find_expected_peer_counts, + readiness::{ + DaBalancerReadiness, MembershipReadiness, NetworkReadiness, ReadinessCheck, + ReadinessError, + }, + utils::multiaddr_port, + }, +}; + +/// Runtime representation of a spawned topology with running nodes. +pub struct Topology { + pub(crate) validators: Vec, + pub(crate) executors: Vec, +} + +impl Topology { + pub async fn spawn(config: TopologyConfig) -> Self { + let generated = TopologyBuilder::new(config.clone()).build(); + let n_validators = config.n_validators; + let n_executors = config.n_executors; + let node_configs = generated + .nodes() + .map(|node| node.general.clone()) + .collect::>(); + + let (validators, executors) = + Self::spawn_validators_executors(node_configs, n_validators, n_executors).await; + + Self { + validators, + executors, + } + } + + pub async fn spawn_with_empty_membership( + config: TopologyConfig, + ids: &[[u8; 32]], + da_ports: &[u16], + blend_ports: &[u16], + ) -> Self { + let generated = TopologyBuilder::new(config.clone()) + .with_ids(ids.to_vec()) + .with_da_ports(da_ports.to_vec()) + .with_blend_ports(blend_ports.to_vec()) + .build(); + + let node_configs = generated + .nodes() + .map(|node| node.general.clone()) + .collect::>(); + + let (validators, executors) = + Self::spawn_validators_executors(node_configs, config.n_validators, config.n_executors) + .await; + + Self { + validators, + executors, + } + } + + pub(crate) async fn spawn_validators_executors( + config: Vec, + n_validators: usize, + n_executors: usize, + ) -> (Vec, Vec) { + let mut validators = Vec::new(); + for i in 0..n_validators { + let config = create_validator_config(config[i].clone()); + validators.push(Validator::spawn(config).await.unwrap()); + } + + let mut executors = Vec::new(); + for i in 0..n_executors { + let config = create_executor_config(config[n_validators + i].clone()); + executors.push(Executor::spawn(config).await); + } + + (validators, executors) + } + + #[must_use] + pub fn validators(&self) -> &[Validator] { + &self.validators + } + + #[must_use] + pub fn executors(&self) -> &[Executor] { + &self.executors + } + + pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> { + let listen_ports = self.node_listen_ports(); + if listen_ports.len() <= 1 { + return Ok(()); + } + + let initial_peer_ports = self.node_initial_peer_ports(); + let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports); + let labels = self.node_labels(); + + let check = NetworkReadiness { + topology: self, + expected_peer_counts: &expected_peer_counts, + labels: &labels, + }; + + check.wait().await?; + Ok(()) + } + + pub async fn wait_da_balancer_ready(&self) -> Result<(), ReadinessError> { + if self.validators.is_empty() && self.executors.is_empty() { + return Ok(()); + } + + let labels = self.node_labels(); + let check = DaBalancerReadiness { + topology: self, + labels: &labels, + }; + + check.wait().await?; + Ok(()) + } + + pub async fn wait_membership_ready(&self) -> Result<(), ReadinessError> { + self.wait_membership_ready_for_session(SessionNumber::from(0u64)) + .await + } + + pub async fn wait_membership_ready_for_session( + &self, + session: SessionNumber, + ) -> Result<(), ReadinessError> { + self.wait_membership_assignations(session, true).await + } + + pub async fn wait_membership_empty_for_session( + &self, + session: SessionNumber, + ) -> Result<(), ReadinessError> { + self.wait_membership_assignations(session, false).await + } + + async fn wait_membership_assignations( + &self, + session: SessionNumber, + expect_non_empty: bool, + ) -> Result<(), ReadinessError> { + let total_nodes = self.validators.len() + self.executors.len(); + + if total_nodes == 0 { + return Ok(()); + } + + let labels = self.node_labels(); + let check = MembershipReadiness { + topology: self, + session, + labels: &labels, + expect_non_empty, + }; + + check.wait().await?; + Ok(()) + } + + fn node_listen_ports(&self) -> Vec { + self.validators + .iter() + .map(|node| node.config().network.backend.inner.port) + .chain( + self.executors + .iter() + .map(|node| node.config().network.backend.inner.port), + ) + .collect() + } + + fn node_initial_peer_ports(&self) -> Vec> { + self.validators + .iter() + .map(|node| { + node.config() + .network + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>() + }) + .chain(self.executors.iter().map(|node| { + node.config() + .network + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>() + })) + .collect() + } + + fn node_labels(&self) -> Vec { + self.validators + .iter() + .enumerate() + .map(|(idx, node)| { + format!( + "validator#{idx}@{}", + node.config().network.backend.inner.port + ) + }) + .chain(self.executors.iter().enumerate().map(|(idx, node)| { + format!( + "executor#{idx}@{}", + node.config().network.backend.inner.port + ) + })) + .collect() + } +} diff --git a/testing-framework/core/src/topology/generation.rs b/testing-framework/core/src/topology/generation.rs new file mode 100644 index 0000000..067dfb4 --- /dev/null +++ b/testing-framework/core/src/topology/generation.rs @@ -0,0 +1,293 @@ +use std::{collections::HashSet, time::Duration}; + +use reqwest::{Client, Url}; + +use crate::topology::{ + Topology, TopologyConfig, + configs::{GeneralConfig, wallet::WalletAccount}, + readiness::{HttpMembershipReadiness, HttpNetworkReadiness, ReadinessCheck, ReadinessError}, + utils::multiaddr_port, +}; + +/// Node role within the generated topology. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum NodeRole { + Validator, + Executor, +} + +/// Fully generated configuration for an individual node. +#[derive(Clone)] +pub struct GeneratedNodeConfig { + pub role: NodeRole, + pub index: usize, + pub id: [u8; 32], + pub general: GeneralConfig, + pub da_port: u16, + pub blend_port: u16, +} + +impl GeneratedNodeConfig { + #[must_use] + /// Logical role of the node. + pub const fn role(&self) -> NodeRole { + self.role + } + + #[must_use] + /// Zero-based index within its role group. + pub const fn index(&self) -> usize { + self.index + } + + #[must_use] + pub const fn network_port(&self) -> u16 { + self.general.network_config.backend.inner.port + } + + #[must_use] + pub const fn api_port(&self) -> u16 { + self.general.api_config.address.port() + } + + #[must_use] + pub const fn testing_http_port(&self) -> u16 { + self.general.api_config.testing_http_address.port() + } +} + +/// Collection of generated node configs and helpers to spawn or probe the +/// stack. +#[derive(Clone)] +pub struct GeneratedTopology { + pub(crate) config: TopologyConfig, + pub(crate) validators: Vec, + pub(crate) executors: Vec, +} + +impl GeneratedTopology { + #[must_use] + /// Underlying configuration used to derive the generated nodes. + pub const fn config(&self) -> &TopologyConfig { + &self.config + } + + #[must_use] + /// All validator configs. + pub fn validators(&self) -> &[GeneratedNodeConfig] { + &self.validators + } + + #[must_use] + /// All executor configs. + pub fn executors(&self) -> &[GeneratedNodeConfig] { + &self.executors + } + + /// Iterator over all node configs in role order. + pub fn nodes(&self) -> impl Iterator { + self.validators.iter().chain(self.executors.iter()) + } + + #[must_use] + /// Slot duration from the first node (assumes homogeneous configs). + pub fn slot_duration(&self) -> Option { + self.validators + .first() + .map(|node| node.general.time_config.slot_duration) + } + + #[must_use] + /// Wallet accounts configured for this topology. + pub fn wallet_accounts(&self) -> &[WalletAccount] { + &self.config.wallet_config.accounts + } + + pub async fn spawn_local(&self) -> Topology { + let configs = self + .nodes() + .map(|node| node.general.clone()) + .collect::>(); + + let (validators, executors) = Topology::spawn_validators_executors( + configs, + self.config.n_validators, + self.config.n_executors, + ) + .await; + + Topology { + validators, + executors, + } + } + + pub async fn wait_remote_readiness( + &self, + validator_endpoints: &[Url], + executor_endpoints: &[Url], + validator_membership_endpoints: Option<&[Url]>, + executor_membership_endpoints: Option<&[Url]>, + ) -> Result<(), ReadinessError> { + let total_nodes = self.validators.len() + self.executors.len(); + if total_nodes == 0 { + return Ok(()); + } + + assert_eq!( + self.validators.len(), + validator_endpoints.len(), + "validator endpoints must match topology" + ); + assert_eq!( + self.executors.len(), + executor_endpoints.len(), + "executor endpoints must match topology" + ); + + let mut endpoints = Vec::with_capacity(total_nodes); + endpoints.extend_from_slice(validator_endpoints); + endpoints.extend_from_slice(executor_endpoints); + + let labels = self.labels(); + let client = Client::new(); + let make_testing_base_url = |port: u16| -> Url { + Url::parse(&format!("http://127.0.0.1:{port}/")) + .expect("failed to construct local testing base url") + }; + + if endpoints.len() > 1 { + let listen_ports = self.listen_ports(); + let initial_peer_ports = self.initial_peer_ports(); + let expected_peer_counts = + find_expected_peer_counts(&listen_ports, &initial_peer_ports); + let network_check = HttpNetworkReadiness { + client: &client, + endpoints: &endpoints, + expected_peer_counts: &expected_peer_counts, + labels: &labels, + }; + + network_check.wait().await?; + } + + let mut membership_endpoints = Vec::with_capacity(total_nodes); + if let Some(urls) = validator_membership_endpoints { + assert_eq!( + self.validators.len(), + urls.len(), + "validator membership endpoints must match topology" + ); + membership_endpoints.extend_from_slice(urls); + } else { + membership_endpoints.extend( + self.validators + .iter() + .map(|node| make_testing_base_url(node.testing_http_port())), + ); + } + + if let Some(urls) = executor_membership_endpoints { + assert_eq!( + self.executors.len(), + urls.len(), + "executor membership endpoints must match topology" + ); + membership_endpoints.extend_from_slice(urls); + } else { + membership_endpoints.extend( + self.executors + .iter() + .map(|node| make_testing_base_url(node.testing_http_port())), + ); + } + + let membership_check = HttpMembershipReadiness { + client: &client, + endpoints: &membership_endpoints, + session: nomos_core::sdp::SessionNumber::from(0u64), + labels: &labels, + expect_non_empty: true, + }; + + membership_check.wait().await + } + + fn listen_ports(&self) -> Vec { + self.validators + .iter() + .map(|node| node.general.network_config.backend.inner.port) + .chain( + self.executors + .iter() + .map(|node| node.general.network_config.backend.inner.port), + ) + .collect() + } + + fn initial_peer_ports(&self) -> Vec> { + self.validators + .iter() + .map(|node| { + node.general + .network_config + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>() + }) + .chain(self.executors.iter().map(|node| { + node.general + .network_config + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>() + })) + .collect() + } + + fn labels(&self) -> Vec { + self.validators + .iter() + .enumerate() + .map(|(idx, node)| { + format!( + "validator#{idx}@{}", + node.general.network_config.backend.inner.port + ) + }) + .chain(self.executors.iter().enumerate().map(|(idx, node)| { + format!( + "executor#{idx}@{}", + node.general.network_config.backend.inner.port + ) + })) + .collect() + } +} + +pub fn find_expected_peer_counts( + listen_ports: &[u16], + initial_peer_ports: &[HashSet], +) -> Vec { + let mut expected: Vec> = vec![HashSet::new(); initial_peer_ports.len()]; + + for (idx, ports) in initial_peer_ports.iter().enumerate() { + for port in ports { + let Some(peer_idx) = listen_ports.iter().position(|p| p == port) else { + continue; + }; + if peer_idx == idx { + continue; + } + + expected[idx].insert(peer_idx); + expected[peer_idx].insert(idx); + } + } + + expected.into_iter().map(|set| set.len()).collect() +} diff --git a/testing-framework/core/src/topology/mod.rs b/testing-framework/core/src/topology/mod.rs index 6884e65..c93a843 100644 --- a/testing-framework/core/src/topology/mod.rs +++ b/testing-framework/core/src/topology/mod.rs @@ -2,1469 +2,17 @@ pub mod configs { pub use testing_framework_config::topology::configs::*; } -use std::{ - collections::{HashMap, HashSet}, - iter, - time::Duration, +mod config; +mod deployment; +pub mod generation; +pub mod readiness; +mod utils; + +pub use config::{TopologyBuilder, TopologyConfig}; +pub use deployment::Topology; +pub use generation::{GeneratedNodeConfig, GeneratedTopology, NodeRole, find_expected_peer_counts}; +pub use readiness::{ + DaBalancerReadiness, HttpMembershipReadiness, HttpNetworkReadiness, MembershipReadiness, + NetworkReadiness, ReadinessCheck, ReadinessError, }; - -use configs::{ - GeneralConfig, - consensus::{ProviderInfo, create_genesis_tx_with_declarations}, - da::{DaParams, create_da_configs}, - network::{Libp2pNetworkLayout, NetworkParams, create_network_configs}, - tracing::create_tracing_configs, - wallet::{WalletAccount, WalletConfig}, -}; -use futures::future::join_all; -use groth16::fr_to_bytes; -use key_management_system_service::{ - backend::preload::PreloadKMSBackendSettings, - keys::{Ed25519Key, Key, ZkKey}, -}; -use nomos_core::{ - mantle::GenesisTx as _, - sdp::{Locator, ServiceType, SessionNumber}, -}; -use nomos_da_network_core::swarm::{BalancerStats, DAConnectionPolicySettings}; -use nomos_da_network_service::MembershipResponse; -use nomos_http_api_common::paths; -use nomos_network::backends::libp2p::Libp2pInfo; -use nomos_utils::net::get_available_udp_port; -use rand::{Rng as _, thread_rng}; -use reqwest::{Client, Url}; -use thiserror::Error; -use tokio::time::{sleep, timeout}; -use tracing::warn; - -use crate::{ - adjust_timeout, - nodes::{ - executor::{Executor, create_executor_config}, - validator::{Validator, create_validator_config}, - }, - topology::configs::{ - api::create_api_configs, - blend::{GeneralBlendConfig, create_blend_configs}, - bootstrap::{SHORT_PROLONGED_BOOTSTRAP_PERIOD, create_bootstrap_configs}, - consensus::{ConsensusParams, create_consensus_configs}, - da::GeneralDaConfig, - time::default_time_config, - }, -}; - -#[derive(Clone)] -/// High-level topology settings used to generate node configs for a scenario. -pub struct TopologyConfig { - pub n_validators: usize, - pub n_executors: usize, - pub consensus_params: ConsensusParams, - pub da_params: DaParams, - pub network_params: NetworkParams, - pub wallet_config: WalletConfig, -} - -impl TopologyConfig { - /// Create a config with zero nodes; counts must be set before building. - #[must_use] - pub fn empty() -> Self { - Self { - n_validators: 0, - n_executors: 0, - consensus_params: ConsensusParams::default_for_participants(1), - da_params: DaParams::default(), - network_params: NetworkParams::default(), - wallet_config: WalletConfig::default(), - } - } - - #[must_use] - /// Convenience config with two validators for consensus-only scenarios. - pub fn two_validators() -> Self { - Self { - n_validators: 2, - n_executors: 0, - consensus_params: ConsensusParams::default_for_participants(2), - da_params: DaParams::default(), - network_params: NetworkParams::default(), - wallet_config: WalletConfig::default(), - } - } - - #[must_use] - /// Single validator + single executor config for minimal dual-role setups. - pub fn validator_and_executor() -> Self { - Self { - n_validators: 1, - n_executors: 1, - consensus_params: ConsensusParams::default_for_participants(2), - da_params: DaParams { - dispersal_factor: 2, - subnetwork_size: 2, - num_subnets: 2, - policy_settings: DAConnectionPolicySettings { - min_dispersal_peers: 1, - min_replication_peers: 1, - max_dispersal_failures: 0, - max_sampling_failures: 0, - max_replication_failures: 0, - malicious_threshold: 0, - }, - balancer_interval: Duration::from_secs(1), - ..Default::default() - }, - network_params: NetworkParams::default(), - wallet_config: WalletConfig::default(), - } - } - - #[must_use] - /// Build a topology with explicit validator and executor counts. - pub fn with_node_numbers(validators: usize, executors: usize) -> Self { - let participants = validators + executors; - assert!(participants > 0, "topology must include at least one node"); - - let mut da_params = DaParams::default(); - let da_nodes = participants; - if da_nodes <= 1 { - da_params.subnetwork_size = 1; - da_params.num_subnets = 1; - da_params.dispersal_factor = 1; - da_params.policy_settings.min_dispersal_peers = 0; - da_params.policy_settings.min_replication_peers = 0; - } else { - let dispersal = da_nodes.min(da_params.dispersal_factor.max(2)); - da_params.dispersal_factor = dispersal; - da_params.subnetwork_size = da_params.subnetwork_size.max(dispersal); - da_params.num_subnets = da_params.subnetwork_size as u16; - let min_peers = dispersal.saturating_sub(1).max(1); - da_params.policy_settings.min_dispersal_peers = min_peers; - da_params.policy_settings.min_replication_peers = min_peers; - da_params.balancer_interval = Duration::from_secs(1); - } - - Self { - n_validators: validators, - n_executors: executors, - consensus_params: ConsensusParams::default_for_participants(participants), - da_params, - network_params: NetworkParams::default(), - wallet_config: WalletConfig::default(), - } - } - - #[must_use] - /// Build a topology with one executor and a configurable validator set. - pub fn validators_and_executor( - num_validators: usize, - num_subnets: usize, - dispersal_factor: usize, - ) -> Self { - Self { - n_validators: num_validators, - n_executors: 1, - consensus_params: ConsensusParams::default_for_participants(num_validators + 1), - da_params: DaParams { - dispersal_factor, - subnetwork_size: num_subnets, - num_subnets: num_subnets as u16, - policy_settings: DAConnectionPolicySettings { - min_dispersal_peers: num_subnets, - min_replication_peers: dispersal_factor - 1, - max_dispersal_failures: 0, - max_sampling_failures: 0, - max_replication_failures: 0, - malicious_threshold: 0, - }, - balancer_interval: Duration::from_secs(5), - ..Default::default() - }, - network_params: NetworkParams::default(), - wallet_config: WalletConfig::default(), - } - } - - #[must_use] - pub const fn wallet(&self) -> &WalletConfig { - &self.wallet_config - } -} - -/// Node role within the generated topology. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum NodeRole { - Validator, - Executor, -} - -/// Fully generated configuration for an individual node. -#[derive(Clone)] -pub struct GeneratedNodeConfig { - pub role: NodeRole, - pub index: usize, - pub id: [u8; 32], - pub general: GeneralConfig, - pub da_port: u16, - pub blend_port: u16, -} - -impl GeneratedNodeConfig { - #[must_use] - /// Logical role of the node. - pub const fn role(&self) -> NodeRole { - self.role - } - - #[must_use] - /// Zero-based index within its role group. - pub const fn index(&self) -> usize { - self.index - } - - #[must_use] - pub const fn network_port(&self) -> u16 { - self.general.network_config.backend.inner.port - } - - #[must_use] - pub const fn api_port(&self) -> u16 { - self.general.api_config.address.port() - } - - #[must_use] - pub const fn testing_http_port(&self) -> u16 { - self.general.api_config.testing_http_address.port() - } -} - -/// Collection of generated node configs and helpers to spawn or probe the -/// stack. -#[derive(Clone)] -pub struct GeneratedTopology { - config: TopologyConfig, - validators: Vec, - executors: Vec, -} - -impl GeneratedTopology { - #[must_use] - /// Underlying configuration used to derive the generated nodes. - pub const fn config(&self) -> &TopologyConfig { - &self.config - } - - #[must_use] - /// All validator configs. - pub fn validators(&self) -> &[GeneratedNodeConfig] { - &self.validators - } - - #[must_use] - /// All executor configs. - pub fn executors(&self) -> &[GeneratedNodeConfig] { - &self.executors - } - - /// Iterator over all node configs in role order. - pub fn nodes(&self) -> impl Iterator { - self.validators.iter().chain(self.executors.iter()) - } - - #[must_use] - /// Slot duration from the first node (assumes homogeneous configs). - pub fn slot_duration(&self) -> Option { - self.validators - .first() - .map(|node| node.general.time_config.slot_duration) - } - - #[must_use] - /// Wallet accounts configured for this topology. - pub fn wallet_accounts(&self) -> &[WalletAccount] { - &self.config.wallet_config.accounts - } - - pub async fn spawn_local(&self) -> Topology { - let configs = self - .nodes() - .map(|node| node.general.clone()) - .collect::>(); - - let (validators, executors) = Topology::spawn_validators_executors( - configs, - self.config.n_validators, - self.config.n_executors, - ) - .await; - - Topology { - validators, - executors, - } - } - - pub async fn wait_remote_readiness( - &self, - validator_endpoints: &[Url], - executor_endpoints: &[Url], - validator_membership_endpoints: Option<&[Url]>, - executor_membership_endpoints: Option<&[Url]>, - ) -> Result<(), ReadinessError> { - let total_nodes = self.validators.len() + self.executors.len(); - if total_nodes == 0 { - return Ok(()); - } - - assert_eq!( - self.validators.len(), - validator_endpoints.len(), - "validator endpoints must match topology" - ); - assert_eq!( - self.executors.len(), - executor_endpoints.len(), - "executor endpoints must match topology" - ); - - let mut endpoints = Vec::with_capacity(total_nodes); - endpoints.extend_from_slice(validator_endpoints); - endpoints.extend_from_slice(executor_endpoints); - - let labels = self.labels(); - let client = Client::new(); - let make_testing_base_url = |port: u16| -> Url { - Url::parse(&format!("http://127.0.0.1:{port}/")) - .expect("failed to construct local testing base url") - }; - - if endpoints.len() > 1 { - let listen_ports = self.listen_ports(); - let initial_peer_ports = self.initial_peer_ports(); - let expected_peer_counts = - find_expected_peer_counts(&listen_ports, &initial_peer_ports); - let network_check = HttpNetworkReadiness { - client: &client, - endpoints: &endpoints, - expected_peer_counts: &expected_peer_counts, - labels: &labels, - }; - - network_check.wait().await?; - } - - let mut membership_endpoints = Vec::with_capacity(total_nodes); - if let Some(urls) = validator_membership_endpoints { - assert_eq!( - self.validators.len(), - urls.len(), - "validator membership endpoints must match topology" - ); - membership_endpoints.extend_from_slice(urls); - } else { - membership_endpoints.extend( - self.validators - .iter() - .map(|node| make_testing_base_url(node.testing_http_port())), - ); - } - - if let Some(urls) = executor_membership_endpoints { - assert_eq!( - self.executors.len(), - urls.len(), - "executor membership endpoints must match topology" - ); - membership_endpoints.extend_from_slice(urls); - } else { - membership_endpoints.extend( - self.executors - .iter() - .map(|node| make_testing_base_url(node.testing_http_port())), - ); - } - - let membership_check = HttpMembershipReadiness { - client: &client, - endpoints: &membership_endpoints, - session: SessionNumber::from(0u64), - labels: &labels, - expect_non_empty: true, - }; - - membership_check.wait().await - } - - fn listen_ports(&self) -> Vec { - self.validators - .iter() - .map(|node| node.general.network_config.backend.inner.port) - .chain( - self.executors - .iter() - .map(|node| node.general.network_config.backend.inner.port), - ) - .collect() - } - - fn initial_peer_ports(&self) -> Vec> { - self.validators - .iter() - .map(|node| { - node.general - .network_config - .backend - .initial_peers - .iter() - .filter_map(multiaddr_port) - .collect::>() - }) - .chain(self.executors.iter().map(|node| { - node.general - .network_config - .backend - .initial_peers - .iter() - .filter_map(multiaddr_port) - .collect::>() - })) - .collect() - } - - fn labels(&self) -> Vec { - self.validators - .iter() - .enumerate() - .map(|(idx, node)| { - format!( - "validator#{idx}@{}", - node.general.network_config.backend.inner.port - ) - }) - .chain(self.executors.iter().enumerate().map(|(idx, node)| { - format!( - "executor#{idx}@{}", - node.general.network_config.backend.inner.port - ) - })) - .collect() - } -} - -/// Builder that produces `GeneratedTopology` instances from a `TopologyConfig`. -#[derive(Clone)] -pub struct TopologyBuilder { - config: TopologyConfig, - ids: Option>, - da_ports: Option>, - blend_ports: Option>, -} - -impl TopologyBuilder { - #[must_use] - /// Create a builder from a base topology config. - pub const fn new(config: TopologyConfig) -> Self { - Self { - config, - ids: None, - da_ports: None, - blend_ports: None, - } - } - - #[must_use] - /// Provide deterministic node IDs. - pub fn with_ids(mut self, ids: Vec<[u8; 32]>) -> Self { - self.ids = Some(ids); - self - } - - #[must_use] - /// Override DA ports for nodes in order. - pub fn with_da_ports(mut self, ports: Vec) -> Self { - self.da_ports = Some(ports); - self - } - - #[must_use] - /// Override blend ports for nodes in order. - pub fn with_blend_ports(mut self, ports: Vec) -> Self { - self.blend_ports = Some(ports); - self - } - - #[must_use] - pub const fn with_validator_count(mut self, validators: usize) -> Self { - self.config.n_validators = validators; - self - } - - #[must_use] - /// Set executor count. - pub const fn with_executor_count(mut self, executors: usize) -> Self { - self.config.n_executors = executors; - self - } - - #[must_use] - /// Set validator and executor counts together. - pub const fn with_node_counts(mut self, validators: usize, executors: usize) -> Self { - self.config.n_validators = validators; - self.config.n_executors = executors; - self - } - - #[must_use] - /// Configure the libp2p network layout. - pub const fn with_network_layout(mut self, layout: Libp2pNetworkLayout) -> Self { - self.config.network_params.libp2p_network_layout = layout; - self - } - - #[must_use] - /// Override wallet configuration used in genesis. - pub fn with_wallet_config(mut self, wallet: WalletConfig) -> Self { - self.config.wallet_config = wallet; - self - } - - #[must_use] - /// Finalize and generate topology and node descriptors. - pub fn build(self) -> GeneratedTopology { - let Self { - config, - ids, - da_ports, - blend_ports, - } = self; - - let n_participants = config.n_validators + config.n_executors; - assert!(n_participants > 0, "topology must have at least one node"); - - let ids = resolve_ids(ids, n_participants); - let da_ports = resolve_ports(da_ports, n_participants, "DA"); - let blend_ports = resolve_ports(blend_ports, n_participants, "Blend"); - - let mut consensus_configs = - create_consensus_configs(&ids, &config.consensus_params, &config.wallet_config); - let bootstrapping_config = create_bootstrap_configs(&ids, SHORT_PROLONGED_BOOTSTRAP_PERIOD); - let da_configs = create_da_configs(&ids, &config.da_params, &da_ports); - let network_configs = create_network_configs(&ids, &config.network_params); - let blend_configs = create_blend_configs(&ids, &blend_ports); - let api_configs = create_api_configs(&ids); - let tracing_configs = create_tracing_configs(&ids); - let time_config = default_time_config(); - - let mut providers: Vec<_> = da_configs - .iter() - .enumerate() - .map(|(i, da_conf)| ProviderInfo { - service_type: ServiceType::DataAvailability, - provider_sk: da_conf.signer.clone(), - zk_sk: da_conf.secret_zk_key.clone(), - locator: Locator(da_conf.listening_address.clone()), - note: consensus_configs[0].da_notes[i].clone(), - }) - .collect(); - providers.extend( - blend_configs - .iter() - .enumerate() - .map(|(i, blend_conf)| ProviderInfo { - service_type: ServiceType::BlendNetwork, - provider_sk: blend_conf.signer.clone(), - zk_sk: blend_conf.secret_zk_key.clone(), - locator: Locator(blend_conf.backend_core.listening_address.clone()), - note: consensus_configs[0].blend_notes[i].clone(), - }), - ); - - let ledger_tx = consensus_configs[0] - .genesis_tx - .mantle_tx() - .ledger_tx - .clone(); - let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers); - for c in &mut consensus_configs { - c.genesis_tx = genesis_tx.clone(); - } - - let kms_configs = - create_kms_configs(&blend_configs, &da_configs, &config.wallet_config.accounts); - - let mut validators = Vec::with_capacity(config.n_validators); - let mut executors = Vec::with_capacity(config.n_executors); - - for i in 0..n_participants { - let general = GeneralConfig { - consensus_config: consensus_configs[i].clone(), - bootstrapping_config: bootstrapping_config[i].clone(), - da_config: da_configs[i].clone(), - network_config: network_configs[i].clone(), - blend_config: blend_configs[i].clone(), - api_config: api_configs[i].clone(), - tracing_config: tracing_configs[i].clone(), - time_config: time_config.clone(), - kms_config: kms_configs[i].clone(), - }; - - let role = if i < config.n_validators { - NodeRole::Validator - } else { - NodeRole::Executor - }; - let index = match role { - NodeRole::Validator => i, - NodeRole::Executor => i - config.n_validators, - }; - - let descriptor = GeneratedNodeConfig { - role, - index, - id: ids[i], - general, - da_port: da_ports[i], - blend_port: blend_ports[i], - }; - - match role { - NodeRole::Validator => validators.push(descriptor), - NodeRole::Executor => executors.push(descriptor), - } - } - - GeneratedTopology { - config, - validators, - executors, - } - } -} - -/// Runtime representation of a spawned topology with running nodes. -pub struct Topology { - validators: Vec, - executors: Vec, -} - -impl Topology { - pub async fn spawn(config: TopologyConfig) -> Self { - let n_participants = config.n_validators + config.n_executors; - - // we use the same random bytes for: - // * da id - // * coin sk - // * coin nonce - // * libp2p node key - let mut ids = vec![[0; 32]; n_participants]; - let mut da_ports = vec![]; - let mut blend_ports = vec![]; - for id in &mut ids { - thread_rng().fill(id); - da_ports.push(get_available_udp_port().unwrap()); - blend_ports.push(get_available_udp_port().unwrap()); - } - - let mut consensus_configs = - create_consensus_configs(&ids, &config.consensus_params, &config.wallet_config); - let bootstrapping_config = create_bootstrap_configs(&ids, SHORT_PROLONGED_BOOTSTRAP_PERIOD); - let da_configs = create_da_configs(&ids, &config.da_params, &da_ports); - let network_configs = create_network_configs(&ids, &config.network_params); - let blend_configs = create_blend_configs(&ids, &blend_ports); - let api_configs = create_api_configs(&ids); - let tracing_configs = create_tracing_configs(&ids); - let time_config = default_time_config(); - - // Setup genesis TX with Blend and DA service declarationse - let mut providers: Vec<_> = da_configs - .iter() - .enumerate() - .map(|(i, da_conf)| ProviderInfo { - service_type: ServiceType::DataAvailability, - provider_sk: da_conf.signer.clone(), - zk_sk: da_conf.secret_zk_key.clone(), - locator: Locator(da_conf.listening_address.clone()), - note: consensus_configs[0].da_notes[i].clone(), - }) - .collect(); - providers.extend( - blend_configs - .iter() - .enumerate() - .map(|(i, blend_conf)| ProviderInfo { - service_type: ServiceType::BlendNetwork, - provider_sk: blend_conf.signer.clone(), - zk_sk: blend_conf.secret_zk_key.clone(), - locator: Locator(blend_conf.backend_core.listening_address.clone()), - note: consensus_configs[0].blend_notes[i].clone(), - }), - ); - - // Update genesis TX to contain Blend and DA providers. - let ledger_tx = consensus_configs[0] - .genesis_tx - .mantle_tx() - .ledger_tx - .clone(); - let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers); - for c in &mut consensus_configs { - c.genesis_tx = genesis_tx.clone(); - } - - // Set Blend and DA keys in KMS of each node config. - let kms_configs = - create_kms_configs(&blend_configs, &da_configs, &config.wallet_config.accounts); - - let mut node_configs = vec![]; - - for i in 0..n_participants { - node_configs.push(GeneralConfig { - consensus_config: consensus_configs[i].clone(), - bootstrapping_config: bootstrapping_config[i].clone(), - da_config: da_configs[i].clone(), - network_config: network_configs[i].clone(), - blend_config: blend_configs[i].clone(), - api_config: api_configs[i].clone(), - tracing_config: tracing_configs[i].clone(), - time_config: time_config.clone(), - kms_config: kms_configs[i].clone(), - }); - } - - let (validators, executors) = - Self::spawn_validators_executors(node_configs, config.n_validators, config.n_executors) - .await; - - Self { - validators, - executors, - } - } - - pub async fn spawn_with_empty_membership( - config: TopologyConfig, - ids: &[[u8; 32]], - da_ports: &[u16], - blend_ports: &[u16], - ) -> Self { - let n_participants = config.n_validators + config.n_executors; - - let consensus_configs = - create_consensus_configs(ids, &config.consensus_params, &config.wallet_config); - let bootstrapping_config = create_bootstrap_configs(ids, SHORT_PROLONGED_BOOTSTRAP_PERIOD); - let da_configs = create_da_configs(ids, &config.da_params, da_ports); - let network_configs = create_network_configs(ids, &config.network_params); - let blend_configs = create_blend_configs(ids, blend_ports); - let api_configs = create_api_configs(ids); - // Create membership configs without DA nodes. - let tracing_configs = create_tracing_configs(ids); - let time_config = default_time_config(); - - let kms_config = PreloadKMSBackendSettings { - keys: HashMap::new(), - }; - - let mut node_configs = vec![]; - - for i in 0..n_participants { - node_configs.push(GeneralConfig { - consensus_config: consensus_configs[i].clone(), - bootstrapping_config: bootstrapping_config[i].clone(), - da_config: da_configs[i].clone(), - network_config: network_configs[i].clone(), - blend_config: blend_configs[i].clone(), - api_config: api_configs[i].clone(), - tracing_config: tracing_configs[i].clone(), - time_config: time_config.clone(), - kms_config: kms_config.clone(), - }); - } - let (validators, executors) = - Self::spawn_validators_executors(node_configs, config.n_validators, config.n_executors) - .await; - - Self { - validators, - executors, - } - } - - async fn spawn_validators_executors( - config: Vec, - n_validators: usize, - n_executors: usize, - ) -> (Vec, Vec) { - let mut validators = Vec::new(); - for i in 0..n_validators { - let config = create_validator_config(config[i].clone()); - validators.push(Validator::spawn(config).await.unwrap()); - } - - let mut executors = Vec::new(); - for i in n_validators..(n_validators + n_executors) { - let config = create_executor_config(config[i].clone()); - executors.push(Executor::spawn(config).await); - } - - (validators, executors) - } - - #[must_use] - pub fn validators(&self) -> &[Validator] { - &self.validators - } - - #[must_use] - pub fn executors(&self) -> &[Executor] { - &self.executors - } - - pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> { - let listen_ports = self.node_listen_ports(); - if listen_ports.len() <= 1 { - return Ok(()); - } - - let initial_peer_ports = self.node_initial_peer_ports(); - let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports); - let labels = self.node_labels(); - - let check = NetworkReadiness { - topology: self, - expected_peer_counts: &expected_peer_counts, - labels: &labels, - }; - - check.wait().await?; - Ok(()) - } - - pub async fn wait_da_balancer_ready(&self) -> Result<(), ReadinessError> { - if self.validators.is_empty() && self.executors.is_empty() { - return Ok(()); - } - - let labels = self.node_labels(); - let check = DaBalancerReadiness { - topology: self, - labels: &labels, - }; - - check.wait().await?; - Ok(()) - } - - pub async fn wait_membership_ready(&self) -> Result<(), ReadinessError> { - self.wait_membership_ready_for_session(SessionNumber::from(0u64)) - .await - } - - pub async fn wait_membership_ready_for_session( - &self, - session: SessionNumber, - ) -> Result<(), ReadinessError> { - self.wait_membership_assignations(session, true).await - } - - pub async fn wait_membership_empty_for_session( - &self, - session: SessionNumber, - ) -> Result<(), ReadinessError> { - self.wait_membership_assignations(session, false).await - } - - async fn wait_membership_assignations( - &self, - session: SessionNumber, - expect_non_empty: bool, - ) -> Result<(), ReadinessError> { - let total_nodes = self.validators.len() + self.executors.len(); - - if total_nodes == 0 { - return Ok(()); - } - - let labels = self.node_labels(); - let check = MembershipReadiness { - topology: self, - session, - labels: &labels, - expect_non_empty, - }; - - check.wait().await?; - Ok(()) - } - - fn node_listen_ports(&self) -> Vec { - self.validators - .iter() - .map(|node| node.config().network.backend.inner.port) - .chain( - self.executors - .iter() - .map(|node| node.config().network.backend.inner.port), - ) - .collect() - } - - fn node_initial_peer_ports(&self) -> Vec> { - self.validators - .iter() - .map(|node| { - node.config() - .network - .backend - .initial_peers - .iter() - .filter_map(multiaddr_port) - .collect::>() - }) - .chain(self.executors.iter().map(|node| { - node.config() - .network - .backend - .initial_peers - .iter() - .filter_map(multiaddr_port) - .collect::>() - })) - .collect() - } - - fn node_labels(&self) -> Vec { - self.validators - .iter() - .enumerate() - .map(|(idx, node)| { - format!( - "validator#{idx}@{}", - node.config().network.backend.inner.port - ) - }) - .chain(self.executors.iter().enumerate().map(|(idx, node)| { - format!( - "executor#{idx}@{}", - node.config().network.backend.inner.port - ) - })) - .collect() - } -} - -/// Errors emitted while waiting for node readiness. -#[derive(Debug, Error)] -pub enum ReadinessError { - #[error("{message}")] - Timeout { message: String }, -} - -#[async_trait::async_trait] -trait ReadinessCheck<'a> { - type Data: Send; - - async fn collect(&'a self) -> Self::Data; - - fn is_ready(&self, data: &Self::Data) -> bool; - - fn timeout_message(&self, data: Self::Data) -> String; - - fn poll_interval(&self) -> Duration { - Duration::from_millis(200) - } - - async fn wait(&'a self) -> Result<(), ReadinessError> { - let timeout_duration = adjust_timeout(Duration::from_secs(60)); - let poll_interval = self.poll_interval(); - let mut data = self.collect().await; - - let wait_result = timeout(timeout_duration, async { - loop { - if self.is_ready(&data) { - return; - } - - sleep(poll_interval).await; - - data = self.collect().await; - } - }) - .await; - - if wait_result.is_err() { - let message = self.timeout_message(data); - return Err(ReadinessError::Timeout { message }); - } - - Ok(()) - } -} - -struct NetworkReadiness<'a> { - topology: &'a Topology, - expected_peer_counts: &'a [usize], - labels: &'a [String], -} - -#[async_trait::async_trait] -impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> { - type Data = Vec; - - async fn collect(&'a self) -> Self::Data { - let (validator_infos, executor_infos) = tokio::join!( - join_all(self.topology.validators.iter().map(Validator::network_info)), - join_all(self.topology.executors.iter().map(Executor::network_info)) - ); - - validator_infos.into_iter().chain(executor_infos).collect() - } - - fn is_ready(&self, data: &Self::Data) -> bool { - data.iter() - .enumerate() - .all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx]) - } - - fn timeout_message(&self, data: Self::Data) -> String { - let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts); - format!("timed out waiting for network readiness: {summary}") - } -} - -struct HttpNetworkReadiness<'a> { - client: &'a Client, - endpoints: &'a [Url], - expected_peer_counts: &'a [usize], - labels: &'a [String], -} - -#[async_trait::async_trait] -impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> { - type Data = Vec; - - async fn collect(&'a self) -> Self::Data { - let futures = self - .endpoints - .iter() - .map(|endpoint| fetch_network_info(self.client, endpoint)); - join_all(futures).await - } - - fn is_ready(&self, data: &Self::Data) -> bool { - data.iter() - .enumerate() - .all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx]) - } - - fn timeout_message(&self, data: Self::Data) -> String { - let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts); - format!("timed out waiting for network readiness: {summary}") - } -} - -struct MembershipReadiness<'a> { - topology: &'a Topology, - session: SessionNumber, - labels: &'a [String], - expect_non_empty: bool, -} - -#[async_trait::async_trait] -impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> { - type Data = Vec>; - - async fn collect(&'a self) -> Self::Data { - let (validator_responses, executor_responses) = tokio::join!( - join_all( - self.topology - .validators - .iter() - .map(|node| node.da_get_membership(self.session)), - ), - join_all( - self.topology - .executors - .iter() - .map(|node| node.da_get_membership(self.session)), - ) - ); - - validator_responses - .into_iter() - .chain(executor_responses) - .collect() - } - - fn is_ready(&self, data: &Self::Data) -> bool { - self.assignation_statuses(data) - .into_iter() - .all(|ready| ready) - } - - fn timeout_message(&self, data: Self::Data) -> String { - let statuses = self.assignation_statuses(&data); - let description = if self.expect_non_empty { - "non-empty assignations" - } else { - "empty assignations" - }; - let summary = build_membership_summary(self.labels, &statuses, description); - format!("timed out waiting for DA membership readiness ({description}): {summary}") - } -} - -impl MembershipReadiness<'_> { - fn assignation_statuses( - &self, - responses: &[Result], - ) -> Vec { - responses - .iter() - .map(|res| { - res.as_ref() - .map(|resp| { - let is_non_empty = !resp.assignations.is_empty(); - if self.expect_non_empty { - is_non_empty - } else { - !is_non_empty - } - }) - .unwrap_or(false) - }) - .collect() - } -} - -struct HttpMembershipReadiness<'a> { - client: &'a Client, - endpoints: &'a [Url], - session: SessionNumber, - labels: &'a [String], - expect_non_empty: bool, -} - -#[async_trait::async_trait] -impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> { - type Data = Vec>; - - async fn collect(&'a self) -> Self::Data { - let futures = self - .endpoints - .iter() - .map(|endpoint| fetch_membership(self.client, endpoint, self.session)); - join_all(futures).await - } - - fn is_ready(&self, data: &Self::Data) -> bool { - assignation_statuses(data, self.expect_non_empty) - .into_iter() - .all(|ready| ready) - } - - fn timeout_message(&self, data: Self::Data) -> String { - let statuses = assignation_statuses(&data, self.expect_non_empty); - let description = if self.expect_non_empty { - "non-empty assignations" - } else { - "empty assignations" - }; - let summary = build_membership_summary(self.labels, &statuses, description); - format!("timed out waiting for DA membership readiness ({description}): {summary}") - } -} - -struct DaBalancerReadiness<'a> { - topology: &'a Topology, - labels: &'a [String], -} - -#[async_trait::async_trait] -impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> { - type Data = Vec<(String, usize, BalancerStats)>; - - async fn collect(&'a self) -> Self::Data { - let mut data = Vec::new(); - for (idx, validator) in self.topology.validators.iter().enumerate() { - data.push(( - self.labels[idx].clone(), - validator.config().da_network.subnet_threshold, - validator.balancer_stats().await, - )); - } - for (offset, executor) in self.topology.executors.iter().enumerate() { - let label_index = self.topology.validators.len() + offset; - data.push(( - self.labels[label_index].clone(), - executor.config().da_network.subnet_threshold, - executor.balancer_stats().await, - )); - } - data - } - - fn is_ready(&self, data: &Self::Data) -> bool { - data.iter().all(|(_, threshold, stats)| { - if *threshold == 0 { - return true; - } - connected_subnetworks(stats) >= *threshold - }) - } - - fn timeout_message(&self, data: Self::Data) -> String { - let summary = data - .into_iter() - .map(|(label, threshold, stats)| { - let connected = connected_subnetworks(&stats); - let details = format_balancer_stats(&stats); - format!("{label}: connected={connected}, required={threshold}, stats={details}") - }) - .collect::>() - .join(", "); - format!("timed out waiting for DA balancer readiness: {summary}") - } - - fn poll_interval(&self) -> Duration { - Duration::from_secs(1) - } -} - -fn connected_subnetworks(stats: &BalancerStats) -> usize { - stats - .values() - .filter(|stat| stat.inbound > 0 || stat.outbound > 0) - .count() -} - -fn format_balancer_stats(stats: &BalancerStats) -> String { - if stats.is_empty() { - return "empty".into(); - } - stats - .iter() - .map(|(subnet, stat)| format!("{}:in={},out={}", subnet, stat.inbound, stat.outbound)) - .collect::>() - .join(";") -} - -fn build_timeout_summary( - labels: &[String], - infos: Vec, - expected_counts: &[usize], -) -> String { - infos - .into_iter() - .zip(expected_counts.iter()) - .zip(labels.iter()) - .map(|((info, expected), label)| { - format!("{}: peers={}, expected={}", label, info.n_peers, expected) - }) - .collect::>() - .join(", ") -} - -fn build_membership_summary(labels: &[String], statuses: &[bool], description: &str) -> String { - statuses - .iter() - .zip(labels.iter()) - .map(|(ready, label)| { - let status = if *ready { "ready" } else { "waiting" }; - format!("{label}: status={status}, expected {description}") - }) - .collect::>() - .join(", ") -} - -async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo { - let url = join_path(base, paths::NETWORK_INFO); - let response = match client.get(url).send().await { - Ok(resp) => resp, - Err(err) => { - return log_network_warning(base, err, "failed to reach network info endpoint"); - } - }; - - let response = match response.error_for_status() { - Ok(resp) => resp, - Err(err) => { - return log_network_warning(base, err, "network info endpoint returned error"); - } - }; - - match response.json::().await { - Ok(info) => info, - Err(err) => log_network_warning(base, err, "failed to decode network info response"), - } -} - -async fn fetch_membership( - client: &Client, - base: &Url, - session: SessionNumber, -) -> Result { - let url = join_path(base, paths::DA_GET_MEMBERSHIP); - client - .post(url) - .json(&session) - .send() - .await? - .error_for_status()? - .json() - .await -} - -fn log_network_warning(base: &Url, err: impl std::fmt::Display, message: &str) -> Libp2pInfo { - warn!(target: "readiness", url = %base, error = %err, "{message}"); - empty_libp2p_info() -} - -fn empty_libp2p_info() -> Libp2pInfo { - Libp2pInfo { - listen_addresses: Vec::with_capacity(0), - n_peers: 0, - n_connections: 0, - n_pending_connections: 0, - } -} - -fn join_path(base: &Url, path: &str) -> Url { - base.join(path.trim_start_matches('/')) - .unwrap_or_else(|err| panic!("failed to join url {base} with path {path}: {err}")) -} - -fn assignation_statuses( - responses: &[Result], - expect_non_empty: bool, -) -> Vec { - responses - .iter() - .map(|res| { - res.as_ref() - .map(|resp| { - let is_non_empty = !resp.assignations.is_empty(); - if expect_non_empty { - is_non_empty - } else { - !is_non_empty - } - }) - .unwrap_or(false) - }) - .collect() -} - -fn multiaddr_port(addr: &nomos_libp2p::Multiaddr) -> Option { - for protocol in addr { - match protocol { - nomos_libp2p::Protocol::Udp(port) | nomos_libp2p::Protocol::Tcp(port) => { - return Some(port); - } - _ => {} - } - } - None -} - -fn find_expected_peer_counts( - listen_ports: &[u16], - initial_peer_ports: &[HashSet], -) -> Vec { - let mut expected: Vec> = vec![HashSet::new(); initial_peer_ports.len()]; - - for (idx, ports) in initial_peer_ports.iter().enumerate() { - for port in ports { - let Some(peer_idx) = listen_ports.iter().position(|p| p == port) else { - continue; - }; - if peer_idx == idx { - continue; - } - - expected[idx].insert(peer_idx); - expected[peer_idx].insert(idx); - } - } - - expected.into_iter().map(|set| set.len()).collect() -} - -#[must_use] -/// Build preload KMS configs for blend/DA and wallet keys for every node. -pub fn create_kms_configs( - blend_configs: &[GeneralBlendConfig], - da_configs: &[GeneralDaConfig], - wallet_accounts: &[WalletAccount], -) -> Vec { - da_configs - .iter() - .zip(blend_configs.iter()) - .map(|(da_conf, blend_conf)| { - let mut keys = HashMap::from([ - ( - hex::encode(blend_conf.signer.verifying_key().as_bytes()), - Key::Ed25519(Ed25519Key::new(blend_conf.signer.clone())), - ), - ( - hex::encode(fr_to_bytes( - &blend_conf.secret_zk_key.to_public_key().into_inner(), - )), - Key::Zk(ZkKey::new(blend_conf.secret_zk_key.clone())), - ), - ( - hex::encode(da_conf.signer.verifying_key().as_bytes()), - Key::Ed25519(Ed25519Key::new(da_conf.signer.clone())), - ), - ( - hex::encode(fr_to_bytes( - &da_conf.secret_zk_key.to_public_key().into_inner(), - )), - Key::Zk(ZkKey::new(da_conf.secret_zk_key.clone())), - ), - ]); - - for account in wallet_accounts { - let key_id = hex::encode(fr_to_bytes(&account.public_key().into_inner())); - keys.entry(key_id) - .or_insert_with(|| Key::Zk(ZkKey::new(account.secret_key.clone()))); - } - - PreloadKMSBackendSettings { keys } - }) - .collect() -} - -fn resolve_ids(ids: Option>, count: usize) -> Vec<[u8; 32]> { - ids.map_or_else( - || { - let mut generated = vec![[0; 32]; count]; - for id in &mut generated { - thread_rng().fill(id); - } - generated - }, - |ids| { - assert_eq!( - ids.len(), - count, - "expected {count} ids but got {}", - ids.len() - ); - ids - }, - ) -} - -fn resolve_ports(ports: Option>, count: usize, label: &str) -> Vec { - let resolved = ports.unwrap_or_else(|| { - iter::repeat_with(|| get_available_udp_port().unwrap()) - .take(count) - .collect() - }); - assert_eq!( - resolved.len(), - count, - "expected {count} {label} ports but got {}", - resolved.len() - ); - resolved -} +pub use utils::{create_kms_configs, multiaddr_port, resolve_ids, resolve_ports}; diff --git a/testing-framework/core/src/topology/readiness/mod.rs b/testing-framework/core/src/topology/readiness/mod.rs new file mode 100644 index 0000000..3ec04c9 --- /dev/null +++ b/testing-framework/core/src/topology/readiness/mod.rs @@ -0,0 +1,425 @@ +use std::time::Duration; + +use futures::future::join_all; +use nomos_core::sdp::SessionNumber; +use nomos_da_network_core::swarm::BalancerStats; +use nomos_da_network_service::MembershipResponse; +use nomos_http_api_common::paths; +use nomos_network::backends::libp2p::Libp2pInfo; +use reqwest::{Client, Url}; +use thiserror::Error; +use tokio::time::{sleep, timeout}; +use tracing::warn; + +use crate::{adjust_timeout, topology::Topology}; + +#[derive(Debug, Error)] +pub enum ReadinessError { + #[error("{message}")] + Timeout { message: String }, +} + +#[async_trait::async_trait] +pub trait ReadinessCheck<'a> { + type Data: Send; + + async fn collect(&'a self) -> Self::Data; + + fn is_ready(&self, data: &Self::Data) -> bool; + + fn timeout_message(&self, data: Self::Data) -> String; + + fn poll_interval(&self) -> Duration { + Duration::from_millis(200) + } + + async fn wait(&'a self) -> Result<(), ReadinessError> { + let timeout_duration = adjust_timeout(Duration::from_secs(60)); + let poll_interval = self.poll_interval(); + let mut data = self.collect().await; + + let wait_result = timeout(timeout_duration, async { + loop { + if self.is_ready(&data) { + return; + } + + sleep(poll_interval).await; + + data = self.collect().await; + } + }) + .await; + + if wait_result.is_err() { + let message = self.timeout_message(data); + return Err(ReadinessError::Timeout { message }); + } + + Ok(()) + } +} + +pub struct NetworkReadiness<'a> { + pub(crate) topology: &'a Topology, + pub(crate) expected_peer_counts: &'a [usize], + pub(crate) labels: &'a [String], +} + +#[async_trait::async_trait] +impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> { + type Data = Vec; + + async fn collect(&'a self) -> Self::Data { + let (validator_infos, executor_infos) = tokio::join!( + join_all( + self.topology + .validators + .iter() + .map(crate::nodes::validator::Validator::network_info) + ), + join_all( + self.topology + .executors + .iter() + .map(crate::nodes::executor::Executor::network_info) + ) + ); + + validator_infos.into_iter().chain(executor_infos).collect() + } + + fn is_ready(&self, data: &Self::Data) -> bool { + data.iter() + .enumerate() + .all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx]) + } + + fn timeout_message(&self, data: Self::Data) -> String { + let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts); + format!("timed out waiting for network readiness: {summary}") + } +} + +pub struct HttpNetworkReadiness<'a> { + pub(crate) client: &'a Client, + pub(crate) endpoints: &'a [Url], + pub(crate) expected_peer_counts: &'a [usize], + pub(crate) labels: &'a [String], +} + +#[async_trait::async_trait] +impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> { + type Data = Vec; + + async fn collect(&'a self) -> Self::Data { + let futures = self + .endpoints + .iter() + .map(|endpoint| fetch_network_info(self.client, endpoint)); + join_all(futures).await + } + + fn is_ready(&self, data: &Self::Data) -> bool { + data.iter() + .enumerate() + .all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx]) + } + + fn timeout_message(&self, data: Self::Data) -> String { + let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts); + format!("timed out waiting for network readiness: {summary}") + } +} + +pub struct MembershipReadiness<'a> { + pub(crate) topology: &'a Topology, + pub(crate) session: SessionNumber, + pub(crate) labels: &'a [String], + pub(crate) expect_non_empty: bool, +} + +#[async_trait::async_trait] +impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> { + type Data = Vec>; + + async fn collect(&'a self) -> Self::Data { + let (validator_responses, executor_responses) = tokio::join!( + join_all( + self.topology + .validators + .iter() + .map(|node| node.da_get_membership(self.session)), + ), + join_all( + self.topology + .executors + .iter() + .map(|node| node.da_get_membership(self.session)), + ) + ); + + validator_responses + .into_iter() + .chain(executor_responses) + .collect() + } + + fn is_ready(&self, data: &Self::Data) -> bool { + self.assignation_statuses(data) + .into_iter() + .all(|ready| ready) + } + + fn timeout_message(&self, data: Self::Data) -> String { + let statuses = self.assignation_statuses(&data); + let description = if self.expect_non_empty { + "non-empty assignations" + } else { + "empty assignations" + }; + let summary = build_membership_summary(self.labels, &statuses, description); + format!("timed out waiting for DA membership readiness ({description}): {summary}") + } +} + +impl MembershipReadiness<'_> { + fn assignation_statuses( + &self, + responses: &[Result], + ) -> Vec { + responses + .iter() + .map(|res| { + res.as_ref() + .map(|resp| { + let is_non_empty = !resp.assignations.is_empty(); + if self.expect_non_empty { + is_non_empty + } else { + !is_non_empty + } + }) + .unwrap_or(false) + }) + .collect() + } +} + +pub struct HttpMembershipReadiness<'a> { + pub(crate) client: &'a Client, + pub(crate) endpoints: &'a [Url], + pub(crate) session: SessionNumber, + pub(crate) labels: &'a [String], + pub(crate) expect_non_empty: bool, +} + +#[async_trait::async_trait] +impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> { + type Data = Vec>; + + async fn collect(&'a self) -> Self::Data { + let futures = self + .endpoints + .iter() + .map(|endpoint| fetch_membership(self.client, endpoint, self.session)); + futures::future::join_all(futures).await + } + + fn is_ready(&self, data: &Self::Data) -> bool { + assignation_statuses(data, self.expect_non_empty) + .into_iter() + .all(|ready| ready) + } + + fn timeout_message(&self, data: Self::Data) -> String { + let statuses = assignation_statuses(&data, self.expect_non_empty); + let description = if self.expect_non_empty { + "non-empty assignations" + } else { + "empty assignations" + }; + let summary = build_membership_summary(self.labels, &statuses, description); + format!("timed out waiting for DA membership readiness ({description}): {summary}") + } +} + +pub struct DaBalancerReadiness<'a> { + pub(crate) topology: &'a Topology, + pub(crate) labels: &'a [String], +} + +#[async_trait::async_trait] +impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> { + type Data = Vec<(String, usize, BalancerStats)>; + + async fn collect(&'a self) -> Self::Data { + let mut data = Vec::new(); + for (idx, validator) in self.topology.validators.iter().enumerate() { + data.push(( + self.labels[idx].clone(), + validator.config().da_network.subnet_threshold, + validator.balancer_stats().await, + )); + } + for (offset, executor) in self.topology.executors.iter().enumerate() { + let label_index = self.topology.validators.len() + offset; + data.push(( + self.labels[label_index].clone(), + executor.config().da_network.subnet_threshold, + executor.balancer_stats().await, + )); + } + data + } + + fn is_ready(&self, data: &Self::Data) -> bool { + data.iter().all(|(_, threshold, stats)| { + if *threshold == 0 { + return true; + } + connected_subnetworks(stats) >= *threshold + }) + } + + fn timeout_message(&self, data: Self::Data) -> String { + let summary = data + .into_iter() + .map(|(label, threshold, stats)| { + let connected = connected_subnetworks(&stats); + let details = format_balancer_stats(&stats); + format!("{label}: connected={connected}, required={threshold}, stats={details}") + }) + .collect::>() + .join(", "); + format!("timed out waiting for DA balancer readiness: {summary}") + } + + fn poll_interval(&self) -> Duration { + Duration::from_secs(1) + } +} + +fn connected_subnetworks(stats: &BalancerStats) -> usize { + stats + .values() + .filter(|stat| stat.inbound > 0 || stat.outbound > 0) + .count() +} + +fn format_balancer_stats(stats: &BalancerStats) -> String { + if stats.is_empty() { + return "empty".into(); + } + stats + .iter() + .map(|(subnet, stat)| format!("{}:in={},out={}", subnet, stat.inbound, stat.outbound)) + .collect::>() + .join(";") +} + +fn build_timeout_summary( + labels: &[String], + infos: Vec, + expected_counts: &[usize], +) -> String { + infos + .into_iter() + .zip(expected_counts.iter()) + .zip(labels.iter()) + .map(|((info, expected), label)| { + format!("{}: peers={}, expected={}", label, info.n_peers, expected) + }) + .collect::>() + .join(", ") +} + +fn build_membership_summary(labels: &[String], statuses: &[bool], description: &str) -> String { + statuses + .iter() + .zip(labels.iter()) + .map(|(ready, label)| { + let status = if *ready { "ready" } else { "waiting" }; + format!("{label}: status={status}, expected {description}") + }) + .collect::>() + .join(", ") +} + +async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo { + let url = join_path(base, paths::NETWORK_INFO); + let response = match client.get(url).send().await { + Ok(resp) => resp, + Err(err) => { + return log_network_warning(base, err, "failed to reach network info endpoint"); + } + }; + + let response = match response.error_for_status() { + Ok(resp) => resp, + Err(err) => { + return log_network_warning(base, err, "network info endpoint returned error"); + } + }; + + match response.json::().await { + Ok(info) => info, + Err(err) => log_network_warning(base, err, "failed to decode network info response"), + } +} + +async fn fetch_membership( + client: &Client, + base: &Url, + session: SessionNumber, +) -> Result { + let url = join_path(base, paths::DA_GET_MEMBERSHIP); + client + .post(url) + .json(&session) + .send() + .await? + .error_for_status()? + .json() + .await +} + +fn log_network_warning(base: &Url, err: impl std::fmt::Display, message: &str) -> Libp2pInfo { + warn!(target: "readiness", url = %base, error = %err, "{message}"); + empty_libp2p_info() +} + +fn empty_libp2p_info() -> Libp2pInfo { + Libp2pInfo { + listen_addresses: Vec::with_capacity(0), + n_peers: 0, + n_connections: 0, + n_pending_connections: 0, + } +} + +fn join_path(base: &Url, path: &str) -> Url { + base.join(path.trim_start_matches('/')) + .unwrap_or_else(|err| panic!("failed to join url {base} with path {path}: {err}")) +} + +fn assignation_statuses( + responses: &[Result], + expect_non_empty: bool, +) -> Vec { + responses + .iter() + .map(|res| { + res.as_ref() + .map(|resp| { + let is_non_empty = !resp.assignations.is_empty(); + if expect_non_empty { + is_non_empty + } else { + !is_non_empty + } + }) + .unwrap_or(false) + }) + .collect() +} diff --git a/testing-framework/core/src/topology/utils.rs b/testing-framework/core/src/topology/utils.rs new file mode 100644 index 0000000..9c5e5ca --- /dev/null +++ b/testing-framework/core/src/topology/utils.rs @@ -0,0 +1,106 @@ +use std::{collections::HashMap, iter}; + +use groth16::fr_to_bytes; +use key_management_system_service::{ + backend::preload::PreloadKMSBackendSettings, + keys::{Ed25519Key, Key, ZkKey}, +}; +use nomos_utils::net::get_available_udp_port; +use rand::{Rng, thread_rng}; + +use crate::topology::configs::{ + blend::GeneralBlendConfig, da::GeneralDaConfig, wallet::WalletAccount, +}; + +#[must_use] +/// Build preload KMS configs for blend/DA and wallet keys for every node. +pub fn create_kms_configs( + blend_configs: &[GeneralBlendConfig], + da_configs: &[GeneralDaConfig], + wallet_accounts: &[WalletAccount], +) -> Vec { + da_configs + .iter() + .zip(blend_configs.iter()) + .map(|(da_conf, blend_conf)| { + let mut keys = HashMap::from([ + ( + hex::encode(blend_conf.signer.verifying_key().as_bytes()), + Key::Ed25519(Ed25519Key::new(blend_conf.signer.clone())), + ), + ( + hex::encode(fr_to_bytes( + &blend_conf.secret_zk_key.to_public_key().into_inner(), + )), + Key::Zk(ZkKey::new(blend_conf.secret_zk_key.clone())), + ), + ( + hex::encode(da_conf.signer.verifying_key().as_bytes()), + Key::Ed25519(Ed25519Key::new(da_conf.signer.clone())), + ), + ( + hex::encode(fr_to_bytes( + &da_conf.secret_zk_key.to_public_key().into_inner(), + )), + Key::Zk(ZkKey::new(da_conf.secret_zk_key.clone())), + ), + ]); + + for account in wallet_accounts { + let key_id = hex::encode(fr_to_bytes(&account.public_key().into_inner())); + keys.entry(key_id) + .or_insert_with(|| Key::Zk(ZkKey::new(account.secret_key.clone()))); + } + + PreloadKMSBackendSettings { keys } + }) + .collect() +} + +pub fn resolve_ids(ids: Option>, count: usize) -> Vec<[u8; 32]> { + ids.map_or_else( + || { + let mut generated = vec![[0; 32]; count]; + for id in &mut generated { + thread_rng().fill(id); + } + generated + }, + |ids| { + assert_eq!( + ids.len(), + count, + "expected {count} ids but got {}", + ids.len() + ); + ids + }, + ) +} + +pub fn resolve_ports(ports: Option>, count: usize, label: &str) -> Vec { + let resolved = ports.unwrap_or_else(|| { + iter::repeat_with(|| get_available_udp_port().unwrap()) + .take(count) + .collect() + }); + assert_eq!( + resolved.len(), + count, + "expected {count} {label} ports but got {}", + resolved.len() + ); + resolved +} + +pub fn multiaddr_port(addr: &nomos_libp2p::Multiaddr) -> Option { + for protocol in addr { + match protocol { + nomos_libp2p::Protocol::Udp(port) | nomos_libp2p::Protocol::Tcp(port) => { + return Some(port); + } + _ => {} + } + } + None +}