From 347d72950dbf5a13f8c634e2d47c57b06517630e Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 18 Dec 2025 21:21:07 +0100 Subject: [PATCH] k8s: avoid blocking port-forwarding in async --- .../deployers/k8s/src/lifecycle/wait/mod.rs | 5 ++ .../k8s/src/lifecycle/wait/orchestrator.rs | 55 +++++++++++++------ 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs index 53353f6..3b7edda 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs @@ -97,6 +97,11 @@ pub enum ClusterWaitError { #[source] source: anyhow::Error, }, + #[error("port-forward task failed: {source}")] + PortForwardTask { + #[source] + source: anyhow::Error, + }, } static DEPLOYMENT_TIMEOUT: LazyLock = LazyLock::new(|| { diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs index 3db58e0..02fc01c 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs @@ -42,13 +42,21 @@ pub async fn wait_for_cluster_ready( { validator_allocations.clear(); validator_host = "127.0.0.1".to_owned(); - port_forwards = port_forward_group( - namespace, - release, - "validator", - validator_ports, - &mut validator_allocations, - )?; + let namespace = namespace.to_owned(); + let release = release.to_owned(); + let ports = validator_ports.to_vec(); + let (forwards, allocations) = tokio::task::spawn_blocking(move || { + let mut allocations = Vec::with_capacity(ports.len()); + let forwards = + port_forward_group(&namespace, &release, "validator", &ports, &mut allocations)?; + Ok::<_, ClusterWaitError>((forwards, allocations)) + }) + .await + .map_err(|source| ClusterWaitError::PortForwardTask { + source: source.into(), + })??; + port_forwards = forwards; + validator_allocations = allocations; let validator_api_ports: Vec = validator_allocations .iter() .map(|ports| ports.api) @@ -78,16 +86,29 @@ pub async fn wait_for_cluster_ready( { executor_allocations.clear(); executor_host = "127.0.0.1".to_owned(); - match port_forward_group( - namespace, - release, - "executor", - executor_ports, - &mut executor_allocations, - ) { - Ok(forwards) => port_forwards.extend(forwards), - Err(err) => return Err(cleanup_port_forwards(&mut port_forwards, err)), - } + let namespace = namespace.to_owned(); + let release = release.to_owned(); + let ports = executor_ports.to_vec(); + let (forwards, allocations) = match tokio::task::spawn_blocking(move || { + let mut allocations = Vec::with_capacity(ports.len()); + let forwards = + port_forward_group(&namespace, &release, "executor", &ports, &mut allocations)?; + Ok::<_, ClusterWaitError>((forwards, allocations)) + }) + .await + { + Ok(result) => result?, + Err(source) => { + return Err(cleanup_port_forwards( + &mut port_forwards, + ClusterWaitError::PortForwardTask { + source: source.into(), + }, + )); + } + }; + port_forwards.extend(forwards); + executor_allocations = allocations; let executor_api_ports: Vec = executor_allocations.iter().map(|ports| ports.api).collect(); if let Err(err) =