From 625179b0e95c7492949f72ef21f22c31d38b343d Mon Sep 17 00:00:00 2001 From: andrussal Date: Wed, 14 Jan 2026 12:44:31 +0100 Subject: [PATCH] testing: add dynamic node control and config helpers --- .gitignore | 1 + Cargo.lock | 4 + examples/Cargo.toml | 3 + .../custom_workload_example_expectation.rs | 17 +- .../src/custom_workload_example_workload.rs | 17 +- examples/tests/dynamic_join.rs | 75 ++++++ .../configs/src/topology/configs/mod.rs | 1 + .../configs/src/topology/configs/network.rs | 30 +++ .../configs/src/topology/configs/runtime.rs | 147 ++++++++++++ testing-framework/core/src/nodes/executor.rs | 5 +- testing-framework/core/src/nodes/validator.rs | 5 +- .../core/src/scenario/capabilities.rs | 16 ++ testing-framework/core/src/scenario/mod.rs | 1 + .../core/src/scenario/runtime/context.rs | 2 +- .../core/src/scenario/runtime/node_clients.rs | 87 +++++-- .../core/src/topology/deployment.rs | 6 +- .../compose/src/lifecycle/block_feed.rs | 1 - .../k8s/src/deployer/orchestrator.rs | 6 +- .../deployers/k8s/src/lifecycle/block_feed.rs | 4 +- testing-framework/deployers/local/Cargo.toml | 11 +- .../deployers/local/src/runner.rs | 225 +++++++++++++++++- .../src/expectations/consensus_liveness.rs | 4 +- .../workflows/src/workloads/util.rs | 4 +- 23 files changed, 603 insertions(+), 69 deletions(-) create mode 100644 examples/tests/dynamic_join.rs create mode 100644 testing-framework/configs/src/topology/configs/runtime.rs diff --git a/.gitignore b/.gitignore index d8126b9..0275d82 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ testing-framework/assets/stack/kzgrs_test_params/ # Local test artifacts (kept when NOMOS_TESTS_KEEP_LOGS=1) tests/workflows/.tmp* tests/workflows/.tmp*/ +examples/.tmp*/ diff --git a/Cargo.lock b/Cargo.lock index 930e3a2..f1e1f14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6188,6 +6188,7 @@ name = "runner-examples" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "testing-framework-core", "testing-framework-runner-compose", "testing-framework-runner-k8s", @@ -7091,6 +7092,9 @@ name = "testing-framework-runner-local" version = "0.1.0" dependencies = [ "async-trait", + "logos-blockchain-utils", + "rand 0.8.5", + "testing-framework-config", "testing-framework-core", "thiserror 2.0.18", "tracing", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index ba28e95..4dee650 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -20,5 +20,8 @@ tokio = { workspace = true, features = ["macros", "ne tracing = { workspace = true } tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } +[dev-dependencies] +async-trait = { workspace = true } + [lints] workspace = true diff --git a/examples/doc-snippets/src/custom_workload_example_expectation.rs b/examples/doc-snippets/src/custom_workload_example_expectation.rs index 243027e..0278438 100644 --- a/examples/doc-snippets/src/custom_workload_example_expectation.rs +++ b/examples/doc-snippets/src/custom_workload_example_expectation.rs @@ -18,16 +18,13 @@ impl Expectation for ReachabilityExpectation { } async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { - let client = ctx - .node_clients() - .validator_clients() - .get(self.target_idx) - .ok_or_else(|| { - Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "missing target client", - )) as DynError - })?; + let validators = ctx.node_clients().validator_clients(); + let client = validators.get(self.target_idx).ok_or_else(|| { + Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "missing target client", + )) as DynError + })?; client .consensus_info() diff --git a/examples/doc-snippets/src/custom_workload_example_workload.rs b/examples/doc-snippets/src/custom_workload_example_workload.rs index cd17837..4b545a4 100644 --- a/examples/doc-snippets/src/custom_workload_example_workload.rs +++ b/examples/doc-snippets/src/custom_workload_example_workload.rs @@ -43,16 +43,13 @@ impl Workload for ReachabilityWorkload { } async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { - let client = ctx - .node_clients() - .validator_clients() - .get(self.target_idx) - .ok_or_else(|| { - Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - "missing target client", - )) as DynError - })?; + let validators = ctx.node_clients().validator_clients(); + let client = validators.get(self.target_idx).ok_or_else(|| { + Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + "missing target client", + )) as DynError + })?; // Lightweight API call to prove reachability. client diff --git a/examples/tests/dynamic_join.rs b/examples/tests/dynamic_join.rs new file mode 100644 index 0000000..f176918 --- /dev/null +++ b/examples/tests/dynamic_join.rs @@ -0,0 +1,75 @@ +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use testing_framework_core::scenario::{Deployer, DynError, RunContext, ScenarioBuilder, Workload}; +use testing_framework_runner_local::LocalDeployer; +use testing_framework_workflows::ScenarioBuilderExt; +use tokio::time::{sleep, timeout}; +use tracing_subscriber::fmt::try_init; + +const START_DELAY: Duration = Duration::from_secs(5); +const READY_TIMEOUT: Duration = Duration::from_secs(60); +const READY_POLL_INTERVAL: Duration = Duration::from_secs(2); + +struct JoinNodeWorkload { + name: String, +} + +impl JoinNodeWorkload { + fn new(name: impl Into) -> Self { + Self { name: name.into() } + } +} + +#[async_trait] +impl Workload for JoinNodeWorkload { + fn name(&self) -> &str { + "dynamic_join" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let handle = ctx + .node_control() + .ok_or_else(|| "dynamic join workload requires node control".to_owned())?; + + sleep(START_DELAY).await; + + let node = handle.start_validator(&self.name).await?; + let client = node.api; + + timeout(READY_TIMEOUT, async { + loop { + match client.consensus_info().await { + Ok(info) if info.height > 0 => break, + Ok(_) | Err(_) => sleep(READY_POLL_INTERVAL).await, + } + } + }) + .await + .map_err(|_| "dynamic join node did not become ready in time")?; + + sleep(ctx.run_duration()).await; + Ok(()) + } +} + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored`"] +async fn dynamic_join_reaches_consensus_liveness() -> Result<()> { + let _ = try_init(); + + let mut scenario = + ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(0)) + .enable_node_control() + .with_workload(JoinNodeWorkload::new("joiner")) + .expect_consensus_liveness() + .with_run_duration(Duration::from_secs(60)) + .build()?; + + let deployer = LocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + let _handle = runner.run(&mut scenario).await?; + + Ok(()) +} diff --git a/testing-framework/configs/src/topology/configs/mod.rs b/testing-framework/configs/src/topology/configs/mod.rs index a1d2d3d..b328316 100644 --- a/testing-framework/configs/src/topology/configs/mod.rs +++ b/testing-framework/configs/src/topology/configs/mod.rs @@ -5,6 +5,7 @@ pub mod bootstrap; pub mod consensus; pub mod da; pub mod network; +pub mod runtime; pub mod time; pub mod tracing; pub mod wallet; diff --git a/testing-framework/configs/src/topology/configs/network.rs b/testing-framework/configs/src/topology/configs/network.rs index 8d56a66..49387e4 100644 --- a/testing-framework/configs/src/topology/configs/network.rs +++ b/testing-framework/configs/src/topology/configs/network.rs @@ -108,6 +108,36 @@ pub fn create_network_configs( .collect()) } +pub fn build_network_config_for_node( + id: [u8; 32], + port: u16, + initial_peers: Vec, +) -> Result { + let mut node_key_bytes = id; + let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes).map_err(|err| { + NetworkConfigError::NodeKeyFromBytes { + message: err.to_string(), + } + })?; + + let swarm_config = SwarmConfig { + node_key, + port, + chain_sync_config: cryptarchia_sync::Config { + peer_response_timeout: PEER_RESPONSE_TIMEOUT, + }, + nat_config: nat_settings(port)?, + ..default_swarm_config() + }; + + Ok(GeneralNetworkConfig { + backend: BackendSettings { + initial_peers, + swarm: swarm_config, + }, + }) +} + fn initial_peers_by_network_layout( swarm_configs: &[SwarmConfig], network_params: &NetworkParams, diff --git a/testing-framework/configs/src/topology/configs/runtime.rs b/testing-framework/configs/src/topology/configs/runtime.rs new file mode 100644 index 0000000..cdd1fde --- /dev/null +++ b/testing-framework/configs/src/topology/configs/runtime.rs @@ -0,0 +1,147 @@ +use std::collections::HashMap; + +use key_management_system_service::{backend::preload::PreloadKMSBackendSettings, keys::Key}; +use nomos_libp2p::Multiaddr; + +use crate::{ + node_address_from_port, + nodes::kms::key_id_for_preload_backend, + topology::configs::{ + GeneralConfig, GeneralConfigError, api, blend, bootstrap, consensus, + consensus::{ConsensusParams, GeneralConsensusConfig}, + da, + da::DaParams, + network, + network::{Libp2pNetworkLayout, NetworkParams}, + time, tracing, + wallet::WalletConfig, + }, +}; + +pub fn build_general_config_for_node( + id: [u8; 32], + network_port: u16, + initial_peers: Vec, + da_port: u16, + blend_port: u16, + consensus_params: &ConsensusParams, + da_params: &DaParams, + wallet_config: &WalletConfig, + base_consensus: &GeneralConsensusConfig, + time_config: &time::GeneralTimeConfig, +) -> Result { + let consensus_config = + build_consensus_config_for_node(id, consensus_params, wallet_config, base_consensus)?; + + let bootstrap_config = + bootstrap::create_bootstrap_configs(&[id], bootstrap::SHORT_PROLONGED_BOOTSTRAP_PERIOD) + .into_iter() + .next() + .ok_or(GeneralConfigError::EmptyParticipants)?; + + let da_config = da::try_create_da_configs(&[id], da_params, &[da_port])? + .into_iter() + .next() + .ok_or(GeneralConfigError::EmptyParticipants)?; + + let blend_config = blend::create_blend_configs(&[id], &[blend_port]) + .into_iter() + .next() + .ok_or(GeneralConfigError::EmptyParticipants)?; + + let network_config = network::build_network_config_for_node(id, network_port, initial_peers)?; + + let api_config = api::create_api_configs(&[id])? + .into_iter() + .next() + .ok_or(GeneralConfigError::EmptyParticipants)?; + + let tracing_config = tracing::create_tracing_configs(&[id]) + .into_iter() + .next() + .ok_or(GeneralConfigError::EmptyParticipants)?; + + let kms_config = build_kms_config_for_node(&blend_config, &da_config, wallet_config); + + Ok(GeneralConfig { + consensus_config, + bootstrapping_config: bootstrap_config, + da_config, + network_config, + blend_config, + api_config, + tracing_config, + time_config: time_config.clone(), + kms_config, + }) +} + +pub fn build_consensus_config_for_node( + id: [u8; 32], + consensus_params: &ConsensusParams, + wallet_config: &WalletConfig, + base: &GeneralConsensusConfig, +) -> Result { + let mut config = consensus::create_consensus_configs(&[id], consensus_params, wallet_config)? + .into_iter() + .next() + .ok_or(GeneralConfigError::EmptyParticipants)?; + + config.genesis_tx = base.genesis_tx.clone(); + config.utxos = base.utxos.clone(); + config.da_notes = base.da_notes.clone(); + config.blend_notes = base.blend_notes.clone(); + config.wallet_accounts = base.wallet_accounts.clone(); + + Ok(config) +} + +pub fn build_initial_peers(network_params: &NetworkParams, peer_ports: &[u16]) -> Vec { + match network_params.libp2p_network_layout { + Libp2pNetworkLayout::Star => peer_ports + .first() + .map(|port| vec![node_address_from_port(*port)]) + .unwrap_or_default(), + Libp2pNetworkLayout::Chain => peer_ports + .last() + .map(|port| vec![node_address_from_port(*port)]) + .unwrap_or_default(), + Libp2pNetworkLayout::Full => peer_ports + .iter() + .map(|port| node_address_from_port(*port)) + .collect(), + } +} + +fn build_kms_config_for_node( + blend_config: &blend::GeneralBlendConfig, + da_config: &da::GeneralDaConfig, + wallet_config: &WalletConfig, +) -> PreloadKMSBackendSettings { + let mut keys = HashMap::from([ + ( + key_id_for_preload_backend(&Key::Ed25519(blend_config.signer.clone())), + Key::Ed25519(blend_config.signer.clone()), + ), + ( + key_id_for_preload_backend(&Key::Zk(blend_config.secret_zk_key.clone())), + Key::Zk(blend_config.secret_zk_key.clone()), + ), + ( + key_id_for_preload_backend(&Key::Ed25519(da_config.signer.clone())), + Key::Ed25519(da_config.signer.clone()), + ), + ( + key_id_for_preload_backend(&Key::Zk(da_config.secret_zk_key.clone())), + Key::Zk(da_config.secret_zk_key.clone()), + ), + ]); + + for account in &wallet_config.accounts { + let key = Key::Zk(account.secret_key.clone()); + let key_id = key_id_for_preload_backend(&key); + keys.entry(key_id).or_insert(key); + } + + PreloadKMSBackendSettings { keys } +} diff --git a/testing-framework/core/src/nodes/executor.rs b/testing-framework/core/src/nodes/executor.rs index 39485e1..cb589ae 100644 --- a/testing-framework/core/src/nodes/executor.rs +++ b/testing-framework/core/src/nodes/executor.rs @@ -60,10 +60,11 @@ impl Drop for Executor { } impl Executor { - pub async fn spawn(config: Config) -> Result { + pub async fn spawn(config: Config, label: &str) -> Result { + let log_prefix = format!("{LOGS_PREFIX}-{label}"); let handle = spawn_node( config, - LOGS_PREFIX, + &log_prefix, "executor.yaml", binary_path(), !*IS_DEBUG_TRACING, diff --git a/testing-framework/core/src/nodes/validator.rs b/testing-framework/core/src/nodes/validator.rs index 7f972ca..c183e0b 100644 --- a/testing-framework/core/src/nodes/validator.rs +++ b/testing-framework/core/src/nodes/validator.rs @@ -72,10 +72,11 @@ impl Validator { self.handle.wait_for_exit(timeout).await } - pub async fn spawn(config: Config) -> Result { + pub async fn spawn(config: Config, label: &str) -> Result { + let log_prefix = format!("{LOGS_PREFIX}-{label}"); let handle = spawn_node( config, - LOGS_PREFIX, + &log_prefix, "validator.yaml", binary_path(), !*IS_DEBUG_TRACING, diff --git a/testing-framework/core/src/scenario/capabilities.rs b/testing-framework/core/src/scenario/capabilities.rs index 3af53e8..d643999 100644 --- a/testing-framework/core/src/scenario/capabilities.rs +++ b/testing-framework/core/src/scenario/capabilities.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use reqwest::Url; use super::DynError; +use crate::{nodes::ApiClient, topology::generation::NodeRole}; /// Marker type used by scenario builders to request node control support. #[derive(Clone, Copy, Debug, Default)] @@ -45,4 +46,19 @@ pub trait NodeControlHandle: Send + Sync { async fn restart_validator(&self, index: usize) -> Result<(), DynError>; async fn restart_executor(&self, index: usize) -> Result<(), DynError>; + + async fn start_validator(&self, _name: &str) -> Result { + Err("start_validator not supported by this deployer".into()) + } + + async fn start_executor(&self, _name: &str) -> Result { + Err("start_executor not supported by this deployer".into()) + } +} + +#[derive(Clone)] +pub struct StartedNode { + pub name: String, + pub role: NodeRole, + pub api: ApiClient, } diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 6cee1cc..648084e 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -13,6 +13,7 @@ pub type DynError = Box; pub use capabilities::{ NodeControlCapability, NodeControlHandle, ObservabilityCapability, RequiresNodeControl, + StartedNode, }; pub use definition::{ Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator, diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 30f2a44..5a40f39 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -63,7 +63,7 @@ impl RunContext { } #[must_use] - pub fn random_node_client(&self) -> Option<&ApiClient> { + pub fn random_node_client(&self) -> Option { self.node_clients.any_client() } diff --git a/testing-framework/core/src/scenario/runtime/node_clients.rs b/testing-framework/core/src/scenario/runtime/node_clients.rs index 32ed57f..657b4e0 100644 --- a/testing-framework/core/src/scenario/runtime/node_clients.rs +++ b/testing-framework/core/src/scenario/runtime/node_clients.rs @@ -1,4 +1,7 @@ -use std::pin::Pin; +use std::{ + pin::Pin, + sync::{Arc, RwLock}, +}; use rand::{Rng as _, seq::SliceRandom as _, thread_rng}; @@ -11,6 +14,11 @@ use crate::{ /// Collection of API clients for the validator and executor set. #[derive(Clone, Default)] pub struct NodeClients { + inner: Arc>, +} + +#[derive(Default)] +struct NodeClientsInner { validators: Vec, executors: Vec, } @@ -18,10 +26,12 @@ pub struct NodeClients { impl NodeClients { #[must_use] /// Build clients from preconstructed vectors. - pub const fn new(validators: Vec, executors: Vec) -> Self { + pub fn new(validators: Vec, executors: Vec) -> Self { Self { - validators, - executors, + inner: Arc::new(RwLock::new(NodeClientsInner { + validators, + executors, + })), } } @@ -43,48 +53,65 @@ impl NodeClients { #[must_use] /// Validator API clients. - pub fn validator_clients(&self) -> &[ApiClient] { - &self.validators + pub fn validator_clients(&self) -> Vec { + self.inner + .read() + .expect("node clients lock poisoned") + .validators + .clone() } #[must_use] /// Executor API clients. - pub fn executor_clients(&self) -> &[ApiClient] { - &self.executors + pub fn executor_clients(&self) -> Vec { + self.inner + .read() + .expect("node clients lock poisoned") + .executors + .clone() } #[must_use] /// Choose a random validator client if present. - pub fn random_validator(&self) -> Option<&ApiClient> { - if self.validators.is_empty() { + pub fn random_validator(&self) -> Option { + let validators = self.validator_clients(); + if validators.is_empty() { return None; } let mut rng = thread_rng(); - let idx = rng.gen_range(0..self.validators.len()); - self.validators.get(idx) + let idx = rng.gen_range(0..validators.len()); + validators.get(idx).cloned() } #[must_use] /// Choose a random executor client if present. - pub fn random_executor(&self) -> Option<&ApiClient> { - if self.executors.is_empty() { + pub fn random_executor(&self) -> Option { + let executors = self.executor_clients(); + if executors.is_empty() { return None; } let mut rng = thread_rng(); - let idx = rng.gen_range(0..self.executors.len()); - self.executors.get(idx) + let idx = rng.gen_range(0..executors.len()); + executors.get(idx).cloned() } /// Iterator over all clients. - pub fn all_clients(&self) -> impl Iterator { - self.validators.iter().chain(self.executors.iter()) + pub fn all_clients(&self) -> Vec { + let guard = self.inner.read().expect("node clients lock poisoned"); + guard + .validators + .iter() + .chain(guard.executors.iter()) + .cloned() + .collect() } #[must_use] /// Choose any random client from validators+executors. - pub fn any_client(&self) -> Option<&ApiClient> { - let validator_count = self.validators.len(); - let executor_count = self.executors.len(); + pub fn any_client(&self) -> Option { + let guard = self.inner.read().expect("node clients lock poisoned"); + let validator_count = guard.validators.len(); + let executor_count = guard.executors.len(); let total = validator_count + executor_count; if total == 0 { return None; @@ -92,9 +119,9 @@ impl NodeClients { let mut rng = thread_rng(); let choice = rng.gen_range(0..total); if choice < validator_count { - self.validators.get(choice) + guard.validators.get(choice).cloned() } else { - self.executors.get(choice - validator_count) + guard.executors.get(choice - validator_count).cloned() } } @@ -103,6 +130,16 @@ impl NodeClients { pub const fn cluster_client(&self) -> ClusterClient<'_> { ClusterClient::new(self) } + + pub fn add_validator(&self, client: ApiClient) { + let mut guard = self.inner.write().expect("node clients lock poisoned"); + guard.validators.push(client); + } + + pub fn add_executor(&self, client: ApiClient) { + let mut guard = self.inner.write().expect("node clients lock poisoned"); + guard.executors.push(client); + } } pub struct ClusterClient<'a> { @@ -127,7 +164,7 @@ impl<'a> ClusterClient<'a> { where E: Into, { - let mut clients: Vec<&ApiClient> = self.node_clients.all_clients().collect(); + let mut clients = self.node_clients.all_clients(); if clients.is_empty() { return Err("cluster client has no api clients".into()); } @@ -135,7 +172,7 @@ impl<'a> ClusterClient<'a> { clients.shuffle(&mut thread_rng()); let mut last_err = None; - for client in clients { + for client in &clients { match f(client).await { Ok(value) => return Ok(value), Err(err) => last_err = Some(err.into()), diff --git a/testing-framework/core/src/topology/deployment.rs b/testing-framework/core/src/topology/deployment.rs index 7395e26..e8126d1 100644 --- a/testing-framework/core/src/topology/deployment.rs +++ b/testing-framework/core/src/topology/deployment.rs @@ -91,13 +91,15 @@ impl Topology { let mut validators = Vec::new(); for i in 0..n_validators { let config = create_validator_config(config[i].clone()); - validators.push(Validator::spawn(config).await?); + let label = format!("validator-{i}"); + validators.push(Validator::spawn(config, &label).await?); } let mut executors = Vec::new(); for i in 0..n_executors { let config = create_executor_config(config[n_validators + i].clone()); - executors.push(Executor::spawn(config).await?); + let label = format!("executor-{i}"); + executors.push(Executor::spawn(config, &label).await?); } Ok((validators, executors)) diff --git a/testing-framework/deployers/compose/src/lifecycle/block_feed.rs b/testing-framework/deployers/compose/src/lifecycle/block_feed.rs index c86f869..8f69ab0 100644 --- a/testing-framework/deployers/compose/src/lifecycle/block_feed.rs +++ b/testing-framework/deployers/compose/src/lifecycle/block_feed.rs @@ -20,7 +20,6 @@ async fn spawn_block_feed_with( let block_source_client = node_clients .random_validator() - .cloned() .ok_or(ComposeRunnerError::BlockFeedMissing)?; spawn_block_feed(block_source_client) diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index cc55100..1dca17f 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -338,7 +338,8 @@ fn maybe_print_endpoints( .unwrap_or_else(|| "".to_string()) ); - for (idx, client) in node_clients.validator_clients().iter().enumerate() { + let validator_clients = node_clients.validator_clients(); + for (idx, client) in validator_clients.iter().enumerate() { println!( "TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto", idx, @@ -346,7 +347,8 @@ fn maybe_print_endpoints( ); } - for (idx, client) in node_clients.executor_clients().iter().enumerate() { + let executor_clients = node_clients.executor_clients(); + for (idx, client) in executor_clients.iter().enumerate() { println!( "TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto", idx, diff --git a/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs b/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs index 5380bcc..e72f9c0 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs @@ -14,9 +14,9 @@ pub async fn spawn_block_feed_with( let block_source_client = node_clients .validator_clients() - .first() + .into_iter() + .next() .or_else(|| node_clients.any_client()) - .cloned() .ok_or(K8sRunnerError::BlockFeedMissing)?; info!("starting block feed"); diff --git a/testing-framework/deployers/local/Cargo.toml b/testing-framework/deployers/local/Cargo.toml index e29a929..a5efe73 100644 --- a/testing-framework/deployers/local/Cargo.toml +++ b/testing-framework/deployers/local/Cargo.toml @@ -13,7 +13,10 @@ version = "0.1.0" workspace = true [dependencies] -async-trait = "0.1" -testing-framework-core = { path = "../../core" } -thiserror = { workspace = true } -tracing = { workspace = true } +async-trait = "0.1" +nomos-utils = { workspace = true } +rand = { workspace = true } +testing-framework-config = { workspace = true } +testing-framework-core = { path = "../../core" } +thiserror = { workspace = true } +tracing = { workspace = true } diff --git a/testing-framework/deployers/local/src/runner.rs b/testing-framework/deployers/local/src/runner.rs index 4e08aaa..61c9ffd 100644 --- a/testing-framework/deployers/local/src/runner.rs +++ b/testing-framework/deployers/local/src/runner.rs @@ -1,11 +1,23 @@ +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; +use nomos_utils::net::get_available_udp_port; +use rand::Rng as _; +use testing_framework_config::topology::configs::{ + consensus, + runtime::{build_general_config_for_node, build_initial_peers}, + time, +}; use testing_framework_core::{ + nodes::{ApiClient, executor::Executor, validator::Validator}, scenario::{ - BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, RunContext, Runner, - Scenario, ScenarioError, spawn_block_feed, + BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability, + NodeControlHandle, RunContext, Runner, Scenario, ScenarioError, StartedNode, + spawn_block_feed, }, topology::{ deployment::{SpawnTopologyError, Topology}, + generation::{GeneratedTopology, NodeRole}, readiness::ReadinessError, }, }; @@ -82,6 +94,43 @@ impl Deployer<()> for LocalDeployer { } } +#[async_trait] +impl Deployer for LocalDeployer { + type Error = LocalDeployerError; + + async fn deploy( + &self, + scenario: &Scenario, + ) -> Result { + info!( + validators = scenario.topology().validators().len(), + executors = scenario.topology().executors().len(), + "starting local deployment with node control" + ); + + let topology = Self::prepare_topology(scenario).await?; + let node_clients = NodeClients::from_topology(scenario.topology(), &topology); + let node_control = Arc::new(LocalNodeControl::new( + scenario.topology().clone(), + node_clients.clone(), + )); + + let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?; + + let context = RunContext::new( + scenario.topology().clone(), + Some(topology), + node_clients, + scenario.duration(), + Metrics::empty(), + block_feed, + Some(node_control), + ); + + Ok(Runner::new(context, Some(Box::new(block_feed_guard)))) + } +} + impl LocalDeployer { #[must_use] /// Construct a local deployer. @@ -89,7 +138,7 @@ impl LocalDeployer { Self::default() } - async fn prepare_topology(scenario: &Scenario<()>) -> Result { + async fn prepare_topology(scenario: &Scenario) -> Result { let descriptors = scenario.topology(); info!( validators = descriptors.validators().len(), @@ -133,7 +182,7 @@ async fn spawn_block_feed_with( "selecting validator client for local block feed" ); - let Some(block_source_client) = node_clients.random_validator().cloned() else { + let Some(block_source_client) = node_clients.random_validator() else { return Err(LocalDeployerError::WorkloadFailed { source: "block feed requires at least one validator".into(), }); @@ -151,3 +200,171 @@ fn workload_error(source: impl Into) -> LocalDeployerError { source: source.into(), } } + +struct LocalNodeControl { + descriptors: GeneratedTopology, + node_clients: NodeClients, + base_consensus: consensus::GeneralConsensusConfig, + base_time: time::GeneralTimeConfig, + state: Mutex, +} + +struct LocalNodeControlState { + validator_count: usize, + executor_count: usize, + peer_ports: Vec, + validators: Vec, + executors: Vec, +} + +#[async_trait] +impl NodeControlHandle for LocalNodeControl { + async fn restart_validator(&self, _index: usize) -> Result<(), DynError> { + Err("local deployer does not support restart_validator".into()) + } + + async fn restart_executor(&self, _index: usize) -> Result<(), DynError> { + Err("local deployer does not support restart_executor".into()) + } + + async fn start_validator(&self, name: &str) -> Result { + self.start_node(NodeRole::Validator, name).await + } + + async fn start_executor(&self, name: &str) -> Result { + self.start_node(NodeRole::Executor, name).await + } +} + +impl LocalNodeControl { + fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self { + let base_node = descriptors + .validators() + .first() + .or_else(|| descriptors.executors().first()) + .expect("generated topology must contain at least one node"); + + let base_consensus = base_node.general.consensus_config.clone(); + let base_time = base_node.general.time_config.clone(); + + let peer_ports = descriptors + .nodes() + .map(|node| node.network_port()) + .collect::>(); + + let state = LocalNodeControlState { + validator_count: descriptors.validators().len(), + executor_count: descriptors.executors().len(), + peer_ports, + validators: Vec::new(), + executors: Vec::new(), + }; + + Self { + descriptors, + node_clients, + base_consensus, + base_time, + state: Mutex::new(state), + } + } + + async fn start_node(&self, role: NodeRole, name: &str) -> Result { + let (peer_ports, node_name) = { + let state = self.state.lock().expect("local node control lock poisoned"); + let index = match role { + NodeRole::Validator => state.validator_count, + NodeRole::Executor => state.executor_count, + }; + + let role_label = match role { + NodeRole::Validator => "validator", + NodeRole::Executor => "executor", + }; + + let label = if name.trim().is_empty() { + format!("{role_label}-{index}") + } else { + format!("{role_label}-{name}") + }; + + (state.peer_ports.clone(), label) + }; + + let id = random_node_id(); + let network_port = allocate_udp_port("network port")?; + let da_port = allocate_udp_port("DA port")?; + let blend_port = allocate_udp_port("Blend port")?; + + let topology = self.descriptors.config(); + let initial_peers = build_initial_peers(&topology.network_params, &peer_ports); + + let general_config = build_general_config_for_node( + id, + network_port, + initial_peers, + da_port, + blend_port, + &topology.consensus_params, + &topology.da_params, + &topology.wallet_config, + &self.base_consensus, + &self.base_time, + )?; + + let api_client = match role { + NodeRole::Validator => { + let config = testing_framework_core::nodes::validator::create_validator_config( + general_config, + ); + + let node = Validator::spawn(config, &node_name).await?; + let client = ApiClient::from_urls(node.url(), node.testing_url()); + + self.node_clients.add_validator(client.clone()); + + let mut state = self.state.lock().expect("local node control lock poisoned"); + + state.peer_ports.push(network_port); + state.validator_count += 1; + state.validators.push(node); + + client + } + NodeRole::Executor => { + let config = + testing_framework_core::nodes::executor::create_executor_config(general_config); + + let node = Executor::spawn(config, &node_name).await?; + let client = ApiClient::from_urls(node.url(), node.testing_url()); + + self.node_clients.add_executor(client.clone()); + + let mut state = self.state.lock().expect("local node control lock poisoned"); + + state.peer_ports.push(network_port); + state.executor_count += 1; + state.executors.push(node); + + client + } + }; + + Ok(StartedNode { + name: node_name, + role, + api: api_client, + }) + } +} + +fn random_node_id() -> [u8; 32] { + let mut id = [0u8; 32]; + rand::thread_rng().fill(&mut id); + id +} + +fn allocate_udp_port(label: &'static str) -> Result { + get_available_udp_port() + .ok_or_else(|| format!("failed to allocate free UDP port for {label}").into()) +} diff --git a/testing-framework/workflows/src/expectations/consensus_liveness.rs b/testing-framework/workflows/src/expectations/consensus_liveness.rs index 6788635..571320e 100644 --- a/testing-framework/workflows/src/expectations/consensus_liveness.rs +++ b/testing-framework/workflows/src/expectations/consensus_liveness.rs @@ -90,7 +90,7 @@ impl ConsensusLiveness { } fn ensure_participants(ctx: &RunContext) -> Result<(), DynError> { - if ctx.node_clients().all_clients().count() == 0 { + if ctx.node_clients().all_clients().is_empty() { Err(Box::new(ConsensusLivenessError::MissingParticipants)) } else { Ok(()) @@ -98,7 +98,7 @@ impl ConsensusLiveness { } async fn collect_results(ctx: &RunContext) -> LivenessCheck { - let clients: Vec<_> = ctx.node_clients().all_clients().collect(); + let clients = ctx.node_clients().all_clients(); let mut samples = Vec::with_capacity(clients.len()); let mut issues = Vec::new(); diff --git a/testing-framework/workflows/src/workloads/util.rs b/testing-framework/workflows/src/workloads/util.rs index 2e9f9a5..0a48d4e 100644 --- a/testing-framework/workflows/src/workloads/util.rs +++ b/testing-framework/workflows/src/workloads/util.rs @@ -48,8 +48,8 @@ pub async fn submit_transaction_via_cluster( ); let node_clients = ctx.node_clients(); - let mut validator_clients: Vec<_> = node_clients.validator_clients().iter().collect(); - let mut executor_clients: Vec<_> = node_clients.executor_clients().iter().collect(); + let mut validator_clients = node_clients.validator_clients(); + let mut executor_clients = node_clients.executor_clients(); validator_clients.shuffle(&mut thread_rng()); executor_clients.shuffle(&mut thread_rng());