diff --git a/Cargo.lock b/Cargo.lock index 9dac85d..08b6e48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7088,12 +7088,16 @@ name = "testing-framework-runner-local" version = "0.1.0" dependencies = [ "async-trait", + "logos-blockchain-executor", "logos-blockchain-libp2p", + "logos-blockchain-network-service", + "logos-blockchain-node", "logos-blockchain-utils", "rand 0.8.5", "testing-framework-config", "testing-framework-core", "thiserror 2.0.17", + "tokio", "tracing", ] diff --git a/examples/tests/dynamic_join.rs b/examples/tests/dynamic_join.rs index 6400fee..7522a62 100644 --- a/examples/tests/dynamic_join.rs +++ b/examples/tests/dynamic_join.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; use testing_framework_core::scenario::{ - Deployer, DynError, RunContext, ScenarioBuilder, StartNodeOptions, Workload, + Deployer, DynError, PeerSelection, RunContext, ScenarioBuilder, StartNodeOptions, Workload, }; use testing_framework_runner_local::LocalDeployer; use testing_framework_workflows::ScenarioBuilderExt; @@ -84,7 +84,7 @@ impl Workload for JoinNodeWithPeersWorkload { sleep(START_DELAY).await; let options = StartNodeOptions { - peer_names: self.peers.clone(), + peers: PeerSelection::Named(self.peers.clone()), }; let node = handle.start_validator_with(&self.name, options).await?; let client = node.api; diff --git a/examples/tests/manual_cluster.rs b/examples/tests/manual_cluster.rs new file mode 100644 index 0000000..290d3a3 --- /dev/null +++ b/examples/tests/manual_cluster.rs @@ -0,0 +1,78 @@ +use std::{env, time::Duration}; + +use anyhow::Result; +use testing_framework_core::{ + scenario::{PeerSelection, StartNodeOptions}, + topology::config::TopologyConfig, +}; +use testing_framework_runner_local::ManualClusterGuard; +use tokio::time::sleep; +use tracing_subscriber::fmt::try_init; + +const MAX_HEIGHT_DIFF: u64 = 5; + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored manual_cluster_two_clusters_merge`"] +async fn manual_cluster_two_clusters_merge() -> Result<()> { + let _ = try_init(); + let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .expect("examples crate should live under the workspace root"); + let circuits_dir = workspace_root.join("testing-framework/assets/stack/circuits"); + unsafe { + env::set_var("LOGOS_BLOCKCHAIN_CIRCUITS", circuits_dir); + } + // Required env vars (set on the command line when running this test): + // - `POL_PROOF_DEV_MODE=true` + // - `RUST_LOG=info` (optional) + let config = TopologyConfig::with_node_numbers(2, 0); + let cluster = ManualClusterGuard::from_config(config)?; + // Nodes are stopped automatically when the guard is dropped. + + println!("starting validator a"); + + let validator_a = cluster + .start_validator_with( + "a", + StartNodeOptions { + peers: PeerSelection::None, + }, + ) + .await? + .api; + + println!("waiting briefly before starting c"); + sleep(Duration::from_secs(30)).await; + + println!("starting validator c -> a"); + let validator_c = cluster + .start_validator_with( + "c", + StartNodeOptions { + peers: PeerSelection::Named(vec!["validator-a".to_owned()]), + }, + ) + .await? + .api; + + println!("waiting for network readiness: cluster a,c"); + cluster.wait_network_ready().await?; + + sleep(Duration::from_secs(5)).await; + + let a_info = validator_a.consensus_info().await?; + let c_info = validator_c.consensus_info().await?; + let height_diff = a_info.height.abs_diff(c_info.height); + + println!( + "final heights: validator-a={}, validator-c={}, diff={}", + a_info.height, c_info.height, height_diff + ); + + if height_diff > MAX_HEIGHT_DIFF { + return Err(anyhow::anyhow!( + "height diff too large: {height_diff} > {MAX_HEIGHT_DIFF}" + )); + } + Ok(()) +} diff --git a/scripts/build/build-bundle.sh b/scripts/build/build-bundle.sh index 692bad4..b3169a2 100755 --- a/scripts/build/build-bundle.sh +++ b/scripts/build/build-bundle.sh @@ -327,10 +327,7 @@ build_bundle::prepare_circuits() { } build_bundle::build_binaries() { - FEATURES="testing" - if [ -n "${NOMOS_EXTRA_FEATURES:-}" ]; then - FEATURES="${FEATURES},${NOMOS_EXTRA_FEATURES}" - fi + BUILD_FEATURES_LABEL="all" echo "==> Building binaries (platform=${PLATFORM})" mkdir -p "${NODE_SRC}" ( @@ -362,13 +359,13 @@ build_bundle::build_binaries() { RUSTFLAGS='--cfg feature="pol-dev-mode"' NOMOS_CIRCUITS="${CIRCUITS_DIR}" \ LOGOS_BLOCKCHAIN_CIRCUITS="${CIRCUITS_DIR}" \ RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \ - cargo build --features "${FEATURES}" \ + cargo build --all-features \ -p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli \ --target-dir "${NODE_TARGET}" else RUSTFLAGS='--cfg feature="pol-dev-mode"' NOMOS_CIRCUITS="${CIRCUITS_DIR}" \ LOGOS_BLOCKCHAIN_CIRCUITS="${CIRCUITS_DIR}" \ - cargo build --features "${FEATURES}" \ + cargo build --all-features \ -p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli \ --target-dir "${NODE_TARGET}" fi @@ -392,7 +389,7 @@ build_bundle::package_bundle() { echo "nomos_node_git_head=$(git -C "${NODE_SRC}" rev-parse HEAD 2>/dev/null || true)" fi echo "platform=${PLATFORM}" - echo "features=${FEATURES}" + echo "features=${BUILD_FEATURES_LABEL}" } > "${bundle_dir}/artifacts/nomos-bundle-meta.env" mkdir -p "$(dirname "${OUTPUT}")" @@ -405,7 +402,7 @@ build_bundle::package_bundle() { fi echo "Bundle created at ${OUTPUT}" - if [[ "${FEATURES}" == *profiling* ]]; then + if [[ "${BUILD_FEATURES_LABEL}" == "all" ]] || [[ "${BUILD_FEATURES_LABEL}" == *profiling* ]]; then cat <<'EOF_PROF' Profiling endpoints (enabled by --features profiling): CPU pprof (SVG): curl "http://:8722/debug/pprof/profile?seconds=15&format=svg" -o profile.svg diff --git a/testing-framework/core/src/scenario/capabilities.rs b/testing-framework/core/src/scenario/capabilities.rs index 4f80cb7..d6784bc 100644 --- a/testing-framework/core/src/scenario/capabilities.rs +++ b/testing-framework/core/src/scenario/capabilities.rs @@ -22,11 +22,30 @@ pub struct ObservabilityCapability { pub grafana_url: Option, } +/// Peer selection strategy for dynamically started nodes. +#[derive(Clone, Debug)] +pub enum PeerSelection { + /// Use the topology default (star/chain/full). + DefaultLayout, + /// Start without any initial peers. + None, + /// Connect to the named peers. + Named(Vec), +} + /// Options for dynamically starting a node. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct StartNodeOptions { - /// Names of nodes to connect to on startup (implementation-defined). - pub peer_names: Vec, + /// How to select initial peers on startup. + pub peers: PeerSelection, +} + +impl Default for StartNodeOptions { + fn default() -> Self { + Self { + peers: PeerSelection::DefaultLayout, + } + } } /// Trait implemented by scenario capability markers to signal whether node @@ -77,6 +96,10 @@ pub trait NodeControlHandle: Send + Sync { ) -> Result { Err("start_executor_with not supported by this deployer".into()) } + + fn node_client(&self, _name: &str) -> Option { + None + } } #[derive(Clone)] diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 2b79383..47c7de3 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -12,8 +12,8 @@ mod workload; pub type DynError = Box; pub use capabilities::{ - NodeControlCapability, NodeControlHandle, ObservabilityCapability, RequiresNodeControl, - StartNodeOptions, StartedNode, + NodeControlCapability, NodeControlHandle, ObservabilityCapability, PeerSelection, + RequiresNodeControl, StartNodeOptions, StartedNode, }; pub use definition::{ Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator, diff --git a/testing-framework/core/src/scenario/runtime/node_clients.rs b/testing-framework/core/src/scenario/runtime/node_clients.rs index 657b4e0..ff7365e 100644 --- a/testing-framework/core/src/scenario/runtime/node_clients.rs +++ b/testing-framework/core/src/scenario/runtime/node_clients.rs @@ -140,6 +140,12 @@ impl NodeClients { let mut guard = self.inner.write().expect("node clients lock poisoned"); guard.executors.push(client); } + + pub fn clear(&self) { + let mut guard = self.inner.write().expect("node clients lock poisoned"); + guard.validators.clear(); + guard.executors.clear(); + } } pub struct ClusterClient<'a> { diff --git a/testing-framework/deployers/local/Cargo.toml b/testing-framework/deployers/local/Cargo.toml index df0eb47..4a49a04 100644 --- a/testing-framework/deployers/local/Cargo.toml +++ b/testing-framework/deployers/local/Cargo.toml @@ -14,10 +14,14 @@ workspace = true [dependencies] async-trait = "0.1" +logos-blockchain-executor = { workspace = true } nomos-libp2p = { workspace = true } +nomos-network = { workspace = true } +nomos-node = { workspace = true } nomos-utils = { workspace = true } rand = { workspace = true } testing-framework-config = { workspace = true } testing-framework-core = { path = "../../core" } thiserror = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index c304ecb..1a7d15e 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -1,3 +1,7 @@ +mod manual; +mod node_control; mod runner; +pub use manual::{ManualCluster, ManualClusterError, ManualClusterGuard}; +pub use node_control::{LocalDynamicError, LocalDynamicNodes, LocalDynamicSeed}; pub use runner::{LocalDeployer, LocalDeployerError}; diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs new file mode 100644 index 0000000..734c313 --- /dev/null +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -0,0 +1,129 @@ +use std::ops::Deref; + +use testing_framework_core::{ + nodes::ApiClient, + scenario::{StartNodeOptions, StartedNode}, + topology::{ + config::{TopologyBuildError, TopologyBuilder, TopologyConfig}, + readiness::{ReadinessCheck, ReadinessError}, + }, +}; +use thiserror::Error; + +use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode}; + +mod readiness; + +use readiness::ManualNetworkReadiness; + +#[derive(Debug, Error)] +pub enum ManualClusterError { + #[error("failed to build topology: {source}")] + Build { + #[source] + source: TopologyBuildError, + }, + #[error(transparent)] + Dynamic(#[from] LocalDynamicError), +} + +/// Imperative, in-process cluster that can start nodes on demand. +pub struct ManualCluster { + nodes: LocalDynamicNodes, +} + +pub struct ManualClusterGuard { + cluster: ManualCluster, +} + +impl ManualClusterGuard { + pub fn from_config(config: TopologyConfig) -> Result { + Ok(Self { + cluster: ManualCluster::from_config(config)?, + }) + } +} + +impl Deref for ManualClusterGuard { + type Target = ManualCluster; + + fn deref(&self) -> &Self::Target { + &self.cluster + } +} + +impl Drop for ManualClusterGuard { + fn drop(&mut self) { + self.cluster.stop_all(); + } +} + +impl ManualCluster { + pub fn from_config(config: TopologyConfig) -> Result { + let builder = TopologyBuilder::new(config); + let descriptors = builder + .build() + .map_err(|source| ManualClusterError::Build { source })?; + let nodes = LocalDynamicNodes::new( + descriptors, + testing_framework_core::scenario::NodeClients::default(), + ); + Ok(Self { nodes }) + } + + #[must_use] + pub fn node_client(&self, name: &str) -> Option { + self.nodes.node_client(name) + } + + pub async fn start_validator(&self, name: &str) -> Result { + Ok(self + .nodes + .start_validator_with(name, StartNodeOptions::default()) + .await?) + } + + pub async fn start_executor(&self, name: &str) -> Result { + Ok(self + .nodes + .start_executor_with(name, StartNodeOptions::default()) + .await?) + } + + pub async fn start_validator_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + Ok(self.nodes.start_validator_with(name, options).await?) + } + + pub async fn start_executor_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + Ok(self.nodes.start_executor_with(name, options).await?) + } + + pub fn stop_all(&self) { + self.nodes.stop_all(); + } + + pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> { + let nodes = self.nodes.readiness_nodes(); + if self.is_singleton(&nodes) { + return Ok(()); + } + + self.wait_nodes_ready(nodes).await + } + + fn is_singleton(&self, nodes: &[ReadinessNode]) -> bool { + nodes.len() <= 1 + } + + async fn wait_nodes_ready(&self, nodes: Vec) -> Result<(), ReadinessError> { + ManualNetworkReadiness::new(nodes).wait().await + } +} diff --git a/testing-framework/deployers/local/src/manual/readiness.rs b/testing-framework/deployers/local/src/manual/readiness.rs new file mode 100644 index 0000000..327a2af --- /dev/null +++ b/testing-framework/deployers/local/src/manual/readiness.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use nomos_network::backends::libp2p::Libp2pInfo; +use testing_framework_core::topology::readiness::ReadinessCheck; +use tokio::time::timeout; + +use crate::node_control::ReadinessNode; + +const NETWORK_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +pub(super) struct ManualNetworkReadiness { + nodes: Vec, +} + +impl ManualNetworkReadiness { + pub(super) fn new(nodes: Vec) -> Self { + Self { nodes } + } +} + +#[async_trait::async_trait] +impl<'a> ReadinessCheck<'a> for ManualNetworkReadiness { + type Data = Vec; + + async fn collect(&'a self) -> Self::Data { + let mut statuses = Vec::with_capacity(self.nodes.len()); + for node in &self.nodes { + let result = timeout(NETWORK_REQUEST_TIMEOUT, node.api.network_info()) + .await + .map_err(|_| "network_info request timed out".to_owned()) + .and_then(|res| res.map_err(|err| err.to_string())); + + statuses.push(ManualNetworkStatus { + label: node.label.clone(), + expected_peers: node.expected_peers, + result, + }); + } + statuses + } + + fn is_ready(&self, data: &Self::Data) -> bool { + data.iter().all( + |status| match (status.expected_peers, status.result.as_ref()) { + (Some(expected), Ok(info)) => info.n_peers >= expected, + _ => false, + }, + ) + } + + fn timeout_message(&self, data: Self::Data) -> String { + let summary = data + .into_iter() + .map(|entry| match entry.result { + Ok(info) => format!( + "{} (peers {}/{})", + entry.label, + info.n_peers, + entry.expected_peers.unwrap_or(0) + ), + Err(err) => format!("{} (error: {err})", entry.label), + }) + .collect::>() + .join(", "); + + format!("timed out waiting for network readiness: {summary}") + } +} + +pub(super) struct ManualNetworkStatus { + label: String, + expected_peers: Option, + result: Result, +} diff --git a/testing-framework/deployers/local/src/node_control/config.rs b/testing-framework/deployers/local/src/node_control/config.rs new file mode 100644 index 0000000..0afa369 --- /dev/null +++ b/testing-framework/deployers/local/src/node_control/config.rs @@ -0,0 +1,130 @@ +use std::collections::HashMap; + +use nomos_libp2p::Multiaddr; +use nomos_utils::net::get_available_udp_port; +use rand::Rng as _; +use testing_framework_config::topology::configs::{ + consensus, + runtime::{build_general_config_for_node, build_initial_peers}, + time::GeneralTimeConfig, +}; +use testing_framework_core::{ + scenario::{PeerSelection, StartNodeOptions}, + topology::{ + config::TopologyConfig, + configs::GeneralConfig, + generation::{GeneratedNodeConfig, GeneratedTopology, NodeRole}, + }, +}; + +use super::LocalDynamicError; + +pub(super) fn build_general_config_for( + descriptors: &GeneratedTopology, + base_consensus: &consensus::GeneralConsensusConfig, + base_time: &GeneralTimeConfig, + role: NodeRole, + index: usize, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], +) -> Result<(GeneralConfig, u16), LocalDynamicError> { + if let Some(node) = descriptor_for(descriptors, role, index) { + let mut config = node.general.clone(); + let initial_peers = resolve_initial_peers( + peer_ports_by_name, + options, + &config.network_config.backend.initial_peers, + descriptors, + peer_ports, + )?; + + config.network_config.backend.initial_peers = initial_peers; + + return Ok((config, node.network_port())); + } + + let id = random_node_id(); + let network_port = allocate_udp_port("network port")?; + let da_port = allocate_udp_port("DA port")?; + let blend_port = allocate_udp_port("Blend port")?; + let topology = descriptors.config(); + let initial_peers = + resolve_initial_peers(peer_ports_by_name, options, &[], descriptors, peer_ports)?; + let general_config = build_general_config_for_node( + id, + network_port, + initial_peers, + da_port, + blend_port, + &topology.consensus_params, + &topology.da_params, + &topology.wallet_config, + base_consensus, + base_time, + ) + .map_err(|source| LocalDynamicError::Config { source })?; + + Ok((general_config, network_port)) +} + +fn descriptor_for( + descriptors: &GeneratedTopology, + role: NodeRole, + index: usize, +) -> Option<&GeneratedNodeConfig> { + match role { + NodeRole::Validator => descriptors.validators().get(index), + NodeRole::Executor => descriptors.executors().get(index), + } +} + +fn resolve_peer_names( + peer_ports_by_name: &HashMap, + peer_names: &[String], +) -> Result, LocalDynamicError> { + let mut peers = Vec::with_capacity(peer_names.len()); + for name in peer_names { + let port = + peer_ports_by_name + .get(name) + .ok_or_else(|| LocalDynamicError::InvalidArgument { + message: format!("unknown peer name '{name}'"), + })?; + peers.push(testing_framework_config::node_address_from_port(*port)); + } + Ok(peers) +} + +fn resolve_initial_peers( + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + default_peers: &[Multiaddr], + descriptors: &GeneratedTopology, + peer_ports: &[u16], +) -> Result, LocalDynamicError> { + match &options.peers { + PeerSelection::Named(names) => resolve_peer_names(peer_ports_by_name, names), + PeerSelection::DefaultLayout => { + if !default_peers.is_empty() { + Ok(default_peers.to_vec()) + } else { + let topology: &TopologyConfig = descriptors.config(); + Ok(build_initial_peers(&topology.network_params, peer_ports)) + } + } + PeerSelection::None => Ok(Vec::new()), + } +} + +fn random_node_id() -> [u8; 32] { + let mut id = [0u8; 32]; + rand::thread_rng().fill(&mut id); + id +} + +fn allocate_udp_port(label: &'static str) -> Result { + get_available_udp_port().ok_or_else(|| LocalDynamicError::PortAllocation { + message: format!("failed to allocate free UDP port for {label}"), + }) +} diff --git a/testing-framework/deployers/local/src/node_control/mod.rs b/testing-framework/deployers/local/src/node_control/mod.rs new file mode 100644 index 0000000..b031fd8 --- /dev/null +++ b/testing-framework/deployers/local/src/node_control/mod.rs @@ -0,0 +1,389 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Mutex, +}; + +use logos_blockchain_executor::config::Config as ExecutorConfig; +use nomos_node::Config as ValidatorConfig; +use testing_framework_config::topology::configs::{consensus, time}; +use testing_framework_core::{ + nodes::{ + ApiClient, + executor::{Executor, create_executor_config}, + validator::{Validator, create_validator_config}, + }, + scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode}, + topology::{ + generation::{GeneratedTopology, NodeRole, find_expected_peer_counts}, + utils::multiaddr_port, + }, +}; +use thiserror::Error; + +mod config; +mod state; + +use config::build_general_config_for; +use state::LocalDynamicState; +use testing_framework_core::scenario::NodeClients; + +#[derive(Debug, Error)] +pub enum LocalDynamicError { + #[error("failed to generate node config: {source}")] + Config { + #[source] + source: testing_framework_config::topology::configs::GeneralConfigError, + }, + #[error("failed to spawn node: {source}")] + Spawn { + #[source] + source: testing_framework_core::nodes::common::node::SpawnNodeError, + }, + #[error("{message}")] + InvalidArgument { message: String }, + #[error("{message}")] + PortAllocation { message: String }, +} + +pub struct LocalDynamicNodes { + descriptors: GeneratedTopology, + base_consensus: consensus::GeneralConsensusConfig, + base_time: time::GeneralTimeConfig, + node_clients: NodeClients, + seed: LocalDynamicSeed, + state: Mutex, +} + +#[derive(Clone, Default)] +pub struct LocalDynamicSeed { + pub validator_count: usize, + pub executor_count: usize, + pub peer_ports: Vec, + pub peer_ports_by_name: HashMap, +} + +impl LocalDynamicSeed { + #[must_use] + pub fn from_topology(descriptors: &GeneratedTopology) -> Self { + let peer_ports = descriptors + .nodes() + .map(|node| node.network_port()) + .collect::>(); + + let peer_ports_by_name = descriptors + .validators() + .iter() + .map(|node| (format!("validator-{}", node.index()), node.network_port())) + .chain( + descriptors + .executors() + .iter() + .map(|node| (format!("executor-{}", node.index()), node.network_port())), + ) + .collect(); + + Self { + validator_count: descriptors.validators().len(), + executor_count: descriptors.executors().len(), + peer_ports, + peer_ports_by_name, + } + } +} + +pub(crate) struct ReadinessNode { + pub(crate) label: String, + pub(crate) expected_peers: Option, + pub(crate) api: ApiClient, +} + +impl LocalDynamicNodes { + pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self { + Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default()) + } + + pub fn new_with_seed( + descriptors: GeneratedTopology, + node_clients: NodeClients, + seed: LocalDynamicSeed, + ) -> Self { + let base_node = descriptors + .validators() + .first() + .or_else(|| descriptors.executors().first()) + .expect("generated topology must include at least one node"); + + let base_consensus = base_node.general.consensus_config.clone(); + let base_time = base_node.general.time_config.clone(); + + let state = LocalDynamicState { + validator_count: seed.validator_count, + executor_count: seed.executor_count, + peer_ports: seed.peer_ports.clone(), + peer_ports_by_name: seed.peer_ports_by_name.clone(), + clients_by_name: HashMap::new(), + validators: Vec::new(), + executors: Vec::new(), + }; + + Self { + descriptors, + base_consensus, + base_time, + node_clients, + seed, + state: Mutex::new(state), + } + } + + #[must_use] + pub fn node_client(&self, name: &str) -> Option { + let state = self.state.lock().expect("local dynamic lock poisoned"); + state.clients_by_name.get(name).cloned() + } + + pub fn stop_all(&self) { + let mut state = self.state.lock().expect("local dynamic lock poisoned"); + state.validators.clear(); + state.executors.clear(); + state.peer_ports.clone_from(&self.seed.peer_ports); + state + .peer_ports_by_name + .clone_from(&self.seed.peer_ports_by_name); + state.clients_by_name.clear(); + state.validator_count = self.seed.validator_count; + state.executor_count = self.seed.executor_count; + self.node_clients.clear(); + } + + pub async fn start_validator_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + self.start_node(NodeRole::Validator, name, options).await + } + + pub async fn start_executor_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + self.start_node(NodeRole::Executor, name, options).await + } + + pub(crate) fn readiness_nodes(&self) -> Vec { + let state = self.state.lock().expect("local dynamic lock poisoned"); + + let listen_ports = state + .validators + .iter() + .map(|node| node.config().network.backend.swarm.port) + .chain( + state + .executors + .iter() + .map(|node| node.config().network.backend.swarm.port), + ) + .collect::>(); + + let initial_peer_ports = state + .validators + .iter() + .map(|node| { + node.config() + .network + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>() + }) + .chain(state.executors.iter().map(|node| { + node.config() + .network + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>() + })) + .collect::>(); + + let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports); + + state + .validators + .iter() + .enumerate() + .map(|(idx, node)| ReadinessNode { + label: format!( + "validator#{idx}@{}", + node.config().network.backend.swarm.port + ), + expected_peers: expected_peer_counts.get(idx).copied(), + api: node.api().clone(), + }) + .chain(state.executors.iter().enumerate().map(|(idx, node)| { + let global_idx = state.validators.len() + idx; + ReadinessNode { + label: format!( + "executor#{idx}@{}", + node.config().network.backend.swarm.port + ), + expected_peers: expected_peer_counts.get(global_idx).copied(), + api: node.api().clone(), + } + })) + .collect::>() + } + + async fn start_node( + &self, + role: NodeRole, + name: &str, + options: StartNodeOptions, + ) -> Result { + let (peer_ports, peer_ports_by_name, node_name, index) = { + let state = self.state.lock().expect("local dynamic lock poisoned"); + let (index, role_label) = match role { + NodeRole::Validator => (state.validator_count, "validator"), + NodeRole::Executor => (state.executor_count, "executor"), + }; + + let label = if name.trim().is_empty() { + format!("{role_label}-{index}") + } else { + format!("{role_label}-{name}") + }; + + if state.peer_ports_by_name.contains_key(&label) { + return Err(LocalDynamicError::InvalidArgument { + message: format!("node name '{label}' already exists"), + }); + } + + ( + state.peer_ports.clone(), + state.peer_ports_by_name.clone(), + label, + index, + ) + }; + + let (general_config, network_port) = build_general_config_for( + &self.descriptors, + &self.base_consensus, + &self.base_time, + role, + index, + &peer_ports_by_name, + &options, + &peer_ports, + )?; + + let api_client = match role { + NodeRole::Validator => { + let config = create_validator_config(general_config); + self.spawn_and_register_validator(&node_name, network_port, config) + .await? + } + NodeRole::Executor => { + let config = create_executor_config(general_config); + self.spawn_and_register_executor(&node_name, network_port, config) + .await? + } + }; + + Ok(StartedNode { + name: node_name, + role, + api: api_client, + }) + } + + async fn spawn_and_register_validator( + &self, + node_name: &str, + network_port: u16, + config: ValidatorConfig, + ) -> Result { + let node = Validator::spawn(config, node_name) + .await + .map_err(|source| LocalDynamicError::Spawn { source })?; + let client = node.api().clone(); + + self.node_clients.add_validator(client.clone()); + + let mut state = self.state.lock().expect("local dynamic lock poisoned"); + state.register_validator(node_name, network_port, client.clone(), node); + + Ok(client) + } + + async fn spawn_and_register_executor( + &self, + node_name: &str, + network_port: u16, + config: ExecutorConfig, + ) -> Result { + let node = Executor::spawn(config, node_name) + .await + .map_err(|source| LocalDynamicError::Spawn { source })?; + let client = node.api().clone(); + + self.node_clients.add_executor(client.clone()); + + let mut state = self.state.lock().expect("local dynamic lock poisoned"); + state.register_executor(node_name, network_port, client.clone(), node); + + Ok(client) + } +} + +#[async_trait::async_trait] +impl NodeControlHandle for LocalDynamicNodes { + async fn restart_validator(&self, _index: usize) -> Result<(), DynError> { + Err("local deployer does not support restart_validator".into()) + } + + async fn restart_executor(&self, _index: usize) -> Result<(), DynError> { + Err("local deployer does not support restart_executor".into()) + } + + async fn start_validator(&self, name: &str) -> Result { + self.start_validator_with(name, StartNodeOptions::default()) + .await + .map_err(|err| err.into()) + } + + async fn start_executor(&self, name: &str) -> Result { + self.start_executor_with(name, StartNodeOptions::default()) + .await + .map_err(|err| err.into()) + } + + async fn start_validator_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + self.start_validator_with(name, options) + .await + .map_err(|err| err.into()) + } + + async fn start_executor_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + self.start_executor_with(name, options) + .await + .map_err(|err| err.into()) + } + + fn node_client(&self, name: &str) -> Option { + self.node_client(name) + } +} diff --git a/testing-framework/deployers/local/src/node_control/state.rs b/testing-framework/deployers/local/src/node_control/state.rs new file mode 100644 index 0000000..738cbc7 --- /dev/null +++ b/testing-framework/deployers/local/src/node_control/state.rs @@ -0,0 +1,46 @@ +use std::collections::HashMap; + +use testing_framework_core::nodes::{ApiClient, executor::Executor, validator::Validator}; + +pub(crate) struct LocalDynamicState { + pub(crate) validator_count: usize, + pub(crate) executor_count: usize, + pub(crate) peer_ports: Vec, + pub(crate) peer_ports_by_name: HashMap, + pub(crate) clients_by_name: HashMap, + pub(crate) validators: Vec, + pub(crate) executors: Vec, +} + +impl LocalDynamicState { + fn register_common(&mut self, node_name: &str, network_port: u16, client: ApiClient) { + self.peer_ports.push(network_port); + self.peer_ports_by_name + .insert(node_name.to_string(), network_port); + self.clients_by_name.insert(node_name.to_string(), client); + } + + pub(super) fn register_validator( + &mut self, + node_name: &str, + network_port: u16, + client: ApiClient, + node: Validator, + ) { + self.register_common(node_name, network_port, client); + self.validator_count += 1; + self.validators.push(node); + } + + pub(super) fn register_executor( + &mut self, + node_name: &str, + network_port: u16, + client: ApiClient, + node: Executor, + ) { + self.register_common(node_name, network_port, client); + self.executor_count += 1; + self.executors.push(node); + } +} diff --git a/testing-framework/deployers/local/src/runner.rs b/testing-framework/deployers/local/src/runner.rs index c9c7b2b..c440357 100644 --- a/testing-framework/deployers/local/src/runner.rs +++ b/testing-framework/deployers/local/src/runner.rs @@ -1,34 +1,20 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::sync::Arc; use async_trait::async_trait; -use nomos_libp2p::Multiaddr; -use nomos_utils::net::get_available_udp_port; -use rand::Rng as _; -use testing_framework_config::topology::configs::{ - consensus, - runtime::{build_general_config_for_node, build_initial_peers}, - time, -}; use testing_framework_core::{ - node_address_from_port, - nodes::{ApiClient, executor::Executor, validator::Validator}, scenario::{ BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability, - NodeControlHandle, RunContext, Runner, Scenario, ScenarioError, StartNodeOptions, - StartedNode, spawn_block_feed, + RunContext, Runner, Scenario, ScenarioError, spawn_block_feed, }, topology::{ deployment::{SpawnTopologyError, Topology}, - generation::{GeneratedTopology, NodeRole}, readiness::ReadinessError, }, }; use thiserror::Error; use tracing::{debug, info}; +use crate::node_control::{LocalDynamicNodes, LocalDynamicSeed}; /// Spawns validators and executors as local processes, reusing the existing /// integration harness. #[derive(Clone)] @@ -115,9 +101,10 @@ impl Deployer for LocalDeployer { let topology = Self::prepare_topology(scenario).await?; let node_clients = NodeClients::from_topology(scenario.topology(), &topology); - let node_control = Arc::new(LocalNodeControl::new( + let node_control = Arc::new(LocalDynamicNodes::new_with_seed( scenario.topology().clone(), node_clients.clone(), + LocalDynamicSeed::from_topology(scenario.topology()), )); let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?; @@ -205,240 +192,3 @@ fn workload_error(source: impl Into) -> LocalDeployerError { source: source.into(), } } - -struct LocalNodeControl { - descriptors: GeneratedTopology, - node_clients: NodeClients, - base_consensus: consensus::GeneralConsensusConfig, - base_time: time::GeneralTimeConfig, - state: Mutex, -} - -struct LocalNodeControlState { - validator_count: usize, - executor_count: usize, - peer_ports: Vec, - peer_ports_by_name: HashMap, - validators: Vec, - executors: Vec, -} - -#[async_trait] -impl NodeControlHandle for LocalNodeControl { - async fn restart_validator(&self, _index: usize) -> Result<(), DynError> { - Err("local deployer does not support restart_validator".into()) - } - - async fn restart_executor(&self, _index: usize) -> Result<(), DynError> { - Err("local deployer does not support restart_executor".into()) - } - - async fn start_validator(&self, name: &str) -> Result { - self.start_node(NodeRole::Validator, name, StartNodeOptions::default()) - .await - } - - async fn start_executor(&self, name: &str) -> Result { - self.start_node(NodeRole::Executor, name, StartNodeOptions::default()) - .await - } - - async fn start_validator_with( - &self, - name: &str, - options: StartNodeOptions, - ) -> Result { - self.start_node(NodeRole::Validator, name, options).await - } - - async fn start_executor_with( - &self, - name: &str, - options: StartNodeOptions, - ) -> Result { - self.start_node(NodeRole::Executor, name, options).await - } -} - -impl LocalNodeControl { - fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self { - let base_node = descriptors - .validators() - .first() - .or_else(|| descriptors.executors().first()) - .expect("generated topology must contain at least one node"); - - let base_consensus = base_node.general.consensus_config.clone(); - let base_time = base_node.general.time_config.clone(); - - let peer_ports = descriptors - .nodes() - .map(|node| node.network_port()) - .collect::>(); - - let peer_ports_by_name = descriptors - .validators() - .iter() - .map(|node| (format!("validator-{}", node.index()), node.network_port())) - .chain( - descriptors - .executors() - .iter() - .map(|node| (format!("executor-{}", node.index()), node.network_port())), - ) - .collect(); - - let state = LocalNodeControlState { - validator_count: descriptors.validators().len(), - executor_count: descriptors.executors().len(), - peer_ports, - peer_ports_by_name, - validators: Vec::new(), - executors: Vec::new(), - }; - - Self { - descriptors, - node_clients, - base_consensus, - base_time, - state: Mutex::new(state), - } - } - - async fn start_node( - &self, - role: NodeRole, - name: &str, - options: StartNodeOptions, - ) -> Result { - let (peer_ports, peer_ports_by_name, node_name) = { - let state = self.state.lock().expect("local node control lock poisoned"); - let index = match role { - NodeRole::Validator => state.validator_count, - NodeRole::Executor => state.executor_count, - }; - - let role_label = match role { - NodeRole::Validator => "validator", - NodeRole::Executor => "executor", - }; - - let label = if name.trim().is_empty() { - format!("{role_label}-{index}") - } else { - format!("{role_label}-{name}") - }; - - if state.peer_ports_by_name.contains_key(&label) { - return Err(format!("node name '{label}' already exists").into()); - } - - ( - state.peer_ports.clone(), - state.peer_ports_by_name.clone(), - label, - ) - }; - - let id = random_node_id(); - let network_port = allocate_udp_port("network port")?; - let da_port = allocate_udp_port("DA port")?; - let blend_port = allocate_udp_port("Blend port")?; - - let topology = self.descriptors.config(); - let initial_peers = if options.peer_names.is_empty() { - build_initial_peers(&topology.network_params, &peer_ports) - } else { - resolve_peer_names(&peer_ports_by_name, &options.peer_names)? - }; - - let general_config = build_general_config_for_node( - id, - network_port, - initial_peers, - da_port, - blend_port, - &topology.consensus_params, - &topology.da_params, - &topology.wallet_config, - &self.base_consensus, - &self.base_time, - )?; - - let api_client = match role { - NodeRole::Validator => { - let config = testing_framework_core::nodes::validator::create_validator_config( - general_config, - ); - - let node = Validator::spawn(config, &node_name).await?; - let client = ApiClient::from_urls(node.url(), node.testing_url()); - - self.node_clients.add_validator(client.clone()); - - let mut state = self.state.lock().expect("local node control lock poisoned"); - - state.peer_ports.push(network_port); - state - .peer_ports_by_name - .insert(node_name.clone(), network_port); - state.validator_count += 1; - state.validators.push(node); - - client - } - NodeRole::Executor => { - let config = - testing_framework_core::nodes::executor::create_executor_config(general_config); - - let node = Executor::spawn(config, &node_name).await?; - let client = ApiClient::from_urls(node.url(), node.testing_url()); - - self.node_clients.add_executor(client.clone()); - - let mut state = self.state.lock().expect("local node control lock poisoned"); - - state.peer_ports.push(network_port); - state - .peer_ports_by_name - .insert(node_name.clone(), network_port); - state.executor_count += 1; - state.executors.push(node); - - client - } - }; - - Ok(StartedNode { - name: node_name, - role, - api: api_client, - }) - } -} - -fn resolve_peer_names( - peer_ports_by_name: &HashMap, - peer_names: &[String], -) -> Result, DynError> { - let mut peers = Vec::with_capacity(peer_names.len()); - for name in peer_names { - let port = peer_ports_by_name - .get(name) - .ok_or_else(|| format!("unknown peer name '{name}'"))?; - peers.push(node_address_from_port(*port)); - } - Ok(peers) -} - -fn random_node_id() -> [u8; 32] { - let mut id = [0u8; 32]; - rand::thread_rng().fill(&mut id); - id -} - -fn allocate_udp_port(label: &'static str) -> Result { - get_available_udp_port() - .ok_or_else(|| format!("failed to allocate free UDP port for {label}").into()) -}