From 2d9ab968ad3c58412a05bcfe5af2709afe2fd627 Mon Sep 17 00:00:00 2001 From: andrussal Date: Wed, 10 Dec 2025 10:25:51 +0100 Subject: [PATCH] Refactor k8s wait logic into modular orchestrator --- testing-framework/runners/k8s/src/wait.rs | 496 ------------------ .../runners/k8s/src/wait/deployment.rs | 52 ++ .../runners/k8s/src/wait/forwarding.rs | 106 ++++ .../runners/k8s/src/wait/http_probe.rs | 38 ++ testing-framework/runners/k8s/src/wait/mod.rs | 97 ++++ .../runners/k8s/src/wait/orchestrator.rs | 137 +++++ .../runners/k8s/src/wait/ports.rs | 62 +++ .../runners/k8s/src/wait/prometheus.rs | 36 ++ 8 files changed, 528 insertions(+), 496 deletions(-) delete mode 100644 testing-framework/runners/k8s/src/wait.rs create mode 100644 testing-framework/runners/k8s/src/wait/deployment.rs create mode 100644 testing-framework/runners/k8s/src/wait/forwarding.rs create mode 100644 testing-framework/runners/k8s/src/wait/http_probe.rs create mode 100644 testing-framework/runners/k8s/src/wait/mod.rs create mode 100644 testing-framework/runners/k8s/src/wait/orchestrator.rs create mode 100644 testing-framework/runners/k8s/src/wait/ports.rs create mode 100644 testing-framework/runners/k8s/src/wait/prometheus.rs diff --git a/testing-framework/runners/k8s/src/wait.rs b/testing-framework/runners/k8s/src/wait.rs deleted file mode 100644 index 8cd176b..0000000 --- a/testing-framework/runners/k8s/src/wait.rs +++ /dev/null @@ -1,496 +0,0 @@ -use std::{ - net::{Ipv4Addr, TcpListener, TcpStream}, - process::{Command as StdCommand, Stdio}, - thread, - time::Duration, -}; - -use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; -use kube::{Api, Client, Error as KubeError}; -use testing_framework_core::scenario::http_probe::{self, HttpReadinessError, NodeRole}; -use thiserror::Error; -use tokio::time::sleep; - -use crate::host::node_host; - -const DEPLOYMENT_TIMEOUT: Duration = Duration::from_secs(180); -const NODE_HTTP_TIMEOUT: Duration = Duration::from_secs(240); -const NODE_HTTP_PROBE_TIMEOUT: Duration = Duration::from_secs(30); -const HTTP_POLL_INTERVAL: Duration = Duration::from_secs(1); -const PROMETHEUS_HTTP_PORT: u16 = 9090; -const PROMETHEUS_HTTP_TIMEOUT: Duration = Duration::from_secs(240); -const PROMETHEUS_HTTP_PROBE_TIMEOUT: Duration = Duration::from_secs(30); -const PROMETHEUS_SERVICE_NAME: &str = "prometheus"; - -/// Container and host-side HTTP ports for a node in the Helm chart values. -#[derive(Clone, Copy, Debug)] -pub struct NodeConfigPorts { - pub api: u16, - pub testing: u16, -} - -/// Host-facing NodePorts for a node. -#[derive(Clone, Copy, Debug)] -pub struct NodePortAllocation { - pub api: u16, - pub testing: u16, -} - -/// All port assignments for the cluster plus Prometheus. -#[derive(Debug)] -pub struct ClusterPorts { - pub validators: Vec, - pub executors: Vec, - pub prometheus: u16, -} - -/// Success result from waiting for the cluster: host ports and forward handles. -#[derive(Debug)] -pub struct ClusterReady { - pub ports: ClusterPorts, - pub port_forwards: Vec, -} - -#[derive(Debug, Error)] -/// Failures while waiting for Kubernetes deployments or endpoints. -pub enum ClusterWaitError { - #[error("deployment {name} in namespace {namespace} did not become ready within {timeout:?}")] - DeploymentTimeout { - name: String, - namespace: String, - timeout: Duration, - }, - #[error("failed to fetch deployment {name}: {source}")] - DeploymentFetch { - name: String, - #[source] - source: KubeError, - }, - #[error("failed to fetch service {service}: {source}")] - ServiceFetch { - service: String, - #[source] - source: KubeError, - }, - #[error("service {service} did not allocate a node port for {port}")] - NodePortUnavailable { service: String, port: u16 }, - #[error("cluster must have at least one validator")] - MissingValidator, - #[error( - "timeout waiting for {role} HTTP endpoint on port {port} after {timeout:?}", - role = role.label() - )] - NodeHttpTimeout { - role: NodeRole, - port: u16, - timeout: Duration, - }, - #[error("timeout waiting for prometheus readiness on NodePort {port}")] - PrometheusTimeout { port: u16 }, - #[error("failed to start port-forward for service {service} port {port}: {source}")] - PortForward { - service: String, - port: u16, - #[source] - source: anyhow::Error, - }, -} - -pub async fn wait_for_deployment_ready( - client: &Client, - namespace: &str, - name: &str, - timeout: Duration, -) -> Result<(), ClusterWaitError> { - let mut elapsed = Duration::ZERO; - let interval = Duration::from_secs(2); - - while elapsed <= timeout { - match Api::::namespaced(client.clone(), namespace) - .get(name) - .await - { - Ok(deployment) => { - let desired = deployment - .spec - .as_ref() - .and_then(|spec| spec.replicas) - .unwrap_or(1); - let ready = deployment - .status - .as_ref() - .and_then(|status| status.ready_replicas) - .unwrap_or(0); - if ready >= desired { - return Ok(()); - } - } - Err(err) => { - return Err(ClusterWaitError::DeploymentFetch { - name: name.to_owned(), - source: err, - }); - } - } - - sleep(interval).await; - elapsed += interval; - } - - Err(ClusterWaitError::DeploymentTimeout { - name: name.to_owned(), - namespace: namespace.to_owned(), - timeout, - }) -} - -pub async fn find_node_port( - client: &Client, - namespace: &str, - service_name: &str, - service_port: u16, -) -> Result { - let interval = Duration::from_secs(1); - for _ in 0..120 { - match Api::::namespaced(client.clone(), namespace) - .get(service_name) - .await - { - Ok(service) => { - if let Some(spec) = service.spec.clone() - && let Some(ports) = spec.ports - { - for port in ports { - if port.port == i32::from(service_port) - && let Some(node_port) = port.node_port - { - return Ok(node_port as u16); - } - } - } - } - Err(err) => { - return Err(ClusterWaitError::ServiceFetch { - service: service_name.to_owned(), - source: err, - }); - } - } - sleep(interval).await; - } - - Err(ClusterWaitError::NodePortUnavailable { - service: service_name.to_owned(), - port: service_port, - }) -} - -pub async fn wait_for_cluster_ready( - client: &Client, - namespace: &str, - release: &str, - validator_ports: &[NodeConfigPorts], - executor_ports: &[NodeConfigPorts], -) -> Result { - if validator_ports.is_empty() { - return Err(ClusterWaitError::MissingValidator); - } - - let mut validator_allocations = Vec::with_capacity(validator_ports.len()); - - for (index, ports) in validator_ports.iter().enumerate() { - let name = format!("{release}-validator-{index}"); - wait_for_deployment_ready(client, namespace, &name, DEPLOYMENT_TIMEOUT).await?; - let api_port = find_node_port(client, namespace, &name, ports.api).await?; - let testing_port = find_node_port(client, namespace, &name, ports.testing).await?; - validator_allocations.push(NodePortAllocation { - api: api_port, - testing: testing_port, - }); - } - - let mut port_forwards = Vec::new(); - - let validator_api_ports: Vec = validator_allocations - .iter() - .map(|ports| ports.api) - .collect(); - if wait_for_node_http_nodeport( - &validator_api_ports, - NodeRole::Validator, - NODE_HTTP_PROBE_TIMEOUT, - ) - .await - .is_err() - { - // Fall back to port-forwarding when NodePorts are unreachable from the host. - validator_allocations.clear(); - port_forwards = port_forward_group( - namespace, - release, - "validator", - validator_ports, - &mut validator_allocations, - )?; - let validator_api_ports: Vec = validator_allocations - .iter() - .map(|ports| ports.api) - .collect(); - if let Err(err) = - wait_for_node_http_port_forward(&validator_api_ports, NodeRole::Validator).await - { - kill_port_forwards(&mut port_forwards); - return Err(err); - } - } - - let mut executor_allocations = Vec::with_capacity(executor_ports.len()); - for (index, ports) in executor_ports.iter().enumerate() { - let name = format!("{release}-executor-{index}"); - wait_for_deployment_ready(client, namespace, &name, DEPLOYMENT_TIMEOUT).await?; - let api_port = find_node_port(client, namespace, &name, ports.api).await?; - let testing_port = find_node_port(client, namespace, &name, ports.testing).await?; - executor_allocations.push(NodePortAllocation { - api: api_port, - testing: testing_port, - }); - } - - let executor_api_ports: Vec = executor_allocations.iter().map(|ports| ports.api).collect(); - if !executor_allocations.is_empty() - && wait_for_node_http_nodeport( - &executor_api_ports, - NodeRole::Executor, - NODE_HTTP_PROBE_TIMEOUT, - ) - .await - .is_err() - { - executor_allocations.clear(); - match port_forward_group( - namespace, - release, - "executor", - executor_ports, - &mut executor_allocations, - ) { - Ok(forwards) => port_forwards.extend(forwards), - Err(err) => { - kill_port_forwards(&mut port_forwards); - return Err(err); - } - } - let executor_api_ports: Vec = - executor_allocations.iter().map(|ports| ports.api).collect(); - if let Err(err) = - wait_for_node_http_port_forward(&executor_api_ports, NodeRole::Executor).await - { - kill_port_forwards(&mut port_forwards); - return Err(err); - } - } - - let mut prometheus_port = find_node_port( - client, - namespace, - PROMETHEUS_SERVICE_NAME, - PROMETHEUS_HTTP_PORT, - ) - .await?; - if wait_for_prometheus_http_nodeport(prometheus_port, PROMETHEUS_HTTP_PROBE_TIMEOUT) - .await - .is_err() - { - let (local_port, forward) = - port_forward_service(namespace, PROMETHEUS_SERVICE_NAME, PROMETHEUS_HTTP_PORT) - .map_err(|err| { - kill_port_forwards(&mut port_forwards); - err - })?; - prometheus_port = local_port; - port_forwards.push(forward); - if let Err(err) = - wait_for_prometheus_http_port_forward(prometheus_port, PROMETHEUS_HTTP_TIMEOUT).await - { - kill_port_forwards(&mut port_forwards); - return Err(err); - } - } - - Ok(ClusterReady { - ports: ClusterPorts { - validators: validator_allocations, - executors: executor_allocations, - prometheus: prometheus_port, - }, - port_forwards, - }) -} - -async fn wait_for_node_http_nodeport( - ports: &[u16], - role: NodeRole, - timeout: Duration, -) -> Result<(), ClusterWaitError> { - let host = node_host(); - wait_for_node_http_on_host(ports, role, &host, timeout).await -} - -async fn wait_for_node_http_port_forward( - ports: &[u16], - role: NodeRole, -) -> Result<(), ClusterWaitError> { - wait_for_node_http_on_host(ports, role, "127.0.0.1", NODE_HTTP_TIMEOUT).await -} - -async fn wait_for_node_http_on_host( - ports: &[u16], - role: NodeRole, - host: &str, - timeout: Duration, -) -> Result<(), ClusterWaitError> { - http_probe::wait_for_http_ports_with_host(ports, role, host, timeout, HTTP_POLL_INTERVAL) - .await - .map_err(map_http_error) -} - -const fn map_http_error(error: HttpReadinessError) -> ClusterWaitError { - ClusterWaitError::NodeHttpTimeout { - role: error.role(), - port: error.port(), - timeout: error.timeout(), - } -} - -pub async fn wait_for_prometheus_http_nodeport( - port: u16, - timeout: Duration, -) -> Result<(), ClusterWaitError> { - let host = node_host(); - wait_for_prometheus_http(&host, port, timeout).await -} - -pub async fn wait_for_prometheus_http_port_forward( - port: u16, - timeout: Duration, -) -> Result<(), ClusterWaitError> { - wait_for_prometheus_http("127.0.0.1", port, timeout).await -} - -pub async fn wait_for_prometheus_http( - host: &str, - port: u16, - timeout: Duration, -) -> Result<(), ClusterWaitError> { - let client = reqwest::Client::new(); - let url = format!("http://{host}:{port}/-/ready"); - - for _ in 0..timeout.as_secs() { - if let Ok(resp) = client.get(&url).send().await - && resp.status().is_success() - { - return Ok(()); - } - sleep(Duration::from_secs(1)).await; - } - - Err(ClusterWaitError::PrometheusTimeout { port }) -} - -fn port_forward_group( - namespace: &str, - release: &str, - kind: &str, - ports: &[NodeConfigPorts], - allocations: &mut Vec, -) -> Result, ClusterWaitError> { - let mut forwards = Vec::new(); - for (index, ports) in ports.iter().enumerate() { - let service = format!("{release}-{kind}-{index}"); - let (api_port, api_forward) = match port_forward_service(namespace, &service, ports.api) { - Ok(forward) => forward, - Err(err) => { - kill_port_forwards(&mut forwards); - return Err(err); - } - }; - let (testing_port, testing_forward) = - match port_forward_service(namespace, &service, ports.testing) { - Ok(forward) => forward, - Err(err) => { - kill_port_forwards(&mut forwards); - return Err(err); - } - }; - allocations.push(NodePortAllocation { - api: api_port, - testing: testing_port, - }); - forwards.push(api_forward); - forwards.push(testing_forward); - } - Ok(forwards) -} - -fn port_forward_service( - namespace: &str, - service: &str, - remote_port: u16, -) -> Result<(u16, std::process::Child), ClusterWaitError> { - let local_port = allocate_local_port().map_err(|source| ClusterWaitError::PortForward { - service: service.to_owned(), - port: remote_port, - source, - })?; - - let mut child = StdCommand::new("kubectl") - .arg("port-forward") - .arg("-n") - .arg(namespace) - .arg(format!("svc/{service}")) - .arg(format!("{local_port}:{remote_port}")) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn() - .map_err(|source| ClusterWaitError::PortForward { - service: service.to_owned(), - port: remote_port, - source: source.into(), - })?; - - for _ in 0..20 { - if let Ok(Some(status)) = child.try_wait() { - return Err(ClusterWaitError::PortForward { - service: service.to_owned(), - port: remote_port, - source: anyhow::anyhow!("kubectl exited with {status}"), - }); - } - if TcpStream::connect((Ipv4Addr::LOCALHOST, local_port)).is_ok() { - return Ok((local_port, child)); - } - thread::sleep(Duration::from_millis(250)); - } - - let _ = child.kill(); - Err(ClusterWaitError::PortForward { - service: service.to_owned(), - port: remote_port, - source: anyhow::anyhow!("port-forward did not become ready"), - }) -} - -fn allocate_local_port() -> anyhow::Result { - let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?; - let port = listener.local_addr()?.port(); - drop(listener); - Ok(port) -} - -fn kill_port_forwards(handles: &mut Vec) { - for handle in handles.iter_mut() { - let _ = handle.kill(); - let _ = handle.wait(); - } - handles.clear(); -} diff --git a/testing-framework/runners/k8s/src/wait/deployment.rs b/testing-framework/runners/k8s/src/wait/deployment.rs new file mode 100644 index 0000000..7ac713f --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/deployment.rs @@ -0,0 +1,52 @@ +use k8s_openapi::api::apps::v1::Deployment; +use kube::{Api, Client}; +use tokio::time::sleep; + +use super::{ClusterWaitError, DEPLOYMENT_TIMEOUT}; + +pub async fn wait_for_deployment_ready( + client: &Client, + namespace: &str, + name: &str, +) -> Result<(), ClusterWaitError> { + let mut elapsed = std::time::Duration::ZERO; + let interval = std::time::Duration::from_secs(2); + + while elapsed <= DEPLOYMENT_TIMEOUT { + match Api::::namespaced(client.clone(), namespace) + .get(name) + .await + { + Ok(deployment) => { + let desired = deployment + .spec + .as_ref() + .and_then(|spec| spec.replicas) + .unwrap_or(1); + let ready = deployment + .status + .as_ref() + .and_then(|status| status.ready_replicas) + .unwrap_or(0); + if ready >= desired { + return Ok(()); + } + } + Err(err) => { + return Err(ClusterWaitError::DeploymentFetch { + name: name.to_owned(), + source: err, + }); + } + } + + sleep(interval).await; + elapsed += interval; + } + + Err(ClusterWaitError::DeploymentTimeout { + name: name.to_owned(), + namespace: namespace.to_owned(), + timeout: DEPLOYMENT_TIMEOUT, + }) +} diff --git a/testing-framework/runners/k8s/src/wait/forwarding.rs b/testing-framework/runners/k8s/src/wait/forwarding.rs new file mode 100644 index 0000000..5120f55 --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/forwarding.rs @@ -0,0 +1,106 @@ +use std::{ + net::{Ipv4Addr, TcpListener, TcpStream}, + process::{Child, Command as StdCommand, Stdio}, + thread, + time::Duration, +}; + +use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation}; + +pub fn port_forward_group( + namespace: &str, + release: &str, + kind: &str, + ports: &[NodeConfigPorts], + allocations: &mut Vec, +) -> Result, ClusterWaitError> { + let mut forwards = Vec::new(); + for (index, ports) in ports.iter().enumerate() { + let service = format!("{release}-{kind}-{index}"); + let (api_port, api_forward) = match port_forward_service(namespace, &service, ports.api) { + Ok(forward) => forward, + Err(err) => { + kill_port_forwards(&mut forwards); + return Err(err); + } + }; + let (testing_port, testing_forward) = + match port_forward_service(namespace, &service, ports.testing) { + Ok(forward) => forward, + Err(err) => { + kill_port_forwards(&mut forwards); + return Err(err); + } + }; + allocations.push(NodePortAllocation { + api: api_port, + testing: testing_port, + }); + forwards.push(api_forward); + forwards.push(testing_forward); + } + Ok(forwards) +} + +pub fn port_forward_service( + namespace: &str, + service: &str, + remote_port: u16, +) -> Result<(u16, Child), ClusterWaitError> { + let local_port = allocate_local_port().map_err(|source| ClusterWaitError::PortForward { + service: service.to_owned(), + port: remote_port, + source, + })?; + + let mut child = StdCommand::new("kubectl") + .arg("port-forward") + .arg("-n") + .arg(namespace) + .arg(format!("svc/{service}")) + .arg(format!("{local_port}:{remote_port}")) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|source| ClusterWaitError::PortForward { + service: service.to_owned(), + port: remote_port, + source: source.into(), + })?; + + for _ in 0..20 { + if let Ok(Some(status)) = child.try_wait() { + return Err(ClusterWaitError::PortForward { + service: service.to_owned(), + port: remote_port, + source: anyhow::anyhow!("kubectl exited with {status}"), + }); + } + if TcpStream::connect((Ipv4Addr::LOCALHOST, local_port)).is_ok() { + return Ok((local_port, child)); + } + thread::sleep(Duration::from_millis(250)); + } + + let _ = child.kill(); + Err(ClusterWaitError::PortForward { + service: service.to_owned(), + port: remote_port, + source: anyhow::anyhow!("port-forward did not become ready"), + }) +} + +pub fn kill_port_forwards(handles: &mut Vec) { + for handle in handles.iter_mut() { + let _ = handle.kill(); + let _ = handle.wait(); + } + handles.clear(); +} + +fn allocate_local_port() -> anyhow::Result { + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?; + let port = listener.local_addr()?.port(); + drop(listener); + Ok(port) +} diff --git a/testing-framework/runners/k8s/src/wait/http_probe.rs b/testing-framework/runners/k8s/src/wait/http_probe.rs new file mode 100644 index 0000000..cd0f6b8 --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/http_probe.rs @@ -0,0 +1,38 @@ +use testing_framework_core::scenario::http_probe::{self, HttpReadinessError, NodeRole}; + +use super::{ClusterWaitError, HTTP_POLL_INTERVAL, NODE_HTTP_PROBE_TIMEOUT, NODE_HTTP_TIMEOUT}; +use crate::host::node_host; + +pub async fn wait_for_node_http_nodeport( + ports: &[u16], + role: NodeRole, +) -> Result<(), ClusterWaitError> { + let host = node_host(); + wait_for_node_http_on_host(ports, role, &host, NODE_HTTP_PROBE_TIMEOUT).await +} + +pub async fn wait_for_node_http_port_forward( + ports: &[u16], + role: NodeRole, +) -> Result<(), ClusterWaitError> { + wait_for_node_http_on_host(ports, role, "127.0.0.1", NODE_HTTP_TIMEOUT).await +} + +async fn wait_for_node_http_on_host( + ports: &[u16], + role: NodeRole, + host: &str, + timeout: std::time::Duration, +) -> Result<(), ClusterWaitError> { + http_probe::wait_for_http_ports_with_host(ports, role, host, timeout, HTTP_POLL_INTERVAL) + .await + .map_err(map_http_error) +} + +const fn map_http_error(error: HttpReadinessError) -> ClusterWaitError { + ClusterWaitError::NodeHttpTimeout { + role: error.role(), + port: error.port(), + timeout: error.timeout(), + } +} diff --git a/testing-framework/runners/k8s/src/wait/mod.rs b/testing-framework/runners/k8s/src/wait/mod.rs new file mode 100644 index 0000000..392564a --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/mod.rs @@ -0,0 +1,97 @@ +use std::time::Duration; + +use kube::Error as KubeError; +use testing_framework_core::scenario::http_probe::NodeRole; +use thiserror::Error; + +mod deployment; +mod forwarding; +mod http_probe; +mod orchestrator; +mod ports; +mod prometheus; + +pub use orchestrator::wait_for_cluster_ready; + +/// Container and host-side HTTP ports for a node in the Helm chart values. +#[derive(Clone, Copy, Debug)] +pub struct NodeConfigPorts { + pub api: u16, + pub testing: u16, +} + +/// Host-facing NodePorts for a node. +#[derive(Clone, Copy, Debug)] +pub struct NodePortAllocation { + pub api: u16, + pub testing: u16, +} + +/// All port assignments for the cluster plus Prometheus. +#[derive(Debug)] +pub struct ClusterPorts { + pub validators: Vec, + pub executors: Vec, + pub prometheus: u16, +} + +/// Success result from waiting for the cluster: host ports and forward handles. +#[derive(Debug)] +pub struct ClusterReady { + pub ports: ClusterPorts, + pub port_forwards: Vec, +} + +#[derive(Debug, Error)] +/// Failures while waiting for Kubernetes deployments or endpoints. +pub enum ClusterWaitError { + #[error("deployment {name} in namespace {namespace} did not become ready within {timeout:?}")] + DeploymentTimeout { + name: String, + namespace: String, + timeout: Duration, + }, + #[error("failed to fetch deployment {name}: {source}")] + DeploymentFetch { + name: String, + #[source] + source: KubeError, + }, + #[error("failed to fetch service {service}: {source}")] + ServiceFetch { + service: String, + #[source] + source: KubeError, + }, + #[error("service {service} did not allocate a node port for {port}")] + NodePortUnavailable { service: String, port: u16 }, + #[error("cluster must have at least one validator")] + MissingValidator, + #[error( + "timeout waiting for {role} HTTP endpoint on port {port} after {timeout:?}", + role = role.label() + )] + NodeHttpTimeout { + role: NodeRole, + port: u16, + timeout: Duration, + }, + #[error("timeout waiting for prometheus readiness on NodePort {port}")] + PrometheusTimeout { port: u16 }, + #[error("failed to start port-forward for service {service} port {port}: {source}")] + PortForward { + service: String, + port: u16, + #[source] + source: anyhow::Error, + }, +} + +pub(crate) const DEPLOYMENT_TIMEOUT: Duration = Duration::from_secs(180); +pub(crate) const NODE_HTTP_TIMEOUT: Duration = Duration::from_secs(240); +pub(crate) const NODE_HTTP_PROBE_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const HTTP_POLL_INTERVAL: Duration = Duration::from_secs(1); +pub(crate) const PROMETHEUS_HTTP_PORT: u16 = 9090; +pub(crate) const PROMETHEUS_HTTP_TIMEOUT: Duration = Duration::from_secs(240); +pub(crate) const PROMETHEUS_HTTP_PROBE_TIMEOUT: Duration = Duration::from_secs(30); +pub(crate) const PROMETHEUS_SERVICE_NAME: &str = "prometheus"; diff --git a/testing-framework/runners/k8s/src/wait/orchestrator.rs b/testing-framework/runners/k8s/src/wait/orchestrator.rs new file mode 100644 index 0000000..c4f46e4 --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/orchestrator.rs @@ -0,0 +1,137 @@ +use kube::Client; +use testing_framework_core::scenario::http_probe::NodeRole; + +use super::{ + ClusterPorts, ClusterReady, ClusterWaitError, NodeConfigPorts, PROMETHEUS_HTTP_PORT, + PROMETHEUS_HTTP_PROBE_TIMEOUT, PROMETHEUS_SERVICE_NAME, +}; +use crate::wait::{ + deployment::wait_for_deployment_ready, + forwarding::{kill_port_forwards, port_forward_group, port_forward_service}, + http_probe::{wait_for_node_http_nodeport, wait_for_node_http_port_forward}, + ports::{discover_node_ports, find_node_port}, + prometheus::{wait_for_prometheus_http_nodeport, wait_for_prometheus_http_port_forward}, +}; + +pub async fn wait_for_cluster_ready( + client: &Client, + namespace: &str, + release: &str, + validator_ports: &[NodeConfigPorts], + executor_ports: &[NodeConfigPorts], +) -> Result { + if validator_ports.is_empty() { + return Err(ClusterWaitError::MissingValidator); + } + + let mut validator_allocations = Vec::with_capacity(validator_ports.len()); + + for (index, ports) in validator_ports.iter().enumerate() { + let name = format!("{release}-validator-{index}"); + wait_for_deployment_ready(client, namespace, &name).await?; + let allocation = discover_node_ports(client, namespace, &name, *ports).await?; + validator_allocations.push(allocation); + } + + let mut port_forwards = Vec::new(); + + let validator_api_ports: Vec = validator_allocations + .iter() + .map(|ports| ports.api) + .collect(); + if wait_for_node_http_nodeport(&validator_api_ports, NodeRole::Validator) + .await + .is_err() + { + validator_allocations.clear(); + port_forwards = port_forward_group( + namespace, + release, + "validator", + validator_ports, + &mut validator_allocations, + )?; + let validator_api_ports: Vec = validator_allocations + .iter() + .map(|ports| ports.api) + .collect(); + if let Err(err) = + wait_for_node_http_port_forward(&validator_api_ports, NodeRole::Validator).await + { + kill_port_forwards(&mut port_forwards); + return Err(err); + } + } + + let mut executor_allocations = Vec::with_capacity(executor_ports.len()); + for (index, ports) in executor_ports.iter().enumerate() { + let name = format!("{release}-executor-{index}"); + wait_for_deployment_ready(client, namespace, &name).await?; + let allocation = discover_node_ports(client, namespace, &name, *ports).await?; + executor_allocations.push(allocation); + } + + let executor_api_ports: Vec = executor_allocations.iter().map(|ports| ports.api).collect(); + if !executor_allocations.is_empty() + && wait_for_node_http_nodeport(&executor_api_ports, NodeRole::Executor) + .await + .is_err() + { + executor_allocations.clear(); + match port_forward_group( + namespace, + release, + "executor", + executor_ports, + &mut executor_allocations, + ) { + Ok(forwards) => port_forwards.extend(forwards), + Err(err) => { + kill_port_forwards(&mut port_forwards); + return Err(err); + } + } + let executor_api_ports: Vec = + executor_allocations.iter().map(|ports| ports.api).collect(); + if let Err(err) = + wait_for_node_http_port_forward(&executor_api_ports, NodeRole::Executor).await + { + kill_port_forwards(&mut port_forwards); + return Err(err); + } + } + + let mut prometheus_port = find_node_port( + client, + namespace, + PROMETHEUS_SERVICE_NAME, + PROMETHEUS_HTTP_PORT, + ) + .await?; + if wait_for_prometheus_http_nodeport(prometheus_port, PROMETHEUS_HTTP_PROBE_TIMEOUT) + .await + .is_err() + { + let (local_port, forward) = + port_forward_service(namespace, PROMETHEUS_SERVICE_NAME, PROMETHEUS_HTTP_PORT) + .map_err(|err| { + kill_port_forwards(&mut port_forwards); + err + })?; + prometheus_port = local_port; + port_forwards.push(forward); + if let Err(err) = wait_for_prometheus_http_port_forward(prometheus_port).await { + kill_port_forwards(&mut port_forwards); + return Err(err); + } + } + + Ok(ClusterReady { + ports: ClusterPorts { + validators: validator_allocations, + executors: executor_allocations, + prometheus: prometheus_port, + }, + port_forwards, + }) +} diff --git a/testing-framework/runners/k8s/src/wait/ports.rs b/testing-framework/runners/k8s/src/wait/ports.rs new file mode 100644 index 0000000..0d44b1b --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/ports.rs @@ -0,0 +1,62 @@ +use k8s_openapi::api::core::v1::Service; +use kube::{Api, Client}; +use tokio::time::sleep; + +use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation}; + +pub async fn find_node_port( + client: &Client, + namespace: &str, + service_name: &str, + service_port: u16, +) -> Result { + let interval = std::time::Duration::from_secs(1); + for _ in 0..120 { + match Api::::namespaced(client.clone(), namespace) + .get(service_name) + .await + { + Ok(service) => { + if let Some(spec) = service.spec.clone() + && let Some(ports) = spec.ports + { + for port in ports { + if port.port == i32::from(service_port) + && let Some(node_port) = port.node_port + { + return Ok(node_port as u16); + } + } + } + } + Err(err) => { + return Err(ClusterWaitError::ServiceFetch { + service: service_name.to_owned(), + source: err, + }); + } + } + sleep(interval).await; + } + + Err(ClusterWaitError::NodePortUnavailable { + service: service_name.to_owned(), + port: service_port, + }) +} + +pub async fn discover_node_ports( + client: &Client, + namespace: &str, + service_name: &str, + config_ports: NodeConfigPorts, +) -> Result { + let api_port = find_node_port(client, namespace, service_name, config_ports.api).await?; + let testing_port = + find_node_port(client, namespace, service_name, config_ports.testing).await?; + + Ok(NodePortAllocation { + api: api_port, + testing: testing_port, + }) +} diff --git a/testing-framework/runners/k8s/src/wait/prometheus.rs b/testing-framework/runners/k8s/src/wait/prometheus.rs new file mode 100644 index 0000000..a962de5 --- /dev/null +++ b/testing-framework/runners/k8s/src/wait/prometheus.rs @@ -0,0 +1,36 @@ +use tokio::time::sleep; + +use super::{ClusterWaitError, PROMETHEUS_HTTP_TIMEOUT}; +use crate::host::node_host; + +pub async fn wait_for_prometheus_http_nodeport( + port: u16, + timeout: std::time::Duration, +) -> Result<(), ClusterWaitError> { + let host = node_host(); + wait_for_prometheus_http(&host, port, timeout).await +} + +pub async fn wait_for_prometheus_http_port_forward(port: u16) -> Result<(), ClusterWaitError> { + wait_for_prometheus_http("127.0.0.1", port, PROMETHEUS_HTTP_TIMEOUT).await +} + +async fn wait_for_prometheus_http( + host: &str, + port: u16, + timeout: std::time::Duration, +) -> Result<(), ClusterWaitError> { + let client = reqwest::Client::new(); + let url = format!("http://{host}:{port}/-/ready"); + + for _ in 0..timeout.as_secs() { + if let Ok(resp) = client.get(&url).send().await + && resp.status().is_success() + { + return Ok(()); + } + sleep(std::time::Duration::from_secs(1)).await; + } + + Err(ClusterWaitError::PrometheusTimeout { port }) +}