From 26dfa1b79538beab40359269c8b804ab2340a794 Mon Sep 17 00:00:00 2001 From: andrussal Date: Mon, 15 Dec 2025 23:13:38 +0100 Subject: [PATCH] refactor: explicit retry policies + RAII port-forwards --- Cargo.lock | 1 + examples/Cargo.toml | 1 + examples/src/bin/compose_runner.rs | 34 ++++----- examples/src/bin/k8s_runner.rs | 34 ++++----- examples/src/bin/local_runner.rs | 30 ++++---- examples/src/env.rs | 10 +++ examples/src/lib.rs | 4 ++ .../runners/compose/src/descriptor/node.rs | 24 ++----- .../runners/k8s/src/deployer/orchestrator.rs | 6 +- .../runners/k8s/src/infrastructure/cluster.rs | 15 ++-- .../runners/k8s/src/lifecycle/cleanup.rs | 7 +- .../k8s/src/lifecycle/wait/forwarding.rs | 72 ++++++++++++++----- .../runners/k8s/src/lifecycle/wait/mod.rs | 3 +- .../k8s/src/lifecycle/wait/orchestrator.rs | 30 ++++---- .../runners/k8s/src/lifecycle/wait/ports.rs | 8 ++- .../k8s/src/lifecycle/wait/prometheus.rs | 7 +- .../src/expectations/consensus_liveness.rs | 2 +- 17 files changed, 166 insertions(+), 122 deletions(-) create mode 100644 examples/src/env.rs diff --git a/Cargo.lock b/Cargo.lock index 1e0602f..7539748 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6015,6 +6015,7 @@ dependencies = [ name = "runner-examples" version = "0.1.0" dependencies = [ + "anyhow", "testing-framework-core", "testing-framework-runner-compose", "testing-framework-runner-k8s", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 5e278dc..ba28e95 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -10,6 +10,7 @@ repository.workspace = true version = "0.1.0" [dependencies] +anyhow = "1" testing-framework-core = { workspace = true } testing-framework-runner-compose = { workspace = true } testing-framework-runner-k8s = { workspace = true } diff --git a/examples/src/bin/compose_runner.rs b/examples/src/bin/compose_runner.rs index 1917f0c..8b6944a 100644 --- a/examples/src/bin/compose_runner.rs +++ b/examples/src/bin/compose_runner.rs @@ -1,6 +1,7 @@ -use std::{env, error::Error, process, str::FromStr, time::Duration}; +use std::{env, process, time::Duration}; -use runner_examples::{ChaosBuilderExt as _, ScenarioBuilderExt as _}; +use anyhow::{Context as _, Result}; +use runner_examples::{ChaosBuilderExt as _, ScenarioBuilderExt as _, read_env_any}; use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder}; use testing_framework_runner_compose::{ComposeDeployer, ComposeRunnerError}; use tracing::{info, warn}; @@ -16,6 +17,7 @@ const TRANSACTION_WALLETS: usize = 500; const CHAOS_MIN_DELAY_SECS: u64 = 120; const CHAOS_MAX_DELAY_SECS: u64 = 180; const CHAOS_COOLDOWN_SECS: u64 = 240; +const CHAOS_DELAY_HEADROOM_SECS: u64 = 1; // DA Testing Constants const DA_CHANNEL_RATE: u64 = 1; @@ -66,7 +68,7 @@ async fn run_compose_case( validators: usize, executors: usize, run_duration: Duration, -) -> Result<(), Box> { +) -> Result<()> { info!( validators, executors, @@ -74,6 +76,11 @@ async fn run_compose_case( "building scenario plan" ); + let chaos_min_delay = Duration::from_secs(CHAOS_MIN_DELAY_SECS) + .max(run_duration + Duration::from_secs(CHAOS_DELAY_HEADROOM_SECS)); + let chaos_max_delay = + Duration::from_secs(CHAOS_MAX_DELAY_SECS).max(chaos_min_delay); + let mut plan = ScenarioBuilder::topology_with(|t| { t.network_star() .validators(validators) @@ -83,8 +90,8 @@ async fn run_compose_case( .chaos_with(|c| { c.restart() // Keep chaos restarts outside the test run window to avoid crash loops on restart. - .min_delay(Duration::from_secs(CHAOS_MIN_DELAY_SECS)) - .max_delay(Duration::from_secs(CHAOS_MAX_DELAY_SECS)) + .min_delay(chaos_min_delay) + .max_delay(chaos_max_delay) .target_cooldown(Duration::from_secs(CHAOS_COOLDOWN_SECS)) .apply() }) @@ -110,7 +117,7 @@ async fn run_compose_case( warn!("Docker is unavailable; cannot run compose demo"); return Ok(()); } - Err(err) => return Err(err.into()), + Err(err) => return Err(anyhow::Error::new(err)).context("deploying compose stack failed"), }; if !runner.context().telemetry().is_configured() { @@ -118,14 +125,9 @@ async fn run_compose_case( } info!("running scenario"); - runner.run(&mut plan).await.map(|_| ()).map_err(Into::into) -} - -fn read_env_any(keys: &[&str], default: T) -> T -where - T: FromStr + Copy, -{ - keys.iter() - .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) - .unwrap_or(default) + runner + .run(&mut plan) + .await + .context("running compose scenario failed")?; + Ok(()) } diff --git a/examples/src/bin/k8s_runner.rs b/examples/src/bin/k8s_runner.rs index c20128e..399faa5 100644 --- a/examples/src/bin/k8s_runner.rs +++ b/examples/src/bin/k8s_runner.rs @@ -1,6 +1,7 @@ -use std::{env, error::Error, process, str::FromStr, time::Duration}; +use std::{process, time::Duration}; -use runner_examples::ScenarioBuilderExt as _; +use anyhow::{Context as _, Result, ensure}; +use runner_examples::{ScenarioBuilderExt as _, read_env_any}; use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder}; use testing_framework_runner_k8s::{K8sDeployer, K8sRunnerError}; use tracing::{info, warn}; @@ -43,7 +44,7 @@ async fn run_k8s_case( validators: usize, executors: usize, run_duration: Duration, -) -> Result<(), Box> { +) -> Result<()> { info!( validators, executors, @@ -76,7 +77,7 @@ async fn run_k8s_case( warn!("Kubernetes cluster unavailable ({source}); skipping"); return Ok(()); } - Err(err) => return Err(err.into()), + Err(err) => return Err(anyhow::Error::new(err)).context("deploying k8s stack failed"), }; if !runner.context().telemetry().is_configured() { @@ -91,20 +92,18 @@ async fn run_k8s_case( let handle = runner .run(&mut plan) .await - .map_err(|err| format!("k8s scenario failed: {err}"))?; + .context("running k8s scenario failed")?; for (idx, client) in validator_clients.iter().enumerate() { let info = client .consensus_info() .await - .map_err(|err| format!("validator {idx} consensus_info failed: {err}"))?; - if info.height < MIN_CONSENSUS_HEIGHT { - return Err(format!( - "validator {idx} height {} should reach at least {MIN_CONSENSUS_HEIGHT} blocks", - info.height - ) - .into()); - } + .with_context(|| format!("validator {idx} consensus_info failed"))?; + ensure!( + info.height >= MIN_CONSENSUS_HEIGHT, + "validator {idx} height {} should reach at least {MIN_CONSENSUS_HEIGHT} blocks", + info.height + ); } // Explicitly drop after checks, allowing cleanup to proceed. @@ -112,12 +111,3 @@ async fn run_k8s_case( Ok(()) } - -fn read_env_any(keys: &[&str], default: T) -> T -where - T: FromStr + Copy, -{ - keys.iter() - .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) - .unwrap_or(default) -} diff --git a/examples/src/bin/local_runner.rs b/examples/src/bin/local_runner.rs index bc72247..512cfb5 100644 --- a/examples/src/bin/local_runner.rs +++ b/examples/src/bin/local_runner.rs @@ -1,6 +1,7 @@ -use std::{env, error::Error, process, str::FromStr, time::Duration}; +use std::{env, process, time::Duration}; -use runner_examples::ScenarioBuilderExt as _; +use anyhow::{Context as _, Result}; +use runner_examples::{ScenarioBuilderExt as _, read_env_any}; use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder}; use testing_framework_runner_local::LocalDeployer; use tracing::{info, warn}; @@ -46,11 +47,7 @@ async fn main() { } } -async fn run_local_case( - validators: usize, - executors: usize, - run_duration: Duration, -) -> Result<(), Box> { +async fn run_local_case(validators: usize, executors: usize, run_duration: Duration) -> Result<()> { info!( validators, executors, @@ -71,20 +68,17 @@ async fn run_local_case( let deployer = LocalDeployer::default().with_membership_check(true); info!("deploying local nodes"); - let runner: Runner = deployer.deploy(&plan).await?; + let runner: Runner = deployer + .deploy(&plan) + .await + .context("deploying local nodes failed")?; info!("running scenario"); - runner.run(&mut plan).await.map(|_| ())?; + runner + .run(&mut plan) + .await + .context("running local scenario failed")?; info!("scenario complete"); Ok(()) } - -fn read_env_any(keys: &[&str], default: T) -> T -where - T: FromStr + Copy, -{ - keys.iter() - .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) - .unwrap_or(default) -} diff --git a/examples/src/env.rs b/examples/src/env.rs new file mode 100644 index 0000000..f6a1ed0 --- /dev/null +++ b/examples/src/env.rs @@ -0,0 +1,10 @@ +use std::{env, str::FromStr}; + +pub fn read_env_any(keys: &[&str], default: T) -> T +where + T: FromStr + Copy, +{ + keys.iter() + .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) + .unwrap_or(default) +} diff --git a/examples/src/lib.rs b/examples/src/lib.rs index 8738278..bf0bcae 100644 --- a/examples/src/lib.rs +++ b/examples/src/lib.rs @@ -4,6 +4,10 @@ pub use testing_framework_workflows::{ expectations, util, workloads, }; +pub mod env; + +pub use env::read_env_any; + /// Metrics are currently disabled in this branch; return a stub handle. #[must_use] pub const fn configure_prometheus_metrics() -> Metrics { diff --git a/testing-framework/runners/compose/src/descriptor/node.rs b/testing-framework/runners/compose/src/descriptor/node.rs index 462cf85..61c19f1 100644 --- a/testing-framework/runners/compose/src/descriptor/node.rs +++ b/testing-framework/runners/compose/src/descriptor/node.rs @@ -55,6 +55,8 @@ impl NodeDescriptor { ) -> Self { let mut environment = base_environment(cfgsync_port); let identifier = kind.instance_name(index); + let api_port = node.general.api_config.address.port(); + let testing_port = node.general.api_config.testing_http_address.port(); environment.extend([ EnvEntry::new( "CFG_NETWORK_PORT", @@ -62,28 +64,14 @@ impl NodeDescriptor { ), EnvEntry::new("CFG_DA_PORT", node.da_port.to_string()), EnvEntry::new("CFG_BLEND_PORT", node.blend_port.to_string()), - EnvEntry::new( - "CFG_API_PORT", - node.general.api_config.address.port().to_string(), - ), - EnvEntry::new( - "CFG_TESTING_HTTP_PORT", - node.general - .api_config - .testing_http_address - .port() - .to_string(), - ), + EnvEntry::new("CFG_API_PORT", api_port.to_string()), + EnvEntry::new("CFG_TESTING_HTTP_PORT", testing_port.to_string()), EnvEntry::new("CFG_HOST_IDENTIFIER", identifier), ]); let ports = vec![ - node.general.api_config.address.port().to_string(), - node.general - .api_config - .testing_http_address - .port() - .to_string(), + format!("127.0.0.1:{api_port}:{api_port}"), + format!("127.0.0.1:{testing_port}:{testing_port}"), ]; Self { diff --git a/testing-framework/runners/k8s/src/deployer/orchestrator.rs b/testing-framework/runners/k8s/src/deployer/orchestrator.rs index f09b9a6..4fc1674 100644 --- a/testing-framework/runners/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/runners/k8s/src/deployer/orchestrator.rs @@ -18,7 +18,7 @@ use crate::{ helm::HelmError, }, lifecycle::{block_feed::spawn_block_feed_with, cleanup::RunnerCleanup}, - wait::ClusterWaitError, + wait::{ClusterWaitError, PortForwardHandle}, }; /// Deploys a scenario into Kubernetes using Helm charts and port-forwards. @@ -285,14 +285,14 @@ async fn setup_cluster( struct K8sCleanupGuard { cleanup: RunnerCleanup, block_feed: Option, - port_forwards: Vec, + port_forwards: Vec, } impl K8sCleanupGuard { const fn new( cleanup: RunnerCleanup, block_feed: BlockFeedTask, - port_forwards: Vec, + port_forwards: Vec, ) -> Self { Self { cleanup, diff --git a/testing-framework/runners/k8s/src/infrastructure/cluster.rs b/testing-framework/runners/k8s/src/infrastructure/cluster.rs index 1186d47..4670e3d 100644 --- a/testing-framework/runners/k8s/src/infrastructure/cluster.rs +++ b/testing-framework/runners/k8s/src/infrastructure/cluster.rs @@ -15,7 +15,9 @@ use crate::{ host::node_host, infrastructure::assets::RunnerAssets, lifecycle::{cleanup::RunnerCleanup, logs::dump_namespace_logs}, - wait::{ClusterPorts, ClusterReady, NodeConfigPorts, wait_for_cluster_ready}, + wait::{ + ClusterPorts, ClusterReady, NodeConfigPorts, PortForwardHandle, wait_for_cluster_ready, + }, }; #[derive(Default)] @@ -35,7 +37,7 @@ pub struct ClusterEnvironment { executor_api_ports: Vec, executor_testing_ports: Vec, prometheus_port: u16, - port_forwards: Vec, + port_forwards: Vec, } impl ClusterEnvironment { @@ -45,7 +47,7 @@ impl ClusterEnvironment { release: String, cleanup: RunnerCleanup, ports: &ClusterPorts, - port_forwards: Vec, + port_forwards: Vec, ) -> Self { let validator_api_ports = ports.validators.iter().map(|ports| ports.api).collect(); let validator_testing_ports = ports.validators.iter().map(|ports| ports.testing).collect(); @@ -80,7 +82,7 @@ impl ClusterEnvironment { } } - pub fn into_cleanup(self) -> (RunnerCleanup, Vec) { + pub fn into_cleanup(self) -> (RunnerCleanup, Vec) { ( self.cleanup.expect("cleanup guard should be available"), self.port_forwards, @@ -316,10 +318,9 @@ pub async fn wait_for_ports_or_cleanup( } } -pub fn kill_port_forwards(handles: &mut Vec) { +pub fn kill_port_forwards(handles: &mut Vec) { for handle in handles.iter_mut() { - let _ = handle.kill(); - let _ = handle.wait(); + handle.shutdown(); } handles.clear(); } diff --git a/testing-framework/runners/k8s/src/lifecycle/cleanup.rs b/testing-framework/runners/k8s/src/lifecycle/cleanup.rs index 6552fc9..5a2de39 100644 --- a/testing-framework/runners/k8s/src/lifecycle/cleanup.rs +++ b/testing-framework/runners/k8s/src/lifecycle/cleanup.rs @@ -183,11 +183,14 @@ async fn delete_namespace_via_cli(namespace: &str) -> bool { } async fn wait_for_namespace_termination(namespaces: &Api, namespace: &str) { - for attempt in 0..60 { + const NAMESPACE_TERMINATION_POLL_ATTEMPTS: u32 = 60; + const NAMESPACE_TERMINATION_POLL_INTERVAL: Duration = Duration::from_secs(1); + + for attempt in 0..NAMESPACE_TERMINATION_POLL_ATTEMPTS { if namespace_deleted(namespaces, namespace, attempt).await { return; } - sleep(Duration::from_secs(1)).await; + sleep(NAMESPACE_TERMINATION_POLL_INTERVAL).await; } warn!( diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs b/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs index b6620a2..e9da949 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs @@ -9,31 +9,67 @@ use anyhow::{Result as AnyhowResult, anyhow}; use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation}; +const PORT_FORWARD_READY_ATTEMPTS: u32 = 20; +const PORT_FORWARD_READY_POLL_INTERVAL: Duration = Duration::from_millis(250); + +pub struct PortForwardHandle { + child: Child, +} + +impl std::fmt::Debug for PortForwardHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PortForwardHandle").finish_non_exhaustive() + } +} + +impl PortForwardHandle { + pub fn shutdown(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + +impl Drop for PortForwardHandle { + fn drop(&mut self) { + self.shutdown(); + } +} + +pub struct PortForwardSpawn { + pub local_port: u16, + pub handle: PortForwardHandle, +} + pub fn port_forward_group( namespace: &str, release: &str, kind: &str, ports: &[NodeConfigPorts], allocations: &mut Vec, -) -> Result, ClusterWaitError> { +) -> Result, ClusterWaitError> { let mut forwards = Vec::new(); for (index, ports) in ports.iter().enumerate() { let service = format!("{release}-{kind}-{index}"); - let (api_port, api_forward) = match port_forward_service(namespace, &service, ports.api) { + let PortForwardSpawn { + local_port: api_port, + handle: api_forward, + } = match port_forward_service(namespace, &service, ports.api) { + Ok(forward) => forward, + Err(err) => { + kill_port_forwards(&mut forwards); + return Err(err); + } + }; + let PortForwardSpawn { + local_port: testing_port, + handle: testing_forward, + } = match port_forward_service(namespace, &service, ports.testing) { Ok(forward) => forward, Err(err) => { kill_port_forwards(&mut forwards); return Err(err); } }; - let (testing_port, testing_forward) = - match port_forward_service(namespace, &service, ports.testing) { - Ok(forward) => forward, - Err(err) => { - kill_port_forwards(&mut forwards); - return Err(err); - } - }; allocations.push(NodePortAllocation { api: api_port, testing: testing_port, @@ -48,7 +84,7 @@ pub fn port_forward_service( namespace: &str, service: &str, remote_port: u16, -) -> Result<(u16, Child), ClusterWaitError> { +) -> Result { let local_port = allocate_local_port().map_err(|source| ClusterWaitError::PortForward { service: service.to_owned(), port: remote_port, @@ -70,7 +106,7 @@ pub fn port_forward_service( source: source.into(), })?; - for _ in 0..20 { + for _ in 0..PORT_FORWARD_READY_ATTEMPTS { if let Ok(Some(status)) = child.try_wait() { return Err(ClusterWaitError::PortForward { service: service.to_owned(), @@ -79,9 +115,12 @@ pub fn port_forward_service( }); } if TcpStream::connect((Ipv4Addr::LOCALHOST, local_port)).is_ok() { - return Ok((local_port, child)); + return Ok(PortForwardSpawn { + local_port, + handle: PortForwardHandle { child }, + }); } - thread::sleep(Duration::from_millis(250)); + thread::sleep(PORT_FORWARD_READY_POLL_INTERVAL); } let _ = child.kill(); @@ -92,10 +131,9 @@ pub fn port_forward_service( }) } -pub fn kill_port_forwards(handles: &mut Vec) { +pub fn kill_port_forwards(handles: &mut Vec) { for handle in handles.iter_mut() { - let _ = handle.kill(); - let _ = handle.wait(); + handle.shutdown(); } handles.clear(); } diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs b/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs index 2f32865..89a9d77 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs @@ -19,6 +19,7 @@ mod orchestrator; mod ports; mod prometheus; +pub use forwarding::PortForwardHandle; pub use orchestrator::wait_for_cluster_ready; /// Container and host-side HTTP ports for a node in the Helm chart values. @@ -47,7 +48,7 @@ pub struct ClusterPorts { #[derive(Debug)] pub struct ClusterReady { pub ports: ClusterPorts, - pub port_forwards: Vec, + pub port_forwards: Vec, } #[derive(Debug, Error)] diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/orchestrator.rs b/testing-framework/runners/k8s/src/lifecycle/wait/orchestrator.rs index fb875dd..04c7420 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/orchestrator.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/orchestrator.rs @@ -7,7 +7,10 @@ use super::{ }; use crate::lifecycle::wait::{ deployment::wait_for_deployment_ready, - forwarding::{kill_port_forwards, port_forward_group, port_forward_service}, + forwarding::{ + PortForwardHandle, PortForwardSpawn, kill_port_forwards, port_forward_group, + port_forward_service, + }, http_probe::{wait_for_node_http_nodeport, wait_for_node_http_port_forward}, ports::{discover_node_ports, find_node_port}, prometheus::{wait_for_prometheus_http_nodeport, wait_for_prometheus_http_port_forward}, @@ -33,7 +36,7 @@ pub async fn wait_for_cluster_ready( validator_allocations.push(allocation); } - let mut port_forwards = Vec::new(); + let mut port_forwards: Vec = Vec::new(); let validator_api_ports: Vec = validator_allocations .iter() @@ -86,18 +89,14 @@ pub async fn wait_for_cluster_ready( &mut executor_allocations, ) { Ok(forwards) => port_forwards.extend(forwards), - Err(err) => { - kill_port_forwards(&mut port_forwards); - return Err(err); - } + Err(err) => return Err(cleanup_port_forwards(&mut port_forwards, err)), } let executor_api_ports: Vec = executor_allocations.iter().map(|ports| ports.api).collect(); if let Err(err) = wait_for_node_http_port_forward(&executor_api_ports, NodeRole::Executor).await { - kill_port_forwards(&mut port_forwards); - return Err(err); + return Err(cleanup_port_forwards(&mut port_forwards, err)); } } @@ -112,17 +111,16 @@ pub async fn wait_for_cluster_ready( .await .is_err() { - let (local_port, forward) = + let PortForwardSpawn { local_port, handle } = port_forward_service(namespace, PROMETHEUS_SERVICE_NAME, PROMETHEUS_HTTP_PORT) .map_err(|err| { kill_port_forwards(&mut port_forwards); err })?; prometheus_port = local_port; - port_forwards.push(forward); + port_forwards.push(handle); if let Err(err) = wait_for_prometheus_http_port_forward(prometheus_port).await { - kill_port_forwards(&mut port_forwards); - return Err(err); + return Err(cleanup_port_forwards(&mut port_forwards, err)); } } @@ -135,3 +133,11 @@ pub async fn wait_for_cluster_ready( port_forwards, }) } + +fn cleanup_port_forwards( + port_forwards: &mut Vec, + error: ClusterWaitError, +) -> ClusterWaitError { + kill_port_forwards(port_forwards); + error +} diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/ports.rs b/testing-framework/runners/k8s/src/lifecycle/wait/ports.rs index 0d44b1b..2e045df 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/ports.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/ports.rs @@ -4,14 +4,16 @@ use tokio::time::sleep; use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation}; +const NODE_PORT_LOOKUP_ATTEMPTS: u32 = 120; +const NODE_PORT_LOOKUP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); + pub async fn find_node_port( client: &Client, namespace: &str, service_name: &str, service_port: u16, ) -> Result { - let interval = std::time::Duration::from_secs(1); - for _ in 0..120 { + for _ in 0..NODE_PORT_LOOKUP_ATTEMPTS { match Api::::namespaced(client.clone(), namespace) .get(service_name) .await @@ -36,7 +38,7 @@ pub async fn find_node_port( }); } } - sleep(interval).await; + sleep(NODE_PORT_LOOKUP_INTERVAL).await; } Err(ClusterWaitError::NodePortUnavailable { diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/prometheus.rs b/testing-framework/runners/k8s/src/lifecycle/wait/prometheus.rs index 4719f00..328eab3 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/prometheus.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/prometheus.rs @@ -3,6 +3,8 @@ use tokio::time::sleep; use super::{ClusterWaitError, prometheus_http_timeout}; use crate::host::node_host; +const PROMETHEUS_HTTP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); + pub async fn wait_for_prometheus_http_nodeport( port: u16, timeout: std::time::Duration, @@ -23,13 +25,14 @@ async fn wait_for_prometheus_http( let client = reqwest::Client::new(); let url = format!("http://{host}:{port}/-/ready"); - for _ in 0..timeout.as_secs() { + let attempts = timeout.as_secs(); + for _ in 0..attempts { if let Ok(resp) = client.get(&url).send().await && resp.status().is_success() { return Ok(()); } - sleep(std::time::Duration::from_secs(1)).await; + sleep(PROMETHEUS_HTTP_POLL_INTERVAL).await; } Err(ClusterWaitError::PrometheusTimeout { port }) diff --git a/testing-framework/workflows/src/expectations/consensus_liveness.rs b/testing-framework/workflows/src/expectations/consensus_liveness.rs index 2ba1a03..27b4017 100644 --- a/testing-framework/workflows/src/expectations/consensus_liveness.rs +++ b/testing-framework/workflows/src/expectations/consensus_liveness.rs @@ -26,7 +26,7 @@ impl Default for ConsensusLiveness { const LAG_ALLOWANCE: u64 = 2; const MIN_PROGRESS_BLOCKS: u64 = 5; -const REQUEST_RETRIES: usize = 5; +const REQUEST_RETRIES: usize = 15; const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(2); const MAX_LAG_ALLOWANCE: u64 = 5;