diff --git a/Cargo.lock b/Cargo.lock index 5095835..03e40e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1305,6 +1305,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -1805,6 +1811,19 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.3" @@ -1814,7 +1833,7 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.11.0", "windows-sys 0.61.2", ] @@ -2176,7 +2195,7 @@ dependencies = [ "fastrand", "getrandom 0.3.4", "once_cell", - "rustix", + "rustix 1.1.3", "windows-sys 0.61.2", ] @@ -2253,6 +2272,7 @@ dependencies = [ "kube", "reqwest", "serde", + "serde_json", "serde_yaml", "tempfile", "testing-framework-core", @@ -2750,6 +2770,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "which" +version = "6.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f" +dependencies = [ + "either", + "home", + "rustix 0.38.44", + "winsafe", +] + [[package]] name = "winapi-util" version = "0.1.11" @@ -2974,6 +3006,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winsafe" +version = "0.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index add9abc..b769e43 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -39,9 +39,6 @@ pub trait StaticArtifactRenderer { fn serialize_node_config(config: &Self::NodeConfig) -> Result; } -#[doc(hidden)] -pub use StaticArtifactRenderer as CfgsyncEnv; - #[doc(hidden)] pub trait StaticNodeConfigProvider: Application { type Error: Error + Send + Sync + 'static; diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs index 4a34207..c7d73bb 100644 --- a/testing-framework/core/src/scenario/config.rs +++ b/testing-framework/core/src/scenario/config.rs @@ -1,7 +1,6 @@ use std::{collections::HashMap, error::Error}; -use super::ScenarioApplication; -use crate::{cfgsync::StaticNodeConfigProvider, topology::DeploymentDescriptor}; +use crate::{cfgsync::StaticNodeConfigProvider, env::Application, topology::DeploymentDescriptor}; #[derive(Clone, Debug)] pub struct ClusterPeerView { @@ -94,7 +93,7 @@ impl ClusterNodeView { } } -pub trait ClusterNodeConfigApplication: ScenarioApplication { +pub trait ClusterNodeConfigApplication: Application { type ConfigError: Error + Send + Sync + 'static; fn static_network_port() -> u16; diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index fd17dca..bc54944 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -33,7 +33,7 @@ pub use control::{ClusterWaitHandle, NodeControlHandle}; pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder}; pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; -pub use noop::ScenarioApplication; +pub use noop::{DefaultFeed, DefaultFeedRuntime, default_feed_result}; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, diff --git a/testing-framework/core/src/scenario/noop.rs b/testing-framework/core/src/scenario/noop.rs index 18c5a67..fb4e243 100644 --- a/testing-framework/core/src/scenario/noop.rs +++ b/testing-framework/core/src/scenario/noop.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use super::{Application, DynError, Feed, FeedRuntime, NodeAccess, NodeClients}; +use super::{DynError, Feed, FeedRuntime}; #[derive(Clone)] pub struct DefaultFeed; @@ -20,59 +20,6 @@ impl FeedRuntime for DefaultFeedRuntime { async fn run(self: Box) {} } -/// App surface for the common case where the framework default feed behavior is -/// sufficient and no custom feed runtime is needed. -#[async_trait] -pub trait ScenarioApplication: Send + Sync + 'static { - type Deployment: crate::topology::DeploymentDescriptor + Clone + 'static; - type NodeClient: Clone + Send + Sync + 'static; - type NodeConfig: Clone + Send + Sync + 'static; - - fn external_node_client( - _source: &super::ExternalNodeSource, - ) -> Result { - Err(std::io::Error::other("external node sources are not supported").into()) - } - - fn build_node_client(_access: &NodeAccess) -> Result { - Err(std::io::Error::other("node access is not supported").into()) - } - - fn node_readiness_path() -> &'static str { - "/" - } -} - -#[async_trait] -impl Application for T -where - T: ScenarioApplication, -{ - type Deployment = T::Deployment; - type NodeClient = T::NodeClient; - type NodeConfig = T::NodeConfig; - type FeedRuntime = DefaultFeedRuntime; - - fn external_node_client( - source: &super::ExternalNodeSource, - ) -> Result { - T::external_node_client(source) - } - - fn build_node_client(access: &NodeAccess) -> Result { - T::build_node_client(access) - } - - fn node_readiness_path() -> &'static str { - T::node_readiness_path() - } - - async fn prepare_feed( - _node_clients: NodeClients, - ) -> Result<(::Feed, Self::FeedRuntime), DynError> - where - Self: Sized, - { - Ok((DefaultFeed, DefaultFeedRuntime)) - } +pub fn default_feed_result() -> Result<(DefaultFeed, DefaultFeedRuntime), DynError> { + Ok((DefaultFeed, DefaultFeedRuntime)) } diff --git a/testing-framework/deployers/k8s/Cargo.toml b/testing-framework/deployers/k8s/Cargo.toml index f78e442..5ecffb6 100644 --- a/testing-framework/deployers/k8s/Cargo.toml +++ b/testing-framework/deployers/k8s/Cargo.toml @@ -19,6 +19,7 @@ k8s-openapi = { features = ["latest"], version = "0.20" } kube = { default-features = false, features = ["client", "runtime", "rustls-tls"], version = "0.87" } reqwest = { features = ["json"], workspace = true } serde = { workspace = true } +serde_json = { workspace = true } serde_yaml = { workspace = true } tempfile = { workspace = true } testing-framework-core = { path = "../../core" } diff --git a/testing-framework/deployers/k8s/src/lib.rs b/testing-framework/deployers/k8s/src/lib.rs index 2b9a75f..4c4ee0a 100644 --- a/testing-framework/deployers/k8s/src/lib.rs +++ b/testing-framework/deployers/k8s/src/lib.rs @@ -3,6 +3,7 @@ mod env; mod host; mod infrastructure; mod lifecycle; +mod manual; mod workspace; pub mod wait { pub use crate::lifecycle::wait::*; @@ -27,6 +28,7 @@ pub use infrastructure::{ runtime_spec::{NodeRuntimeSpec, RuntimeSpecError, SharedServiceFileSpec, SharedServiceSpec}, }; pub use lifecycle::cleanup::RunnerCleanup; +pub use manual::{ManualCluster, ManualClusterError}; pub use workspace::{ RequiredPathError, bundled_runner_chart_path, create_temp_workspace, require_existing_paths, resolve_optional_relative_dir, resolve_workspace_root, write_temp_file, diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs index 63f3d31..d13a709 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs @@ -3,11 +3,11 @@ use std::{env, sync::LazyLock, time::Duration}; use kube::Error as KubeError; use thiserror::Error; -mod deployment; +pub(crate) mod deployment; mod forwarding; mod http_probe; mod orchestrator; -mod ports; +pub(crate) mod ports; pub use forwarding::PortForwardHandle; const DEFAULT_HTTP_POLL_INTERVAL: Duration = Duration::from_secs(1); diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs new file mode 100644 index 0000000..9d83a8d --- /dev/null +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -0,0 +1,689 @@ +use std::{ + collections::HashSet, + sync::{Arc, Mutex}, +}; + +use k8s_openapi::api::apps::v1::Deployment; +use kube::{ + Api, Client, + api::{Patch, PatchParams}, +}; +use testing_framework_core::{ + manual::ManualClusterHandle, + scenario::{ + ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients, + NodeControlHandle, StartNodeOptions, StartedNode, + }, +}; +use thiserror::Error; +use tokio_retry::{RetryIf, strategy::FixedInterval}; + +use crate::{ + K8sDeployer, + env::{HelmReleaseAssets, K8sDeployEnv, discovered_node_access}, + host::node_host, + infrastructure::helm::install_release, + lifecycle::{ + cleanup::RunnerCleanup, + wait::{ + ClusterWaitError, NodeConfigPorts, deployment::wait_for_deployment_ready, + ports::discover_node_ports, + }, + }, +}; + +#[derive(Debug, Error)] +pub enum ManualClusterError { + #[error("kubernetes runner requires at least one node (nodes={nodes})")] + UnsupportedTopology { nodes: usize }, + #[error("failed to initialise kubernetes client: {source}")] + ClientInit { + #[source] + source: kube::Error, + }, + #[error("failed to prepare k8s assets: {source}")] + Assets { + #[source] + source: DynError, + }, + #[error("failed to install k8s stack: {source}")] + InstallStack { + #[source] + source: DynError, + }, + #[error(transparent)] + NodePorts(#[from] ClusterWaitError), + #[error("unsupported start options for k8s manual cluster: {message}")] + UnsupportedStartOptions { message: String }, + #[error("invalid node name '{name}'; expected node-")] + InvalidNodeName { name: String }, + #[error("node index {index} is out of range for topology with {nodes} nodes")] + NodeIndexOutOfRange { index: usize, nodes: usize }, + #[error("node '{name}' is already running")] + NodeAlreadyRunning { name: String }, + #[error("node '{name}' is not running")] + NodeNotRunning { name: String }, + #[error("failed to patch deployment {name}: {source}")] + PatchDeployment { + name: String, + #[source] + source: kube::Error, + }, + #[error("failed to delete pods for deployment {name}: {source}")] + DeletePods { + name: String, + #[source] + source: kube::Error, + }, + #[error("failed to discover node client for '{name}': {source}")] + NodeClient { + name: String, + #[source] + source: DynError, + }, + #[error("node readiness failed for '{name}': {source}")] + NodeReadiness { + name: String, + #[source] + source: DynError, + }, + #[error("cluster network readiness failed: {source}")] + NetworkReadiness { + #[source] + source: DynError, + }, +} + +struct ManualClusterState { + running: HashSet, + node_clients: NodeClients, + known_clients: Vec>, +} + +pub struct ManualCluster +where + E::Assets: HelmReleaseAssets, +{ + client: Client, + namespace: String, + release: String, + node_count: usize, + node_host: String, + node_allocations: Vec, + cleanup: Option, + state: Arc>>, +} + +impl ManualCluster +where + E::Assets: HelmReleaseAssets, +{ + pub async fn from_topology(topology: E::Deployment) -> Result { + let nodes = testing_framework_core::topology::DeploymentDescriptor::node_count(&topology); + if nodes == 0 { + return Err(ManualClusterError::UnsupportedTopology { nodes }); + } + + let client = Client::try_default() + .await + .map_err(|source| ManualClusterError::ClientInit { source })?; + let assets = E::prepare_assets(&topology, None) + .map_err(|source| ManualClusterError::Assets { source })?; + let (namespace, release) = E::cluster_identifiers(); + let install_spec = assets + .release_bundle() + .install_spec(release.clone(), namespace.clone()); + install_release(&install_spec).await.map_err(|source| { + ManualClusterError::InstallStack { + source: source.into(), + } + })?; + let cleanup = RunnerCleanup::new( + client.clone(), + namespace.clone(), + release.clone(), + std::env::var("K8S_RUNNER_PRESERVE").is_ok(), + ); + + let node_ports = E::collect_port_specs(&topology).nodes; + let node_allocations = + discover_all_node_ports::(&client, &namespace, &release, &node_ports).await?; + scale_all_nodes::(&client, &namespace, &release, nodes, 0).await?; + + Ok(Self { + client, + namespace, + release, + node_count: nodes, + node_host: node_host(), + node_allocations, + cleanup: Some(cleanup), + state: Arc::new(Mutex::new(ManualClusterState { + running: HashSet::new(), + node_clients: NodeClients::default(), + known_clients: vec![None; nodes], + })), + }) + } + + #[must_use] + pub fn node_client(&self, name: &str) -> Option { + let index = parse_node_index(name)?; + let state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state + .known_clients + .get(index) + .and_then(|client| client.clone()) + } + + #[must_use] + pub fn node_pid(&self, _name: &str) -> Option { + None + } + + pub async fn start_node(&self, name: &str) -> Result, ManualClusterError> { + self.start_node_with(name, StartNodeOptions::::default()) + .await + } + + pub async fn start_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result, ManualClusterError> { + validate_start_options(&options)?; + let index = self.require_node_index(name)?; + { + let state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if state.running.contains(&index) { + return Err(ManualClusterError::NodeAlreadyRunning { + name: name.to_owned(), + }); + } + } + + scale_node::(&self.client, &self.namespace, &self.release, index, 1).await?; + self.wait_node_ready(name).await?; + let client = self.build_client(index, name)?; + + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.running.insert(index); + state.known_clients[index] = Some(client.clone()); + state.node_clients.add_node(client.clone()); + + Ok(StartedNode { + name: canonical_node_name(index), + client, + }) + } + + pub fn stop_all(&self) { + block_on_best_effort(self.stop_all_async()); + } + + async fn stop_all_async(&self) -> Result<(), ManualClusterError> { + let indices = { + let state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.running.iter().copied().collect::>() + }; + + for index in indices { + let name = canonical_node_name(index); + self.stop_node(&name).await?; + } + + Ok(()) + } + + pub async fn restart_node(&self, name: &str) -> Result<(), ManualClusterError> { + let index = self.require_running_node_index(name)?; + scale_node::(&self.client, &self.namespace, &self.release, index, 0).await?; + scale_node::(&self.client, &self.namespace, &self.release, index, 1).await?; + self.wait_node_ready(name).await?; + let client = self.build_client(index, name)?; + + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.known_clients[index] = Some(client.clone()); + state.node_clients.add_node(client); + Ok(()) + } + + pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> { + let index = self.require_running_node_index(name)?; + scale_node::(&self.client, &self.namespace, &self.release, index, 0).await?; + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.running.remove(&index); + Ok(()) + } + + pub async fn wait_network_ready(&self) -> Result<(), ManualClusterError> { + let running_ports = { + let state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state + .running + .iter() + .copied() + .map(|index| self.node_allocations[index].api) + .collect::>() + }; + + if running_ports.is_empty() { + return Ok(()); + } + + let ports = running_ports; + testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement( + &ports, + &self.node_host, + ::node_readiness_path(), + HttpReadinessRequirement::AllNodesReady, + ) + .await + .map_err(|source| ManualClusterError::NetworkReadiness { + source: source.into(), + }) + } + + pub async fn wait_node_ready(&self, name: &str) -> Result<(), ManualClusterError> { + let index = self.require_node_index(name)?; + let port = self.node_allocations[index].api; + testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement( + &[port], + &self.node_host, + ::node_readiness_path(), + HttpReadinessRequirement::AllNodesReady, + ) + .await + .map_err(|source| ManualClusterError::NodeReadiness { + name: canonical_node_name(index), + source: source.into(), + }) + } + + #[must_use] + pub fn node_clients(&self) -> NodeClients { + let state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + state.node_clients.clone() + } + + pub fn add_external_sources( + &self, + external_sources: impl IntoIterator, + ) -> Result<(), DynError> { + let node_clients = self.node_clients(); + for source in external_sources { + node_clients.add_node(E::external_node_client(&source)?); + } + Ok(()) + } + + pub fn add_external_clients(&self, clients: impl IntoIterator) { + let node_clients = self.node_clients(); + for client in clients { + node_clients.add_node(client); + } + } + + fn build_client(&self, index: usize, name: &str) -> Result { + let allocation = self.node_allocations[index]; + E::build_node_client(&discovered_node_access( + &self.node_host, + allocation.api, + allocation.auxiliary, + )) + .map_err(|source| ManualClusterError::NodeClient { + name: name.to_owned(), + source, + }) + } + + fn require_node_index(&self, name: &str) -> Result { + let index = parse_node_index(name).ok_or_else(|| ManualClusterError::InvalidNodeName { + name: name.to_owned(), + })?; + if index >= self.node_count { + return Err(ManualClusterError::NodeIndexOutOfRange { + index, + nodes: self.node_count, + }); + } + Ok(index) + } + + fn require_running_node_index(&self, name: &str) -> Result { + let index = self.require_node_index(name)?; + let state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if !state.running.contains(&index) { + return Err(ManualClusterError::NodeNotRunning { + name: canonical_node_name(index), + }); + } + Ok(index) + } +} + +impl Drop for ManualCluster +where + E: K8sDeployEnv, + E::Assets: HelmReleaseAssets, +{ + fn drop(&mut self) { + self.stop_all(); + if let Some(cleanup) = self.cleanup.take() { + testing_framework_core::scenario::internal::CleanupGuard::cleanup(Box::new(cleanup)); + } + } +} + +#[async_trait::async_trait] +impl NodeControlHandle for ManualCluster +where + E: K8sDeployEnv, + E::Assets: HelmReleaseAssets, +{ + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + Self::restart_node(self, name).await.map_err(Into::into) + } + + async fn start_node(&self, name: &str) -> Result, DynError> { + Self::start_node(self, name).await.map_err(Into::into) + } + + async fn start_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result, DynError> { + Self::start_node_with(self, name, options) + .await + .map_err(Into::into) + } + + async fn stop_node(&self, name: &str) -> Result<(), DynError> { + Self::stop_node(self, name).await.map_err(Into::into) + } + + fn node_client(&self, name: &str) -> Option { + Self::node_client(self, name) + } +} + +#[async_trait::async_trait] +impl ClusterWaitHandle for ManualCluster +where + E: K8sDeployEnv, + E::Assets: HelmReleaseAssets, +{ + async fn wait_network_ready(&self) -> Result<(), DynError> { + Self::wait_network_ready(self).await.map_err(Into::into) + } +} + +#[async_trait::async_trait] +impl ManualClusterHandle for ManualCluster +where + E: K8sDeployEnv, + E::Assets: HelmReleaseAssets, +{ +} + +impl K8sDeployer +where + E: K8sDeployEnv, + E::Assets: HelmReleaseAssets, +{ + pub async fn manual_cluster_from_descriptors( + &self, + descriptors: E::Deployment, + ) -> Result, ManualClusterError> { + let _ = self; + ManualCluster::from_topology(descriptors).await + } +} + +async fn discover_all_node_ports( + client: &Client, + namespace: &str, + release: &str, + node_ports: &[NodeConfigPorts], +) -> Result, ManualClusterError> { + let mut allocations = Vec::with_capacity(node_ports.len()); + for (index, ports) in node_ports.iter().enumerate() { + let service_name = E::node_service_name(release, index); + allocations.push(discover_node_ports(client, namespace, &service_name, *ports).await?); + } + Ok(allocations) +} + +async fn scale_all_nodes( + client: &Client, + namespace: &str, + release: &str, + node_count: usize, + replicas: i32, +) -> Result<(), ManualClusterError> { + for index in 0..node_count { + scale_node::(client, namespace, release, index, replicas).await?; + } + Ok(()) +} + +async fn scale_node( + client: &Client, + namespace: &str, + release: &str, + index: usize, + replicas: i32, +) -> Result<(), ManualClusterError> { + let name = E::node_deployment_name(release, index); + let deployments = Api::::namespaced(client.clone(), namespace); + let patch = serde_json::json!({"spec": {"replicas": replicas}}); + deployments + .patch(&name, &PatchParams::default(), &Patch::Merge(&patch)) + .await + .map_err(|source| ManualClusterError::PatchDeployment { + name: name.clone(), + source, + })?; + + wait_for_replicas(client, namespace, &name, replicas).await +} + +async fn wait_for_replicas( + client: &Client, + namespace: &str, + name: &str, + replicas: i32, +) -> Result<(), ManualClusterError> { + if replicas > 0 { + return wait_for_deployment_ready(client, namespace, name) + .await + .map_err(Into::into); + } + + let deployments = Api::::namespaced(client.clone(), namespace); + RetryIf::spawn( + FixedInterval::from_millis(500).take(240), + || async { + let deployment = deployments.get(name).await.map_err(|source| { + ManualClusterError::PatchDeployment { + name: name.to_owned(), + source, + } + })?; + let ready = deployment + .status + .as_ref() + .and_then(|status| status.ready_replicas) + .unwrap_or(0); + let current = deployment + .spec + .as_ref() + .and_then(|spec| spec.replicas) + .unwrap_or(1); + if ready == 0 && current == 0 { + Ok(()) + } else { + Err(ManualClusterError::NodeAlreadyRunning { + name: name.to_owned(), + }) + } + }, + |error: &ManualClusterError| matches!(error, ManualClusterError::NodeAlreadyRunning { .. }), + ) + .await +} + +fn validate_start_options( + options: &StartNodeOptions, +) -> Result<(), ManualClusterError> { + if !matches!( + options.peers, + testing_framework_core::scenario::PeerSelection::DefaultLayout + ) { + return Err(ManualClusterError::UnsupportedStartOptions { + message: "custom peer selection is not supported".to_owned(), + }); + } + if options.config_override.is_some() || options.config_patch.is_some() { + return Err(ManualClusterError::UnsupportedStartOptions { + message: "config overrides/patches are not supported".to_owned(), + }); + } + if options.persist_dir.is_some() || options.snapshot_dir.is_some() { + return Err(ManualClusterError::UnsupportedStartOptions { + message: "persist/snapshot directories are not supported".to_owned(), + }); + } + Ok(()) +} + +fn parse_node_index(name: &str) -> Option { + name.strip_prefix("node-")?.parse().ok() +} + +fn canonical_node_name(index: usize) -> String { + format!("node-{index}") +} + +fn block_on_best_effort(fut: impl std::future::Future>) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + tokio::task::block_in_place(|| { + let _ = handle.block_on(fut); + }); + return; + } + + if let Ok(runtime) = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + let _ = runtime.block_on(fut); + } +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::{ + Application, DefaultFeedRuntime, NodeAccess, NodeClients, PeerSelection, + default_feed_result, + }; + + use super::*; + use crate::{ + PortSpecs, RenderedHelmChartAssets, render_single_template_chart_assets, + standard_port_specs, + }; + + struct DummyEnv; + + #[async_trait::async_trait] + impl Application for DummyEnv { + type Deployment = testing_framework_core::topology::ClusterTopology; + type NodeClient = String; + type NodeConfig = (); + type FeedRuntime = DefaultFeedRuntime; + + fn build_node_client(access: &NodeAccess) -> Result { + Ok(access.api_base_url()?.to_string()) + } + + async fn prepare_feed( + _node_clients: NodeClients, + ) -> Result< + ( + testing_framework_core::scenario::DefaultFeed, + Self::FeedRuntime, + ), + DynError, + > { + default_feed_result() + } + } + + #[async_trait::async_trait] + impl K8sDeployEnv for DummyEnv { + type Assets = RenderedHelmChartAssets; + + fn collect_port_specs(_topology: &Self::Deployment) -> PortSpecs { + standard_port_specs(1, 8080, 8081) + } + + fn prepare_assets( + _topology: &Self::Deployment, + _metrics_otlp_ingest_url: Option<&reqwest::Url>, + ) -> Result { + render_single_template_chart_assets("dummy", "dummy.yaml", "") + } + } + + #[test] + fn parse_node_index_accepts_node_labels() { + assert_eq!(parse_node_index("node-0"), Some(0)); + assert_eq!(parse_node_index("node-12"), Some(12)); + assert_eq!(parse_node_index("validator-0"), None); + } + + #[test] + fn validate_start_options_rejects_non_default_inputs() { + let peers = StartNodeOptions::::default().with_peers(PeerSelection::None); + assert!(matches!( + validate_start_options(&peers), + Err(ManualClusterError::UnsupportedStartOptions { .. }) + )); + + let persist = StartNodeOptions::::default() + .with_persist_dir(std::path::PathBuf::from("/tmp/demo")); + assert!(matches!( + validate_start_options(&persist), + Err(ManualClusterError::UnsupportedStartOptions { .. }) + )); + } +}