refactor: explicit retry policies + RAII port-forwards

This commit is contained in:
andrussal 2025-12-15 23:13:38 +01:00
parent 6e619d4c03
commit 26dfa1b795
17 changed files with 166 additions and 122 deletions

1
Cargo.lock generated
View File

@ -6015,6 +6015,7 @@ dependencies = [
name = "runner-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",

View File

@ -10,6 +10,7 @@ repository.workspace = true
version = "0.1.0"
[dependencies]
anyhow = "1"
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }

View File

@ -1,6 +1,7 @@
use std::{env, error::Error, process, str::FromStr, time::Duration};
use std::{env, process, time::Duration};
use runner_examples::{ChaosBuilderExt as _, ScenarioBuilderExt as _};
use anyhow::{Context as _, Result};
use runner_examples::{ChaosBuilderExt as _, ScenarioBuilderExt as _, read_env_any};
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
use testing_framework_runner_compose::{ComposeDeployer, ComposeRunnerError};
use tracing::{info, warn};
@ -16,6 +17,7 @@ const TRANSACTION_WALLETS: usize = 500;
const CHAOS_MIN_DELAY_SECS: u64 = 120;
const CHAOS_MAX_DELAY_SECS: u64 = 180;
const CHAOS_COOLDOWN_SECS: u64 = 240;
const CHAOS_DELAY_HEADROOM_SECS: u64 = 1;
// DA Testing Constants
const DA_CHANNEL_RATE: u64 = 1;
@ -66,7 +68,7 @@ async fn run_compose_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<(), Box<dyn Error>> {
) -> Result<()> {
info!(
validators,
executors,
@ -74,6 +76,11 @@ async fn run_compose_case(
"building scenario plan"
);
let chaos_min_delay = Duration::from_secs(CHAOS_MIN_DELAY_SECS)
.max(run_duration + Duration::from_secs(CHAOS_DELAY_HEADROOM_SECS));
let chaos_max_delay =
Duration::from_secs(CHAOS_MAX_DELAY_SECS).max(chaos_min_delay);
let mut plan = ScenarioBuilder::topology_with(|t| {
t.network_star()
.validators(validators)
@ -83,8 +90,8 @@ async fn run_compose_case(
.chaos_with(|c| {
c.restart()
// Keep chaos restarts outside the test run window to avoid crash loops on restart.
.min_delay(Duration::from_secs(CHAOS_MIN_DELAY_SECS))
.max_delay(Duration::from_secs(CHAOS_MAX_DELAY_SECS))
.min_delay(chaos_min_delay)
.max_delay(chaos_max_delay)
.target_cooldown(Duration::from_secs(CHAOS_COOLDOWN_SECS))
.apply()
})
@ -110,7 +117,7 @@ async fn run_compose_case(
warn!("Docker is unavailable; cannot run compose demo");
return Ok(());
}
Err(err) => return Err(err.into()),
Err(err) => return Err(anyhow::Error::new(err)).context("deploying compose stack failed"),
};
if !runner.context().telemetry().is_configured() {
@ -118,14 +125,9 @@ async fn run_compose_case(
}
info!("running scenario");
runner.run(&mut plan).await.map(|_| ()).map_err(Into::into)
}
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
runner
.run(&mut plan)
.await
.context("running compose scenario failed")?;
Ok(())
}

View File

@ -1,6 +1,7 @@
use std::{env, error::Error, process, str::FromStr, time::Duration};
use std::{process, time::Duration};
use runner_examples::ScenarioBuilderExt as _;
use anyhow::{Context as _, Result, ensure};
use runner_examples::{ScenarioBuilderExt as _, read_env_any};
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
use testing_framework_runner_k8s::{K8sDeployer, K8sRunnerError};
use tracing::{info, warn};
@ -43,7 +44,7 @@ async fn run_k8s_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<(), Box<dyn Error>> {
) -> Result<()> {
info!(
validators,
executors,
@ -76,7 +77,7 @@ async fn run_k8s_case(
warn!("Kubernetes cluster unavailable ({source}); skipping");
return Ok(());
}
Err(err) => return Err(err.into()),
Err(err) => return Err(anyhow::Error::new(err)).context("deploying k8s stack failed"),
};
if !runner.context().telemetry().is_configured() {
@ -91,20 +92,18 @@ async fn run_k8s_case(
let handle = runner
.run(&mut plan)
.await
.map_err(|err| format!("k8s scenario failed: {err}"))?;
.context("running k8s scenario failed")?;
for (idx, client) in validator_clients.iter().enumerate() {
let info = client
.consensus_info()
.await
.map_err(|err| format!("validator {idx} consensus_info failed: {err}"))?;
if info.height < MIN_CONSENSUS_HEIGHT {
return Err(format!(
"validator {idx} height {} should reach at least {MIN_CONSENSUS_HEIGHT} blocks",
info.height
)
.into());
}
.with_context(|| format!("validator {idx} consensus_info failed"))?;
ensure!(
info.height >= MIN_CONSENSUS_HEIGHT,
"validator {idx} height {} should reach at least {MIN_CONSENSUS_HEIGHT} blocks",
info.height
);
}
// Explicitly drop after checks, allowing cleanup to proceed.
@ -112,12 +111,3 @@ async fn run_k8s_case(
Ok(())
}
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
}

View File

@ -1,6 +1,7 @@
use std::{env, error::Error, process, str::FromStr, time::Duration};
use std::{env, process, time::Duration};
use runner_examples::ScenarioBuilderExt as _;
use anyhow::{Context as _, Result};
use runner_examples::{ScenarioBuilderExt as _, read_env_any};
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
use testing_framework_runner_local::LocalDeployer;
use tracing::{info, warn};
@ -46,11 +47,7 @@ async fn main() {
}
}
async fn run_local_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<(), Box<dyn Error>> {
async fn run_local_case(validators: usize, executors: usize, run_duration: Duration) -> Result<()> {
info!(
validators,
executors,
@ -71,20 +68,17 @@ async fn run_local_case(
let deployer = LocalDeployer::default().with_membership_check(true);
info!("deploying local nodes");
let runner: Runner = deployer.deploy(&plan).await?;
let runner: Runner = deployer
.deploy(&plan)
.await
.context("deploying local nodes failed")?;
info!("running scenario");
runner.run(&mut plan).await.map(|_| ())?;
runner
.run(&mut plan)
.await
.context("running local scenario failed")?;
info!("scenario complete");
Ok(())
}
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
}

10
examples/src/env.rs Normal file
View File

@ -0,0 +1,10 @@
use std::{env, str::FromStr};
pub fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
}

View File

@ -4,6 +4,10 @@ pub use testing_framework_workflows::{
expectations, util, workloads,
};
pub mod env;
pub use env::read_env_any;
/// Metrics are currently disabled in this branch; return a stub handle.
#[must_use]
pub const fn configure_prometheus_metrics() -> Metrics {

View File

@ -55,6 +55,8 @@ impl NodeDescriptor {
) -> Self {
let mut environment = base_environment(cfgsync_port);
let identifier = kind.instance_name(index);
let api_port = node.general.api_config.address.port();
let testing_port = node.general.api_config.testing_http_address.port();
environment.extend([
EnvEntry::new(
"CFG_NETWORK_PORT",
@ -62,28 +64,14 @@ impl NodeDescriptor {
),
EnvEntry::new("CFG_DA_PORT", node.da_port.to_string()),
EnvEntry::new("CFG_BLEND_PORT", node.blend_port.to_string()),
EnvEntry::new(
"CFG_API_PORT",
node.general.api_config.address.port().to_string(),
),
EnvEntry::new(
"CFG_TESTING_HTTP_PORT",
node.general
.api_config
.testing_http_address
.port()
.to_string(),
),
EnvEntry::new("CFG_API_PORT", api_port.to_string()),
EnvEntry::new("CFG_TESTING_HTTP_PORT", testing_port.to_string()),
EnvEntry::new("CFG_HOST_IDENTIFIER", identifier),
]);
let ports = vec![
node.general.api_config.address.port().to_string(),
node.general
.api_config
.testing_http_address
.port()
.to_string(),
format!("127.0.0.1:{api_port}:{api_port}"),
format!("127.0.0.1:{testing_port}:{testing_port}"),
];
Self {

View File

@ -18,7 +18,7 @@ use crate::{
helm::HelmError,
},
lifecycle::{block_feed::spawn_block_feed_with, cleanup::RunnerCleanup},
wait::ClusterWaitError,
wait::{ClusterWaitError, PortForwardHandle},
};
/// Deploys a scenario into Kubernetes using Helm charts and port-forwards.
@ -285,14 +285,14 @@ async fn setup_cluster(
struct K8sCleanupGuard {
cleanup: RunnerCleanup,
block_feed: Option<BlockFeedTask>,
port_forwards: Vec<std::process::Child>,
port_forwards: Vec<PortForwardHandle>,
}
impl K8sCleanupGuard {
const fn new(
cleanup: RunnerCleanup,
block_feed: BlockFeedTask,
port_forwards: Vec<std::process::Child>,
port_forwards: Vec<PortForwardHandle>,
) -> Self {
Self {
cleanup,

View File

@ -15,7 +15,9 @@ use crate::{
host::node_host,
infrastructure::assets::RunnerAssets,
lifecycle::{cleanup::RunnerCleanup, logs::dump_namespace_logs},
wait::{ClusterPorts, ClusterReady, NodeConfigPorts, wait_for_cluster_ready},
wait::{
ClusterPorts, ClusterReady, NodeConfigPorts, PortForwardHandle, wait_for_cluster_ready,
},
};
#[derive(Default)]
@ -35,7 +37,7 @@ pub struct ClusterEnvironment {
executor_api_ports: Vec<u16>,
executor_testing_ports: Vec<u16>,
prometheus_port: u16,
port_forwards: Vec<std::process::Child>,
port_forwards: Vec<PortForwardHandle>,
}
impl ClusterEnvironment {
@ -45,7 +47,7 @@ impl ClusterEnvironment {
release: String,
cleanup: RunnerCleanup,
ports: &ClusterPorts,
port_forwards: Vec<std::process::Child>,
port_forwards: Vec<PortForwardHandle>,
) -> Self {
let validator_api_ports = ports.validators.iter().map(|ports| ports.api).collect();
let validator_testing_ports = ports.validators.iter().map(|ports| ports.testing).collect();
@ -80,7 +82,7 @@ impl ClusterEnvironment {
}
}
pub fn into_cleanup(self) -> (RunnerCleanup, Vec<std::process::Child>) {
pub fn into_cleanup(self) -> (RunnerCleanup, Vec<PortForwardHandle>) {
(
self.cleanup.expect("cleanup guard should be available"),
self.port_forwards,
@ -316,10 +318,9 @@ pub async fn wait_for_ports_or_cleanup(
}
}
pub fn kill_port_forwards(handles: &mut Vec<std::process::Child>) {
pub fn kill_port_forwards(handles: &mut Vec<PortForwardHandle>) {
for handle in handles.iter_mut() {
let _ = handle.kill();
let _ = handle.wait();
handle.shutdown();
}
handles.clear();
}

View File

@ -183,11 +183,14 @@ async fn delete_namespace_via_cli(namespace: &str) -> bool {
}
async fn wait_for_namespace_termination(namespaces: &Api<Namespace>, namespace: &str) {
for attempt in 0..60 {
const NAMESPACE_TERMINATION_POLL_ATTEMPTS: u32 = 60;
const NAMESPACE_TERMINATION_POLL_INTERVAL: Duration = Duration::from_secs(1);
for attempt in 0..NAMESPACE_TERMINATION_POLL_ATTEMPTS {
if namespace_deleted(namespaces, namespace, attempt).await {
return;
}
sleep(Duration::from_secs(1)).await;
sleep(NAMESPACE_TERMINATION_POLL_INTERVAL).await;
}
warn!(

View File

@ -9,31 +9,67 @@ use anyhow::{Result as AnyhowResult, anyhow};
use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation};
const PORT_FORWARD_READY_ATTEMPTS: u32 = 20;
const PORT_FORWARD_READY_POLL_INTERVAL: Duration = Duration::from_millis(250);
pub struct PortForwardHandle {
child: Child,
}
impl std::fmt::Debug for PortForwardHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PortForwardHandle").finish_non_exhaustive()
}
}
impl PortForwardHandle {
pub fn shutdown(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
impl Drop for PortForwardHandle {
fn drop(&mut self) {
self.shutdown();
}
}
pub struct PortForwardSpawn {
pub local_port: u16,
pub handle: PortForwardHandle,
}
pub fn port_forward_group(
namespace: &str,
release: &str,
kind: &str,
ports: &[NodeConfigPorts],
allocations: &mut Vec<NodePortAllocation>,
) -> Result<Vec<Child>, ClusterWaitError> {
) -> Result<Vec<PortForwardHandle>, ClusterWaitError> {
let mut forwards = Vec::new();
for (index, ports) in ports.iter().enumerate() {
let service = format!("{release}-{kind}-{index}");
let (api_port, api_forward) = match port_forward_service(namespace, &service, ports.api) {
let PortForwardSpawn {
local_port: api_port,
handle: api_forward,
} = match port_forward_service(namespace, &service, ports.api) {
Ok(forward) => forward,
Err(err) => {
kill_port_forwards(&mut forwards);
return Err(err);
}
};
let PortForwardSpawn {
local_port: testing_port,
handle: testing_forward,
} = match port_forward_service(namespace, &service, ports.testing) {
Ok(forward) => forward,
Err(err) => {
kill_port_forwards(&mut forwards);
return Err(err);
}
};
let (testing_port, testing_forward) =
match port_forward_service(namespace, &service, ports.testing) {
Ok(forward) => forward,
Err(err) => {
kill_port_forwards(&mut forwards);
return Err(err);
}
};
allocations.push(NodePortAllocation {
api: api_port,
testing: testing_port,
@ -48,7 +84,7 @@ pub fn port_forward_service(
namespace: &str,
service: &str,
remote_port: u16,
) -> Result<(u16, Child), ClusterWaitError> {
) -> Result<PortForwardSpawn, ClusterWaitError> {
let local_port = allocate_local_port().map_err(|source| ClusterWaitError::PortForward {
service: service.to_owned(),
port: remote_port,
@ -70,7 +106,7 @@ pub fn port_forward_service(
source: source.into(),
})?;
for _ in 0..20 {
for _ in 0..PORT_FORWARD_READY_ATTEMPTS {
if let Ok(Some(status)) = child.try_wait() {
return Err(ClusterWaitError::PortForward {
service: service.to_owned(),
@ -79,9 +115,12 @@ pub fn port_forward_service(
});
}
if TcpStream::connect((Ipv4Addr::LOCALHOST, local_port)).is_ok() {
return Ok((local_port, child));
return Ok(PortForwardSpawn {
local_port,
handle: PortForwardHandle { child },
});
}
thread::sleep(Duration::from_millis(250));
thread::sleep(PORT_FORWARD_READY_POLL_INTERVAL);
}
let _ = child.kill();
@ -92,10 +131,9 @@ pub fn port_forward_service(
})
}
pub fn kill_port_forwards(handles: &mut Vec<Child>) {
pub fn kill_port_forwards(handles: &mut Vec<PortForwardHandle>) {
for handle in handles.iter_mut() {
let _ = handle.kill();
let _ = handle.wait();
handle.shutdown();
}
handles.clear();
}

View File

@ -19,6 +19,7 @@ mod orchestrator;
mod ports;
mod prometheus;
pub use forwarding::PortForwardHandle;
pub use orchestrator::wait_for_cluster_ready;
/// Container and host-side HTTP ports for a node in the Helm chart values.
@ -47,7 +48,7 @@ pub struct ClusterPorts {
#[derive(Debug)]
pub struct ClusterReady {
pub ports: ClusterPorts,
pub port_forwards: Vec<std::process::Child>,
pub port_forwards: Vec<PortForwardHandle>,
}
#[derive(Debug, Error)]

View File

@ -7,7 +7,10 @@ use super::{
};
use crate::lifecycle::wait::{
deployment::wait_for_deployment_ready,
forwarding::{kill_port_forwards, port_forward_group, port_forward_service},
forwarding::{
PortForwardHandle, PortForwardSpawn, kill_port_forwards, port_forward_group,
port_forward_service,
},
http_probe::{wait_for_node_http_nodeport, wait_for_node_http_port_forward},
ports::{discover_node_ports, find_node_port},
prometheus::{wait_for_prometheus_http_nodeport, wait_for_prometheus_http_port_forward},
@ -33,7 +36,7 @@ pub async fn wait_for_cluster_ready(
validator_allocations.push(allocation);
}
let mut port_forwards = Vec::new();
let mut port_forwards: Vec<PortForwardHandle> = Vec::new();
let validator_api_ports: Vec<u16> = validator_allocations
.iter()
@ -86,18 +89,14 @@ pub async fn wait_for_cluster_ready(
&mut executor_allocations,
) {
Ok(forwards) => port_forwards.extend(forwards),
Err(err) => {
kill_port_forwards(&mut port_forwards);
return Err(err);
}
Err(err) => return Err(cleanup_port_forwards(&mut port_forwards, err)),
}
let executor_api_ports: Vec<u16> =
executor_allocations.iter().map(|ports| ports.api).collect();
if let Err(err) =
wait_for_node_http_port_forward(&executor_api_ports, NodeRole::Executor).await
{
kill_port_forwards(&mut port_forwards);
return Err(err);
return Err(cleanup_port_forwards(&mut port_forwards, err));
}
}
@ -112,17 +111,16 @@ pub async fn wait_for_cluster_ready(
.await
.is_err()
{
let (local_port, forward) =
let PortForwardSpawn { local_port, handle } =
port_forward_service(namespace, PROMETHEUS_SERVICE_NAME, PROMETHEUS_HTTP_PORT)
.map_err(|err| {
kill_port_forwards(&mut port_forwards);
err
})?;
prometheus_port = local_port;
port_forwards.push(forward);
port_forwards.push(handle);
if let Err(err) = wait_for_prometheus_http_port_forward(prometheus_port).await {
kill_port_forwards(&mut port_forwards);
return Err(err);
return Err(cleanup_port_forwards(&mut port_forwards, err));
}
}
@ -135,3 +133,11 @@ pub async fn wait_for_cluster_ready(
port_forwards,
})
}
fn cleanup_port_forwards(
port_forwards: &mut Vec<PortForwardHandle>,
error: ClusterWaitError,
) -> ClusterWaitError {
kill_port_forwards(port_forwards);
error
}

View File

@ -4,14 +4,16 @@ use tokio::time::sleep;
use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation};
const NODE_PORT_LOOKUP_ATTEMPTS: u32 = 120;
const NODE_PORT_LOOKUP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
pub async fn find_node_port(
client: &Client,
namespace: &str,
service_name: &str,
service_port: u16,
) -> Result<u16, ClusterWaitError> {
let interval = std::time::Duration::from_secs(1);
for _ in 0..120 {
for _ in 0..NODE_PORT_LOOKUP_ATTEMPTS {
match Api::<Service>::namespaced(client.clone(), namespace)
.get(service_name)
.await
@ -36,7 +38,7 @@ pub async fn find_node_port(
});
}
}
sleep(interval).await;
sleep(NODE_PORT_LOOKUP_INTERVAL).await;
}
Err(ClusterWaitError::NodePortUnavailable {

View File

@ -3,6 +3,8 @@ use tokio::time::sleep;
use super::{ClusterWaitError, prometheus_http_timeout};
use crate::host::node_host;
const PROMETHEUS_HTTP_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
pub async fn wait_for_prometheus_http_nodeport(
port: u16,
timeout: std::time::Duration,
@ -23,13 +25,14 @@ async fn wait_for_prometheus_http(
let client = reqwest::Client::new();
let url = format!("http://{host}:{port}/-/ready");
for _ in 0..timeout.as_secs() {
let attempts = timeout.as_secs();
for _ in 0..attempts {
if let Ok(resp) = client.get(&url).send().await
&& resp.status().is_success()
{
return Ok(());
}
sleep(std::time::Duration::from_secs(1)).await;
sleep(PROMETHEUS_HTTP_POLL_INTERVAL).await;
}
Err(ClusterWaitError::PrometheusTimeout { port })

View File

@ -26,7 +26,7 @@ impl Default for ConsensusLiveness {
const LAG_ALLOWANCE: u64 = 2;
const MIN_PROGRESS_BLOCKS: u64 = 5;
const REQUEST_RETRIES: usize = 5;
const REQUEST_RETRIES: usize = 15;
const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(2);
const MAX_LAG_ALLOWANCE: u64 = 5;