use anyhow::Error; use async_trait::async_trait; use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ BlockFeedTask, CleanupGuard, Deployer, MetricsError, ObservabilityCapability, RunContext, Runner, Scenario, }, topology::generation::GeneratedTopology, }; use tracing::{error, info}; use crate::{ infrastructure::{ assets::{AssetsError, prepare_assets}, cluster::{ ClusterEnvironment, NodeClientError, PortSpecs, RemoteReadinessError, build_node_clients, cluster_identifiers, collect_port_specs, ensure_cluster_readiness, install_stack, kill_port_forwards, metrics_handle_from_endpoint, metrics_handle_from_url, wait_for_ports_or_cleanup, }, helm::HelmError, }, lifecycle::{block_feed::spawn_block_feed_with, cleanup::RunnerCleanup}, wait::{ClusterWaitError, HostPort, PortForwardHandle}, }; /// Deploys a scenario into Kubernetes using Helm charts and port-forwards. #[derive(Clone, Copy)] pub struct K8sDeployer { readiness_checks: bool, } impl Default for K8sDeployer { fn default() -> Self { Self::new() } } impl K8sDeployer { #[must_use] /// Create a k8s deployer with readiness checks enabled. pub const fn new() -> Self { Self { readiness_checks: true, } } #[must_use] /// Enable/disable readiness probes before handing control to workloads. pub const fn with_readiness(mut self, enabled: bool) -> Self { self.readiness_checks = enabled; self } } #[derive(Debug, thiserror::Error)] /// High-level runner failures returned to the scenario harness. pub enum K8sRunnerError { #[error( "kubernetes runner requires at least one validator and one executor (validators={validators}, executors={executors})" )] UnsupportedTopology { validators: usize, executors: usize }, #[error("failed to initialise kubernetes client: {source}")] ClientInit { #[source] source: kube::Error, }, #[error(transparent)] Assets(#[from] AssetsError), #[error(transparent)] Helm(#[from] HelmError), #[error(transparent)] Cluster(#[from] Box), #[error(transparent)] Readiness(#[from] RemoteReadinessError), #[error(transparent)] NodeClients(#[from] NodeClientError), #[error(transparent)] Telemetry(#[from] MetricsError), #[error("k8s runner requires at least one node client to follow blocks")] BlockFeedMissing, #[error("failed to initialize block feed: {source}")] BlockFeed { #[source] source: Error, }, } #[async_trait] impl Deployer for K8sDeployer { type Error = K8sRunnerError; async fn deploy(&self, scenario: &Scenario) -> Result { deploy_with_observability(self, scenario, None, None, None).await } } #[async_trait] impl Deployer for K8sDeployer { type Error = K8sRunnerError; async fn deploy( &self, scenario: &Scenario, ) -> Result { deploy_with_observability( self, scenario, scenario.capabilities().metrics_query_url.clone(), scenario.capabilities().metrics_query_grafana_url.clone(), scenario.capabilities().metrics_otlp_ingest_url.clone(), ) .await } } fn cluster_prometheus_endpoint(cluster: &Option) -> Option<&HostPort> { cluster .as_ref() .expect("cluster must be available") .prometheus_endpoint() } fn cluster_grafana_endpoint(cluster: &Option) -> Option<&HostPort> { cluster .as_ref() .expect("cluster must be available") .grafana_endpoint() } async fn fail_cluster(cluster: &mut Option, reason: &str) { if let Some(env) = cluster.as_mut() { env.fail(reason).await; } } impl From for K8sRunnerError { fn from(value: ClusterWaitError) -> Self { Self::Cluster(Box::new(value)) } } fn ensure_supported_topology(descriptors: &GeneratedTopology) -> Result<(), K8sRunnerError> { let validators = descriptors.validators().len(); let executors = descriptors.executors().len(); if validators == 0 || executors == 0 { return Err(K8sRunnerError::UnsupportedTopology { validators, executors, }); } Ok(()) } async fn deploy_with_observability( deployer: &K8sDeployer, scenario: &Scenario, metrics_query_url: Option, metrics_query_grafana_url: Option, metrics_otlp_ingest_url: Option, ) -> Result { let external_prometheus = match metrics_query_url { Some(url) => Some(url), None => match std::env::var("K8S_RUNNER_METRICS_QUERY_URL") .ok() .or_else(|| std::env::var("NOMOS_METRICS_QUERY_URL").ok()) // Back-compat: .or_else(|| std::env::var("K8S_RUNNER_EXTERNAL_PROMETHEUS_URL").ok()) .or_else(|| std::env::var("NOMOS_EXTERNAL_PROMETHEUS_URL").ok()) { Some(raw) if !raw.trim().is_empty() => { Some(Url::parse(raw.trim()).map_err(|err| { MetricsError::new(format!("invalid metrics query url: {err}")) })?) } _ => None, }, }; let external_prometheus_grafana_url = match metrics_query_grafana_url { Some(url) => Some(url), None => match std::env::var("K8S_RUNNER_METRICS_QUERY_GRAFANA_URL") .ok() .or_else(|| std::env::var("NOMOS_METRICS_QUERY_GRAFANA_URL").ok()) // Back-compat: .or_else(|| std::env::var("K8S_RUNNER_EXTERNAL_PROMETHEUS_GRAFANA_URL").ok()) .or_else(|| std::env::var("NOMOS_EXTERNAL_PROMETHEUS_GRAFANA_URL").ok()) { Some(raw) if !raw.trim().is_empty() => Some(Url::parse(raw.trim()).map_err(|err| { MetricsError::new(format!("invalid metrics query grafana url: {err}")) })?), _ => None, }, }; let external_otlp_metrics_endpoint = match metrics_otlp_ingest_url { Some(url) => Some(url), None => match std::env::var("K8S_RUNNER_METRICS_OTLP_INGEST_URL") .ok() .or_else(|| std::env::var("NOMOS_METRICS_OTLP_INGEST_URL").ok()) // Back-compat: .or_else(|| std::env::var("K8S_RUNNER_EXTERNAL_OTLP_METRICS_ENDPOINT").ok()) .or_else(|| std::env::var("NOMOS_EXTERNAL_OTLP_METRICS_ENDPOINT").ok()) { Some(raw) if !raw.trim().is_empty() => Some(Url::parse(raw.trim()).map_err(|err| { MetricsError::new(format!("invalid metrics OTLP ingest url: {err}")) })?), _ => None, }, }; let descriptors = scenario.topology().clone(); let validator_count = descriptors.validators().len(); let executor_count = descriptors.executors().len(); ensure_supported_topology(&descriptors)?; let client = Client::try_default() .await .map_err(|source| K8sRunnerError::ClientInit { source })?; info!( validators = validator_count, executors = executor_count, duration_secs = scenario.duration().as_secs(), readiness_checks = deployer.readiness_checks, metrics_query_url = external_prometheus.as_ref().map(|u| u.as_str()), metrics_query_grafana_url = external_prometheus_grafana_url.as_ref().map(|u| u.as_str()), metrics_otlp_ingest_url = external_otlp_metrics_endpoint.as_ref().map(|u| u.as_str()), "starting k8s deployment" ); let port_specs = collect_port_specs(&descriptors); let mut cluster = Some( setup_cluster( &client, &port_specs, &descriptors, deployer.readiness_checks, external_prometheus.as_ref(), external_prometheus_grafana_url.as_ref(), external_otlp_metrics_endpoint.as_ref(), ) .await?, ); info!("building node clients"); let node_clients = match build_node_clients( cluster .as_ref() .expect("cluster must be available while building clients"), ) { Ok(clients) => clients, Err(err) => { fail_cluster(&mut cluster, "failed to construct node api clients").await; error!(error = ?err, "failed to build k8s node clients"); return Err(err.into()); } }; let telemetry = match external_prometheus.clone() { Some(url) => metrics_handle_from_url(url), None => cluster .as_ref() .and_then(|cluster| cluster.prometheus_endpoint()) .ok_or_else(|| MetricsError::new("prometheus endpoint unavailable")) .and_then(metrics_handle_from_endpoint), }; let telemetry = match telemetry { Ok(handle) => handle, Err(err) => { fail_cluster( &mut cluster, "failed to configure prometheus metrics handle", ) .await; error!(error = ?err, "failed to configure prometheus metrics handle"); return Err(err.into()); } }; let (block_feed, block_feed_guard) = match spawn_block_feed_with(&node_clients).await { Ok(pair) => pair, Err(err) => { fail_cluster(&mut cluster, "failed to initialize block feed").await; error!(error = ?err, "failed to initialize block feed"); return Err(err); } }; if let Some(url) = external_prometheus.as_ref() { info!(prometheus_url = %url.as_str(), "using external prometheus endpoint"); } else if let Some(prometheus) = cluster_prometheus_endpoint(&cluster) { info!( prometheus_url = %format!("http://{}:{}/", prometheus.host, prometheus.port), "prometheus endpoint available on host" ); } if let Some(grafana) = cluster_grafana_endpoint(&cluster) { info!( grafana_url = %format!("http://{}:{}/", grafana.host, grafana.port), "grafana dashboard available on host" ); } if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() { let prometheus = external_prometheus .as_ref() .map(|u| u.as_str().to_string()) .or_else(|| { cluster_prometheus_endpoint(&cluster) .map(|endpoint| format!("http://{}:{}/", endpoint.host, endpoint.port)) }) .unwrap_or_else(|| "".to_string()); let grafana = cluster_grafana_endpoint(&cluster); println!( "TESTNET_ENDPOINTS prometheus={} grafana={}", prometheus, grafana .map(|endpoint| format!("http://{}:{}/", endpoint.host, endpoint.port)) .unwrap_or_else(|| "".to_string()) ); for (idx, client) in node_clients.validator_clients().iter().enumerate() { println!( "TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto", idx, client.base_url() ); } for (idx, client) in node_clients.executor_clients().iter().enumerate() { println!( "TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto", idx, client.base_url() ); } } let (cleanup, port_forwards) = cluster .take() .expect("cluster should still be available") .into_cleanup(); let cleanup_guard: Box = Box::new(K8sCleanupGuard::new( cleanup, block_feed_guard, port_forwards, )); let context = RunContext::new( descriptors, None, node_clients, scenario.duration(), telemetry, block_feed, None, ); info!( validators = validator_count, executors = executor_count, duration_secs = scenario.duration().as_secs(), "k8s deployment ready; handing control to scenario runner" ); Ok(Runner::new(context, Some(cleanup_guard))) } async fn setup_cluster( client: &Client, specs: &PortSpecs, descriptors: &GeneratedTopology, readiness_checks: bool, external_prometheus: Option<&Url>, external_prometheus_grafana_url: Option<&Url>, external_otlp_metrics_endpoint: Option<&Url>, ) -> Result { let assets = prepare_assets( descriptors, external_prometheus, external_prometheus_grafana_url, external_otlp_metrics_endpoint, )?; let validators = descriptors.validators().len(); let executors = descriptors.executors().len(); let (namespace, release) = cluster_identifiers(); info!(%namespace, %release, validators, executors, "preparing k8s assets and namespace"); let mut cleanup_guard = Some(install_stack(client, &assets, &namespace, &release, validators, executors).await?); info!("waiting for helm-managed services to become ready"); let cluster_ready = wait_for_ports_or_cleanup( client, &namespace, &release, specs, external_prometheus.is_none() && external_prometheus_grafana_url.is_none(), &mut cleanup_guard, ) .await?; if let Some(prometheus) = cluster_ready.ports.prometheus.as_ref() { info!(prometheus = ?prometheus, "discovered prometheus endpoint"); } let environment = ClusterEnvironment::new( client.clone(), namespace, release, cleanup_guard .take() .expect("cleanup guard must exist after successful cluster startup"), &cluster_ready.ports, cluster_ready.port_forwards, ); if readiness_checks { info!("probing cluster readiness"); ensure_cluster_readiness(descriptors, &environment).await?; info!("cluster readiness probes passed"); } Ok(environment) } struct K8sCleanupGuard { cleanup: RunnerCleanup, block_feed: Option, port_forwards: Vec, } impl K8sCleanupGuard { const fn new( cleanup: RunnerCleanup, block_feed: BlockFeedTask, port_forwards: Vec, ) -> Self { Self { cleanup, block_feed: Some(block_feed), port_forwards, } } } impl CleanupGuard for K8sCleanupGuard { fn cleanup(mut self: Box) { if let Some(block_feed) = self.block_feed.take() { CleanupGuard::cleanup(Box::new(block_feed)); } kill_port_forwards(&mut self.port_forwards); CleanupGuard::cleanup(Box::new(self.cleanup)); } }