From 29637acadf774a8ec3942e6a9be7d8279c168596 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 10 Apr 2026 16:59:40 +0200 Subject: [PATCH] refactor(compose): redesign runtime around stack access cfgsync --- .../compose/src/deployer/attach_provider.rs | 4 +- .../deployers/compose/src/deployer/mod.rs | 10 +- .../compose/src/deployer/orchestrator.rs | 10 +- .../deployers/compose/src/deployer/ports.rs | 4 +- .../compose/src/deployer/readiness.rs | 7 +- .../deployers/compose/src/deployer/setup.rs | 8 +- .../deployers/compose/src/descriptor/mod.rs | 2 +- .../deployers/compose/src/descriptor/node.rs | 29 + .../deployers/compose/src/env.rs | 924 ++++++++++++++---- .../compose/src/infrastructure/environment.rs | 40 +- .../deployers/compose/src/lib.rs | 7 +- .../compose/src/lifecycle/readiness.rs | 7 +- 12 files changed, 819 insertions(+), 233 deletions(-) diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index fc241e5..19921c2 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -13,7 +13,7 @@ use crate::{ discover_attachable_services, discover_service_container_id, inspect_api_container_port_label, inspect_mapped_tcp_ports, }, - env::ComposeDeployEnv, + env::{ComposeDeployEnv, readiness_http_path}, }; pub(super) struct ComposeAttachProvider { @@ -198,7 +198,7 @@ async fn collect_readiness_endpoints( let container_id = discover_service_container_id(project, service).await?; let api_port = discover_api_port(&container_id).await?; let mut endpoint = build_service_endpoint(host, api_port)?; - endpoint.set_path(::node_readiness_path()); + endpoint.set_path(readiness_http_path::()); endpoints.push(endpoint); } diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 03ccbe3..ad73efc 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -14,11 +14,7 @@ use testing_framework_core::scenario::{ internal::{CleanupGuard, FeedHandle}, }; -use crate::{ - env::{ComposeCfgsyncEnv, ComposeDeployEnv}, - errors::ComposeRunnerError, - lifecycle::cleanup::RunnerCleanup, -}; +use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; /// Docker Compose-based deployer for test scenarios. #[derive(Clone, Copy)] @@ -145,7 +141,7 @@ impl ComposeDeployer { scenario: &Scenario, ) -> Result<(Runner, ComposeDeploymentMetadata), ComposeRunnerError> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let deployer = Self { @@ -163,7 +159,7 @@ impl ComposeDeployer { impl Deployer for ComposeDeployer where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { type Error = ComposeRunnerError; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index aced978..15e7432 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -28,7 +28,7 @@ use super::{ }; use crate::{ docker::control::{ComposeAttachedNodeControl, ComposeNodeControl}, - env::{ComposeCfgsyncEnv, ComposeDeployEnv}, + env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, @@ -41,14 +41,14 @@ const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS"; pub struct DeploymentOrchestrator where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { deployer: ComposeDeployer, } impl DeploymentOrchestrator where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { pub const fn new(deployer: ComposeDeployer) -> Self { Self { deployer } @@ -656,7 +656,7 @@ fn profiling_url(host: &str, api_port: u16) -> String { struct PreparedDeployment where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { environment: StackEnvironment, descriptors: ::Deployment, @@ -667,7 +667,7 @@ async fn prepare_deployment( observability: &ObservabilityInputs, ) -> Result, ComposeRunnerError> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { let DeploymentContext { environment, diff --git a/testing-framework/deployers/compose/src/deployer/ports.rs b/testing-framework/deployers/compose/src/deployer/ports.rs index 21e9397..d4d2aea 100644 --- a/testing-framework/deployers/compose/src/deployer/ports.rs +++ b/testing-framework/deployers/compose/src/deployer/ports.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use tracing::{debug, info, warn}; use crate::{ - env::ComposeDeployEnv, + env::{ComposeDeployEnv, node_container_ports}, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, @@ -20,7 +20,7 @@ impl PortManager { environment: &mut StackEnvironment, descriptors: &E::Deployment, ) -> Result { - let nodes = E::node_container_ports(descriptors).map_err(|source| { + let nodes = node_container_ports::(descriptors).map_err(|source| { ComposeRunnerError::Config(crate::errors::ConfigError::Descriptor { source }) })?; debug!( diff --git a/testing-framework/deployers/compose/src/deployer/readiness.rs b/testing-framework/deployers/compose/src/deployer/readiness.rs index 0faef3c..90593be 100644 --- a/testing-framework/deployers/compose/src/deployer/readiness.rs +++ b/testing-framework/deployers/compose/src/deployer/readiness.rs @@ -4,7 +4,7 @@ use testing_framework_core::scenario::HttpReadinessRequirement; use tracing::{info, warn}; use crate::{ - env::ComposeDeployEnv, + env::{ComposeDeployEnv, wait_remote_readiness as remote_readiness_future}, errors::ComposeRunnerError, infrastructure::{environment::StackEnvironment, ports::HostPortMapping}, lifecycle::readiness::ensure_nodes_ready_with_ports, @@ -55,7 +55,10 @@ async fn wait_remote_readiness( run_readiness_check( environment, "remote readiness probe failed", - E::wait_remote_readiness(descriptors, host_ports, requirement) + remote_readiness_future::(descriptors, host_ports, requirement) + .map_err(|source| { + ComposeRunnerError::Readiness(crate::errors::StackReadinessError::Remote { source }) + })? .await .map_err(|source| { ComposeRunnerError::Readiness(crate::errors::StackReadinessError::Remote { source }) diff --git a/testing-framework/deployers/compose/src/deployer/setup.rs b/testing-framework/deployers/compose/src/deployer/setup.rs index 15152eb..114ff23 100644 --- a/testing-framework/deployers/compose/src/deployer/setup.rs +++ b/testing-framework/deployers/compose/src/deployer/setup.rs @@ -6,7 +6,7 @@ use tracing::info; use crate::{ docker::ensure_docker_available, - env::ComposeCfgsyncEnv, + env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::environment::{ StackEnvironment, ensure_supported_topology, prepare_environment, @@ -15,14 +15,14 @@ use crate::{ pub struct DeploymentSetup<'a, E> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { descriptors: &'a ::Deployment, } pub struct DeploymentContext<'a, E> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { pub descriptors: &'a ::Deployment, pub environment: StackEnvironment, @@ -30,7 +30,7 @@ where impl<'a, E> DeploymentSetup<'a, E> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { pub fn new(descriptors: &'a ::Deployment) -> Self { Self { descriptors } diff --git a/testing-framework/deployers/compose/src/descriptor/mod.rs b/testing-framework/deployers/compose/src/descriptor/mod.rs index f38e1c2..d196f14 100644 --- a/testing-framework/deployers/compose/src/descriptor/mod.rs +++ b/testing-framework/deployers/compose/src/descriptor/mod.rs @@ -5,7 +5,7 @@ mod node; pub use node::{ BinaryConfigNodeSpec, EnvEntry, LoopbackNodeRuntimeSpec, NodeDescriptor, binary_config_node_runtime_spec, build_binary_config_node_descriptors, - build_loopback_node_descriptors, + build_binary_config_node_descriptors_with_file_name, build_loopback_node_descriptors, }; /// Top-level docker-compose descriptor built from an environment-specific diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index 46cb9bf..f93ada5 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -223,6 +223,35 @@ pub fn build_binary_config_node_descriptors( }) } +pub fn build_binary_config_node_descriptors_with_file_name( + node_count: usize, + spec: &BinaryConfigNodeSpec, + file_name_for_index: impl Fn(usize) -> String, +) -> Vec { + (0..node_count) + .map(|index| { + let file_name = file_name_for_index(index); + NodeDescriptor::with_loopback_ports( + node_identifier(index), + env::var(&spec.image_env_var).unwrap_or_else(|_| spec.default_image.clone()), + vec![ + spec.binary_path.clone(), + "--config".to_owned(), + spec.config_container_path.clone(), + ], + vec![format!( + "./stack/configs/{file_name}:{}:ro", + spec.config_container_path + )], + vec![], + spec.container_ports.clone(), + vec![EnvEntry::new("RUST_LOG", &spec.rust_log)], + env::var(&spec.platform_env_var).ok(), + ) + }) + .collect() +} + pub fn binary_config_node_runtime_spec( index: usize, spec: &BinaryConfigNodeSpec, diff --git a/testing-framework/deployers/compose/src/env.rs b/testing-framework/deployers/compose/src/env.rs index 8d1157e..ba4c254 100644 --- a/testing-framework/deployers/compose/src/env.rs +++ b/testing-framework/deployers/compose/src/env.rs @@ -1,9 +1,15 @@ -use std::{fs, path::Path, time::Duration}; +use std::{ + fs, + path::{Path, PathBuf}, + time::Duration, +}; -use async_trait::async_trait; use reqwest::Url; use testing_framework_core::{ - cfgsync::{MaterializedArtifacts, StaticArtifactRenderer}, + cfgsync::{ + CfgsyncOutputPaths, MaterializedArtifacts, RegistrationServerRenderOptions, + StaticArtifactRenderer, render_and_write_registration_server, + }, scenario::{ Application, DynError, HttpReadinessRequirement, NodeAccess, NodeClients, wait_for_http_ports_with_host_and_requirement, wait_http_readiness, @@ -18,7 +24,7 @@ use tokio::{ use crate::{ descriptor::{ BinaryConfigNodeSpec, ComposeDescriptor, LoopbackNodeRuntimeSpec, NodeDescriptor, - binary_config_node_runtime_spec, + build_binary_config_node_descriptors_with_file_name, }, docker::config_server::DockerConfigServerSpec, infrastructure::ports::{ @@ -41,71 +47,457 @@ pub enum ComposeConfigServerMode { Docker, } -/// Compose-specific topology surface needed by the runner. -#[async_trait] -pub trait ComposeDeployEnv: Application { - /// Write per-node config files or other compose-time assets into the stack - /// workspace before the stack starts. - fn prepare_compose_configs( - _path: &Path, - _topology: &::Deployment, - _metrics_otlp_ingest_url: Option<&Url>, +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum ComposeReadinessProbe { + Http { path: &'static str }, + Tcp, +} + +pub type ComposeWorkspacePrep = + fn(&Path, &::Deployment, Option<&Url>) -> Result<(), DynError>; +pub type ComposeLoopbackSpecBuilder = + fn(&::Deployment, usize) -> Result; +pub type ComposeExtraServices = + fn(&::Deployment) -> Result, DynError>; +pub type ComposeDescriptorBuilder = + fn(&::Deployment, u16) -> Result; +pub type ComposeNodeClientBuilder = + fn(&NodeHostPorts, &str) -> Result<::NodeClient, DynError>; +pub type ComposeContainerPortsResolver = fn( + &::Deployment, + &ComposeDescriptor, +) -> Result, DynError>; +pub type ComposeCfgsyncHostnames = fn(&::Deployment) -> Vec; +pub type ComposeCfgsyncEnricher = + fn(&::Deployment, &mut MaterializedArtifacts) -> Result<(), DynError>; +pub type ComposeConfigWriter = for<'a> fn(ComposeConfigContext<'a, E>) -> Result<(), DynError>; +pub type ComposeConfigServerSpecBuilder = + fn(&Path, u16, &str) -> Result; +pub type ComposeRunnerHost = fn() -> String; + +#[derive(Clone, Copy)] +pub enum ComposeNodeConfigFileName { + FixedExtension(&'static str), + Custom(fn(usize) -> String), +} + +impl ComposeNodeConfigFileName { + #[must_use] + pub fn resolve(&self, index: usize) -> String { + match self { + Self::FixedExtension(extension) => format!("node-{index}.{extension}"), + Self::Custom(build) => build(index), + } + } +} + +pub trait ComposeDeployEnv: Application + Sized { + fn compose_runtime() -> ComposeRuntime; +} + +pub struct ComposeRuntime { + stack: ComposeStack, + configs: ComposeConfigs, + access: ComposeAccess, + cfgsync: ComposeCfgsync, + node_config_file_name: ComposeNodeConfigFileName, +} + +impl ComposeRuntime { + #[must_use] + pub fn new(stack: ComposeStack) -> Self { + Self { + stack, + configs: ComposeConfigs::disabled(), + access: ComposeAccess::default(), + cfgsync: ComposeCfgsync::default(), + node_config_file_name: ComposeNodeConfigFileName::FixedExtension("yaml"), + } + } + + #[must_use] + pub fn binary_config(spec: BinaryConfigNodeSpec) -> Self { + let node_config_file_name = + make_extension_node_config_file_name(&spec.config_file_extension); + Self { + stack: ComposeStack::nodes(ComposeNodes::binary_config(spec)), + configs: ComposeConfigs::disabled(), + access: ComposeAccess::default(), + cfgsync: ComposeCfgsync::default(), + node_config_file_name, + } + } + + #[must_use] + pub fn loopback(runtime_spec: ComposeLoopbackSpecBuilder) -> Self { + Self::new(ComposeStack::nodes(ComposeNodes::loopback(runtime_spec))) + } + + #[must_use] + pub fn custom_descriptor(build_descriptor: ComposeDescriptorBuilder) -> Self { + Self::new(ComposeStack::custom(build_descriptor)) + } + + #[must_use] + pub fn with_node_config_file_name( + mut self, + node_config_file_name: ComposeNodeConfigFileName, + ) -> Self { + self.node_config_file_name = node_config_file_name; + self + } + + #[must_use] + pub fn with_configs(mut self, configs: ComposeConfigs) -> Self { + self.configs = configs; + self + } + + #[must_use] + pub fn with_static_configs(mut self) -> Self + where + E: ComposeDeployEnv, + E: StaticArtifactRenderer::Deployment>, + { + self.configs = ComposeConfigs::static_node_configs(); + self + } + + #[must_use] + pub fn with_registration_server_configs(mut self) -> Self + where + E: ComposeDeployEnv, + E: StaticArtifactRenderer::Deployment>, + { + self.configs = ComposeConfigs::registration_server(); + self + } + + #[must_use] + pub fn with_access(mut self, access: ComposeAccess) -> Self { + self.access = access; + self + } + + #[must_use] + pub fn with_cfgsync(mut self, cfgsync: ComposeCfgsync) -> Self { + self.cfgsync = cfgsync; + self + } + + #[must_use] + pub fn with_prepare_workspace(mut self, prepare_workspace: ComposeWorkspacePrep) -> Self { + self.stack = self.stack.with_prepare_workspace(prepare_workspace); + self + } + + #[must_use] + pub fn with_extra_services(mut self, extra_services: ComposeExtraServices) -> Self { + self.stack = self.stack.with_extra_services(extra_services); + self + } +} + +pub enum ComposeStack { + Nodes(ComposeNodes), + Custom { + build_descriptor: ComposeDescriptorBuilder, + prepare_workspace: Option>, + }, +} + +impl ComposeStack { + #[must_use] + pub fn nodes(nodes: ComposeNodes) -> Self { + Self::Nodes(nodes) + } + + #[must_use] + pub fn custom(build_descriptor: ComposeDescriptorBuilder) -> Self { + Self::Custom { + build_descriptor, + prepare_workspace: None, + } + } + + #[must_use] + pub fn with_prepare_workspace(mut self, prepare_workspace: ComposeWorkspacePrep) -> Self { + match &mut self { + Self::Nodes(nodes) => nodes.prepare_workspace = Some(prepare_workspace), + Self::Custom { + prepare_workspace: slot, + .. + } => *slot = Some(prepare_workspace), + } + self + } + + #[must_use] + pub fn with_extra_services(mut self, extra_services: ComposeExtraServices) -> Self { + if let Self::Nodes(nodes) = &mut self { + nodes.extra_services = Some(extra_services); + } + self + } + + fn prepare_workspace( + &self, + path: &Path, + topology: &E::Deployment, + metrics_otlp_ingest_url: Option<&Url>, ) -> Result<(), DynError> { - Ok(()) - } - - /// File name for a static per-node config rendered into the compose stack. - fn static_node_config_file_name(index: usize) -> String { - format!("node-{index}.yaml") - } - - fn loopback_node_runtime_spec( - _topology: &::Deployment, - _index: usize, - ) -> Option { - if let Some(spec) = Self::binary_config_node_spec(_topology, _index) { - return Some(binary_config_node_runtime_spec(_index, &spec)); + match self { + Self::Nodes(nodes) => nodes + .prepare_workspace + .map(|prepare| prepare(path, topology, metrics_otlp_ingest_url)) + .unwrap_or(Ok(())), + Self::Custom { + prepare_workspace, .. + } => prepare_workspace + .map(|prepare| prepare(path, topology, metrics_otlp_ingest_url)) + .unwrap_or(Ok(())), } - None } - fn binary_config_node_spec( - _topology: &::Deployment, - _index: usize, - ) -> Option { - None - } - - /// Produce the compose descriptor for the given topology. - fn compose_descriptor( - topology: &::Deployment, - _cfgsync_port: u16, + fn build_descriptor( + &self, + topology: &E::Deployment, + cfgsync_port: u16, + node_config_file_name: ComposeNodeConfigFileName, ) -> Result { - let mut nodes = Vec::with_capacity(topology.node_count()); - for index in 0..topology.node_count() { - let spec = Self::loopback_node_runtime_spec(topology, index).ok_or_else(|| { - std::io::Error::other("compose_descriptor is not implemented for this app") - })?; - nodes.push(NodeDescriptor::with_loopback_ports( - crate::infrastructure::ports::node_identifier(index), - spec.image, - spec.entrypoint, - spec.volumes, - spec.extra_hosts, - spec.container_ports, - spec.environment, - spec.platform, - )); + match self { + Self::Nodes(nodes) => nodes.build_descriptor(topology, &node_config_file_name), + Self::Custom { + build_descriptor, .. + } => build_descriptor(topology, cfgsync_port), } + } +} + +pub struct ComposeNodes { + runtime: ComposeNodeRuntime, + extra_services: Option>, + prepare_workspace: Option>, +} + +impl ComposeNodes { + #[must_use] + pub fn binary_config(spec: BinaryConfigNodeSpec) -> Self { + Self { + runtime: ComposeNodeRuntime::BinaryConfig(spec), + extra_services: None, + prepare_workspace: None, + } + } + + #[must_use] + pub fn loopback(runtime_spec: ComposeLoopbackSpecBuilder) -> Self { + Self { + runtime: ComposeNodeRuntime::Loopback(runtime_spec), + extra_services: None, + prepare_workspace: None, + } + } + + fn build_descriptor( + &self, + topology: &E::Deployment, + node_config_file_name: &ComposeNodeConfigFileName, + ) -> Result { + let mut nodes = match &self.runtime { + ComposeNodeRuntime::BinaryConfig(spec) => { + build_binary_config_node_descriptors_with_file_name( + topology.node_count(), + spec, + |index| node_config_file_name.resolve(index), + ) + } + ComposeNodeRuntime::Loopback(build_runtime) => (0..topology.node_count()) + .map(|index| { + let spec = build_runtime(topology, index)?; + Ok(NodeDescriptor::with_loopback_ports( + crate::infrastructure::ports::node_identifier(index), + spec.image, + spec.entrypoint, + spec.volumes, + spec.extra_hosts, + spec.container_ports, + spec.environment, + spec.platform, + )) + }) + .collect::, DynError>>()?, + }; + + if let Some(extra_services) = self.extra_services { + nodes.extend(extra_services(topology)?); + } + Ok(ComposeDescriptor::new(nodes)) } +} + +enum ComposeNodeRuntime { + BinaryConfig(BinaryConfigNodeSpec), + Loopback(ComposeLoopbackSpecBuilder), +} + +pub struct ComposeConfigContext<'a, E: Application> { + path: &'a Path, + topology: &'a E::Deployment, + cfgsync_port: u16, + metrics_otlp_ingest_url: Option<&'a Url>, + node_config_file_name: ComposeNodeConfigFileName, +} + +impl<'a, E: Application> ComposeConfigContext<'a, E> { + #[must_use] + pub fn path(&self) -> &'a Path { + self.path + } + + #[must_use] + pub fn topology(&self) -> &'a E::Deployment { + self.topology + } + + #[must_use] + pub fn cfgsync_port(&self) -> u16 { + self.cfgsync_port + } + + #[must_use] + pub fn metrics_otlp_ingest_url(&self) -> Option<&'a Url> { + self.metrics_otlp_ingest_url + } + + pub fn node_config_path(&self, index: usize) -> Result { + Ok(stack_configs_dir(self.path)?.join(self.node_config_file_name.resolve(index))) + } +} + +pub struct ComposeConfigs { + writer: Option>, +} + +impl ComposeConfigs { + #[must_use] + pub fn disabled() -> Self { + Self { writer: None } + } + + #[must_use] + pub fn custom(writer: ComposeConfigWriter) -> Self { + Self { + writer: Some(writer), + } + } + + #[must_use] + pub fn static_node_configs() -> Self + where + E: ComposeDeployEnv, + E: StaticArtifactRenderer::Deployment>, + { + Self { + writer: Some(write_static_compose_configs::), + } + } + + #[must_use] + pub fn registration_server() -> Self + where + E: ComposeDeployEnv, + E: StaticArtifactRenderer::Deployment>, + { + Self { + writer: Some(write_registration_server_compose_configs::), + } + } + + fn write( + &self, + path: &Path, + topology: &E::Deployment, + cfgsync_port: u16, + metrics_otlp_ingest_url: Option<&Url>, + node_config_file_name: ComposeNodeConfigFileName, + ) -> Result<(), DynError> { + if let Some(writer) = self.writer { + writer(ComposeConfigContext { + path, + topology, + cfgsync_port, + metrics_otlp_ingest_url, + node_config_file_name, + })?; + } + Ok(()) + } +} + +pub struct ComposeAccess { + container_ports: Option>, + node_client_from_ports: Option>, + readiness_probe: ComposeReadinessProbe, + runner_host: ComposeRunnerHost, +} + +impl Default for ComposeAccess { + fn default() -> Self { + Self { + container_ports: None, + node_client_from_ports: None, + readiness_probe: ComposeReadinessProbe::Http { + path: E::node_readiness_path(), + }, + runner_host: compose_runner_host, + } + } +} + +impl ComposeAccess { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn with_container_ports( + mut self, + container_ports: ComposeContainerPortsResolver, + ) -> Self { + self.container_ports = Some(container_ports); + self + } + + #[must_use] + pub fn with_node_client(mut self, node_client_from_ports: ComposeNodeClientBuilder) -> Self { + self.node_client_from_ports = Some(node_client_from_ports); + self + } + + #[must_use] + pub fn with_readiness_probe(mut self, readiness_probe: ComposeReadinessProbe) -> Self { + self.readiness_probe = readiness_probe; + self + } + + #[must_use] + pub fn with_runner_host(mut self, runner_host: ComposeRunnerHost) -> Self { + self.runner_host = runner_host; + self + } - /// Container ports (API/testing) per node, used for docker-compose port - /// discovery. fn node_container_ports( - topology: &::Deployment, + &self, + topology: &E::Deployment, + descriptor: &ComposeDescriptor, ) -> Result, DynError> { - let descriptor = Self::compose_descriptor(topology, 0)?; + if let Some(container_ports) = self.container_ports { + return container_ports(topology, descriptor); + } + Ok(descriptor .nodes() .iter() @@ -115,126 +507,268 @@ pub trait ComposeDeployEnv: Application { .collect()) } - /// Hostnames used when rewriting node configs for cfgsync delivery. - fn cfgsync_hostnames(topology: &::Deployment) -> Vec { - (0..topology.node_count()) - .map(crate::infrastructure::ports::node_identifier) - .collect() - } - - /// App-specific cfgsync artifact enrichment. - fn enrich_cfgsync_artifacts( - _topology: &::Deployment, - _artifacts: &mut MaterializedArtifacts, - ) -> Result<(), DynError> { - Ok(()) - } - - /// Render and write cfgsync runtime files for the current topology. - fn write_cfgsync_config( - path: &Path, - topology: &::Deployment, - port: u16, - metrics_otlp_ingest_url: Option<&Url>, - ) -> Result<(), DynError> - where - Self: Sized + StaticArtifactRenderer::Deployment>, - { - write_static_compose_configs::(path, topology, metrics_otlp_ingest_url)?; - write_dummy_cfgsync_config(path, port)?; - Ok(()) - } - - /// Build the config server container specification. - fn cfgsync_container_spec( - _cfgsync_path: &Path, - _port: u16, - _network: &str, - ) -> Result { - Err(std::io::Error::other("cfgsync_container_spec is not implemented for this app").into()) - } - - /// Timeout used when launching the config server container. - fn cfgsync_start_timeout() -> Duration { - Duration::from_secs(180) - } - - fn cfgsync_server_mode() -> ComposeConfigServerMode { - ComposeConfigServerMode::Disabled - } - - /// Build node clients from discovered host ports. fn node_client_from_ports( + &self, ports: &NodeHostPorts, host: &str, - ) -> Result { - ::build_node_client(&discovered_node_access(host, ports)) + ) -> Result { + if let Some(node_client_from_ports) = self.node_client_from_ports { + return node_client_from_ports(ports, host); + } + + ::build_node_client(&discovered_node_access(host, ports)) } - /// Build node clients from discovered host ports. fn build_node_clients( - _topology: &::Deployment, + &self, + _topology: &E::Deployment, host_ports: &HostPortMapping, host: &str, - ) -> Result, DynError> - where - Self: Sized, - { + ) -> Result, DynError> { let clients = host_ports .nodes .iter() - .map(|ports| Self::node_client_from_ports(ports, host)) + .map(|ports| self.node_client_from_ports(ports, host)) .collect::>()?; Ok(NodeClients::new(clients)) } - /// Path used by default readiness checks. - fn node_readiness_path() -> &'static str { - ::node_readiness_path() + fn runner_host(&self) -> String { + (self.runner_host)() } - fn node_readiness_probe() -> ComposeReadinessProbe { - ComposeReadinessProbe::Http { - path: ::node_readiness_path(), + fn readiness_probe(&self) -> ComposeReadinessProbe { + self.readiness_probe + } +} + +pub struct ComposeCfgsync { + server_mode: ComposeConfigServerMode, + hostnames: ComposeCfgsyncHostnames, + enrich_artifacts: Option>, + container_spec: Option, + start_timeout: Duration, +} + +impl Default for ComposeCfgsync { + fn default() -> Self { + Self { + server_mode: ComposeConfigServerMode::Disabled, + hostnames: default_cfgsync_hostnames::, + enrich_artifacts: None, + container_spec: None, + start_timeout: Duration::from_secs(180), } } +} - /// Host used by default remote readiness checks. - fn compose_runner_host() -> String { - compose_runner_host() +impl ComposeCfgsync { + #[must_use] + pub fn new() -> Self { + Self::default() } - /// Remote readiness probe for node APIs. - async fn wait_remote_readiness( - _topology: &::Deployment, - mapping: &HostPortMapping, - requirement: HttpReadinessRequirement, + #[must_use] + pub fn with_server_mode(mut self, server_mode: ComposeConfigServerMode) -> Self { + self.server_mode = server_mode; + self + } + + #[must_use] + pub fn with_hostnames(mut self, hostnames: ComposeCfgsyncHostnames) -> Self { + self.hostnames = hostnames; + self + } + + #[must_use] + pub fn with_enrich_artifacts(mut self, enrich_artifacts: ComposeCfgsyncEnricher) -> Self { + self.enrich_artifacts = Some(enrich_artifacts); + self + } + + #[must_use] + pub fn with_container_spec(mut self, container_spec: ComposeConfigServerSpecBuilder) -> Self { + self.container_spec = Some(container_spec); + self + } + + #[must_use] + pub fn with_start_timeout(mut self, start_timeout: Duration) -> Self { + self.start_timeout = start_timeout; + self + } + + fn server_mode(&self) -> ComposeConfigServerMode { + self.server_mode + } + + fn hostnames(&self, topology: &E::Deployment) -> Vec { + (self.hostnames)(topology) + } + + fn enrich_artifacts( + &self, + topology: &E::Deployment, + artifacts: &mut MaterializedArtifacts, ) -> Result<(), DynError> { - match ::node_readiness_probe() { + self.enrich_artifacts + .map(|enrich| enrich(topology, artifacts)) + .unwrap_or(Ok(())) + } + + fn container_spec( + &self, + cfgsync_path: &Path, + port: u16, + network: &str, + ) -> Result { + let container_spec = self.container_spec.ok_or_else(|| { + DynError::from(std::io::Error::other( + "cfgsync container spec is not configured", + )) + })?; + container_spec(cfgsync_path, port, network) + } + + fn start_timeout(&self) -> Duration { + self.start_timeout + } +} + +pub(crate) fn runtime_for() -> ComposeRuntime { + E::compose_runtime() +} + +pub(crate) fn prepare_compose_configs( + path: &Path, + topology: &E::Deployment, + cfgsync_port: u16, + metrics_otlp_ingest_url: Option<&Url>, +) -> Result<(), DynError> { + let runtime = runtime_for::(); + runtime + .stack + .prepare_workspace(path, topology, metrics_otlp_ingest_url)?; + runtime.configs.write( + path, + topology, + cfgsync_port, + metrics_otlp_ingest_url, + runtime.node_config_file_name, + )?; + Ok(()) +} + +pub(crate) fn compose_descriptor( + topology: &E::Deployment, + cfgsync_port: u16, +) -> Result { + let runtime = runtime_for::(); + runtime + .stack + .build_descriptor(topology, cfgsync_port, runtime.node_config_file_name) +} + +pub(crate) fn node_container_ports( + topology: &E::Deployment, +) -> Result, DynError> { + let runtime = runtime_for::(); + let descriptor = runtime + .stack + .build_descriptor(topology, 0, runtime.node_config_file_name)?; + runtime.access.node_container_ports(topology, &descriptor) +} + +pub(crate) fn cfgsync_hostnames(topology: &E::Deployment) -> Vec { + runtime_for::().cfgsync.hostnames(topology) +} + +pub(crate) fn enrich_cfgsync_artifacts( + topology: &E::Deployment, + artifacts: &mut MaterializedArtifacts, +) -> Result<(), DynError> { + runtime_for::() + .cfgsync + .enrich_artifacts(topology, artifacts) +} + +pub(crate) fn cfgsync_container_spec( + cfgsync_path: &Path, + port: u16, + network: &str, +) -> Result { + runtime_for::() + .cfgsync + .container_spec(cfgsync_path, port, network) +} + +pub(crate) fn cfgsync_start_timeout() -> Duration { + runtime_for::().cfgsync.start_timeout() +} + +pub(crate) fn cfgsync_server_mode() -> ComposeConfigServerMode { + runtime_for::().cfgsync.server_mode() +} + +pub(crate) fn readiness_http_path() -> &'static str { + match runtime_for::().access.readiness_probe() { + ComposeReadinessProbe::Http { path } => path, + ComposeReadinessProbe::Tcp => E::node_readiness_path(), + } +} + +pub(crate) fn build_node_clients( + topology: &E::Deployment, + host_ports: &HostPortMapping, + host: &str, +) -> Result, DynError> { + runtime_for::() + .access + .build_node_clients(topology, host_ports, host) +} + +pub(crate) fn wait_remote_readiness( + _topology: &E::Deployment, + mapping: &HostPortMapping, + requirement: HttpReadinessRequirement, +) -> Result>, DynError> { + let runtime = runtime_for::(); + let host = runtime.access.runner_host(); + let probe = runtime.access.readiness_probe(); + Ok(async move { + match probe { ComposeReadinessProbe::Http { path } => { - let host = Self::compose_runner_host(); let urls = readiness_urls(&host, mapping, path)?; wait_http_readiness(&urls, requirement).await?; Ok(()) } ComposeReadinessProbe::Tcp => wait_for_tcp_readiness(&mapping.nodes, requirement).await, } - } + }) +} - /// Wait for HTTP readiness on node ports. - async fn wait_for_nodes( - ports: &[u16], - host: &str, - requirement: HttpReadinessRequirement, - ) -> Result<(), DynError> { - match ::node_readiness_probe() { +pub(crate) fn wait_for_nodes( + ports: &[u16], + host: &str, + requirement: HttpReadinessRequirement, +) -> Result>, DynError> { + let probe = runtime_for::().access.readiness_probe(); + let host = host.to_owned(); + let node_ports = ports.to_vec(); + Ok(async move { + match probe { ComposeReadinessProbe::Http { path } => { - wait_for_http_ports_with_host_and_requirement(ports, host, path, requirement) - .await?; + wait_for_http_ports_with_host_and_requirement( + &node_ports, + &host, + path, + requirement, + ) + .await?; Ok(()) } ComposeReadinessProbe::Tcp => { - let ports = ports + let ports = node_ports .iter() .copied() .map(|port| NodeHostPorts { @@ -245,61 +779,66 @@ pub trait ComposeDeployEnv: Application { wait_for_tcp_readiness(&ports, requirement).await } } + }) +} + +fn write_static_compose_configs(context: ComposeConfigContext<'_, E>) -> Result<(), DynError> +where + E: ComposeDeployEnv + StaticArtifactRenderer::Deployment>, +{ + let hostnames = cfgsync_hostnames::(context.topology()); + let configs_dir = stack_configs_dir(context.path())?; + fs::create_dir_all(&configs_dir)?; + + for index in 0..context.topology().node_count() { + let mut config = E::build_node_config(context.topology(), index)?; + E::rewrite_for_hostnames(context.topology(), index, &hostnames, &mut config)?; + let rendered = E::serialize_node_config(&config)?; + fs::write(context.node_config_path(index)?, rendered)?; } + + Ok(()) } -pub trait ComposeCfgsyncEnv: - ComposeDeployEnv + StaticArtifactRenderer::Deployment> -{ -} - -impl ComposeCfgsyncEnv for T where - T: ComposeDeployEnv + StaticArtifactRenderer::Deployment> -{ -} - -fn write_static_compose_configs( - path: &Path, - topology: &::Deployment, - metrics_otlp_ingest_url: Option<&Url>, +fn write_registration_server_compose_configs( + context: ComposeConfigContext<'_, E>, ) -> Result<(), DynError> where E: ComposeDeployEnv + StaticArtifactRenderer::Deployment>, { - E::prepare_compose_configs(path, topology, metrics_otlp_ingest_url)?; + let stack_dir = context + .path() + .parent() + .ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?; + let artifacts_path = stack_dir.join("cfgsync.artifacts.yaml"); + let hostnames = cfgsync_hostnames::(context.topology()); - let hostnames = E::cfgsync_hostnames(topology); - let configs_dir = stack_configs_dir(path)?; - fs::create_dir_all(&configs_dir)?; - - for index in 0..topology.node_count() { - let mut config = E::build_node_config(topology, index)?; - E::rewrite_for_hostnames(topology, index, &hostnames, &mut config)?; - let rendered = E::serialize_node_config(&config)?; - let output_path = configs_dir.join(E::static_node_config_file_name(index)); - fs::write(&output_path, rendered)?; - } + render_and_write_registration_server::( + context.topology(), + &hostnames, + RegistrationServerRenderOptions { + port: Some(context.cfgsync_port()), + artifacts_path: Some("cfgsync.artifacts.yaml".to_owned()), + }, + CfgsyncOutputPaths { + config_path: context.path(), + artifacts_path: &artifacts_path, + }, + |artifacts| { + enrich_cfgsync_artifacts::(context.topology(), artifacts).map_err(Into::into) + }, + )?; Ok(()) } -fn stack_configs_dir(cfgsync_path: &Path) -> Result { +fn stack_configs_dir(cfgsync_path: &Path) -> Result { let stack_dir = cfgsync_path .parent() .ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?; Ok(stack_dir.join("configs")) } -fn write_dummy_cfgsync_config(path: &Path, port: u16) -> Result<(), DynError> { - fs::write( - path, - format!( - "port: {port}\nsource:\n kind: static\n artifacts_path: cfgsync.artifacts.yaml\n" - ), - )?; - Ok(()) -} - fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option { let mut ports = node.container_ports().iter().copied(); let api = ports.next()?; @@ -312,12 +851,6 @@ fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option Result(topology: &E::Deployment) -> Vec { + (0..topology.node_count()) + .map(crate::infrastructure::ports::node_identifier) + .collect() +} + +fn make_extension_node_config_file_name(extension: &str) -> ComposeNodeConfigFileName { + match extension { + "yaml" => ComposeNodeConfigFileName::FixedExtension("yaml"), + "yml" => ComposeNodeConfigFileName::FixedExtension("yml"), + "conf" => ComposeNodeConfigFileName::FixedExtension("conf"), + "nats" => ComposeNodeConfigFileName::FixedExtension("nats"), + other => { + let leaked: &'static str = Box::leak(other.to_owned().into_boxed_str()); + ComposeNodeConfigFileName::FixedExtension(leaked) + } + } +} diff --git a/testing-framework/deployers/compose/src/infrastructure/environment.rs b/testing-framework/deployers/compose/src/infrastructure/environment.rs index c796fa6..e7e3973 100644 --- a/testing-framework/deployers/compose/src/infrastructure/environment.rs +++ b/testing-framework/deployers/compose/src/infrastructure/environment.rs @@ -9,7 +9,6 @@ use anyhow::anyhow; use reqwest::Url; use testing_framework_core::{ adjust_timeout, - cfgsync::StaticArtifactRenderer, scenario::{Application, internal::CleanupGuard}, topology::DeploymentDescriptor, }; @@ -25,7 +24,10 @@ use crate::{ ensure_image_present, workspace::ComposeWorkspace, }, - env::{ComposeCfgsyncEnv, ComposeConfigServerMode, ComposeDeployEnv, ConfigServerHandle}, + env::{ + ComposeConfigServerMode, ComposeDeployEnv, ConfigServerHandle, cfgsync_container_spec, + cfgsync_server_mode, cfgsync_start_timeout, compose_descriptor, prepare_compose_configs, + }, errors::{ComposeRunnerError, ConfigError, WorkspaceError}, infrastructure::template::write_compose_file, lifecycle::cleanup::RunnerCleanup, @@ -197,7 +199,7 @@ pub fn update_cfgsync_logged( metrics_otlp_ingest_url: Option<&Url>, ) -> Result<(), ComposeRunnerError> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { info!(cfgsync_port, "updating cfgsync configuration"); @@ -217,24 +219,26 @@ pub async fn start_cfgsync_stage( cfgsync_port: u16, project_name: &str, ) -> Result>, ComposeRunnerError> { - if matches!(E::cfgsync_server_mode(), ComposeConfigServerMode::Disabled) { + if matches!( + cfgsync_server_mode::(), + ComposeConfigServerMode::Disabled + ) { return Ok(None); } info!(cfgsync_port = cfgsync_port, "launching cfgsync server"); let network = compose_network_name(project_name); - let spec = E::cfgsync_container_spec(&workspace.cfgsync_path, cfgsync_port, &network).map_err( - |source| { + let spec = cfgsync_container_spec::(&workspace.cfgsync_path, cfgsync_port, &network) + .map_err(|source| { ComposeRunnerError::Config(ConfigError::CfgsyncStart { port: cfgsync_port, source, }) - }, - )?; + })?; let handle = start_docker_config_server( &spec, - adjust_timeout(E::cfgsync_start_timeout()), + adjust_timeout(cfgsync_start_timeout::()), "docker run cfgsync server", ) .await @@ -259,9 +263,9 @@ pub fn configure_cfgsync( metrics_otlp_ingest_url: Option<&Url>, ) -> Result<(), ConfigError> where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { - E::write_cfgsync_config( + prepare_compose_configs::( &workspace.cfgsync_path, descriptors, cfgsync_port, @@ -303,7 +307,7 @@ pub fn write_compose_artifacts( workspace_root = %workspace.root.display(), "building compose descriptor" ); - let descriptor = E::compose_descriptor(descriptors, cfgsync_port) + let descriptor = compose_descriptor::(descriptors, cfgsync_port) .map_err(|source| ConfigError::Descriptor { source })?; let compose_path = workspace.root.join("compose.generated.yml"); @@ -360,7 +364,7 @@ pub async fn prepare_environment( metrics_otlp_ingest_url: Option<&Url>, ) -> Result where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { let prepared = prepare_stack_artifacts::(descriptors, metrics_otlp_ingest_url).await?; let mut cfgsync_handle = start_cfgsync_for_prepared::(&prepared).await?; @@ -376,7 +380,7 @@ pub async fn prepare_environment_manual( metrics_otlp_ingest_url: Option<&Url>, ) -> Result where - E: ComposeCfgsyncEnv, + E: ComposeDeployEnv, { let prepared = prepare_stack_artifacts::(descriptors, metrics_otlp_ingest_url).await?; let cfgsync_handle = start_cfgsync_for_prepared::(&prepared).await?; @@ -391,7 +395,7 @@ async fn prepare_stack_artifacts( metrics_otlp_ingest_url: Option<&Url>, ) -> Result where - E: ComposeDeployEnv + StaticArtifactRenderer::Deployment>, + E: ComposeDeployEnv, { let workspace = prepare_workspace_logged()?; let cfgsync_port = allocate_cfgsync_port()?; @@ -419,15 +423,15 @@ async fn ensure_compose_images_present( descriptors: &E::Deployment, cfgsync_port: u16, ) -> Result<(), ComposeRunnerError> { - let descriptor = E::compose_descriptor(descriptors, 0) + let descriptor = compose_descriptor::(descriptors, 0) .map_err(|source| ComposeRunnerError::Config(ConfigError::Descriptor { source }))?; let mut images = descriptor .nodes() .iter() .map(|node| (node.image().to_owned(), node.platform().map(str::to_owned))) .collect::>(); - if matches!(E::cfgsync_server_mode(), ComposeConfigServerMode::Docker) { - let cfgsync_spec = E::cfgsync_container_spec( + if matches!(cfgsync_server_mode::(), ComposeConfigServerMode::Docker) { + let cfgsync_spec = cfgsync_container_spec::( &workspace.cfgsync_path, cfgsync_port, &compose_network_name("compose-image-check"), diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index 3c22921..098ce12 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -10,7 +10,7 @@ pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata}; pub use descriptor::{ BinaryConfigNodeSpec, ComposeDescriptor, EnvEntry, LoopbackNodeRuntimeSpec, NodeDescriptor, binary_config_node_runtime_spec, build_binary_config_node_descriptors, - build_loopback_node_descriptors, + build_binary_config_node_descriptors_with_file_name, build_loopback_node_descriptors, }; pub use docker::{ commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs}, @@ -21,8 +21,9 @@ pub use docker::{ platform::host_gateway_entry, }; pub use env::{ - ComposeConfigServerMode, ComposeDeployEnv, ComposeReadinessProbe, ConfigServerHandle, - discovered_node_access, + ComposeAccess, ComposeCfgsync, ComposeConfigContext, ComposeConfigServerMode, ComposeConfigs, + ComposeDeployEnv, ComposeNodeConfigFileName, ComposeNodes, ComposeReadinessProbe, + ComposeRuntime, ComposeStack, ConfigServerHandle, discovered_node_access, }; pub use errors::ComposeRunnerError; pub use infrastructure::{ diff --git a/testing-framework/deployers/compose/src/lifecycle/readiness.rs b/testing-framework/deployers/compose/src/lifecycle/readiness.rs index a3ab08b..92a7fe1 100644 --- a/testing-framework/deployers/compose/src/lifecycle/readiness.rs +++ b/testing-framework/deployers/compose/src/lifecycle/readiness.rs @@ -4,7 +4,7 @@ use testing_framework_core::scenario::{HttpReadinessRequirement, NodeClients}; use tokio::time::sleep; use crate::{ - env::ComposeDeployEnv, + env::{ComposeDeployEnv, build_node_clients, wait_for_nodes}, errors::{NodeClientError, StackReadinessError}, infrastructure::ports::{HostPortMapping, compose_runner_host}, }; @@ -21,7 +21,8 @@ pub async fn ensure_nodes_ready_with_ports( } let host = compose_runner_host(); - E::wait_for_nodes(ports, &host, requirement) + wait_for_nodes::(ports, &host, requirement) + .map_err(|source| StackReadinessError::Remote { source })? .await .map_err(|source| StackReadinessError::Remote { source }) } @@ -41,6 +42,6 @@ pub fn build_node_clients_with_ports( mapping: &HostPortMapping, host: &str, ) -> Result, NodeClientError> { - E::build_node_clients(descriptors, mapping, host) + build_node_clients::(descriptors, mapping, host) .map_err(|source| NodeClientError::Build { source }) }