diff --git a/logos/examples/tests/k8s_attach_node_control.rs b/logos/examples/tests/k8s_attach_node_control.rs new file mode 100644 index 0000000..8e8e34d --- /dev/null +++ b/logos/examples/tests/k8s_attach_node_control.rs @@ -0,0 +1,65 @@ +use std::time::{Duration, Instant}; + +use anyhow::{Error, Result, anyhow}; +use lb_ext::{CoreBuilderExt as _, LbcExtEnv, LbcK8sDeployer, ScenarioBuilder}; +use lb_framework::NodeHttpClient; +use testing_framework_core::scenario::{Deployer as _, Runner}; +use testing_framework_runner_k8s::{K8sDeploymentMetadata, K8sRunnerError}; +use tokio::time::sleep; + +#[tokio::test] +#[ignore = "requires k8s cluster access and mutates k8s runtime state"] +async fn k8s_attach_mode_queries_node_api_opt_in() -> Result<()> { + let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .with_run_duration(Duration::from_secs(5)) + .build()?; + + let deployer = LbcK8sDeployer::default(); + let (_managed_runner, metadata): (Runner, K8sDeploymentMetadata) = + match deployer.deploy_with_metadata(&managed).await { + Ok(result) => result, + Err(K8sRunnerError::ClientInit { .. }) => return Ok(()), + Err(error) => return Err(Error::new(error)), + }; + + let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; + let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .with_run_duration(Duration::from_secs(5)) + .with_attach_source(attach_source) + .build()?; + + let attached_runner: Runner = match deployer.deploy(&attached).await { + Ok(runner) => runner, + Err(K8sRunnerError::ClientInit { .. }) => return Ok(()), + Err(error) => return Err(Error::new(error)), + }; + + if attached_runner.context().node_clients().is_empty() { + return Err(anyhow!("k8s attach resolved no node clients")); + } + + for node_client in attached_runner.context().node_clients().snapshot() { + wait_until_node_api_ready(&node_client, Duration::from_secs(20)).await?; + } + + Ok(()) +} + +async fn wait_until_node_api_ready(client: &NodeHttpClient, timeout: Duration) -> Result<()> { + let deadline = Instant::now() + timeout; + + loop { + if client.consensus_info().await.is_ok() { + return Ok(()); + } + + if Instant::now() >= deadline { + return Err(anyhow!( + "timed out waiting for attached node api readiness at {}", + client.base_url() + )); + } + + sleep(Duration::from_millis(500)).await; + } +} diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs new file mode 100644 index 0000000..e448b1f --- /dev/null +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -0,0 +1,249 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use k8s_openapi::api::core::v1::Service; +use kube::{ + Api, Client, + api::{ListParams, ObjectList}, +}; +use testing_framework_core::scenario::{ + AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource, +}; + +use crate::{env::K8sDeployEnv, host::node_host}; + +#[derive(Debug, thiserror::Error)] +enum K8sAttachDiscoveryError { + #[error("k8s attach source requires a non-empty label selector")] + EmptyLabelSelector, + #[error("no services matched label selector '{selector}' in namespace '{namespace}'")] + NoMatchingServices { namespace: String, selector: String }, + #[error("service '{service}' has no TCP node ports exposed")] + ServiceHasNoNodePorts { service: String }, + #[error( + "service '{service}' has multiple candidate API node ports ({ports}); explicit API port required" + )] + ServiceHasMultipleNodePorts { service: String, ports: String }, +} + +pub(super) struct K8sAttachProvider { + client: Client, + _env: PhantomData, +} + +impl K8sAttachProvider { + pub(super) fn new(client: Client) -> Self { + Self { + client, + _env: PhantomData, + } + } +} + +#[async_trait] +impl AttachProvider for K8sAttachProvider { + async fn discover( + &self, + source: &AttachSource, + ) -> Result>, AttachProviderError> { + let (namespace, label_selector) = match source { + AttachSource::K8s { + namespace, + label_selector, + } => (namespace, label_selector), + _ => { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + } + }; + + if label_selector.trim().is_empty() { + return Err(AttachProviderError::Discovery { + source: K8sAttachDiscoveryError::EmptyLabelSelector.into(), + }); + } + + let namespace = namespace.as_deref().unwrap_or("default"); + let services = discover_services(&self.client, namespace, label_selector) + .await + .map_err(to_discovery_error)?; + let host = node_host(); + let mut attached = Vec::with_capacity(services.items.len()); + + for service in services.items { + let service_name = + service + .metadata + .name + .clone() + .ok_or_else(|| AttachProviderError::Discovery { + source: "k8s service has no metadata.name".into(), + })?; + + let api_port = extract_api_node_port(&service).map_err(to_discovery_error)?; + let endpoint = format!("http://{host}:{api_port}/"); + let source = ExternalNodeSource::new(service_name.clone(), endpoint); + let client = E::external_node_client(&source).map_err(to_discovery_error)?; + + attached.push(AttachedNode { + identity_hint: Some(service_name), + client, + }); + } + + Ok(attached) + } +} + +fn to_discovery_error(source: DynError) -> AttachProviderError { + AttachProviderError::Discovery { source } +} + +async fn discover_services( + client: &Client, + namespace: &str, + selector: &str, +) -> Result, DynError> { + let services: Api = Api::namespaced(client.clone(), namespace); + let params = ListParams::default().labels(selector); + let services = services.list(¶ms).await?; + let services = filter_services_with_tcp_node_ports(services); + + if services.items.is_empty() { + return Err(K8sAttachDiscoveryError::NoMatchingServices { + namespace: namespace.to_owned(), + selector: selector.to_owned(), + } + .into()); + } + + Ok(services) +} + +fn filter_services_with_tcp_node_ports(services: ObjectList) -> ObjectList { + ObjectList { + items: services + .items + .into_iter() + .filter(|service| !tcp_node_ports(service).is_empty()) + .collect(), + metadata: services.metadata, + } +} + +fn tcp_node_ports(service: &Service) -> Vec<(String, u16)> { + service + .spec + .as_ref() + .into_iter() + .flat_map(|spec| spec.ports.as_ref()) + .flat_map(|ports| ports.iter()) + .filter_map(|port| { + let node_port = port.node_port.and_then(|value| u16::try_from(value).ok())?; + let protocol = port.protocol.as_deref().unwrap_or("TCP"); + if protocol != "TCP" { + return None; + } + + Some((port.name.clone().unwrap_or_default(), node_port)) + }) + .collect() +} + +fn extract_api_node_port(service: &Service) -> Result { + let service_name = service + .metadata + .name + .clone() + .unwrap_or_else(|| "".to_owned()); + let ports = api_port_candidates(tcp_node_ports(service)); + + match ports.as_slice() { + [] => Err(K8sAttachDiscoveryError::ServiceHasNoNodePorts { + service: service_name, + } + .into()), + [port] => Ok(*port), + _ => Err(K8sAttachDiscoveryError::ServiceHasMultipleNodePorts { + service: service_name, + ports: ports + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "), + } + .into()), + } +} + +fn api_port_candidates(ports: Vec<(String, u16)>) -> Vec { + let explicit_api: Vec = ports + .iter() + .filter_map(|(name, port)| (name == "http" || name == "api").then_some(*port)) + .collect(); + if !explicit_api.is_empty() { + return explicit_api; + } + + let non_testing: Vec = ports + .iter() + .filter_map(|(name, port)| (!name.contains("testing")).then_some(*port)) + .collect(); + if !non_testing.is_empty() { + return non_testing; + } + + ports.into_iter().map(|(_, port)| port).collect() +} + +#[cfg(test)] +mod tests { + use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}; + + use super::extract_api_node_port; + + #[test] + fn extract_api_node_port_returns_single_port() { + let service = Service { + metadata: Default::default(), + spec: Some(ServiceSpec { + ports: Some(vec![ServicePort { + node_port: Some(31234), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + + let port = extract_api_node_port(&service).expect("single port should resolve"); + assert_eq!(port, 31234); + } + + #[test] + fn extract_api_node_port_prefers_http_name() { + let service = Service { + metadata: Default::default(), + spec: Some(ServiceSpec { + ports: Some(vec![ + ServicePort { + name: Some("testing-http".to_owned()), + node_port: Some(31234), + ..Default::default() + }, + ServicePort { + name: Some("http".to_owned()), + node_port: Some(31235), + ..Default::default() + }, + ]), + ..Default::default() + }), + ..Default::default() + }; + + let port = extract_api_node_port(&service).expect("http-named port should resolve"); + assert_eq!(port, 31235); + } +} diff --git a/testing-framework/deployers/k8s/src/deployer/mod.rs b/testing-framework/deployers/k8s/src/deployer/mod.rs index e42a6da..e16ec45 100644 --- a/testing-framework/deployers/k8s/src/deployer/mod.rs +++ b/testing-framework/deployers/k8s/src/deployer/mod.rs @@ -1,3 +1,46 @@ +mod attach_provider; mod orchestrator; pub use orchestrator::{K8sDeployer, K8sRunnerError}; +use testing_framework_core::scenario::{AttachSource, DynError}; + +/// Kubernetes deployment metadata returned by k8s-specific deployment APIs. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct K8sDeploymentMetadata { + /// Namespace used for this deployment when available. + pub namespace: Option, + /// Attach selector used to discover node services. + pub label_selector: Option, +} + +#[derive(Debug, thiserror::Error)] +enum K8sMetadataError { + #[error("k8s deployment metadata has no namespace")] + MissingNamespace, + #[error("k8s deployment metadata has no label selector")] + MissingLabelSelector, +} + +impl K8sDeploymentMetadata { + /// Returns namespace when deployment is bound to a specific namespace. + #[must_use] + pub fn namespace(&self) -> Option<&str> { + self.namespace.as_deref() + } + + /// Returns attach label selector when available. + #[must_use] + pub fn label_selector(&self) -> Option<&str> { + self.label_selector.as_deref() + } + + /// Builds an attach source for the same k8s deployment scope. + pub fn attach_source(&self) -> Result { + let namespace = self.namespace().ok_or(K8sMetadataError::MissingNamespace)?; + let label_selector = self + .label_selector() + .ok_or(K8sMetadataError::MissingLabelSelector)?; + + Ok(AttachSource::k8s(label_selector.to_owned()).with_namespace(namespace.to_owned())) + } +} diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index c6172ee..4089ecf 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -1,20 +1,22 @@ -use std::{env, fmt::Debug, marker::PhantomData, time::Duration}; +use std::{env, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; use async_trait::async_trait; use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ - Application, CleanupGuard, Deployer, DynError, FeedHandle, FeedRuntime, - HttpReadinessRequirement, Metrics, MetricsError, NodeClients, + Application, ApplicationExternalProvider, AttachSource, CleanupGuard, Deployer, DynError, + FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, build_source_orchestration_plan, orchestrate_sources, + Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; use tracing::{error, info}; use crate::{ + deployer::{K8sDeploymentMetadata, attach_provider::K8sAttachProvider}, env::K8sDeployEnv, infrastructure::cluster::{ ClusterEnvironment, ClusterEnvironmentError, NodeClientError, PortSpecs, @@ -56,6 +58,17 @@ impl K8sDeployer { self.readiness_checks = enabled; self } + + /// Deploy and return k8s-specific metadata alongside the generic runner. + pub async fn deploy_with_metadata( + &self, + scenario: &Scenario, + ) -> Result<(Runner, K8sDeploymentMetadata), K8sRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + deploy_with_observability(self, scenario).await + } } #[derive(Debug, thiserror::Error)] @@ -115,7 +128,9 @@ where type Error = K8sRunnerError; async fn deploy(&self, scenario: &Scenario) -> Result, Self::Error> { - deploy_with_observability(self, scenario).await + self.deploy_with_metadata(scenario) + .await + .map(|(runner, _)| runner) } } @@ -147,10 +162,10 @@ fn ensure_supported_topology( async fn deploy_with_observability( deployer: &K8sDeployer, scenario: &Scenario, -) -> Result, K8sRunnerError> +) -> Result<(Runner, K8sDeploymentMetadata), K8sRunnerError> where E: K8sDeployEnv, - Caps: ObservabilityCapabilityProvider, + Caps: ObservabilityCapabilityProvider + Send + Sync, { // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { @@ -160,21 +175,132 @@ where })?; let observability = resolve_observability_inputs(scenario.capabilities())?; + + if scenario.sources().is_attached() { + let runner = deploy_attached_only::(scenario, source_plan, observability).await?; + return Ok((runner, attached_metadata(scenario))); + } + let deployment = build_k8s_deployment::(deployer, scenario, &observability).await?; + let metadata = K8sDeploymentMetadata { + namespace: Some(deployment.cluster.namespace().to_owned()), + label_selector: Some(E::attach_node_service_selector( + deployment.cluster.release(), + )), + }; let mut cluster = Some(deployment.cluster); let mut runtime = build_runtime_artifacts::(&mut cluster, &observability).await?; - // Source orchestration currently runs here after managed clients are prepared. - runtime.node_clients = orchestrate_sources(&source_plan, runtime.node_clients) - .await - .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + let source_providers = source_providers::( + client_from_cluster(&cluster)?, + runtime.node_clients.snapshot(), + ); + + runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?; + ensure_non_empty_node_clients(&runtime.node_clients)?; let parts = build_runner_parts(scenario, deployment.node_count, runtime); log_configured_observability(&observability); maybe_print_endpoints::(&observability, &parts.node_clients); - finalize_runner::(&mut cluster, parts) + let runner = finalize_runner::(&mut cluster, parts)?; + Ok((runner, metadata)) +} + +async fn deploy_attached_only( + scenario: &Scenario, + source_plan: SourceOrchestrationPlan, + observability: ObservabilityInputs, +) -> Result, K8sRunnerError> +where + E: K8sDeployEnv, + Caps: ObservabilityCapabilityProvider + Send + Sync, +{ + let client = init_kube_client().await?; + let source_providers = source_providers::(client, Vec::new()); + let node_clients = resolve_node_clients(&source_plan, source_providers).await?; + + ensure_non_empty_node_clients(&node_clients)?; + + let telemetry = observability.telemetry_handle()?; + let (feed, feed_task) = spawn_block_feed_with::(&node_clients).await?; + let context = RunContext::new( + scenario.deployment().clone(), + node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + telemetry, + feed, + None, + ); + + Ok(Runner::new(context, Some(Box::new(feed_task)))) +} + +fn attached_metadata(scenario: &Scenario) -> K8sDeploymentMetadata +where + E: K8sDeployEnv, + Caps: Send + Sync, +{ + match scenario.sources() { + ScenarioSources::Attached { + attach: + AttachSource::K8s { + namespace, + label_selector, + }, + .. + } => K8sDeploymentMetadata { + namespace: namespace.clone(), + label_selector: Some(label_selector.clone()), + }, + _ => K8sDeploymentMetadata { + namespace: None, + label_selector: None, + }, + } +} + +fn client_from_cluster(cluster: &Option) -> Result { + let client = cluster + .as_ref() + .ok_or_else(|| K8sRunnerError::InternalInvariant { + message: "cluster must exist while resolving source providers".to_owned(), + })? + .client() + .clone(); + + Ok(client) +} + +fn source_providers( + client: Client, + managed_clients: Vec, +) -> SourceProviders { + SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new(managed_clients))) + .with_attach(Arc::new(K8sAttachProvider::::new(client))) + .with_external(Arc::new(ApplicationExternalProvider)) +} + +async fn resolve_node_clients( + source_plan: &SourceOrchestrationPlan, + source_providers: SourceProviders, +) -> Result, K8sRunnerError> { + orchestrate_sources_with_providers(source_plan, source_providers) + .await + .map_err(|source| K8sRunnerError::SourceOrchestration { source }) +} + +fn ensure_non_empty_node_clients( + node_clients: &NodeClients, +) -> Result<(), K8sRunnerError> { + if node_clients.is_empty() { + return Err(K8sRunnerError::RuntimePreflight); + } + + Ok(()) } struct BuiltK8sDeployment { diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index ecd7972..4ec7fd2 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -106,6 +106,11 @@ pub trait K8sDeployEnv: Application { format!("{release}-node-{index}") } + /// Label selector used to discover managed node services in attached mode. + fn attach_node_service_selector(release: &str) -> String { + format!("app.kubernetes.io/instance={release}") + } + /// Wait for HTTP readiness on provided ports for a given host. async fn wait_for_node_http( ports: &[u16], diff --git a/testing-framework/deployers/k8s/src/infrastructure/cluster.rs b/testing-framework/deployers/k8s/src/infrastructure/cluster.rs index d1048fd..fa3ee24 100644 --- a/testing-framework/deployers/k8s/src/infrastructure/cluster.rs +++ b/testing-framework/deployers/k8s/src/infrastructure/cluster.rs @@ -94,6 +94,10 @@ impl ClusterEnvironment { &self.release } + pub fn client(&self) -> &Client { + &self.client + } + pub fn node_ports(&self) -> (&[u16], &[u16]) { (&self.node_api_ports, &self.node_testing_ports) } diff --git a/testing-framework/deployers/k8s/src/lib.rs b/testing-framework/deployers/k8s/src/lib.rs index 90251c3..0c331a5 100644 --- a/testing-framework/deployers/k8s/src/lib.rs +++ b/testing-framework/deployers/k8s/src/lib.rs @@ -7,7 +7,7 @@ pub mod wait { pub use crate::lifecycle::wait::*; } -pub use deployer::{K8sDeployer, K8sRunnerError}; +pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError}; pub use env::K8sDeployEnv; pub use infrastructure::cluster::PortSpecs; pub use lifecycle::cleanup::RunnerCleanup;