use std::env; use kube::Client; use reqwest::Url; use testing_framework_core::{ nodes::ApiClient, scenario::{CleanupGuard, Metrics, MetricsError, NodeClients, http_probe::NodeRole}, topology::{GeneratedTopology, ReadinessError}, }; use tracing::info; use url::ParseError; use uuid::Uuid; use crate::{ assets::RunnerAssets, cleanup::RunnerCleanup, host::node_host, logs::dump_namespace_logs, wait::{ClusterPorts, ClusterReady, NodeConfigPorts, wait_for_cluster_ready}, }; #[derive(Default)] pub struct PortSpecs { pub validators: Vec, pub executors: Vec, } /// Holds k8s namespace, Helm release, port forwards, and cleanup guard. pub struct ClusterEnvironment { client: Client, namespace: String, release: String, cleanup: Option, validator_api_ports: Vec, validator_testing_ports: Vec, executor_api_ports: Vec, executor_testing_ports: Vec, prometheus_port: u16, port_forwards: Vec, } impl ClusterEnvironment { pub fn new( client: Client, namespace: String, release: String, cleanup: RunnerCleanup, ports: &ClusterPorts, port_forwards: Vec, ) -> Self { Self { client, namespace, release, cleanup: Some(cleanup), validator_api_ports: ports.validators.iter().map(|ports| ports.api).collect(), validator_testing_ports: ports.validators.iter().map(|ports| ports.testing).collect(), executor_api_ports: ports.executors.iter().map(|ports| ports.api).collect(), executor_testing_ports: ports.executors.iter().map(|ports| ports.testing).collect(), prometheus_port: ports.prometheus, port_forwards, } } pub async fn fail(&mut self, reason: &str) { tracing::error!( reason = reason, namespace = %self.namespace, release = %self.release, "k8s stack failure; collecting diagnostics" ); dump_namespace_logs(&self.client, &self.namespace).await; kill_port_forwards(&mut self.port_forwards); if let Some(guard) = self.cleanup.take() { Box::new(guard).cleanup(); } } pub fn into_cleanup(self) -> (RunnerCleanup, Vec) { ( self.cleanup.expect("cleanup guard should be available"), self.port_forwards, ) } pub fn prometheus_port(&self) -> u16 { self.prometheus_port } pub fn validator_ports(&self) -> (&[u16], &[u16]) { (&self.validator_api_ports, &self.validator_testing_ports) } pub fn executor_ports(&self) -> (&[u16], &[u16]) { (&self.executor_api_ports, &self.executor_testing_ports) } } #[derive(Debug, thiserror::Error)] /// Failures while building node clients against forwarded ports. pub enum NodeClientError { #[error( "failed to build {endpoint} client URL for {role} port {port}: {source}", role = role.label() )] Endpoint { role: NodeRole, endpoint: &'static str, port: u16, #[source] source: ParseError, }, } #[derive(Debug, thiserror::Error)] /// Readiness check failures for the remote cluster endpoints. pub enum RemoteReadinessError { #[error( "failed to build readiness URL for {role} port {port}: {source}", role = role.label() )] Endpoint { role: NodeRole, port: u16, #[source] source: ParseError, }, #[error("remote readiness probe failed: {source}")] Remote { #[source] source: ReadinessError, }, } pub fn collect_port_specs(descriptors: &GeneratedTopology) -> PortSpecs { let validators = descriptors .validators() .iter() .map(|node| NodeConfigPorts { api: node.general.api_config.address.port(), testing: node.general.api_config.testing_http_address.port(), }) .collect(); let executors = descriptors .executors() .iter() .map(|node| NodeConfigPorts { api: node.general.api_config.address.port(), testing: node.general.api_config.testing_http_address.port(), }) .collect(); PortSpecs { validators, executors, } } pub fn build_node_clients(cluster: &ClusterEnvironment) -> Result { let validators = cluster .validator_api_ports .iter() .copied() .zip(cluster.validator_testing_ports.iter().copied()) .map(|(api_port, testing_port)| { api_client_from_ports(NodeRole::Validator, api_port, testing_port) }) .collect::, _>>()?; let executors = cluster .executor_api_ports .iter() .copied() .zip(cluster.executor_testing_ports.iter().copied()) .map(|(api_port, testing_port)| { api_client_from_ports(NodeRole::Executor, api_port, testing_port) }) .collect::, _>>()?; Ok(NodeClients::new(validators, executors)) } pub fn metrics_handle_from_port(port: u16) -> Result { let url = cluster_host_url(port) .map_err(|err| MetricsError::new(format!("invalid prometheus url: {err}")))?; Metrics::from_prometheus(url) } pub async fn ensure_cluster_readiness( descriptors: &GeneratedTopology, cluster: &ClusterEnvironment, ) -> Result<(), RemoteReadinessError> { info!("waiting for remote readiness (API + membership)"); let (validator_api, validator_testing) = cluster.validator_ports(); let (executor_api, executor_testing) = cluster.executor_ports(); let validator_urls = readiness_urls(validator_api, NodeRole::Validator)?; let executor_urls = readiness_urls(executor_api, NodeRole::Executor)?; let validator_membership_urls = readiness_urls(validator_testing, NodeRole::Validator)?; let executor_membership_urls = readiness_urls(executor_testing, NodeRole::Executor)?; descriptors .wait_remote_readiness( &validator_urls, &executor_urls, Some(&validator_membership_urls), Some(&executor_membership_urls), ) .await .map_err(|source| RemoteReadinessError::Remote { source }) } pub fn cluster_identifiers() -> (String, String) { let run_id = Uuid::new_v4().simple().to_string(); let namespace = format!("nomos-k8s-{run_id}"); (namespace.clone(), namespace) } pub async fn install_stack( client: &Client, assets: &RunnerAssets, namespace: &str, release: &str, validators: usize, executors: usize, ) -> Result { tracing::info!( release = %release, namespace = %namespace, "installing helm release" ); crate::helm::install_release(assets, release, namespace, validators, executors).await?; tracing::info!(release = %release, "helm install succeeded"); let preserve = env::var("K8S_RUNNER_PRESERVE").is_ok(); Ok(RunnerCleanup::new( client.clone(), namespace.to_owned(), release.to_owned(), preserve, )) } pub async fn wait_for_ports_or_cleanup( client: &Client, namespace: &str, release: &str, specs: &PortSpecs, cleanup_guard: &mut Option, ) -> Result { info!( validators = specs.validators.len(), executors = specs.executors.len(), %namespace, %release, "waiting for cluster port-forwards" ); match wait_for_cluster_ready( client, namespace, release, &specs.validators, &specs.executors, ) .await { Ok(ports) => { info!( prometheus_port = ports.ports.prometheus, validator_ports = ?ports.ports.validators, executor_ports = ?ports.ports.executors, "cluster port-forwards established" ); Ok(ports) } Err(err) => { cleanup_pending(client, namespace, cleanup_guard).await; Err(err.into()) } } } pub fn kill_port_forwards(handles: &mut Vec) { for handle in handles.iter_mut() { let _ = handle.kill(); let _ = handle.wait(); } handles.clear(); } async fn cleanup_pending(client: &Client, namespace: &str, guard: &mut Option) { crate::logs::dump_namespace_logs(client, namespace).await; if let Some(guard) = guard.take() { Box::new(guard).cleanup(); } } fn readiness_urls(ports: &[u16], role: NodeRole) -> Result, RemoteReadinessError> { ports .iter() .copied() .map(|port| readiness_url(role, port)) .collect() } fn readiness_url(role: NodeRole, port: u16) -> Result { cluster_host_url(port).map_err(|source| RemoteReadinessError::Endpoint { role, port, source }) } fn cluster_host_url(port: u16) -> Result { Url::parse(&format!("http://{}:{port}/", node_host())) } fn api_client_from_ports( role: NodeRole, api_port: u16, testing_port: u16, ) -> Result { let base_endpoint = cluster_host_url(api_port).map_err(|source| NodeClientError::Endpoint { role, endpoint: "api", port: api_port, source, })?; let testing_endpoint = Some( cluster_host_url(testing_port).map_err(|source| NodeClientError::Endpoint { role, endpoint: "testing", port: testing_port, source, })?, ); Ok(ApiClient::from_urls(base_endpoint, testing_endpoint)) }