k8s: avoid blocking port-forwarding in async

This commit is contained in:
andrussal 2025-12-18 21:21:07 +01:00
parent 9392280127
commit 347d72950d
2 changed files with 43 additions and 17 deletions

View File

@ -97,6 +97,11 @@ pub enum ClusterWaitError {
#[source] #[source]
source: anyhow::Error, source: anyhow::Error,
}, },
#[error("port-forward task failed: {source}")]
PortForwardTask {
#[source]
source: anyhow::Error,
},
} }
static DEPLOYMENT_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| { static DEPLOYMENT_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {

View File

@ -42,13 +42,21 @@ pub async fn wait_for_cluster_ready(
{ {
validator_allocations.clear(); validator_allocations.clear();
validator_host = "127.0.0.1".to_owned(); validator_host = "127.0.0.1".to_owned();
port_forwards = port_forward_group( let namespace = namespace.to_owned();
namespace, let release = release.to_owned();
release, let ports = validator_ports.to_vec();
"validator", let (forwards, allocations) = tokio::task::spawn_blocking(move || {
validator_ports, let mut allocations = Vec::with_capacity(ports.len());
&mut validator_allocations, let forwards =
)?; port_forward_group(&namespace, &release, "validator", &ports, &mut allocations)?;
Ok::<_, ClusterWaitError>((forwards, allocations))
})
.await
.map_err(|source| ClusterWaitError::PortForwardTask {
source: source.into(),
})??;
port_forwards = forwards;
validator_allocations = allocations;
let validator_api_ports: Vec<u16> = validator_allocations let validator_api_ports: Vec<u16> = validator_allocations
.iter() .iter()
.map(|ports| ports.api) .map(|ports| ports.api)
@ -78,16 +86,29 @@ pub async fn wait_for_cluster_ready(
{ {
executor_allocations.clear(); executor_allocations.clear();
executor_host = "127.0.0.1".to_owned(); executor_host = "127.0.0.1".to_owned();
match port_forward_group( let namespace = namespace.to_owned();
namespace, let release = release.to_owned();
release, let ports = executor_ports.to_vec();
"executor", let (forwards, allocations) = match tokio::task::spawn_blocking(move || {
executor_ports, let mut allocations = Vec::with_capacity(ports.len());
&mut executor_allocations, let forwards =
) { port_forward_group(&namespace, &release, "executor", &ports, &mut allocations)?;
Ok(forwards) => port_forwards.extend(forwards), Ok::<_, ClusterWaitError>((forwards, allocations))
Err(err) => return Err(cleanup_port_forwards(&mut port_forwards, err)), })
} .await
{
Ok(result) => result?,
Err(source) => {
return Err(cleanup_port_forwards(
&mut port_forwards,
ClusterWaitError::PortForwardTask {
source: source.into(),
},
));
}
};
port_forwards.extend(forwards);
executor_allocations = allocations;
let executor_api_ports: Vec<u16> = let executor_api_ports: Vec<u16> =
executor_allocations.iter().map(|ports| ports.api).collect(); executor_allocations.iter().map(|ports| ports.api).collect();
if let Err(err) = if let Err(err) =