Refine k8s attach readability

This commit is contained in:
andrussal 2026-03-07 08:58:09 +01:00
parent 315286ec0d
commit 7bcd0fac3d
2 changed files with 110 additions and 71 deletions

View File

@ -20,6 +20,8 @@ enum K8sAttachDiscoveryError {
EmptyLabelSelector,
#[error("no services matched label selector '{selector}' in namespace '{namespace}'")]
NoMatchingServices { namespace: String, selector: String },
#[error("k8s service has no metadata.name")]
MissingServiceName,
#[error("service '{service}' has no TCP node ports exposed")]
ServiceHasNoNodePorts { service: String },
#[error(
@ -39,6 +41,11 @@ pub(super) struct K8sAttachedClusterWait<E: K8sDeployEnv> {
_env: PhantomData<E>,
}
struct K8sAttachRequest<'a> {
namespace: &'a str,
label_selector: &'a str,
}
impl<E: K8sDeployEnv> K8sAttachProvider<E> {
pub(super) fn new(client: Client) -> Self {
Self {
@ -64,50 +71,15 @@ impl<E: K8sDeployEnv> AttachProvider<E> for K8sAttachProvider<E> {
&self,
source: &AttachSource,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
let (namespace, label_selector) = match source {
AttachSource::K8s {
namespace,
label_selector,
} => (namespace, label_selector),
_ => {
return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(),
});
}
};
if label_selector.trim().is_empty() {
return Err(AttachProviderError::Discovery {
source: K8sAttachDiscoveryError::EmptyLabelSelector.into(),
});
}
let namespace = namespace.as_deref().unwrap_or("default");
let services = discover_services(&self.client, namespace, label_selector)
let request = k8s_attach_request(source)?;
let services = discover_services(&self.client, request.namespace, request.label_selector)
.await
.map_err(to_discovery_error)?;
let host = node_host();
let mut attached = Vec::with_capacity(services.items.len());
for service in services.items {
let service_name =
service
.metadata
.name
.clone()
.ok_or_else(|| AttachProviderError::Discovery {
source: "k8s service has no metadata.name".into(),
})?;
let api_port = extract_api_node_port(&service).map_err(to_discovery_error)?;
let endpoint = format!("http://{host}:{api_port}/");
let source = ExternalNodeSource::new(service_name.clone(), endpoint);
let client = E::external_node_client(&source).map_err(to_discovery_error)?;
attached.push(AttachedNode {
identity_hint: Some(service_name),
client,
});
attached.push(build_attached_node::<E>(&host, service).map_err(to_discovery_error)?);
}
Ok(attached)
@ -118,6 +90,50 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
fn k8s_attach_request(source: &AttachSource) -> Result<K8sAttachRequest<'_>, AttachProviderError> {
let AttachSource::K8s {
namespace,
label_selector,
} = source
else {
return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(),
});
};
if label_selector.trim().is_empty() {
return Err(AttachProviderError::Discovery {
source: K8sAttachDiscoveryError::EmptyLabelSelector.into(),
});
}
Ok(K8sAttachRequest {
namespace: namespace.as_deref().unwrap_or("default"),
label_selector,
})
}
fn build_attached_node<E: K8sDeployEnv>(
host: &str,
service: Service,
) -> Result<AttachedNode<E>, DynError> {
let service_name = service
.metadata
.name
.clone()
.ok_or(K8sAttachDiscoveryError::MissingServiceName)?;
let api_port = extract_api_node_port(&service)?;
let endpoint = format!("http://{host}:{api_port}/");
let source = ExternalNodeSource::new(service_name.clone(), endpoint);
let client = E::external_node_client(&source)?;
Ok(AttachedNode {
identity_hint: Some(service_name),
client,
})
}
pub(super) async fn discover_services(
client: &Client,
namespace: &str,
@ -218,29 +234,11 @@ fn api_port_candidates(ports: Vec<(String, u16)>) -> Vec<u16> {
#[async_trait]
impl<E: K8sDeployEnv> ClusterWaitHandle<E> for K8sAttachedClusterWait<E> {
async fn wait_network_ready(&self) -> Result<(), DynError> {
let AttachSource::K8s {
namespace,
label_selector,
} = &self.source
else {
return Err("k8s cluster wait requires a k8s attach source".into());
};
if label_selector.trim().is_empty() {
return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into());
}
let namespace = namespace.as_deref().unwrap_or("default");
let services = discover_services(&self.client, namespace, label_selector).await?;
let request = k8s_wait_request(&self.source)?;
let services =
discover_services(&self.client, request.namespace, request.label_selector).await?;
let host = node_host();
let mut endpoints = Vec::with_capacity(services.items.len());
for service in &services.items {
let api_port = extract_api_node_port(service)?;
let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
let endpoints = collect_readiness_endpoints::<E>(&host, &services.items)?;
wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?;
@ -248,6 +246,41 @@ impl<E: K8sDeployEnv> ClusterWaitHandle<E> for K8sAttachedClusterWait<E> {
}
}
fn k8s_wait_request(source: &AttachSource) -> Result<K8sAttachRequest<'_>, DynError> {
let AttachSource::K8s {
namespace,
label_selector,
} = source
else {
return Err("k8s cluster wait requires a k8s attach source".into());
};
if label_selector.trim().is_empty() {
return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into());
}
Ok(K8sAttachRequest {
namespace: namespace.as_deref().unwrap_or("default"),
label_selector,
})
}
fn collect_readiness_endpoints<E: K8sDeployEnv>(
host: &str,
services: &[Service],
) -> Result<Vec<Url>, DynError> {
let mut endpoints = Vec::with_capacity(services.len());
for service in services {
let api_port = extract_api_node_port(service)?;
let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
Ok(endpoints)
}
#[cfg(test)]
mod tests {
use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec};

View File

@ -195,6 +195,7 @@ where
let mut cluster = Some(deployment.cluster);
let mut runtime = build_runtime_artifacts::<E>(&mut cluster, &observability).await?;
let cluster_wait = managed_cluster_wait::<E>(&cluster, &metadata)?;
let source_providers = source_providers::<E>(
client_from_cluster(&cluster)?,
@ -204,17 +205,7 @@ where
runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?;
ensure_non_empty_node_clients(&runtime.node_clients)?;
let parts = build_runner_parts(
scenario,
deployment.node_count,
runtime,
Arc::new(K8sAttachedClusterWait::<E>::new(
client_from_cluster(&cluster)?,
metadata
.attach_source()
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?,
)),
);
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait);
log_configured_observability(&observability);
maybe_print_endpoints::<E>(&observability, &parts.node_clients);
@ -298,6 +289,21 @@ where
)))
}
fn managed_cluster_wait<E: K8sDeployEnv>(
cluster: &Option<ClusterEnvironment>,
metadata: &K8sDeploymentMetadata,
) -> Result<Arc<dyn ClusterWaitHandle<E>>, K8sRunnerError> {
let client = client_from_cluster(cluster)?;
let attach_source = metadata
.attach_source()
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(K8sAttachedClusterWait::<E>::new(
client,
attach_source,
)))
}
fn client_from_cluster(cluster: &Option<ClusterEnvironment>) -> Result<Client, K8sRunnerError> {
let client = cluster
.as_ref()