diff --git a/.cargo-deny.toml b/.cargo-deny.toml index 0b79d1b..687e5e2 100644 --- a/.cargo-deny.toml +++ b/.cargo-deny.toml @@ -6,8 +6,7 @@ exclude-dev = true no-default-features = true [advisories] -ignore = [ -] +ignore = [] yanked = "deny" [bans] diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index cb9452e..c2fbd25 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -72,6 +72,17 @@ pub trait StaticNodeConfigProvider: Application + Sized { } } +pub fn serialize_yaml_config(config: &T) -> Result +where + T: Serialize, +{ + serde_yaml::to_string(config) +} + +pub fn serialize_plain_text_config(config: &str) -> Result { + Ok(config.to_owned()) +} + impl StaticArtifactRenderer for T where T: StaticNodeConfigProvider, diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs index be20e05..813dd95 100644 --- a/testing-framework/core/src/scenario/config.rs +++ b/testing-framework/core/src/scenario/config.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, error::Error, io}; use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use serde::Serialize; use crate::{ cfgsync::StaticNodeConfigProvider, @@ -119,6 +120,13 @@ pub trait ClusterNodeConfigApplication: Application { ) -> Result; } +pub fn serialize_cluster_yaml_config(config: &T) -> Result +where + T: Serialize, +{ + serde_yaml::to_string(config) +} + impl StaticNodeConfigProvider for T where T: ClusterNodeConfigApplication, diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index bc54944..a89380b 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -28,7 +28,9 @@ pub use capabilities::{ }; pub use client::NodeAccess; pub use common_builder_ext::CoreBuilderExt; -pub use config::{ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView}; +pub use config::{ + ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, serialize_cluster_yaml_config, +}; pub use control::{ClusterWaitHandle, NodeControlHandle}; pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder}; pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index ba21922..464f659 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -3,7 +3,10 @@ services: {{ node.name }}: image: {{ node.image }} {% if node.platform %} platform: {{ node.platform }} -{% endif %} entrypoint: {{ node.entrypoint }} +{% endif %} entrypoint: +{% for arg in node.entrypoint %} + - {{ arg }} +{% endfor %} volumes: {% for volume in node.volumes %} - {{ volume }} diff --git a/testing-framework/deployers/compose/src/deployer/ports.rs b/testing-framework/deployers/compose/src/deployer/ports.rs index ea46aff..21e9397 100644 --- a/testing-framework/deployers/compose/src/deployer/ports.rs +++ b/testing-framework/deployers/compose/src/deployer/ports.rs @@ -20,7 +20,9 @@ impl PortManager { environment: &mut StackEnvironment, descriptors: &E::Deployment, ) -> Result { - let nodes = E::node_container_ports(descriptors); + let nodes = E::node_container_ports(descriptors).map_err(|source| { + ComposeRunnerError::Config(crate::errors::ConfigError::Descriptor { source }) + })?; debug!( nodes = nodes.len(), "resolving host ports for compose services" diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index 1965da0..46cb9bf 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -9,7 +9,7 @@ use crate::infrastructure::ports::node_identifier; pub struct NodeDescriptor { name: String, image: String, - entrypoint: String, + entrypoint: Vec, volumes: Vec, extra_hosts: Vec, ports: Vec, @@ -51,7 +51,7 @@ impl NodeDescriptor { pub fn new( name: impl Into, image: impl Into, - entrypoint: impl Into, + entrypoint: Vec, volumes: Vec, extra_hosts: Vec, ports: Vec, @@ -62,7 +62,7 @@ impl NodeDescriptor { Self { name: name.into(), image: image.into(), - entrypoint: entrypoint.into(), + entrypoint, volumes, extra_hosts, ports, @@ -76,7 +76,7 @@ impl NodeDescriptor { pub fn with_loopback_ports( name: impl Into, image: impl Into, - entrypoint: impl Into, + entrypoint: Vec, volumes: Vec, extra_hosts: Vec, container_ports: Vec, @@ -135,7 +135,7 @@ impl NodeDescriptor { #[derive(Clone, Debug)] pub struct LoopbackNodeRuntimeSpec { pub image: String, - pub entrypoint: String, + pub entrypoint: Vec, pub volumes: Vec, pub extra_hosts: Vec, pub container_ports: Vec, @@ -229,10 +229,11 @@ pub fn binary_config_node_runtime_spec( ) -> LoopbackNodeRuntimeSpec { let image = env::var(&spec.image_env_var).unwrap_or_else(|_| spec.default_image.clone()); let platform = env::var(&spec.platform_env_var).ok(); - let entrypoint = format!( - "/bin/sh -c '{} --config {}'", - spec.binary_path, spec.config_container_path - ); + let entrypoint = vec![ + spec.binary_path.clone(), + "--config".to_owned(), + spec.config_container_path.clone(), + ]; LoopbackNodeRuntimeSpec { image, diff --git a/testing-framework/deployers/compose/src/env.rs b/testing-framework/deployers/compose/src/env.rs index 86eb384..8d1157e 100644 --- a/testing-framework/deployers/compose/src/env.rs +++ b/testing-framework/deployers/compose/src/env.rs @@ -10,11 +10,15 @@ use testing_framework_core::{ }, topology::DeploymentDescriptor, }; +use tokio::{ + net::TcpStream, + time::{Instant, sleep}, +}; use crate::{ descriptor::{ BinaryConfigNodeSpec, ComposeDescriptor, LoopbackNodeRuntimeSpec, NodeDescriptor, - binary_config_node_runtime_spec, build_loopback_node_descriptors, + binary_config_node_runtime_spec, }, docker::config_server::DockerConfigServerSpec, infrastructure::ports::{ @@ -31,6 +35,12 @@ pub trait ConfigServerHandle: Send + Sync { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ComposeConfigServerMode { + Disabled, + Docker, +} + /// Compose-specific topology surface needed by the runner. #[async_trait] pub trait ComposeDeployEnv: Application { @@ -70,27 +80,39 @@ pub trait ComposeDeployEnv: Application { fn compose_descriptor( topology: &::Deployment, _cfgsync_port: u16, - ) -> ComposeDescriptor { - let nodes = build_loopback_node_descriptors(topology.node_count(), |index| { - Self::loopback_node_runtime_spec(topology, index) - .unwrap_or_else(|| panic!("compose_descriptor is not implemented for this app")) - }); - ComposeDescriptor::new(nodes) + ) -> Result { + let mut nodes = Vec::with_capacity(topology.node_count()); + for index in 0..topology.node_count() { + let spec = Self::loopback_node_runtime_spec(topology, index).ok_or_else(|| { + std::io::Error::other("compose_descriptor is not implemented for this app") + })?; + nodes.push(NodeDescriptor::with_loopback_ports( + crate::infrastructure::ports::node_identifier(index), + spec.image, + spec.entrypoint, + spec.volumes, + spec.extra_hosts, + spec.container_ports, + spec.environment, + spec.platform, + )); + } + Ok(ComposeDescriptor::new(nodes)) } /// Container ports (API/testing) per node, used for docker-compose port /// discovery. fn node_container_ports( topology: &::Deployment, - ) -> Vec { - let descriptor = Self::compose_descriptor(topology, 0); - descriptor + ) -> Result, DynError> { + let descriptor = Self::compose_descriptor(topology, 0)?; + Ok(descriptor .nodes() .iter() .enumerate() .take(topology.node_count()) .filter_map(|(index, node)| parse_node_container_ports(index, node)) - .collect() + .collect()) } /// Hostnames used when rewriting node configs for cfgsync delivery. @@ -126,10 +148,10 @@ pub trait ComposeDeployEnv: Application { /// Build the config server container specification. fn cfgsync_container_spec( _cfgsync_path: &Path, - port: u16, - network: &str, + _port: u16, + _network: &str, ) -> Result { - Ok(dummy_cfgsync_spec(port, network)) + Err(std::io::Error::other("cfgsync_container_spec is not implemented for this app").into()) } /// Timeout used when launching the config server container. @@ -137,6 +159,10 @@ pub trait ComposeDeployEnv: Application { Duration::from_secs(180) } + fn cfgsync_server_mode() -> ComposeConfigServerMode { + ComposeConfigServerMode::Disabled + } + /// Build node clients from discovered host ports. fn node_client_from_ports( ports: &NodeHostPorts, @@ -167,6 +193,12 @@ pub trait ComposeDeployEnv: Application { ::node_readiness_path() } + fn node_readiness_probe() -> ComposeReadinessProbe { + ComposeReadinessProbe::Http { + path: ::node_readiness_path(), + } + } + /// Host used by default remote readiness checks. fn compose_runner_host() -> String { compose_runner_host() @@ -178,14 +210,15 @@ pub trait ComposeDeployEnv: Application { mapping: &HostPortMapping, requirement: HttpReadinessRequirement, ) -> Result<(), DynError> { - let host = Self::compose_runner_host(); - let urls = readiness_urls( - &host, - mapping, - ::node_readiness_path(), - )?; - wait_http_readiness(&urls, requirement).await?; - Ok(()) + match ::node_readiness_probe() { + ComposeReadinessProbe::Http { path } => { + let host = Self::compose_runner_host(); + let urls = readiness_urls(&host, mapping, path)?; + wait_http_readiness(&urls, requirement).await?; + Ok(()) + } + ComposeReadinessProbe::Tcp => wait_for_tcp_readiness(&mapping.nodes, requirement).await, + } } /// Wait for HTTP readiness on node ports. @@ -194,14 +227,24 @@ pub trait ComposeDeployEnv: Application { host: &str, requirement: HttpReadinessRequirement, ) -> Result<(), DynError> { - wait_for_http_ports_with_host_and_requirement( - ports, - host, - ::node_readiness_path(), - requirement, - ) - .await?; - Ok(()) + match ::node_readiness_probe() { + ComposeReadinessProbe::Http { path } => { + wait_for_http_ports_with_host_and_requirement(ports, host, path, requirement) + .await?; + Ok(()) + } + ComposeReadinessProbe::Tcp => { + let ports = ports + .iter() + .copied() + .map(|port| NodeHostPorts { + api: port, + testing: port, + }) + .collect::>(); + wait_for_tcp_readiness(&ports, requirement).await + } + } } } @@ -257,27 +300,10 @@ fn write_dummy_cfgsync_config(path: &Path, port: u16) -> Result<(), DynError> { Ok(()) } -fn dummy_cfgsync_spec(port: u16, network: &str) -> DockerConfigServerSpec { - use crate::docker::config_server::DockerPortBinding; - - DockerConfigServerSpec::new( - "cfgsync".to_owned(), - network.to_owned(), - "sh".to_owned(), - "busybox:1.36".to_owned(), - ) - .with_network_alias("cfgsync".to_owned()) - .with_args(vec![ - "-c".to_owned(), - format!("while true; do nc -l -p {port} >/dev/null 2>&1; done"), - ]) - .with_ports(vec![DockerPortBinding::tcp(port, port)]) -} - fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option { let mut ports = node.container_ports().iter().copied(); let api = ports.next()?; - let testing = ports.next()?; + let testing = ports.next().unwrap_or(api); Some(NodeContainerPorts { index, @@ -286,6 +312,52 @@ fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option Result<(), DynError> { + let timeout = Duration::from_secs(60); + let deadline = Instant::now() + timeout; + + loop { + let mut ready = 0; + for node in ports { + if TcpStream::connect(("127.0.0.1", node.testing)) + .await + .is_ok() + { + ready += 1; + } + } + + let total = ports.len(); + let satisfied = match requirement { + HttpReadinessRequirement::AllNodesReady => ready == total, + HttpReadinessRequirement::AnyNodeReady => ready >= 1, + HttpReadinessRequirement::AtLeast(min_ready) => ready >= min_ready, + }; + + if satisfied { + return Ok(()); + } + + if Instant::now() >= deadline { + return Err(format!( + "tcp readiness timed out: ready={ready}, total={total}, requirement={requirement:?}" + ) + .into()); + } + + sleep(Duration::from_millis(200)).await; + } +} + pub fn discovered_node_access(host: &str, ports: &NodeHostPorts) -> NodeAccess { NodeAccess::new(host, ports.api).with_testing_port(ports.testing) } diff --git a/testing-framework/deployers/compose/src/errors.rs b/testing-framework/deployers/compose/src/errors.rs index f30fae7..de3ccbd 100644 --- a/testing-framework/deployers/compose/src/errors.rs +++ b/testing-framework/deployers/compose/src/errors.rs @@ -73,6 +73,11 @@ impl WorkspaceError { #[derive(Debug, thiserror::Error)] /// Configuration-related failures while preparing compose runs. pub enum ConfigError { + #[error("failed to build compose descriptor: {source}")] + Descriptor { + #[source] + source: DynError, + }, #[error("failed to update cfgsync configuration at {path:?}: {source}")] Cfgsync { path: PathBuf, diff --git a/testing-framework/deployers/compose/src/infrastructure/environment.rs b/testing-framework/deployers/compose/src/infrastructure/environment.rs index 5612aba..c796fa6 100644 --- a/testing-framework/deployers/compose/src/infrastructure/environment.rs +++ b/testing-framework/deployers/compose/src/infrastructure/environment.rs @@ -25,7 +25,7 @@ use crate::{ ensure_image_present, workspace::ComposeWorkspace, }, - env::{ComposeCfgsyncEnv, ComposeDeployEnv, ConfigServerHandle}, + env::{ComposeCfgsyncEnv, ComposeConfigServerMode, ComposeDeployEnv, ConfigServerHandle}, errors::{ComposeRunnerError, ConfigError, WorkspaceError}, infrastructure::template::write_compose_file, lifecycle::cleanup::RunnerCleanup, @@ -216,7 +216,11 @@ pub async fn start_cfgsync_stage( workspace: &WorkspaceState, cfgsync_port: u16, project_name: &str, -) -> Result, ComposeRunnerError> { +) -> Result>, ComposeRunnerError> { + if matches!(E::cfgsync_server_mode(), ComposeConfigServerMode::Disabled) { + return Ok(None); + } + info!(cfgsync_port = cfgsync_port, "launching cfgsync server"); let network = compose_network_name(project_name); @@ -244,7 +248,7 @@ pub async fn start_cfgsync_stage( wait_for_cfgsync_ready(cfgsync_port, Some(&handle)).await?; log_cfgsync_started(&handle); - Ok(Box::new(handle)) + Ok(Some(Box::new(handle))) } /// Write cfgsync YAML from topology data. @@ -299,7 +303,8 @@ pub fn write_compose_artifacts( workspace_root = %workspace.root.display(), "building compose descriptor" ); - let descriptor = E::compose_descriptor(descriptors, cfgsync_port); + let descriptor = E::compose_descriptor(descriptors, cfgsync_port) + .map_err(|source| ConfigError::Descriptor { source })?; let compose_path = workspace.root.join("compose.generated.yml"); write_compose_file(&descriptor, &compose_path) @@ -326,10 +331,12 @@ pub async fn bring_up_stack( compose_path: &Path, project_name: &str, workspace_root: &Path, - cfgsync_handle: &mut dyn ConfigServerHandle, + cfgsync_handle: &mut Option>, ) -> Result<(), ComposeRunnerError> { if let Err(err) = compose_up(compose_path, project_name, workspace_root).await { - cfgsync_handle.shutdown(); + if let Some(cfgsync_handle) = cfgsync_handle.as_deref_mut() { + cfgsync_handle.shutdown(); + } return Err(ComposeRunnerError::Compose(err)); } debug!(project = %project_name, "docker compose up completed"); @@ -341,7 +348,7 @@ pub async fn bring_up_stack_logged( compose_path: &Path, project_name: &str, workspace_root: &Path, - cfgsync_handle: &mut dyn ConfigServerHandle, + cfgsync_handle: &mut Option>, ) -> Result<(), ComposeRunnerError> { info!(project = %project_name, "bringing up docker compose stack"); bring_up_stack(compose_path, project_name, workspace_root, cfgsync_handle).await @@ -357,13 +364,10 @@ where { let prepared = prepare_stack_artifacts::(descriptors, metrics_otlp_ingest_url).await?; let mut cfgsync_handle = start_cfgsync_for_prepared::(&prepared).await?; - start_compose_stack(&prepared, cfgsync_handle.as_mut()).await?; + start_compose_stack(&prepared, &mut cfgsync_handle).await?; log_compose_environment_ready(&prepared, "compose stack is up"); - Ok(stack_environment_from_prepared( - prepared, - Some(cfgsync_handle), - )) + Ok(stack_environment_from_prepared(prepared, cfgsync_handle)) } /// Prepare workspace, cfgsync, and compose artifacts without starting services. @@ -379,10 +383,7 @@ where log_compose_environment_ready(&prepared, "compose manual environment prepared"); - Ok(stack_environment_from_prepared( - prepared, - Some(cfgsync_handle), - )) + Ok(stack_environment_from_prepared(prepared, cfgsync_handle)) } async fn prepare_stack_artifacts( @@ -418,25 +419,28 @@ async fn ensure_compose_images_present( descriptors: &E::Deployment, cfgsync_port: u16, ) -> Result<(), ComposeRunnerError> { - let descriptor = E::compose_descriptor(descriptors, 0); + let descriptor = E::compose_descriptor(descriptors, 0) + .map_err(|source| ComposeRunnerError::Config(ConfigError::Descriptor { source }))?; let mut images = descriptor .nodes() .iter() .map(|node| (node.image().to_owned(), node.platform().map(str::to_owned))) .collect::>(); - let cfgsync_spec = E::cfgsync_container_spec( - &workspace.cfgsync_path, - cfgsync_port, - &compose_network_name("compose-image-check"), - ) - .map_err(|source| { - ComposeRunnerError::Config(ConfigError::CfgsyncStart { - port: cfgsync_port, - source, - }) - })?; + if matches!(E::cfgsync_server_mode(), ComposeConfigServerMode::Docker) { + let cfgsync_spec = E::cfgsync_container_spec( + &workspace.cfgsync_path, + cfgsync_port, + &compose_network_name("compose-image-check"), + ) + .map_err(|source| { + ComposeRunnerError::Config(ConfigError::CfgsyncStart { + port: cfgsync_port, + source, + }) + })?; - images.insert((cfgsync_spec.image, cfgsync_spec.platform)); + images.insert((cfgsync_spec.image, cfgsync_spec.platform)); + } for (image, platform) in images { ensure_image_present(&image, platform.as_deref()).await?; @@ -451,7 +455,7 @@ fn create_project_name() -> String { async fn start_cfgsync_for_prepared( prepared: &PreparedEnvironment, -) -> Result, ComposeRunnerError> { +) -> Result>, ComposeRunnerError> { start_cfgsync_stage::( &prepared.workspace, prepared.cfgsync_port, @@ -462,7 +466,7 @@ async fn start_cfgsync_for_prepared( async fn handle_compose_start_failure( prepared: &PreparedEnvironment, - cfgsync_handle: &mut dyn ConfigServerHandle, + cfgsync_handle: &mut Option>, ) { dump_compose_logs( &prepared.compose_path, @@ -470,7 +474,9 @@ async fn handle_compose_start_failure( &prepared.workspace.root, ) .await; - cfgsync_handle.shutdown(); + if let Some(cfgsync_handle) = cfgsync_handle.as_deref_mut() { + cfgsync_handle.shutdown(); + } } fn stack_environment_from_prepared( @@ -487,7 +493,7 @@ fn stack_environment_from_prepared( async fn start_compose_stack( prepared: &PreparedEnvironment, - cfgsync_handle: &mut dyn ConfigServerHandle, + cfgsync_handle: &mut Option>, ) -> Result<(), ComposeRunnerError> { if let Err(error) = bring_up_stack_logged( &prepared.compose_path, diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index 82215f7..3c22921 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -20,7 +20,10 @@ pub use docker::{ }, platform::host_gateway_entry, }; -pub use env::{ComposeDeployEnv, ConfigServerHandle, discovered_node_access}; +pub use env::{ + ComposeConfigServerMode, ComposeDeployEnv, ComposeReadinessProbe, ConfigServerHandle, + discovered_node_access, +}; pub use errors::ComposeRunnerError; pub use infrastructure::{ ports::{HostPortMapping, NodeHostPorts, compose_runner_host, node_identifier}, diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index a975bfb..bd610de 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -9,6 +9,7 @@ use async_trait::async_trait; use cfgsync_artifacts::ArtifactSet; use kube::Client; use reqwest::Url; +use serde::Serialize; use tempfile::TempDir; use testing_framework_core::{ cfgsync::StaticNodeConfigProvider, @@ -35,6 +36,11 @@ pub struct RenderedHelmChartAssets { _tempdir: TempDir, } +#[derive(Clone, Debug, Default)] +pub struct HelmManifest { + documents: Vec, +} + #[derive(Clone, Debug)] pub struct BinaryConfigK8sSpec { pub chart_name: String, @@ -55,6 +61,38 @@ impl HelmReleaseAssets for RenderedHelmChartAssets { } } +impl HelmManifest { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + pub fn push_yaml(&mut self, value: &T) -> Result<(), DynError> + where + T: Serialize, + { + self.documents + .push(normalize_yaml_document(&serde_yaml::to_string(value)?)); + Ok(()) + } + + pub fn push_raw_yaml(&mut self, yaml: &str) { + let yaml = yaml.trim(); + if !yaml.is_empty() { + self.documents.push(yaml.to_owned()); + } + } + + pub fn extend(&mut self, other: Self) { + self.documents.extend(other.documents); + } + + #[must_use] + pub fn render(&self) -> String { + self.documents.join("\n---\n") + } +} + pub fn standard_port_specs(node_count: usize, api: u16, auxiliary: u16) -> PortSpecs { PortSpecs { nodes: (0..node_count) @@ -201,6 +239,14 @@ pub fn render_single_template_chart_assets( }) } +pub fn render_manifest_chart_assets( + chart_name: &str, + template_name: &str, + manifest: &HelmManifest, +) -> Result { + render_single_template_chart_assets(chart_name, template_name, &manifest.render()) +} + pub fn discovered_node_access(host: &str, api_port: u16, auxiliary_port: u16) -> NodeAccess { NodeAccess::new(host, api_port).with_testing_port(auxiliary_port) } @@ -209,6 +255,10 @@ fn render_chart_yaml(chart_name: &str) -> String { format!("apiVersion: v2\nname: {chart_name}\nversion: 0.1.0\n") } +fn normalize_yaml_document(yaml: &str) -> String { + yaml.trim_start_matches("---\n").trim().to_owned() +} + pub async fn install_helm_release_with_cleanup( client: &Client, assets: &A, diff --git a/testing-framework/deployers/k8s/src/lib.rs b/testing-framework/deployers/k8s/src/lib.rs index 2cc3c38..0adbe4a 100644 --- a/testing-framework/deployers/k8s/src/lib.rs +++ b/testing-framework/deployers/k8s/src/lib.rs @@ -7,6 +7,8 @@ mod manual; mod workspace; use std::sync::Once; +pub use k8s_openapi; + pub mod wait { pub use crate::lifecycle::wait::*; } @@ -21,10 +23,10 @@ pub(crate) fn ensure_rustls_provider_installed() { pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError}; pub use env::{ - BinaryConfigK8sSpec, HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets, + BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets, discovered_node_access, install_helm_release_with_cleanup, render_binary_config_node_chart_assets, render_binary_config_node_manifest, - render_single_template_chart_assets, standard_port_specs, + render_manifest_chart_assets, render_single_template_chart_assets, standard_port_specs, }; pub use infrastructure::{ chart_values::{ diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/forwarding.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/forwarding.rs index 7fd3c67..1324012 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/forwarding.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/forwarding.rs @@ -1,5 +1,6 @@ use std::{ fmt, io, + io::Read, net::{Ipv4Addr, TcpListener, TcpStream}, process::{Child, Command as StdCommand, ExitStatus, Stdio}, thread, @@ -86,7 +87,7 @@ fn spawn_kubectl_port_forward( .arg(format!("svc/{service}")) .arg(format!("{local_port}:{remote_port}")) .stdout(Stdio::null()) - .stderr(Stdio::null()) + .stderr(Stdio::piped()) .spawn() } @@ -107,7 +108,13 @@ fn wait_until_port_forward_ready( } let _ = child.kill(); - Err(port_forward_ready_timeout_error(service, remote_port)) + let _ = child.wait(); + let details = read_port_forward_stderr(child); + Err(port_forward_ready_timeout_error( + service, + remote_port, + details.as_deref(), + )) } fn ensure_port_forward_running( @@ -122,7 +129,7 @@ fn ensure_port_forward_running( Err(port_forward_error( service, remote_port, - anyhow!("kubectl exited with {status}"), + port_forward_process_error(status, read_port_forward_stderr(child)), )) } @@ -134,11 +141,18 @@ fn port_forward_error(service: &str, remote_port: u16, source: anyhow::Error) -> } } -fn port_forward_ready_timeout_error(service: &str, remote_port: u16) -> ClusterWaitError { +fn port_forward_ready_timeout_error( + service: &str, + remote_port: u16, + details: Option<&str>, +) -> ClusterWaitError { port_forward_error( service, remote_port, - anyhow!("port-forward did not become ready"), + anyhow!( + "port-forward did not become ready{}", + format_port_forward_details(details) + ), ) } @@ -146,6 +160,30 @@ fn exited_status(child: &mut Child) -> Option { child.try_wait().ok().flatten() } +fn read_port_forward_stderr(child: &mut Child) -> Option { + let mut stderr = child.stderr.take()?; + let mut output = String::new(); + if stderr.read_to_string(&mut output).is_err() { + return None; + } + let trimmed = output.trim(); + (!trimmed.is_empty()).then(|| trimmed.to_owned()) +} + +fn port_forward_process_error(status: ExitStatus, details: Option) -> anyhow::Error { + anyhow!( + "kubectl exited with {status}{}", + format_port_forward_details(details.as_deref()) + ) +} + +fn format_port_forward_details(details: Option<&str>) -> String { + match details { + Some(details) => format!(": {details}"), + None => String::new(), + } +} + fn local_port_reachable(local_port: u16) -> bool { TcpStream::connect(localhost_addr(local_port)).is_ok() } diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index 186f528..411cfa6 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -24,7 +24,6 @@ use crate::{ K8sDeployer, env::{HelmReleaseAssets, K8sDeployEnv, discovered_node_access}, host::node_host, - infrastructure::helm::install_release, lifecycle::{ cleanup::RunnerCleanup, wait::{ @@ -140,20 +139,9 @@ where let assets = E::prepare_assets(&topology, None) .map_err(|source| ManualClusterError::Assets { source })?; let (namespace, release) = E::cluster_identifiers(); - let install_spec = assets - .release_bundle() - .install_spec(release.clone(), namespace.clone()); - install_release(&install_spec).await.map_err(|source| { - ManualClusterError::InstallStack { - source: source.into(), - } - })?; - let cleanup = RunnerCleanup::new( - client.clone(), - namespace.clone(), - release.clone(), - std::env::var("K8S_RUNNER_PRESERVE").is_ok(), - ); + let cleanup = E::install_stack(&client, &assets, &namespace, &release, nodes) + .await + .map_err(|source| ManualClusterError::InstallStack { source })?; let node_ports = E::collect_port_specs(&topology).nodes; let node_allocations = diff --git a/testing-framework/deployers/local/src/env.rs b/testing-framework/deployers/local/src/env.rs index 8abbb1f..8af34e9 100644 --- a/testing-framework/deployers/local/src/env.rs +++ b/testing-framework/deployers/local/src/env.rs @@ -511,15 +511,17 @@ where None } - fn node_endpoints(config: &::NodeConfig) -> NodeEndpoints { + fn node_endpoints( + config: &::NodeConfig, + ) -> Result { if let Some(port) = Self::http_api_port(config) { - return NodeEndpoints { + return Ok(NodeEndpoints { api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), extra_ports: HashMap::new(), - }; + }); } - panic!("node_endpoints is not implemented for this app"); + Err(std::io::Error::other("node_endpoints is not implemented for this app").into()) } fn node_peer_port(node: &Node) -> u16 { @@ -530,18 +532,18 @@ where None } - fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient { + fn node_client(endpoints: &NodeEndpoints) -> Result { if let Ok(client) = ::build_node_client(&discovered_node_access(endpoints)) { - return client; + return Ok(client); } if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) { - return client; + return Ok(client); } - panic!("node_client is not implemented for this app"); + Err(std::io::Error::other("node_client is not implemented for this app").into()) } fn readiness_endpoint_path() -> &'static str { @@ -680,11 +682,15 @@ mod tests { Ok(LaunchSpec::default()) } - fn node_endpoints(_config: &::NodeConfig) -> NodeEndpoints { - NodeEndpoints::default() + fn node_endpoints( + _config: &::NodeConfig, + ) -> Result { + Ok(NodeEndpoints::default()) } - fn node_client(_endpoints: &NodeEndpoints) -> Self::NodeClient {} + fn node_client(_endpoints: &NodeEndpoints) -> Result { + Ok(()) + } async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { STABLE_CALLS.fetch_add(1, Ordering::SeqCst); diff --git a/testing-framework/deployers/local/src/external.rs b/testing-framework/deployers/local/src/external.rs index 696d05e..09de43d 100644 --- a/testing-framework/deployers/local/src/external.rs +++ b/testing-framework/deployers/local/src/external.rs @@ -31,7 +31,7 @@ pub fn build_external_client( let api = resolve_api_socket(source)?; let mut endpoints = NodeEndpoints::default(); endpoints.api = api; - Ok(E::node_client(&endpoints)) + E::node_client(&endpoints) } fn resolve_api_socket(source: &ExternalNodeSource) -> Result { diff --git a/testing-framework/deployers/local/src/process.rs b/testing-framework/deployers/local/src/process.rs index 3da6a5b..ced4959 100644 --- a/testing-framework/deployers/local/src/process.rs +++ b/testing-framework/deployers/local/src/process.rs @@ -203,11 +203,11 @@ impl Result, - endpoints_from_config: impl FnOnce(&Config) -> NodeEndpoints, + endpoints_from_config: impl FnOnce(&Config) -> Result, keep_tempdir: bool, persist_dir: Option<&Path>, snapshot_dir: Option<&Path>, - client_from_endpoints: impl FnOnce(&NodeEndpoints) -> Client, + client_from_endpoints: impl FnOnce(&NodeEndpoints) -> Result, ) -> Result { let tempdir = create_tempdir(persist_dir)?; if let Some(snapshot_dir) = snapshot_dir { @@ -217,8 +217,10 @@ impl