From 1c2f734bca9b37d08307891ebbd7e20bb09adbdb Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 10 Apr 2026 17:13:15 +0200 Subject: [PATCH] refactor(k8s): redesign runtime around install access manual --- .../k8s/src/deployer/attach_provider.rs | 7 +- .../k8s/src/deployer/orchestrator.rs | 30 +- testing-framework/deployers/k8s/src/env.rs | 617 +++++++++++++----- .../k8s/src/infrastructure/cluster.rs | 13 +- testing-framework/deployers/k8s/src/lib.rs | 9 +- .../k8s/src/lifecycle/wait/http_probe.rs | 7 +- .../k8s/src/lifecycle/wait/orchestrator.rs | 8 +- testing-framework/deployers/k8s/src/manual.rs | 101 ++- 8 files changed, 538 insertions(+), 254 deletions(-) diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index a78bbbc..bea906f 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -13,7 +13,10 @@ use testing_framework_core::scenario::{ }; use url::Url; -use crate::{env::K8sDeployEnv, host::node_host}; +use crate::{ + env::{K8sDeployEnv, node_readiness_path}, + host::node_host, +}; #[derive(Debug, thiserror::Error)] enum K8sAttachDiscoveryError { @@ -264,7 +267,7 @@ fn collect_readiness_endpoints( for service in services { let api_port = extract_api_node_port(service)?; let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?; - endpoint.set_path(::node_readiness_path()); + endpoint.set_path(node_readiness_path::()); endpoints.push(endpoint); } diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 26ddcae..d7f76f0 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -24,7 +24,10 @@ use crate::{ K8sDeploymentMetadata, attach_provider::{K8sAttachProvider, K8sAttachedClusterWait}, }, - env::{HelmReleaseAssets, K8sDeployEnv}, + env::{ + K8sDeployEnv, attach_node_service_selector, cluster_identifiers, node_base_url, + prepare_stack, + }, infrastructure::cluster::{ ClusterEnvironment, ClusterEnvironmentError, NodeClientError, PortSpecs, RemoteReadinessError, build_node_clients, collect_port_specs, ensure_cluster_readiness, @@ -72,7 +75,6 @@ impl K8sDeployer { scenario: &Scenario, ) -> Result<(Runner, K8sDeploymentMetadata), K8sRunnerError> where - E::Assets: HelmReleaseAssets, Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { deploy_with_observability(self, scenario).await @@ -131,7 +133,6 @@ pub enum K8sRunnerError { impl Deployer for K8sDeployer where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { type Error = K8sRunnerError; @@ -174,7 +175,6 @@ async fn deploy_with_observability( ) -> Result<(Runner, K8sDeploymentMetadata), K8sRunnerError> where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, Caps: ObservabilityCapabilityProvider + Send + Sync, { validate_supported_cluster_mode(scenario) @@ -198,7 +198,7 @@ where let deployment = build_k8s_deployment::(deployer, scenario, &observability).await?; let metadata = K8sDeploymentMetadata { namespace: Some(deployment.cluster.namespace().to_owned()), - label_selector: Some(E::attach_node_service_selector( + label_selector: Some(attach_node_service_selector::( deployment.cluster.release(), )), }; @@ -397,7 +397,6 @@ async fn build_k8s_deployment( ) -> Result where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, Caps: ObservabilityCapabilityProvider, { let descriptors = scenario.deployment(); @@ -439,10 +438,7 @@ async fn setup_cluster( readiness_checks: bool, readiness_requirement: HttpReadinessRequirement, observability: &ObservabilityInputs, -) -> Result -where - E::Assets: HelmReleaseAssets, -{ +) -> Result { let (setup, cleanup) = prepare_cluster_setup::(client, descriptors, observability).await?; let mut cleanup_guard = Some(cleanup); @@ -475,17 +471,15 @@ async fn prepare_cluster_setup( client: &Client, descriptors: &E::Deployment, observability: &ObservabilityInputs, -) -> Result<(ClusterSetup, RunnerCleanup), K8sRunnerError> -where - E::Assets: HelmReleaseAssets, -{ - let assets = E::prepare_assets(descriptors, observability.metrics_otlp_ingest_url.as_ref()) +) -> Result<(ClusterSetup, RunnerCleanup), K8sRunnerError> { + let assets = prepare_stack::(descriptors, observability.metrics_otlp_ingest_url.as_ref()) .map_err(|source| K8sRunnerError::Assets { source })?; let nodes = descriptors.node_count(); - let (namespace, release) = E::cluster_identifiers(); + let (namespace, release) = cluster_identifiers::(); info!(%namespace, %release, nodes, "preparing k8s assets and namespace"); - let cleanup = E::install_stack(client, &assets, &namespace, &release, nodes) + let cleanup = assets + .install(client, &namespace, &release, nodes) .await .map_err(|source| K8sRunnerError::InstallStack { source })?; @@ -749,7 +743,7 @@ fn print_node_pprof_endpoints(node_clients: &NodeClients) { let nodes = node_clients.snapshot(); for (idx, client) in nodes.iter().enumerate() { - if let Some(base_url) = E::node_base_url(client) { + if let Some(base_url) = node_base_url::(client) { println!( "TESTNET_PPROF node_{}={}/debug/pprof/profile?seconds=15&format=proto", idx, base_url diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index bd610de..efed11d 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -281,170 +281,467 @@ pub async fn install_helm_release_with_cleanup( } #[async_trait] -pub trait K8sDeployEnv: Application + Sized { - type Assets: Send + Sync; - - /// Collect container port specs from the topology. - fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs; - - /// Build deploy-time assets (charts, config payloads, scripts). - fn prepare_assets( - topology: &Self::Deployment, - metrics_otlp_ingest_url: Option<&Url>, - ) -> Result; - - /// Install the k8s stack using the prepared assets. - async fn install_stack( +pub trait PreparedK8sStack: Send + Sync { + async fn install( + &self, client: &Client, - assets: &Self::Assets, namespace: &str, release: &str, nodes: usize, - ) -> Result - where - Self::Assets: HelmReleaseAssets, - { + ) -> Result; +} + +#[async_trait] +impl PreparedK8sStack for T +where + T: HelmReleaseAssets + Send + Sync, +{ + async fn install( + &self, + client: &Client, + namespace: &str, + release: &str, + nodes: usize, + ) -> Result { let _ = nodes; - install_helm_release_with_cleanup(client, assets, namespace, release).await - } - - /// Provide a namespace/release identifier pair. - fn cluster_identifiers() -> (String, String) { - let stamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|d| d.as_millis()) - .unwrap_or_default(); - let suffix = format!("{stamp:x}-{:x}", process::id()); - (format!("tf-testnet-{suffix}"), String::from("tf-runner")) - } - - /// Build a single node client from forwarded ports. - fn node_client_from_ports( - host: &str, - api_port: u16, - auxiliary_port: u16, - ) -> Result { - ::build_node_client(&discovered_node_access( - host, - api_port, - auxiliary_port, - )) - } - - /// Build node clients from forwarded ports. - fn build_node_clients( - host: &str, - node_api_ports: &[u16], - node_auxiliary_ports: &[u16], - ) -> Result, DynError> { - node_api_ports - .iter() - .zip(node_auxiliary_ports.iter()) - .map(|(&api_port, &auxiliary_port)| { - Self::node_client_from_ports(host, api_port, auxiliary_port) - }) - .collect() - } - - /// Path appended to readiness probe URLs. - fn node_readiness_path() -> &'static str { - ::node_readiness_path() - } - - /// Wait for remote readiness using topology + URLs. - async fn wait_remote_readiness( - topology: &Self::Deployment, - urls: &[Url], - requirement: HttpReadinessRequirement, - ) -> Result<(), DynError> { - let _ = topology; - let readiness_urls: Vec<_> = urls - .iter() - .map(|url| { - let mut endpoint = url.clone(); - endpoint.set_path(::node_readiness_path()); - endpoint - }) - .collect(); - wait_http_readiness(&readiness_urls, requirement).await?; - Ok(()) - } - - /// Label used for readiness probe logging. - fn node_role() -> &'static str { - "node" - } - - /// Deployment resource name for a node index. - fn node_deployment_name(release: &str, index: usize) -> String { - format!("{release}-node-{index}") - } - - /// Service resource name for a node index. - fn node_service_name(release: &str, index: usize) -> String { - format!("{release}-node-{index}") - } - - /// Label selector used to discover managed node services in - /// existing-cluster mode. - fn attach_node_service_selector(release: &str) -> String { - format!("app.kubernetes.io/instance={release}") - } - - /// Wait for HTTP readiness on provided ports for a given host. - async fn wait_for_node_http( - ports: &[u16], - role: &'static str, - host: &str, - timeout: Duration, - poll_interval: Duration, - requirement: HttpReadinessRequirement, - ) -> Result<(), DynError> { - let _ = role; - let _ = timeout; - let _ = poll_interval; - wait_for_http_ports_with_host_and_requirement( - ports, - host, - ::node_readiness_path(), - requirement, - ) - .await?; - Ok(()) - } - - /// Optional base URL for node client diagnostics. - fn node_base_url(_client: &Self::NodeClient) -> Option { - None - } - - /// Optional cfgsync/bootstrap service reachable from inside the cluster. - /// - /// Manual cluster uses this to update one node's served config before - /// start. - fn cfgsync_service(_release: &str) -> Option<(String, u16)> { - None - } - - /// Hostnames that should be rendered into cfgsync-served node configs. - fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec { - (0..node_count) - .map(|index| Self::node_service_name(release, index)) - .collect() - } - - /// Optional node-local artifact override for manual cluster startup - /// options. - /// - /// Return `Some(..)` when options require a node-specific config - /// replacement before the node starts. Return `None` to keep the - /// original cfgsync artifact set. - fn build_cfgsync_override_artifacts( - _topology: &Self::Deployment, - _node_index: usize, - _hostnames: &[String], - _options: &testing_framework_core::scenario::StartNodeOptions, - ) -> Result, DynError> { - Ok(None) + install_helm_release_with_cleanup(client, self, namespace, release).await } } + +type K8sPortSpecsBuilder = + Box::Deployment) -> PortSpecs + Send + Sync>; +type K8sPreparedStackBuilder = Box< + dyn Fn( + &::Deployment, + Option<&Url>, + ) -> Result, DynError> + + Send + + Sync, +>; +type K8sClusterIdentifiers = Box (String, String) + Send + Sync>; +type K8sIndexedName = Box String + Send + Sync>; +type K8sAttachSelector = Box String + Send + Sync>; +type K8sNodeClientsBuilder = Box< + dyn Fn(&str, &[u16], &[u16]) -> Result::NodeClient>, DynError> + + Send + + Sync, +>; +type K8sNodeBaseUrl = + Box::NodeClient) -> Option + Send + Sync>; +type K8sCfgsyncService = Box Option<(String, u16)> + Send + Sync>; +type K8sCfgsyncHostnames = Box Vec + Send + Sync>; +type K8sCfgsyncOverrideBuilder = Box< + dyn Fn( + &::Deployment, + usize, + &[String], + &testing_framework_core::scenario::StartNodeOptions, + ) -> Result, DynError> + + Send + + Sync, +>; + +pub struct K8sRuntime { + install: K8sInstall, + access: K8sAccess, + manual: K8sManual, +} + +pub struct K8sInstall { + collect_port_specs: K8sPortSpecsBuilder, + prepare_stack: K8sPreparedStackBuilder, + cluster_identifiers: K8sClusterIdentifiers, + node_deployment_name: K8sIndexedName, + node_service_name: K8sIndexedName, + attach_node_service_selector: K8sAttachSelector, +} + +pub struct K8sAccess { + build_node_clients: K8sNodeClientsBuilder, + readiness_path: &'static str, + node_role: &'static str, + node_base_url: K8sNodeBaseUrl, +} + +pub struct K8sManual { + cfgsync_service: K8sCfgsyncService, + cfgsync_hostnames: Option, + build_cfgsync_override_artifacts: K8sCfgsyncOverrideBuilder, +} + +impl K8sRuntime { + #[must_use] + pub fn new(install: K8sInstall) -> Self { + Self { + install, + access: K8sAccess::default(), + manual: K8sManual::default(), + } + } + + #[must_use] + pub fn with_access(mut self, access: K8sAccess) -> Self { + self.access = access; + self + } + + #[must_use] + pub fn with_manual(mut self, manual: K8sManual) -> Self { + self.manual = manual; + self + } +} + +impl K8sRuntime +where + E: Application + StaticNodeConfigProvider, + E::Deployment: DeploymentDescriptor, +{ + #[must_use] + pub fn binary_config(spec: BinaryConfigK8sSpec) -> Self { + let prepare_spec = spec.clone(); + let name_prefix = spec.node_name_prefix.clone(); + let container_http_port = spec.container_http_port; + let service_testing_port = spec.service_testing_port; + + Self::new( + K8sInstall::new( + move |topology: &E::Deployment| { + standard_port_specs( + topology.node_count(), + container_http_port, + service_testing_port, + ) + }, + move |topology, _metrics_otlp_ingest_url| { + let assets = + render_binary_config_node_chart_assets::(topology, &prepare_spec)?; + Ok(Box::new(assets) as Box) + }, + ) + .with_node_name_prefix(name_prefix), + ) + } +} + +impl K8sInstall { + #[must_use] + pub fn new(collect_port_specs: FP, prepare_stack: FA) -> Self + where + FP: Fn(&E::Deployment) -> PortSpecs + Send + Sync + 'static, + FA: Fn(&E::Deployment, Option<&Url>) -> Result, DynError> + + Send + + Sync + + 'static, + { + Self { + collect_port_specs: Box::new(collect_port_specs), + prepare_stack: Box::new(prepare_stack), + cluster_identifiers: Box::new(default_cluster_identifiers), + node_deployment_name: Box::new(default_node_name), + node_service_name: Box::new(default_node_name), + attach_node_service_selector: Box::new(default_attach_node_service_selector), + } + } + + #[must_use] + pub fn with_cluster_identifiers(mut self, cluster_identifiers: F) -> Self + where + F: Fn() -> (String, String) + Send + Sync + 'static, + { + self.cluster_identifiers = Box::new(cluster_identifiers); + self + } + + #[must_use] + pub fn with_node_name_prefix(mut self, prefix: impl Into) -> Self { + let prefix = prefix.into(); + self.node_deployment_name = Box::new(named_resource(prefix.clone())); + self.node_service_name = Box::new(named_resource(prefix)); + self + } + + #[must_use] + pub fn with_resource_names( + mut self, + node_deployment_name: FD, + node_service_name: FS, + ) -> Self + where + FD: Fn(&str, usize) -> String + Send + Sync + 'static, + FS: Fn(&str, usize) -> String + Send + Sync + 'static, + { + self.node_deployment_name = Box::new(node_deployment_name); + self.node_service_name = Box::new(node_service_name); + self + } + + #[must_use] + pub fn with_attach_node_service_selector(mut self, attach_node_service_selector: F) -> Self + where + F: Fn(&str) -> String + Send + Sync + 'static, + { + self.attach_node_service_selector = Box::new(attach_node_service_selector); + self + } +} + +impl Default for K8sAccess { + fn default() -> Self { + Self { + build_node_clients: Box::new(default_build_node_clients::), + readiness_path: E::node_readiness_path(), + node_role: "node", + node_base_url: Box::new(|_client| None), + } + } +} + +impl K8sAccess { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn with_node_clients(mut self, build_node_clients: F) -> Self + where + F: Fn(&str, &[u16], &[u16]) -> Result, DynError> + Send + Sync + 'static, + { + self.build_node_clients = Box::new(build_node_clients); + self + } + + #[must_use] + pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self { + self.readiness_path = readiness_path; + self + } + + #[must_use] + pub fn with_node_role(mut self, node_role: &'static str) -> Self { + self.node_role = node_role; + self + } + + #[must_use] + pub fn with_node_base_url(mut self, node_base_url: F) -> Self + where + F: Fn(&E::NodeClient) -> Option + Send + Sync + 'static, + { + self.node_base_url = Box::new(node_base_url); + self + } +} + +impl Default for K8sManual { + fn default() -> Self { + Self { + cfgsync_service: Box::new(|_release| None), + cfgsync_hostnames: None, + build_cfgsync_override_artifacts: Box::new( + |_topology, _node_index, _hostnames, _options| Ok(None), + ), + } + } +} + +impl K8sManual { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn with_cfgsync_service(mut self, cfgsync_service: F) -> Self + where + F: Fn(&str) -> Option<(String, u16)> + Send + Sync + 'static, + { + self.cfgsync_service = Box::new(cfgsync_service); + self + } + + #[must_use] + pub fn with_cfgsync_hostnames(mut self, cfgsync_hostnames: F) -> Self + where + F: Fn(&str, usize) -> Vec + Send + Sync + 'static, + { + self.cfgsync_hostnames = Some(Box::new(cfgsync_hostnames)); + self + } + + #[must_use] + pub fn with_cfgsync_override_artifacts(mut self, build_override_artifacts: F) -> Self + where + F: Fn( + &E::Deployment, + usize, + &[String], + &testing_framework_core::scenario::StartNodeOptions, + ) -> Result, DynError> + + Send + + Sync + + 'static, + { + self.build_cfgsync_override_artifacts = Box::new(build_override_artifacts); + self + } +} + +pub trait K8sDeployEnv: Application + Sized { + fn k8s_runtime() -> K8sRuntime; +} + +pub(crate) fn runtime_for() -> K8sRuntime { + E::k8s_runtime() +} + +pub(crate) fn collect_port_specs(deployment: &E::Deployment) -> PortSpecs { + (runtime_for::().install.collect_port_specs)(deployment) +} + +pub(crate) fn prepare_stack( + deployment: &E::Deployment, + metrics_otlp_ingest_url: Option<&Url>, +) -> Result, DynError> { + (runtime_for::().install.prepare_stack)(deployment, metrics_otlp_ingest_url) +} + +pub(crate) fn cluster_identifiers() -> (String, String) { + (runtime_for::().install.cluster_identifiers)() +} + +pub(crate) fn build_node_clients( + host: &str, + node_api_ports: &[u16], + node_auxiliary_ports: &[u16], +) -> Result, DynError> { + (runtime_for::().access.build_node_clients)(host, node_api_ports, node_auxiliary_ports) +} + +pub(crate) fn node_readiness_path() -> &'static str { + runtime_for::().access.readiness_path +} + +pub(crate) async fn wait_remote_readiness( + _deployment: &E::Deployment, + urls: &[Url], + requirement: HttpReadinessRequirement, +) -> Result<(), DynError> { + let readiness_urls: Vec<_> = urls + .iter() + .map(|url| { + let mut endpoint = url.clone(); + endpoint.set_path(node_readiness_path::()); + endpoint + }) + .collect(); + wait_http_readiness(&readiness_urls, requirement).await?; + Ok(()) +} + +pub(crate) fn node_role() -> &'static str { + runtime_for::().access.node_role +} + +pub(crate) fn node_deployment_name(release: &str, index: usize) -> String { + (runtime_for::().install.node_deployment_name)(release, index) +} + +pub(crate) fn node_service_name(release: &str, index: usize) -> String { + (runtime_for::().install.node_service_name)(release, index) +} + +pub(crate) fn attach_node_service_selector(release: &str) -> String { + (runtime_for::().install.attach_node_service_selector)(release) +} + +pub(crate) async fn wait_for_node_http( + ports: &[u16], + role: &'static str, + host: &str, + timeout: Duration, + poll_interval: Duration, + requirement: HttpReadinessRequirement, +) -> Result<(), DynError> { + let _ = role; + let _ = timeout; + let _ = poll_interval; + wait_for_http_ports_with_host_and_requirement( + ports, + host, + node_readiness_path::(), + requirement, + ) + .await?; + Ok(()) +} + +pub(crate) fn node_base_url(client: &E::NodeClient) -> Option { + (runtime_for::().access.node_base_url)(client) +} + +pub(crate) fn cfgsync_service(release: &str) -> Option<(String, u16)> { + (runtime_for::().manual.cfgsync_service)(release) +} + +pub(crate) fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec { + let runtime = runtime_for::(); + if let Some(cfgsync_hostnames) = runtime.manual.cfgsync_hostnames { + return cfgsync_hostnames(release, node_count); + } + + (0..node_count) + .map(|index| (runtime.install.node_service_name)(release, index)) + .collect() +} + +pub(crate) fn build_cfgsync_override_artifacts( + deployment: &E::Deployment, + node_index: usize, + hostnames: &[String], + options: &testing_framework_core::scenario::StartNodeOptions, +) -> Result, DynError> { + (runtime_for::().manual.build_cfgsync_override_artifacts)( + deployment, node_index, hostnames, options, + ) +} + +fn default_cluster_identifiers() -> (String, String) { + let stamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default(); + let suffix = format!("{stamp:x}-{:x}", process::id()); + (format!("tf-testnet-{suffix}"), String::from("tf-runner")) +} + +fn default_build_node_clients( + host: &str, + node_api_ports: &[u16], + node_auxiliary_ports: &[u16], +) -> Result, DynError> { + node_api_ports + .iter() + .zip(node_auxiliary_ports.iter()) + .map(|(&api_port, &auxiliary_port)| { + ::build_node_client(&discovered_node_access( + host, + api_port, + auxiliary_port, + )) + }) + .collect() +} + +fn default_node_name(release: &str, index: usize) -> String { + format!("{release}-node-{index}") +} + +fn default_attach_node_service_selector(release: &str) -> String { + format!("app.kubernetes.io/instance={release}") +} + +fn named_resource(prefix: String) -> impl Fn(&str, usize) -> String + Send + Sync + 'static { + move |_release, index| format!("{prefix}-{index}") +} diff --git a/testing-framework/deployers/k8s/src/infrastructure/cluster.rs b/testing-framework/deployers/k8s/src/infrastructure/cluster.rs index 7bc4298..6e00640 100644 --- a/testing-framework/deployers/k8s/src/infrastructure/cluster.rs +++ b/testing-framework/deployers/k8s/src/infrastructure/cluster.rs @@ -7,7 +7,10 @@ use tracing::{debug, info}; use url::ParseError; use crate::{ - env::K8sDeployEnv, + env::{ + K8sDeployEnv, build_node_clients as build_k8s_node_clients, + collect_port_specs as collect_k8s_port_specs, node_role, wait_remote_readiness, + }, lifecycle::{cleanup::RunnerCleanup, logs::dump_namespace_logs}, wait::{ ClusterPorts, ClusterReady, NodeConfigPorts, PortForwardHandle, wait_for_cluster_ready, @@ -131,7 +134,7 @@ pub enum RemoteReadinessError { } pub fn collect_port_specs(descriptors: &E::Deployment) -> PortSpecs { - let specs = E::collect_port_specs(descriptors); + let specs = collect_k8s_port_specs::(descriptors); debug!(nodes = specs.nodes.len(), "collected k8s port specs"); specs } @@ -139,7 +142,7 @@ pub fn collect_port_specs(descriptors: &E::Deployment) -> PortS pub fn build_node_clients( cluster: &ClusterEnvironment, ) -> Result, NodeClientError> { - let nodes = E::build_node_clients( + let nodes = build_k8s_node_clients::( &cluster.node_host, &cluster.node_api_ports, &cluster.node_auxiliary_ports, @@ -159,9 +162,9 @@ pub async fn ensure_cluster_readiness( info!("waiting for remote readiness (API + membership)"); let (node_api, _node_auxiliary) = cluster.node_ports(); - let node_urls = readiness_urls(node_api, E::node_role(), &cluster.node_host)?; + let node_urls = readiness_urls(node_api, node_role::(), &cluster.node_host)?; - E::wait_remote_readiness(descriptors, &node_urls, requirement) + wait_remote_readiness::(descriptors, &node_urls, requirement) .await .map_err(|source| RemoteReadinessError::Remote { source })?; diff --git a/testing-framework/deployers/k8s/src/lib.rs b/testing-framework/deployers/k8s/src/lib.rs index 0adbe4a..94c3f35 100644 --- a/testing-framework/deployers/k8s/src/lib.rs +++ b/testing-framework/deployers/k8s/src/lib.rs @@ -23,10 +23,11 @@ pub(crate) fn ensure_rustls_provider_installed() { pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError}; pub use env::{ - BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets, - discovered_node_access, install_helm_release_with_cleanup, - render_binary_config_node_chart_assets, render_binary_config_node_manifest, - render_manifest_chart_assets, render_single_template_chart_assets, standard_port_specs, + BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sAccess, K8sDeployEnv, K8sInstall, + K8sManual, K8sRuntime, PreparedK8sStack, RenderedHelmChartAssets, discovered_node_access, + install_helm_release_with_cleanup, render_binary_config_node_chart_assets, + render_binary_config_node_manifest, render_manifest_chart_assets, + render_single_template_chart_assets, standard_port_specs, }; pub use infrastructure::{ chart_values::{ diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/http_probe.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/http_probe.rs index 585c34c..00d5849 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/http_probe.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/http_probe.rs @@ -3,7 +3,10 @@ use std::time::Duration; use testing_framework_core::scenario::HttpReadinessRequirement; use super::{ClusterWaitError, http_poll_interval, node_http_probe_timeout, node_http_timeout}; -use crate::{env::K8sDeployEnv, host::node_host}; +use crate::{ + env::{K8sDeployEnv, wait_for_node_http}, + host::node_host, +}; const LOCALHOST: &str = "127.0.0.1"; const READINESS_REQUIREMENT: HttpReadinessRequirement = HttpReadinessRequirement::AllNodesReady; @@ -29,7 +32,7 @@ async fn wait_for_node_http_on_host( host: &str, timeout: Duration, ) -> Result<(), ClusterWaitError> { - E::wait_for_node_http( + wait_for_node_http::( ports, role, host, diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs index 5f0f954..4bb0ff9 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/orchestrator.rs @@ -2,7 +2,7 @@ use kube::Client; use super::{ClusterPorts, ClusterReady, ClusterWaitError, NodeConfigPorts, NodePortAllocation}; use crate::{ - env::K8sDeployEnv, + env::{K8sDeployEnv, node_deployment_name, node_role, node_service_name}, lifecycle::wait::{ deployment::wait_for_deployment_ready, forwarding::{ @@ -27,7 +27,7 @@ pub async fn wait_for_cluster_ready( let node_allocations = discover_ready_node_allocations::(client, namespace, release, node_ports).await?; - let role = E::node_role(); + let role = node_role::(); let (readiness, node_allocations) = if needs_port_forward_fallback::(&node_allocations, role).await { @@ -112,7 +112,7 @@ async fn discover_ready_node_allocations( let mut allocations = Vec::with_capacity(node_ports.len()); for (index, ports) in node_ports.iter().enumerate() { - let deployment_name = E::node_deployment_name(release, index); + let deployment_name = node_deployment_name::(release, index); wait_for_deployment_ready(client, namespace, &deployment_name).await?; let allocation = discover_node_ports(client, namespace, &deployment_name, *ports).await?; allocations.push(allocation); @@ -131,7 +131,7 @@ async fn spawn_port_forwards( let mut forwards = Vec::new(); for (index, ports) in node_ports.iter().enumerate() { - let service = E::node_service_name(&release, index); + let service = node_service_name::(&release, index); let api_forward = port_forward_service(&namespace, &service, ports.api)?; let auxiliary_forward = port_forward_service(&namespace, &service, ports.auxiliary)?; register_forward_pair( diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index 411cfa6..fd8ecb4 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -22,7 +22,11 @@ use tokio_retry::{RetryIf, strategy::FixedInterval}; use crate::{ K8sDeployer, - env::{HelmReleaseAssets, K8sDeployEnv, discovered_node_access}, + env::{ + K8sDeployEnv, build_cfgsync_override_artifacts, cfgsync_hostnames, cfgsync_service, + cluster_identifiers, collect_port_specs, discovered_node_access, node_deployment_name, + node_readiness_path, node_service_name, prepare_stack, + }, host::node_host, lifecycle::{ cleanup::RunnerCleanup, @@ -107,10 +111,7 @@ struct ManualClusterState { known_clients: Vec>, } -pub struct ManualCluster -where - E::Assets: HelmReleaseAssets, -{ +pub struct ManualCluster { client: Client, namespace: String, release: String, @@ -122,10 +123,7 @@ where state: Arc>>, } -impl ManualCluster -where - E::Assets: HelmReleaseAssets, -{ +impl ManualCluster { pub async fn from_topology(topology: E::Deployment) -> Result { let nodes = testing_framework_core::topology::DeploymentDescriptor::node_count(&topology); if nodes == 0 { @@ -136,14 +134,15 @@ where let client = Client::try_default() .await .map_err(|source| ManualClusterError::ClientInit { source })?; - let assets = E::prepare_assets(&topology, None) + let assets = prepare_stack::(&topology, None) .map_err(|source| ManualClusterError::Assets { source })?; - let (namespace, release) = E::cluster_identifiers(); - let cleanup = E::install_stack(&client, &assets, &namespace, &release, nodes) + let (namespace, release) = cluster_identifiers::(); + let cleanup = assets + .install(&client, &namespace, &release, nodes) .await .map_err(|source| ManualClusterError::InstallStack { source })?; - let node_ports = E::collect_port_specs(&topology).nodes; + let node_ports = collect_port_specs::(&topology).nodes; let node_allocations = discover_all_node_ports::(&client, &namespace, &release, &node_ports).await?; scale_all_nodes::(&client, &namespace, &release, nodes, 0).await?; @@ -296,7 +295,7 @@ where testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement( &ports, &self.node_host, - ::node_readiness_path(), + node_readiness_path::(), HttpReadinessRequirement::AllNodesReady, ) .await @@ -311,7 +310,7 @@ where testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement( &[port], &self.node_host, - ::node_readiness_path(), + node_readiness_path::(), HttpReadinessRequirement::AllNodesReady, ) .await @@ -393,13 +392,13 @@ where index: usize, options: &StartNodeOptions, ) -> Result<(), ManualClusterError> { - let Some((service, port)) = E::cfgsync_service(&self.release) else { + let Some((service, port)) = cfgsync_service::(&self.release) else { return ensure_default_cfgsync_options(options); }; - let hostnames = E::cfgsync_hostnames(&self.release, self.node_count); + let hostnames = cfgsync_hostnames::(&self.release, self.node_count); let artifacts = - E::build_cfgsync_override_artifacts(&self.topology, index, &hostnames, options) + build_cfgsync_override_artifacts::(&self.topology, index, &hostnames, options) .map_err(|source| ManualClusterError::CfgsyncUpdate { name: canonical_node_name(index), source, @@ -431,7 +430,6 @@ where impl Drop for ManualCluster where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, { fn drop(&mut self) { self.stop_all(); @@ -445,7 +443,6 @@ where impl NodeControlHandle for ManualCluster where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, { async fn restart_node(&self, name: &str) -> Result<(), DynError> { Self::restart_node(self, name).await.map_err(Into::into) @@ -478,7 +475,6 @@ where impl ClusterWaitHandle for ManualCluster where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, { async fn wait_network_ready(&self) -> Result<(), DynError> { Self::wait_network_ready(self).await.map_err(Into::into) @@ -486,17 +482,11 @@ where } #[async_trait::async_trait] -impl ManualClusterHandle for ManualCluster -where - E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, -{ -} +impl ManualClusterHandle for ManualCluster where E: K8sDeployEnv {} impl K8sDeployer where E: K8sDeployEnv, - E::Assets: HelmReleaseAssets, { pub async fn manual_cluster_from_descriptors( &self, @@ -515,7 +505,7 @@ async fn discover_all_node_ports( ) -> Result, ManualClusterError> { let mut allocations = Vec::with_capacity(node_ports.len()); for (index, ports) in node_ports.iter().enumerate() { - let service_name = E::node_service_name(release, index); + let service_name = node_service_name::(release, index); allocations.push(discover_node_ports(client, namespace, &service_name, *ports).await?); } Ok(allocations) @@ -541,7 +531,7 @@ async fn scale_node( index: usize, replicas: i32, ) -> Result<(), ManualClusterError> { - let name = E::node_deployment_name(release, index); + let name = node_deployment_name::(release, index); let deployments = Api::::namespaced(client.clone(), namespace); let patch = serde_json::json!({"spec": {"replicas": replicas}}); deployments @@ -663,8 +653,7 @@ mod tests { use super::*; use crate::{ - PortSpecs, RenderedHelmChartAssets, render_single_template_chart_assets, - standard_port_specs, + RenderedHelmChartAssets, render_single_template_chart_assets, standard_port_specs, }; struct DummyEnv; @@ -695,31 +684,25 @@ mod tests { #[async_trait::async_trait] impl K8sDeployEnv for DummyEnv { - type Assets = RenderedHelmChartAssets; - - fn collect_port_specs(_topology: &Self::Deployment) -> PortSpecs { - standard_port_specs(1, 8080, 8081) - } - - fn prepare_assets( - _topology: &Self::Deployment, - _metrics_otlp_ingest_url: Option<&reqwest::Url>, - ) -> Result { - render_single_template_chart_assets("dummy", "dummy.yaml", "") - } - - fn cfgsync_service(release: &str) -> Option<(String, u16)> { - Some((format!("{release}-cfgsync"), 4400)) - } - - fn build_cfgsync_override_artifacts( - topology: &Self::Deployment, - node_index: usize, - hostnames: &[String], - options: &StartNodeOptions, - ) -> Result, DynError> { - build_node_artifact_override::(topology, node_index, hostnames, options) - .map_err(Into::into) + fn k8s_runtime() -> crate::env::K8sRuntime { + crate::env::K8sRuntime::new(crate::env::K8sInstall::new( + |_topology| standard_port_specs(1, 8080, 8081), + |_topology, _metrics_otlp_ingest_url| { + let assets: RenderedHelmChartAssets = + render_single_template_chart_assets("dummy", "dummy.yaml", "")?; + Ok(Box::new(assets) as Box) + }, + )) + .with_manual( + crate::env::K8sManual::new() + .with_cfgsync_service(|release| Some((format!("{release}-cfgsync"), 4400))) + .with_cfgsync_override_artifacts(|topology, node_index, hostnames, options| { + build_node_artifact_override::( + topology, node_index, hostnames, options, + ) + .map_err(Into::into) + }), + ) } } @@ -827,7 +810,7 @@ mod tests { let options = StartNodeOptions::::default() .with_peers(PeerSelection::Named(vec!["node-0".to_owned()])); - let artifacts = DummyEnv::build_cfgsync_override_artifacts( + let artifacts = crate::env::build_cfgsync_override_artifacts::( &topology, 1, &["node-0".to_owned(), "node-1".to_owned()], @@ -846,7 +829,7 @@ mod tests { let options = StartNodeOptions::::default().with_config_override("override".to_owned()); - let artifacts = DummyEnv::build_cfgsync_override_artifacts( + let artifacts = crate::env::build_cfgsync_override_artifacts::( &topology, 1, &["node-0".to_owned(), "node-1".to_owned()],