use std::marker::PhantomData; use async_trait::async_trait; use k8s_openapi::api::core::v1::Service; use kube::{ Api, Client, api::{ListParams, ObjectList}, }; use testing_framework_core::scenario::{ ClusterWaitHandle, DynError, ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, internal::{AttachProvider, AttachProviderError, AttachedNode}, wait_http_readiness, }; use url::Url; use crate::{env::K8sDeployEnv, host::node_host}; #[derive(Debug, thiserror::Error)] enum K8sAttachDiscoveryError { #[error("k8s attach source requires a non-empty label selector")] EmptyLabelSelector, #[error("no services matched label selector '{selector}' in namespace '{namespace}'")] NoMatchingServices { namespace: String, selector: String }, #[error("k8s service has no metadata.name")] MissingServiceName, #[error("service '{service}' has no TCP node ports exposed")] ServiceHasNoNodePorts { service: String }, #[error( "service '{service}' has multiple candidate API node ports ({ports}); explicit API port required" )] ServiceHasMultipleNodePorts { service: String, ports: String }, } pub(super) struct K8sAttachProvider { client: Client, _env: PhantomData, } pub(super) struct K8sAttachedClusterWait { client: Client, source: ExistingCluster, _env: PhantomData, } struct K8sAttachRequest<'a> { namespace: &'a str, label_selector: &'a str, } impl K8sAttachProvider { pub(super) fn new(client: Client) -> Self { Self { client, _env: PhantomData, } } } impl K8sAttachedClusterWait { pub(super) fn try_new(client: Client, source: &ExistingCluster) -> Result { let _ = k8s_wait_request(source)?; Ok(Self { client, source: source.clone(), _env: PhantomData, }) } } #[async_trait] impl AttachProvider for K8sAttachProvider { async fn discover( &self, source: &ExistingCluster, ) -> Result>, AttachProviderError> { let request = k8s_attach_request(source)?; let services = discover_services(&self.client, request.namespace, request.label_selector) .await .map_err(to_discovery_error)?; let host = node_host(); let mut attached = Vec::with_capacity(services.items.len()); for service in services.items { attached.push(build_attached_node::(&host, service).map_err(to_discovery_error)?); } Ok(attached) } } fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } fn k8s_attach_request( source: &ExistingCluster, ) -> Result, AttachProviderError> { let Some(label_selector) = source.k8s_label_selector() else { return Err(AttachProviderError::UnsupportedSource { attach_source: source.clone(), }); }; if label_selector.trim().is_empty() { return Err(AttachProviderError::Discovery { source: K8sAttachDiscoveryError::EmptyLabelSelector.into(), }); } Ok(K8sAttachRequest { namespace: source.k8s_namespace().unwrap_or("default"), label_selector, }) } fn build_attached_node( host: &str, service: Service, ) -> Result, DynError> { let service_name = service .metadata .name .clone() .ok_or(K8sAttachDiscoveryError::MissingServiceName)?; let api_port = extract_api_node_port(&service)?; let endpoint = format!("http://{host}:{api_port}/"); let source = ExternalNodeSource::new(service_name.clone(), endpoint); let client = E::external_node_client(&source)?; Ok(AttachedNode { identity_hint: Some(service_name), client, }) } pub(super) async fn discover_services( client: &Client, namespace: &str, selector: &str, ) -> Result, DynError> { let services: Api = Api::namespaced(client.clone(), namespace); let params = ListParams::default().labels(selector); let services = services.list(¶ms).await?; let services = filter_services_with_tcp_node_ports(services); if services.items.is_empty() { return Err(K8sAttachDiscoveryError::NoMatchingServices { namespace: namespace.to_owned(), selector: selector.to_owned(), } .into()); } Ok(services) } fn filter_services_with_tcp_node_ports(services: ObjectList) -> ObjectList { ObjectList { items: services .items .into_iter() .filter(|service| !tcp_node_ports(service).is_empty()) .collect(), metadata: services.metadata, } } fn tcp_node_ports(service: &Service) -> Vec<(String, u16)> { service .spec .as_ref() .into_iter() .flat_map(|spec| spec.ports.as_ref()) .flat_map(|ports| ports.iter()) .filter_map(|port| { let node_port = port.node_port.and_then(|value| u16::try_from(value).ok())?; let protocol = port.protocol.as_deref().unwrap_or("TCP"); if protocol != "TCP" { return None; } Some((port.name.clone().unwrap_or_default(), node_port)) }) .collect() } pub(super) fn extract_api_node_port(service: &Service) -> Result { let service_name = service .metadata .name .clone() .unwrap_or_else(|| "".to_owned()); let ports = api_port_candidates(tcp_node_ports(service)); match ports.as_slice() { [] => Err(K8sAttachDiscoveryError::ServiceHasNoNodePorts { service: service_name, } .into()), [port] => Ok(*port), _ => Err(K8sAttachDiscoveryError::ServiceHasMultipleNodePorts { service: service_name, ports: ports .iter() .map(ToString::to_string) .collect::>() .join(", "), } .into()), } } fn api_port_candidates(ports: Vec<(String, u16)>) -> Vec { let explicit_api: Vec = ports .iter() .filter_map(|(name, port)| (name == "http" || name == "api").then_some(*port)) .collect(); if !explicit_api.is_empty() { return explicit_api; } let non_testing: Vec = ports .iter() .filter_map(|(name, port)| (!name.contains("testing")).then_some(*port)) .collect(); if !non_testing.is_empty() { return non_testing; } ports.into_iter().map(|(_, port)| port).collect() } #[async_trait] impl ClusterWaitHandle for K8sAttachedClusterWait { async fn wait_network_ready(&self) -> Result<(), DynError> { let request = k8s_wait_request(&self.source)?; let services = discover_services(&self.client, request.namespace, request.label_selector).await?; let host = node_host(); let endpoints = collect_readiness_endpoints::(&host, &services.items)?; wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; Ok(()) } } fn k8s_wait_request(source: &ExistingCluster) -> Result, DynError> { let label_selector = source.k8s_label_selector().ok_or_else(|| { DynError::from("k8s cluster wait requires a k8s existing-cluster descriptor") })?; if label_selector.trim().is_empty() { return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); } Ok(K8sAttachRequest { namespace: source.k8s_namespace().unwrap_or("default"), label_selector, }) } fn collect_readiness_endpoints( host: &str, services: &[Service], ) -> Result, DynError> { let mut endpoints = Vec::with_capacity(services.len()); for service in services { let api_port = extract_api_node_port(service)?; let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?; endpoint.set_path(E::readiness_path()); endpoints.push(endpoint); } Ok(endpoints) } #[cfg(test)] mod tests { use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}; use super::extract_api_node_port; #[test] fn extract_api_node_port_returns_single_port() { let service = Service { metadata: Default::default(), spec: Some(ServiceSpec { ports: Some(vec![ServicePort { node_port: Some(31234), ..Default::default() }]), ..Default::default() }), ..Default::default() }; let port = extract_api_node_port(&service).expect("single port should resolve"); assert_eq!(port, 31234); } #[test] fn extract_api_node_port_prefers_http_name() { let service = Service { metadata: Default::default(), spec: Some(ServiceSpec { ports: Some(vec![ ServicePort { name: Some("testing-http".to_owned()), node_port: Some(31234), ..Default::default() }, ServicePort { name: Some("http".to_owned()), node_port: Some(31235), ..Default::default() }, ]), ..Default::default() }), ..Default::default() }; let port = extract_api_node_port(&service).expect("http-named port should resolve"); assert_eq!(port, 31235); } }