From 7bcd0fac3d7f9ae4ce131f6376398da8cad37e04 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 08:58:09 +0100 Subject: [PATCH] Refine k8s attach readability --- .../k8s/src/deployer/attach_provider.rs | 153 +++++++++++------- .../k8s/src/deployer/orchestrator.rs | 28 ++-- 2 files changed, 110 insertions(+), 71 deletions(-) diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index 7063992..b10a24a 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -20,6 +20,8 @@ enum K8sAttachDiscoveryError { EmptyLabelSelector, #[error("no services matched label selector '{selector}' in namespace '{namespace}'")] NoMatchingServices { namespace: String, selector: String }, + #[error("k8s service has no metadata.name")] + MissingServiceName, #[error("service '{service}' has no TCP node ports exposed")] ServiceHasNoNodePorts { service: String }, #[error( @@ -39,6 +41,11 @@ pub(super) struct K8sAttachedClusterWait { _env: PhantomData, } +struct K8sAttachRequest<'a> { + namespace: &'a str, + label_selector: &'a str, +} + impl K8sAttachProvider { pub(super) fn new(client: Client) -> Self { Self { @@ -64,50 +71,15 @@ impl AttachProvider for K8sAttachProvider { &self, source: &AttachSource, ) -> Result>, AttachProviderError> { - let (namespace, label_selector) = match source { - AttachSource::K8s { - namespace, - label_selector, - } => (namespace, label_selector), - _ => { - return Err(AttachProviderError::UnsupportedSource { - attach_source: source.clone(), - }); - } - }; - - if label_selector.trim().is_empty() { - return Err(AttachProviderError::Discovery { - source: K8sAttachDiscoveryError::EmptyLabelSelector.into(), - }); - } - - let namespace = namespace.as_deref().unwrap_or("default"); - let services = discover_services(&self.client, namespace, label_selector) + let request = k8s_attach_request(source)?; + let services = discover_services(&self.client, request.namespace, request.label_selector) .await .map_err(to_discovery_error)?; let host = node_host(); let mut attached = Vec::with_capacity(services.items.len()); for service in services.items { - let service_name = - service - .metadata - .name - .clone() - .ok_or_else(|| AttachProviderError::Discovery { - source: "k8s service has no metadata.name".into(), - })?; - - let api_port = extract_api_node_port(&service).map_err(to_discovery_error)?; - let endpoint = format!("http://{host}:{api_port}/"); - let source = ExternalNodeSource::new(service_name.clone(), endpoint); - let client = E::external_node_client(&source).map_err(to_discovery_error)?; - - attached.push(AttachedNode { - identity_hint: Some(service_name), - client, - }); + attached.push(build_attached_node::(&host, service).map_err(to_discovery_error)?); } Ok(attached) @@ -118,6 +90,50 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } +fn k8s_attach_request(source: &AttachSource) -> Result, AttachProviderError> { + let AttachSource::K8s { + namespace, + label_selector, + } = source + else { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + }; + + if label_selector.trim().is_empty() { + return Err(AttachProviderError::Discovery { + source: K8sAttachDiscoveryError::EmptyLabelSelector.into(), + }); + } + + Ok(K8sAttachRequest { + namespace: namespace.as_deref().unwrap_or("default"), + label_selector, + }) +} + +fn build_attached_node( + host: &str, + service: Service, +) -> Result, DynError> { + let service_name = service + .metadata + .name + .clone() + .ok_or(K8sAttachDiscoveryError::MissingServiceName)?; + + let api_port = extract_api_node_port(&service)?; + let endpoint = format!("http://{host}:{api_port}/"); + let source = ExternalNodeSource::new(service_name.clone(), endpoint); + let client = E::external_node_client(&source)?; + + Ok(AttachedNode { + identity_hint: Some(service_name), + client, + }) +} + pub(super) async fn discover_services( client: &Client, namespace: &str, @@ -218,29 +234,11 @@ fn api_port_candidates(ports: Vec<(String, u16)>) -> Vec { #[async_trait] impl ClusterWaitHandle for K8sAttachedClusterWait { async fn wait_network_ready(&self) -> Result<(), DynError> { - let AttachSource::K8s { - namespace, - label_selector, - } = &self.source - else { - return Err("k8s cluster wait requires a k8s attach source".into()); - }; - - if label_selector.trim().is_empty() { - return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); - } - - let namespace = namespace.as_deref().unwrap_or("default"); - let services = discover_services(&self.client, namespace, label_selector).await?; + let request = k8s_wait_request(&self.source)?; + let services = + discover_services(&self.client, request.namespace, request.label_selector).await?; let host = node_host(); - let mut endpoints = Vec::with_capacity(services.items.len()); - - for service in &services.items { - let api_port = extract_api_node_port(service)?; - let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?; - endpoint.set_path(E::readiness_path()); - endpoints.push(endpoint); - } + let endpoints = collect_readiness_endpoints::(&host, &services.items)?; wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; @@ -248,6 +246,41 @@ impl ClusterWaitHandle for K8sAttachedClusterWait { } } +fn k8s_wait_request(source: &AttachSource) -> Result, DynError> { + let AttachSource::K8s { + namespace, + label_selector, + } = source + else { + return Err("k8s cluster wait requires a k8s attach source".into()); + }; + + if label_selector.trim().is_empty() { + return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); + } + + Ok(K8sAttachRequest { + namespace: namespace.as_deref().unwrap_or("default"), + label_selector, + }) +} + +fn collect_readiness_endpoints( + host: &str, + services: &[Service], +) -> Result, DynError> { + let mut endpoints = Vec::with_capacity(services.len()); + + for service in services { + let api_port = extract_api_node_port(service)?; + let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + Ok(endpoints) +} + #[cfg(test)] mod tests { use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}; diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 72539a3..eb39b33 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -195,6 +195,7 @@ where let mut cluster = Some(deployment.cluster); let mut runtime = build_runtime_artifacts::(&mut cluster, &observability).await?; + let cluster_wait = managed_cluster_wait::(&cluster, &metadata)?; let source_providers = source_providers::( client_from_cluster(&cluster)?, @@ -204,17 +205,7 @@ where runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&runtime.node_clients)?; - let parts = build_runner_parts( - scenario, - deployment.node_count, - runtime, - Arc::new(K8sAttachedClusterWait::::new( - client_from_cluster(&cluster)?, - metadata - .attach_source() - .map_err(|source| K8sRunnerError::SourceOrchestration { source })?, - )), - ); + let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); log_configured_observability(&observability); maybe_print_endpoints::(&observability, &parts.node_clients); @@ -298,6 +289,21 @@ where ))) } +fn managed_cluster_wait( + cluster: &Option, + metadata: &K8sDeploymentMetadata, +) -> Result>, K8sRunnerError> { + let client = client_from_cluster(cluster)?; + let attach_source = metadata + .attach_source() + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + + Ok(Arc::new(K8sAttachedClusterWait::::new( + client, + attach_source, + ))) +} + fn client_from_cluster(cluster: &Option) -> Result { let client = cluster .as_ref()