From c172714a2fb7de154bce2ecc54b8f11b4de45e26 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 11 Apr 2026 06:42:28 +0200 Subject: [PATCH] refactor(deployers): split simple app traits from advanced hooks --- .../deployers/compose/src/env.rs | 978 ++++++------------ .../deployers/compose/src/lib.rs | 6 +- testing-framework/deployers/k8s/src/env.rs | 555 ++++------ testing-framework/deployers/k8s/src/lib.rs | 4 +- testing-framework/deployers/k8s/src/manual.rs | 46 +- .../deployers/local/src/env/mod.rs | 424 ++++++-- .../deployers/local/src/env/runtime.rs | 438 -------- .../deployers/local/src/env/tests.rs | 61 +- testing-framework/deployers/local/src/lib.rs | 11 +- 9 files changed, 975 insertions(+), 1548 deletions(-) delete mode 100644 testing-framework/deployers/local/src/env/runtime.rs diff --git a/testing-framework/deployers/compose/src/env.rs b/testing-framework/deployers/compose/src/env.rs index ba4c254..81af8cd 100644 --- a/testing-framework/deployers/compose/src/env.rs +++ b/testing-framework/deployers/compose/src/env.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use async_trait::async_trait; use reqwest::Url; use testing_framework_core::{ cfgsync::{ @@ -24,7 +25,7 @@ use tokio::{ use crate::{ descriptor::{ BinaryConfigNodeSpec, ComposeDescriptor, LoopbackNodeRuntimeSpec, NodeDescriptor, - build_binary_config_node_descriptors_with_file_name, + binary_config_node_runtime_spec, build_loopback_node_descriptors, }, docker::config_server::DockerConfigServerSpec, infrastructure::ports::{ @@ -53,28 +54,6 @@ pub enum ComposeReadinessProbe { Tcp, } -pub type ComposeWorkspacePrep = - fn(&Path, &::Deployment, Option<&Url>) -> Result<(), DynError>; -pub type ComposeLoopbackSpecBuilder = - fn(&::Deployment, usize) -> Result; -pub type ComposeExtraServices = - fn(&::Deployment) -> Result, DynError>; -pub type ComposeDescriptorBuilder = - fn(&::Deployment, u16) -> Result; -pub type ComposeNodeClientBuilder = - fn(&NodeHostPorts, &str) -> Result<::NodeClient, DynError>; -pub type ComposeContainerPortsResolver = fn( - &::Deployment, - &ComposeDescriptor, -) -> Result, DynError>; -pub type ComposeCfgsyncHostnames = fn(&::Deployment) -> Vec; -pub type ComposeCfgsyncEnricher = - fn(&::Deployment, &mut MaterializedArtifacts) -> Result<(), DynError>; -pub type ComposeConfigWriter = for<'a> fn(ComposeConfigContext<'a, E>) -> Result<(), DynError>; -pub type ComposeConfigServerSpecBuilder = - fn(&Path, u16, &str) -> Result; -pub type ComposeRunnerHost = fn() -> String; - #[derive(Clone, Copy)] pub enum ComposeNodeConfigFileName { FixedExtension(&'static str), @@ -91,413 +70,56 @@ impl ComposeNodeConfigFileName { } } +/// Advanced compose deployer integration. +#[async_trait] pub trait ComposeDeployEnv: Application + Sized { - fn compose_runtime() -> ComposeRuntime; -} - -pub struct ComposeRuntime { - stack: ComposeStack, - configs: ComposeConfigs, - access: ComposeAccess, - cfgsync: ComposeCfgsync, - node_config_file_name: ComposeNodeConfigFileName, -} - -impl ComposeRuntime { - #[must_use] - pub fn new(stack: ComposeStack) -> Self { - Self { - stack, - configs: ComposeConfigs::disabled(), - access: ComposeAccess::default(), - cfgsync: ComposeCfgsync::default(), - node_config_file_name: ComposeNodeConfigFileName::FixedExtension("yaml"), - } - } - - #[must_use] - pub fn binary_config(spec: BinaryConfigNodeSpec) -> Self { - let node_config_file_name = - make_extension_node_config_file_name(&spec.config_file_extension); - Self { - stack: ComposeStack::nodes(ComposeNodes::binary_config(spec)), - configs: ComposeConfigs::disabled(), - access: ComposeAccess::default(), - cfgsync: ComposeCfgsync::default(), - node_config_file_name, - } - } - - #[must_use] - pub fn loopback(runtime_spec: ComposeLoopbackSpecBuilder) -> Self { - Self::new(ComposeStack::nodes(ComposeNodes::loopback(runtime_spec))) - } - - #[must_use] - pub fn custom_descriptor(build_descriptor: ComposeDescriptorBuilder) -> Self { - Self::new(ComposeStack::custom(build_descriptor)) - } - - #[must_use] - pub fn with_node_config_file_name( - mut self, - node_config_file_name: ComposeNodeConfigFileName, - ) -> Self { - self.node_config_file_name = node_config_file_name; - self - } - - #[must_use] - pub fn with_configs(mut self, configs: ComposeConfigs) -> Self { - self.configs = configs; - self - } - - #[must_use] - pub fn with_static_configs(mut self) -> Self - where - E: ComposeDeployEnv, - E: StaticArtifactRenderer::Deployment>, - { - self.configs = ComposeConfigs::static_node_configs(); - self - } - - #[must_use] - pub fn with_registration_server_configs(mut self) -> Self - where - E: ComposeDeployEnv, - E: StaticArtifactRenderer::Deployment>, - { - self.configs = ComposeConfigs::registration_server(); - self - } - - #[must_use] - pub fn with_access(mut self, access: ComposeAccess) -> Self { - self.access = access; - self - } - - #[must_use] - pub fn with_cfgsync(mut self, cfgsync: ComposeCfgsync) -> Self { - self.cfgsync = cfgsync; - self - } - - #[must_use] - pub fn with_prepare_workspace(mut self, prepare_workspace: ComposeWorkspacePrep) -> Self { - self.stack = self.stack.with_prepare_workspace(prepare_workspace); - self - } - - #[must_use] - pub fn with_extra_services(mut self, extra_services: ComposeExtraServices) -> Self { - self.stack = self.stack.with_extra_services(extra_services); - self - } -} - -pub enum ComposeStack { - Nodes(ComposeNodes), - Custom { - build_descriptor: ComposeDescriptorBuilder, - prepare_workspace: Option>, - }, -} - -impl ComposeStack { - #[must_use] - pub fn nodes(nodes: ComposeNodes) -> Self { - Self::Nodes(nodes) - } - - #[must_use] - pub fn custom(build_descriptor: ComposeDescriptorBuilder) -> Self { - Self::Custom { - build_descriptor, - prepare_workspace: None, - } - } - - #[must_use] - pub fn with_prepare_workspace(mut self, prepare_workspace: ComposeWorkspacePrep) -> Self { - match &mut self { - Self::Nodes(nodes) => nodes.prepare_workspace = Some(prepare_workspace), - Self::Custom { - prepare_workspace: slot, - .. - } => *slot = Some(prepare_workspace), - } - self - } - - #[must_use] - pub fn with_extra_services(mut self, extra_services: ComposeExtraServices) -> Self { - if let Self::Nodes(nodes) = &mut self { - nodes.extra_services = Some(extra_services); - } - self - } - - fn prepare_workspace( - &self, - path: &Path, - topology: &E::Deployment, - metrics_otlp_ingest_url: Option<&Url>, + fn prepare_compose_configs( + _path: &Path, + _topology: &::Deployment, + _cfgsync_port: u16, + _metrics_otlp_ingest_url: Option<&Url>, ) -> Result<(), DynError> { - match self { - Self::Nodes(nodes) => nodes - .prepare_workspace - .map(|prepare| prepare(path, topology, metrics_otlp_ingest_url)) - .unwrap_or(Ok(())), - Self::Custom { - prepare_workspace, .. - } => prepare_workspace - .map(|prepare| prepare(path, topology, metrics_otlp_ingest_url)) - .unwrap_or(Ok(())), - } - } - - fn build_descriptor( - &self, - topology: &E::Deployment, - cfgsync_port: u16, - node_config_file_name: ComposeNodeConfigFileName, - ) -> Result { - match self { - Self::Nodes(nodes) => nodes.build_descriptor(topology, &node_config_file_name), - Self::Custom { - build_descriptor, .. - } => build_descriptor(topology, cfgsync_port), - } - } -} - -pub struct ComposeNodes { - runtime: ComposeNodeRuntime, - extra_services: Option>, - prepare_workspace: Option>, -} - -impl ComposeNodes { - #[must_use] - pub fn binary_config(spec: BinaryConfigNodeSpec) -> Self { - Self { - runtime: ComposeNodeRuntime::BinaryConfig(spec), - extra_services: None, - prepare_workspace: None, - } - } - - #[must_use] - pub fn loopback(runtime_spec: ComposeLoopbackSpecBuilder) -> Self { - Self { - runtime: ComposeNodeRuntime::Loopback(runtime_spec), - extra_services: None, - prepare_workspace: None, - } - } - - fn build_descriptor( - &self, - topology: &E::Deployment, - node_config_file_name: &ComposeNodeConfigFileName, - ) -> Result { - let mut nodes = match &self.runtime { - ComposeNodeRuntime::BinaryConfig(spec) => { - build_binary_config_node_descriptors_with_file_name( - topology.node_count(), - spec, - |index| node_config_file_name.resolve(index), - ) - } - ComposeNodeRuntime::Loopback(build_runtime) => (0..topology.node_count()) - .map(|index| { - let spec = build_runtime(topology, index)?; - Ok(NodeDescriptor::with_loopback_ports( - crate::infrastructure::ports::node_identifier(index), - spec.image, - spec.entrypoint, - spec.volumes, - spec.extra_hosts, - spec.container_ports, - spec.environment, - spec.platform, - )) - }) - .collect::, DynError>>()?, - }; - - if let Some(extra_services) = self.extra_services { - nodes.extend(extra_services(topology)?); - } - - Ok(ComposeDescriptor::new(nodes)) - } -} - -enum ComposeNodeRuntime { - BinaryConfig(BinaryConfigNodeSpec), - Loopback(ComposeLoopbackSpecBuilder), -} - -pub struct ComposeConfigContext<'a, E: Application> { - path: &'a Path, - topology: &'a E::Deployment, - cfgsync_port: u16, - metrics_otlp_ingest_url: Option<&'a Url>, - node_config_file_name: ComposeNodeConfigFileName, -} - -impl<'a, E: Application> ComposeConfigContext<'a, E> { - #[must_use] - pub fn path(&self) -> &'a Path { - self.path - } - - #[must_use] - pub fn topology(&self) -> &'a E::Deployment { - self.topology - } - - #[must_use] - pub fn cfgsync_port(&self) -> u16 { - self.cfgsync_port - } - - #[must_use] - pub fn metrics_otlp_ingest_url(&self) -> Option<&'a Url> { - self.metrics_otlp_ingest_url - } - - pub fn node_config_path(&self, index: usize) -> Result { - Ok(stack_configs_dir(self.path)?.join(self.node_config_file_name.resolve(index))) - } -} - -pub struct ComposeConfigs { - writer: Option>, -} - -impl ComposeConfigs { - #[must_use] - pub fn disabled() -> Self { - Self { writer: None } - } - - #[must_use] - pub fn custom(writer: ComposeConfigWriter) -> Self { - Self { - writer: Some(writer), - } - } - - #[must_use] - pub fn static_node_configs() -> Self - where - E: ComposeDeployEnv, - E: StaticArtifactRenderer::Deployment>, - { - Self { - writer: Some(write_static_compose_configs::), - } - } - - #[must_use] - pub fn registration_server() -> Self - where - E: ComposeDeployEnv, - E: StaticArtifactRenderer::Deployment>, - { - Self { - writer: Some(write_registration_server_compose_configs::), - } - } - - fn write( - &self, - path: &Path, - topology: &E::Deployment, - cfgsync_port: u16, - metrics_otlp_ingest_url: Option<&Url>, - node_config_file_name: ComposeNodeConfigFileName, - ) -> Result<(), DynError> { - if let Some(writer) = self.writer { - writer(ComposeConfigContext { - path, - topology, - cfgsync_port, - metrics_otlp_ingest_url, - node_config_file_name, - })?; - } Ok(()) } -} -pub struct ComposeAccess { - container_ports: Option>, - node_client_from_ports: Option>, - readiness_probe: ComposeReadinessProbe, - runner_host: ComposeRunnerHost, -} + fn static_node_config_file_name(index: usize) -> String { + format!("node-{index}.yaml") + } -impl Default for ComposeAccess { - fn default() -> Self { - Self { - container_ports: None, - node_client_from_ports: None, - readiness_probe: ComposeReadinessProbe::Http { - path: E::node_readiness_path(), - }, - runner_host: compose_runner_host, + fn loopback_node_runtime_spec( + topology: &::Deployment, + index: usize, + ) -> Result, DynError> { + if let Some(spec) = Self::binary_config_node_spec(topology, index)? { + return Ok(Some(binary_config_node_runtime_spec(index, &spec))); } - } -} - -impl ComposeAccess { - #[must_use] - pub fn new() -> Self { - Self::default() + Ok(None) } - #[must_use] - pub fn with_container_ports( - mut self, - container_ports: ComposeContainerPortsResolver, - ) -> Self { - self.container_ports = Some(container_ports); - self + fn binary_config_node_spec( + _topology: &::Deployment, + _index: usize, + ) -> Result, DynError> { + Ok(None) } - #[must_use] - pub fn with_node_client(mut self, node_client_from_ports: ComposeNodeClientBuilder) -> Self { - self.node_client_from_ports = Some(node_client_from_ports); - self - } - - #[must_use] - pub fn with_readiness_probe(mut self, readiness_probe: ComposeReadinessProbe) -> Self { - self.readiness_probe = readiness_probe; - self - } - - #[must_use] - pub fn with_runner_host(mut self, runner_host: ComposeRunnerHost) -> Self { - self.runner_host = runner_host; - self + fn compose_descriptor( + topology: &::Deployment, + _cfgsync_port: u16, + ) -> Result { + let nodes = build_loopback_node_descriptors(topology.node_count(), |index| { + Self::loopback_node_runtime_spec(topology, index) + .ok() + .flatten() + .unwrap_or_else(|| panic!("compose_descriptor is not implemented for this app")) + }); + Ok(ComposeDescriptor::new(nodes)) } fn node_container_ports( - &self, - topology: &E::Deployment, - descriptor: &ComposeDescriptor, + topology: &::Deployment, ) -> Result, DynError> { - if let Some(container_ports) = self.container_ports { - return container_ports(topology, descriptor); - } - + let descriptor = Self::compose_descriptor(topology, 0)?; Ok(descriptor .nodes() .iter() @@ -507,268 +129,94 @@ impl ComposeAccess { .collect()) } + fn cfgsync_hostnames(topology: &::Deployment) -> Vec { + (0..topology.node_count()) + .map(crate::infrastructure::ports::node_identifier) + .collect() + } + + fn enrich_cfgsync_artifacts( + _topology: &::Deployment, + _artifacts: &mut MaterializedArtifacts, + ) -> Result<(), DynError> { + Ok(()) + } + + fn cfgsync_server_mode() -> ComposeConfigServerMode { + ComposeConfigServerMode::Disabled + } + + fn cfgsync_container_spec( + _cfgsync_path: &Path, + _port: u16, + _network: &str, + ) -> Result { + Err(std::io::Error::other("cfgsync_container_spec is not implemented for this app").into()) + } + + fn cfgsync_start_timeout() -> Duration { + Duration::from_secs(180) + } + fn node_client_from_ports( - &self, ports: &NodeHostPorts, host: &str, - ) -> Result { - if let Some(node_client_from_ports) = self.node_client_from_ports { - return node_client_from_ports(ports, host); - } - - ::build_node_client(&discovered_node_access(host, ports)) + ) -> Result { + ::build_node_client(&discovered_node_access(host, ports)) } fn build_node_clients( - &self, - _topology: &E::Deployment, + _topology: &::Deployment, host_ports: &HostPortMapping, host: &str, - ) -> Result, DynError> { + ) -> Result, DynError> { let clients = host_ports .nodes .iter() - .map(|ports| self.node_client_from_ports(ports, host)) + .map(|ports| Self::node_client_from_ports(ports, host)) .collect::>()?; Ok(NodeClients::new(clients)) } - fn runner_host(&self) -> String { - (self.runner_host)() - } - - fn readiness_probe(&self) -> ComposeReadinessProbe { - self.readiness_probe - } -} - -pub struct ComposeCfgsync { - server_mode: ComposeConfigServerMode, - hostnames: ComposeCfgsyncHostnames, - enrich_artifacts: Option>, - container_spec: Option, - start_timeout: Duration, -} - -impl Default for ComposeCfgsync { - fn default() -> Self { - Self { - server_mode: ComposeConfigServerMode::Disabled, - hostnames: default_cfgsync_hostnames::, - enrich_artifacts: None, - container_spec: None, - start_timeout: Duration::from_secs(180), + fn readiness_probe() -> ComposeReadinessProbe { + ComposeReadinessProbe::Http { + path: ::node_readiness_path(), } } -} -impl ComposeCfgsync { - #[must_use] - pub fn new() -> Self { - Self::default() + fn compose_runner_host() -> String { + compose_runner_host() } - #[must_use] - pub fn with_server_mode(mut self, server_mode: ComposeConfigServerMode) -> Self { - self.server_mode = server_mode; - self - } - - #[must_use] - pub fn with_hostnames(mut self, hostnames: ComposeCfgsyncHostnames) -> Self { - self.hostnames = hostnames; - self - } - - #[must_use] - pub fn with_enrich_artifacts(mut self, enrich_artifacts: ComposeCfgsyncEnricher) -> Self { - self.enrich_artifacts = Some(enrich_artifacts); - self - } - - #[must_use] - pub fn with_container_spec(mut self, container_spec: ComposeConfigServerSpecBuilder) -> Self { - self.container_spec = Some(container_spec); - self - } - - #[must_use] - pub fn with_start_timeout(mut self, start_timeout: Duration) -> Self { - self.start_timeout = start_timeout; - self - } - - fn server_mode(&self) -> ComposeConfigServerMode { - self.server_mode - } - - fn hostnames(&self, topology: &E::Deployment) -> Vec { - (self.hostnames)(topology) - } - - fn enrich_artifacts( - &self, - topology: &E::Deployment, - artifacts: &mut MaterializedArtifacts, + async fn wait_remote_readiness( + _topology: &::Deployment, + mapping: &HostPortMapping, + requirement: HttpReadinessRequirement, ) -> Result<(), DynError> { - self.enrich_artifacts - .map(|enrich| enrich(topology, artifacts)) - .unwrap_or(Ok(())) - } - - fn container_spec( - &self, - cfgsync_path: &Path, - port: u16, - network: &str, - ) -> Result { - let container_spec = self.container_spec.ok_or_else(|| { - DynError::from(std::io::Error::other( - "cfgsync container spec is not configured", - )) - })?; - container_spec(cfgsync_path, port, network) - } - - fn start_timeout(&self) -> Duration { - self.start_timeout - } -} - -pub(crate) fn runtime_for() -> ComposeRuntime { - E::compose_runtime() -} - -pub(crate) fn prepare_compose_configs( - path: &Path, - topology: &E::Deployment, - cfgsync_port: u16, - metrics_otlp_ingest_url: Option<&Url>, -) -> Result<(), DynError> { - let runtime = runtime_for::(); - runtime - .stack - .prepare_workspace(path, topology, metrics_otlp_ingest_url)?; - runtime.configs.write( - path, - topology, - cfgsync_port, - metrics_otlp_ingest_url, - runtime.node_config_file_name, - )?; - Ok(()) -} - -pub(crate) fn compose_descriptor( - topology: &E::Deployment, - cfgsync_port: u16, -) -> Result { - let runtime = runtime_for::(); - runtime - .stack - .build_descriptor(topology, cfgsync_port, runtime.node_config_file_name) -} - -pub(crate) fn node_container_ports( - topology: &E::Deployment, -) -> Result, DynError> { - let runtime = runtime_for::(); - let descriptor = runtime - .stack - .build_descriptor(topology, 0, runtime.node_config_file_name)?; - runtime.access.node_container_ports(topology, &descriptor) -} - -pub(crate) fn cfgsync_hostnames(topology: &E::Deployment) -> Vec { - runtime_for::().cfgsync.hostnames(topology) -} - -pub(crate) fn enrich_cfgsync_artifacts( - topology: &E::Deployment, - artifacts: &mut MaterializedArtifacts, -) -> Result<(), DynError> { - runtime_for::() - .cfgsync - .enrich_artifacts(topology, artifacts) -} - -pub(crate) fn cfgsync_container_spec( - cfgsync_path: &Path, - port: u16, - network: &str, -) -> Result { - runtime_for::() - .cfgsync - .container_spec(cfgsync_path, port, network) -} - -pub(crate) fn cfgsync_start_timeout() -> Duration { - runtime_for::().cfgsync.start_timeout() -} - -pub(crate) fn cfgsync_server_mode() -> ComposeConfigServerMode { - runtime_for::().cfgsync.server_mode() -} - -pub(crate) fn readiness_http_path() -> &'static str { - match runtime_for::().access.readiness_probe() { - ComposeReadinessProbe::Http { path } => path, - ComposeReadinessProbe::Tcp => E::node_readiness_path(), - } -} - -pub(crate) fn build_node_clients( - topology: &E::Deployment, - host_ports: &HostPortMapping, - host: &str, -) -> Result, DynError> { - runtime_for::() - .access - .build_node_clients(topology, host_ports, host) -} - -pub(crate) fn wait_remote_readiness( - _topology: &E::Deployment, - mapping: &HostPortMapping, - requirement: HttpReadinessRequirement, -) -> Result>, DynError> { - let runtime = runtime_for::(); - let host = runtime.access.runner_host(); - let probe = runtime.access.readiness_probe(); - Ok(async move { - match probe { + match Self::readiness_probe() { ComposeReadinessProbe::Http { path } => { + let host = Self::compose_runner_host(); let urls = readiness_urls(&host, mapping, path)?; wait_http_readiness(&urls, requirement).await?; Ok(()) } ComposeReadinessProbe::Tcp => wait_for_tcp_readiness(&mapping.nodes, requirement).await, } - }) -} + } -pub(crate) fn wait_for_nodes( - ports: &[u16], - host: &str, - requirement: HttpReadinessRequirement, -) -> Result>, DynError> { - let probe = runtime_for::().access.readiness_probe(); - let host = host.to_owned(); - let node_ports = ports.to_vec(); - Ok(async move { - match probe { + async fn wait_for_nodes( + ports: &[u16], + host: &str, + requirement: HttpReadinessRequirement, + ) -> Result<(), DynError> { + match Self::readiness_probe() { ComposeReadinessProbe::Http { path } => { - wait_for_http_ports_with_host_and_requirement( - &node_ports, - &host, - path, - requirement, - ) - .await?; + wait_for_http_ports_with_host_and_requirement(ports, host, path, requirement) + .await?; Ok(()) } ComposeReadinessProbe::Tcp => { - let ports = node_ports + let ports = ports .iter() .copied() .map(|port| NodeHostPorts { @@ -779,54 +227,244 @@ pub(crate) fn wait_for_nodes( wait_for_tcp_readiness(&ports, requirement).await } } - }) + } } -fn write_static_compose_configs(context: ComposeConfigContext<'_, E>) -> Result<(), DynError> -where - E: ComposeDeployEnv + StaticArtifactRenderer::Deployment>, +/// Common compose binary-app path. +pub trait ComposeBinaryApp: + Application + Sized + StaticArtifactRenderer::Deployment> { - let hostnames = cfgsync_hostnames::(context.topology()); - let configs_dir = stack_configs_dir(context.path())?; + fn compose_node_spec() -> BinaryConfigNodeSpec; + + fn prepare_compose_workspace( + _path: &Path, + _topology: &::Deployment, + _metrics_otlp_ingest_url: Option<&Url>, + ) -> Result<(), DynError> { + Ok(()) + } + + fn compose_extra_services( + _topology: &::Deployment, + ) -> Result, DynError> { + Ok(Vec::new()) + } + + fn static_node_config_file_name(index: usize) -> String { + make_extension_node_config_file_name(&Self::compose_node_spec().config_file_extension) + .resolve(index) + } + + fn node_client_from_ports( + ports: &NodeHostPorts, + host: &str, + ) -> Result { + ::build_node_client(&discovered_node_access(host, ports)) + } + + fn readiness_probe() -> ComposeReadinessProbe { + ComposeReadinessProbe::Http { + path: ::node_readiness_path(), + } + } + + fn compose_runner_host() -> String { + compose_runner_host() + } +} + +impl ComposeDeployEnv for T +where + T: ComposeBinaryApp, +{ + fn prepare_compose_configs( + path: &Path, + topology: &::Deployment, + _cfgsync_port: u16, + metrics_otlp_ingest_url: Option<&Url>, + ) -> Result<(), DynError> { + T::prepare_compose_workspace(path, topology, metrics_otlp_ingest_url)?; + write_static_compose_configs::(path, topology) + } + + fn static_node_config_file_name(index: usize) -> String { + T::static_node_config_file_name(index) + } + + fn binary_config_node_spec( + _topology: &::Deployment, + _index: usize, + ) -> Result, DynError> { + Ok(Some(T::compose_node_spec())) + } + + fn compose_descriptor( + topology: &::Deployment, + _cfgsync_port: u16, + ) -> Result { + let spec = T::compose_node_spec(); + let mut nodes = (0..topology.node_count()) + .map(|index| { + let file_name = T::static_node_config_file_name(index); + let runtime = binary_config_node_runtime_spec(index, &spec); + NodeDescriptor::with_loopback_ports( + crate::infrastructure::ports::node_identifier(index), + runtime.image, + runtime.entrypoint, + vec![format!( + "./stack/configs/{file_name}:{}:ro", + spec.config_container_path + )], + runtime.extra_hosts, + runtime.container_ports, + runtime.environment, + runtime.platform, + ) + }) + .collect::>(); + nodes.extend(T::compose_extra_services(topology)?); + Ok(ComposeDescriptor::new(nodes)) + } + + fn node_client_from_ports( + ports: &NodeHostPorts, + host: &str, + ) -> Result { + T::node_client_from_ports(ports, host) + } + + fn readiness_probe() -> ComposeReadinessProbe { + T::readiness_probe() + } + + fn compose_runner_host() -> String { + T::compose_runner_host() + } +} + +pub(crate) fn prepare_compose_configs( + path: &Path, + topology: &E::Deployment, + cfgsync_port: u16, + metrics_otlp_ingest_url: Option<&Url>, +) -> Result<(), DynError> { + E::prepare_compose_configs(path, topology, cfgsync_port, metrics_otlp_ingest_url) +} + +pub(crate) fn compose_descriptor( + topology: &E::Deployment, + cfgsync_port: u16, +) -> Result { + E::compose_descriptor(topology, cfgsync_port) +} + +pub(crate) fn node_container_ports( + topology: &E::Deployment, +) -> Result, DynError> { + E::node_container_ports(topology) +} + +pub(crate) fn cfgsync_container_spec( + cfgsync_path: &Path, + port: u16, + network: &str, +) -> Result { + E::cfgsync_container_spec(cfgsync_path, port, network) +} + +pub(crate) fn cfgsync_start_timeout() -> Duration { + E::cfgsync_start_timeout() +} + +pub(crate) fn cfgsync_server_mode() -> ComposeConfigServerMode { + E::cfgsync_server_mode() +} + +pub(crate) fn readiness_http_path() -> &'static str { + match E::readiness_probe() { + ComposeReadinessProbe::Http { path } => path, + ComposeReadinessProbe::Tcp => ::node_readiness_path(), + } +} + +pub(crate) fn build_node_clients( + topology: &E::Deployment, + host_ports: &HostPortMapping, + host: &str, +) -> Result, DynError> { + E::build_node_clients(topology, host_ports, host) +} + +pub(crate) fn wait_remote_readiness( + topology: &E::Deployment, + mapping: &HostPortMapping, + requirement: HttpReadinessRequirement, +) -> Result>, DynError> { + let topology = topology.clone(); + let mapping = mapping.clone(); + Ok(async move { E::wait_remote_readiness(&topology, &mapping, requirement).await }) +} + +pub(crate) fn wait_for_nodes( + ports: &[u16], + host: &str, + requirement: HttpReadinessRequirement, +) -> Result>, DynError> { + let node_ports = ports.to_vec(); + let host = host.to_owned(); + Ok(async move { E::wait_for_nodes(&node_ports, &host, requirement).await }) +} + +fn write_static_compose_configs( + path: &Path, + topology: &::Deployment, +) -> Result<(), DynError> +where + E: ComposeBinaryApp, +{ + let hostnames = E::cfgsync_hostnames(topology); + let configs_dir = stack_configs_dir(path)?; fs::create_dir_all(&configs_dir)?; - for index in 0..context.topology().node_count() { - let mut config = E::build_node_config(context.topology(), index)?; - E::rewrite_for_hostnames(context.topology(), index, &hostnames, &mut config)?; + for index in 0..topology.node_count() { + let mut config = E::build_node_config(topology, index)?; + E::rewrite_for_hostnames(topology, index, &hostnames, &mut config)?; let rendered = E::serialize_node_config(&config)?; - fs::write(context.node_config_path(index)?, rendered)?; + fs::write( + configs_dir.join(E::static_node_config_file_name(index)), + rendered, + )?; } Ok(()) } -fn write_registration_server_compose_configs( - context: ComposeConfigContext<'_, E>, +pub fn write_registration_server_compose_configs( + path: &Path, + topology: &::Deployment, + cfgsync_port: u16, ) -> Result<(), DynError> where E: ComposeDeployEnv + StaticArtifactRenderer::Deployment>, { - let stack_dir = context - .path() + let artifacts_path = path .parent() - .ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?; - let artifacts_path = stack_dir.join("cfgsync.artifacts.yaml"); - let hostnames = cfgsync_hostnames::(context.topology()); + .ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))? + .join("cfgsync.artifacts.yaml"); + let hostnames = E::cfgsync_hostnames(topology); render_and_write_registration_server::( - context.topology(), + topology, &hostnames, RegistrationServerRenderOptions { - port: Some(context.cfgsync_port()), + port: Some(cfgsync_port), artifacts_path: Some("cfgsync.artifacts.yaml".to_owned()), }, CfgsyncOutputPaths { - config_path: context.path(), + config_path: path, artifacts_path: &artifacts_path, }, - |artifacts| { - enrich_cfgsync_artifacts::(context.topology(), artifacts).map_err(Into::into) - }, + |artifacts| E::enrich_cfgsync_artifacts(topology, artifacts).map_err(Into::into), )?; Ok(()) @@ -922,12 +560,6 @@ fn readiness_url(host: &str, api_port: u16, endpoint_path: &str) -> Result(topology: &E::Deployment) -> Vec { - (0..topology.node_count()) - .map(crate::infrastructure::ports::node_identifier) - .collect() -} - fn make_extension_node_config_file_name(extension: &str) -> ComposeNodeConfigFileName { match extension { "yaml" => ComposeNodeConfigFileName::FixedExtension("yaml"), diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index 098ce12..19d522e 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -21,9 +21,9 @@ pub use docker::{ platform::host_gateway_entry, }; pub use env::{ - ComposeAccess, ComposeCfgsync, ComposeConfigContext, ComposeConfigServerMode, ComposeConfigs, - ComposeDeployEnv, ComposeNodeConfigFileName, ComposeNodes, ComposeReadinessProbe, - ComposeRuntime, ComposeStack, ConfigServerHandle, discovered_node_access, + ComposeBinaryApp, ComposeConfigServerMode, ComposeDeployEnv, ComposeNodeConfigFileName, + ComposeReadinessProbe, ConfigServerHandle, discovered_node_access, + write_registration_server_compose_configs, }; pub use errors::ComposeRunnerError; pub use infrastructure::{ diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index efed11d..3928635 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -308,307 +308,260 @@ where } } -type K8sPortSpecsBuilder = - Box::Deployment) -> PortSpecs + Send + Sync>; -type K8sPreparedStackBuilder = Box< - dyn Fn( - &::Deployment, - Option<&Url>, - ) -> Result, DynError> - + Send - + Sync, ->; -type K8sClusterIdentifiers = Box (String, String) + Send + Sync>; -type K8sIndexedName = Box String + Send + Sync>; -type K8sAttachSelector = Box String + Send + Sync>; -type K8sNodeClientsBuilder = Box< - dyn Fn(&str, &[u16], &[u16]) -> Result::NodeClient>, DynError> - + Send - + Sync, ->; -type K8sNodeBaseUrl = - Box::NodeClient) -> Option + Send + Sync>; -type K8sCfgsyncService = Box Option<(String, u16)> + Send + Sync>; -type K8sCfgsyncHostnames = Box Vec + Send + Sync>; -type K8sCfgsyncOverrideBuilder = Box< - dyn Fn( - &::Deployment, - usize, - &[String], - &testing_framework_core::scenario::StartNodeOptions, - ) -> Result, DynError> - + Send - + Sync, ->; +/// Advanced k8s deployer integration. +#[async_trait] +pub trait K8sDeployEnv: Application + Sized { + type Assets: PreparedK8sStack + Send + Sync + 'static; -pub struct K8sRuntime { - install: K8sInstall, - access: K8sAccess, - manual: K8sManual, -} + fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs; -pub struct K8sInstall { - collect_port_specs: K8sPortSpecsBuilder, - prepare_stack: K8sPreparedStackBuilder, - cluster_identifiers: K8sClusterIdentifiers, - node_deployment_name: K8sIndexedName, - node_service_name: K8sIndexedName, - attach_node_service_selector: K8sAttachSelector, -} + fn prepare_assets( + topology: &Self::Deployment, + metrics_otlp_ingest_url: Option<&Url>, + ) -> Result; -pub struct K8sAccess { - build_node_clients: K8sNodeClientsBuilder, - readiness_path: &'static str, - node_role: &'static str, - node_base_url: K8sNodeBaseUrl, -} - -pub struct K8sManual { - cfgsync_service: K8sCfgsyncService, - cfgsync_hostnames: Option, - build_cfgsync_override_artifacts: K8sCfgsyncOverrideBuilder, -} - -impl K8sRuntime { - #[must_use] - pub fn new(install: K8sInstall) -> Self { - Self { - install, - access: K8sAccess::default(), - manual: K8sManual::default(), - } + async fn install_stack( + client: &Client, + assets: &Self::Assets, + namespace: &str, + release: &str, + nodes: usize, + ) -> Result + where + Self::Assets: PreparedK8sStack, + { + assets.install(client, namespace, release, nodes).await } - #[must_use] - pub fn with_access(mut self, access: K8sAccess) -> Self { - self.access = access; - self + fn cluster_identifiers() -> (String, String) { + default_cluster_identifiers() } - #[must_use] - pub fn with_manual(mut self, manual: K8sManual) -> Self { - self.manual = manual; - self + fn node_client_from_ports( + host: &str, + api_port: u16, + auxiliary_port: u16, + ) -> Result { + ::build_node_client(&discovered_node_access( + host, + api_port, + auxiliary_port, + )) + } + + fn build_node_clients( + host: &str, + node_api_ports: &[u16], + node_auxiliary_ports: &[u16], + ) -> Result, DynError> { + node_api_ports + .iter() + .zip(node_auxiliary_ports.iter()) + .map(|(&api_port, &auxiliary_port)| { + Self::node_client_from_ports(host, api_port, auxiliary_port) + }) + .collect() + } + + fn node_readiness_path() -> &'static str { + ::node_readiness_path() + } + + async fn wait_remote_readiness( + _deployment: &Self::Deployment, + urls: &[Url], + requirement: HttpReadinessRequirement, + ) -> Result<(), DynError> { + let readiness_urls: Vec<_> = urls + .iter() + .map(|url| { + let mut endpoint = url.clone(); + endpoint.set_path(::node_readiness_path()); + endpoint + }) + .collect(); + wait_http_readiness(&readiness_urls, requirement).await?; + Ok(()) + } + + fn node_role() -> &'static str { + "node" + } + + fn node_deployment_name(release: &str, index: usize) -> String { + default_node_name(release, index) + } + + fn node_service_name(release: &str, index: usize) -> String { + default_node_name(release, index) + } + + fn attach_node_service_selector(release: &str) -> String { + default_attach_node_service_selector(release) + } + + async fn wait_for_node_http( + ports: &[u16], + role: &'static str, + host: &str, + timeout: Duration, + poll_interval: Duration, + requirement: HttpReadinessRequirement, + ) -> Result<(), DynError> { + let _ = role; + let _ = timeout; + let _ = poll_interval; + wait_for_http_ports_with_host_and_requirement( + ports, + host, + ::node_readiness_path(), + requirement, + ) + .await?; + Ok(()) + } + + fn node_base_url(_client: &Self::NodeClient) -> Option { + None + } + + fn cfgsync_service(_release: &str) -> Option<(String, u16)> { + None + } + + fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec { + (0..node_count) + .map(|index| Self::node_service_name(release, index)) + .collect() + } + + fn build_cfgsync_override_artifacts( + _deployment: &Self::Deployment, + _node_index: usize, + _hostnames: &[String], + _options: &testing_framework_core::scenario::StartNodeOptions, + ) -> Result, DynError> { + Ok(None) } } -impl K8sRuntime +/// Common binary+config k8s path. +pub trait K8sBinaryApp: Application + StaticNodeConfigProvider + Sized where - E: Application + StaticNodeConfigProvider, - E::Deployment: DeploymentDescriptor, + Self::Deployment: DeploymentDescriptor, { - #[must_use] - pub fn binary_config(spec: BinaryConfigK8sSpec) -> Self { - let prepare_spec = spec.clone(); - let name_prefix = spec.node_name_prefix.clone(); - let container_http_port = spec.container_http_port; - let service_testing_port = spec.service_testing_port; + fn k8s_binary_spec() -> BinaryConfigK8sSpec; - Self::new( - K8sInstall::new( - move |topology: &E::Deployment| { - standard_port_specs( - topology.node_count(), - container_http_port, - service_testing_port, - ) - }, - move |topology, _metrics_otlp_ingest_url| { - let assets = - render_binary_config_node_chart_assets::(topology, &prepare_spec)?; - Ok(Box::new(assets) as Box) - }, - ) - .with_node_name_prefix(name_prefix), + fn extend_k8s_manifest( + _deployment: &Self::Deployment, + _manifest: &mut HelmManifest, + ) -> Result<(), DynError> { + Ok(()) + } + + fn build_node_clients( + host: &str, + node_api_ports: &[u16], + node_auxiliary_ports: &[u16], + ) -> Result, DynError> { + node_api_ports + .iter() + .zip(node_auxiliary_ports.iter()) + .map(|(&api_port, &auxiliary_port)| { + ::build_node_client(&discovered_node_access( + host, + api_port, + auxiliary_port, + )) + }) + .collect() + } + + fn node_role() -> &'static str { + "node" + } + + fn node_base_url(_client: &Self::NodeClient) -> Option { + None + } +} + +#[async_trait] +impl K8sDeployEnv for T +where + T: K8sBinaryApp, + T::Deployment: DeploymentDescriptor, +{ + type Assets = RenderedHelmChartAssets; + + fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs { + let spec = T::k8s_binary_spec(); + standard_port_specs( + topology.node_count(), + spec.container_http_port, + spec.service_testing_port, ) } -} -impl K8sInstall { - #[must_use] - pub fn new(collect_port_specs: FP, prepare_stack: FA) -> Self - where - FP: Fn(&E::Deployment) -> PortSpecs + Send + Sync + 'static, - FA: Fn(&E::Deployment, Option<&Url>) -> Result, DynError> - + Send - + Sync - + 'static, - { - Self { - collect_port_specs: Box::new(collect_port_specs), - prepare_stack: Box::new(prepare_stack), - cluster_identifiers: Box::new(default_cluster_identifiers), - node_deployment_name: Box::new(default_node_name), - node_service_name: Box::new(default_node_name), - attach_node_service_selector: Box::new(default_attach_node_service_selector), - } + fn prepare_assets( + topology: &Self::Deployment, + _metrics_otlp_ingest_url: Option<&Url>, + ) -> Result { + let spec = T::k8s_binary_spec(); + let mut manifest = HelmManifest::new(); + manifest.push_raw_yaml(&render_binary_config_node_manifest::(topology, &spec)?); + T::extend_k8s_manifest(topology, &mut manifest)?; + render_manifest_chart_assets( + &spec.chart_name, + &format!("{}.yaml", spec.chart_name), + &manifest, + ) } - #[must_use] - pub fn with_cluster_identifiers(mut self, cluster_identifiers: F) -> Self - where - F: Fn() -> (String, String) + Send + Sync + 'static, - { - self.cluster_identifiers = Box::new(cluster_identifiers); - self + fn build_node_clients( + host: &str, + node_api_ports: &[u16], + node_auxiliary_ports: &[u16], + ) -> Result, DynError> { + T::build_node_clients(host, node_api_ports, node_auxiliary_ports) } - #[must_use] - pub fn with_node_name_prefix(mut self, prefix: impl Into) -> Self { - let prefix = prefix.into(); - self.node_deployment_name = Box::new(named_resource(prefix.clone())); - self.node_service_name = Box::new(named_resource(prefix)); - self + fn node_role() -> &'static str { + T::node_role() } - #[must_use] - pub fn with_resource_names( - mut self, - node_deployment_name: FD, - node_service_name: FS, - ) -> Self - where - FD: Fn(&str, usize) -> String + Send + Sync + 'static, - FS: Fn(&str, usize) -> String + Send + Sync + 'static, - { - self.node_deployment_name = Box::new(node_deployment_name); - self.node_service_name = Box::new(node_service_name); - self + fn node_base_url(client: &Self::NodeClient) -> Option { + T::node_base_url(client) } - #[must_use] - pub fn with_attach_node_service_selector(mut self, attach_node_service_selector: F) -> Self - where - F: Fn(&str) -> String + Send + Sync + 'static, - { - self.attach_node_service_selector = Box::new(attach_node_service_selector); - self - } -} - -impl Default for K8sAccess { - fn default() -> Self { - Self { - build_node_clients: Box::new(default_build_node_clients::), - readiness_path: E::node_readiness_path(), - node_role: "node", - node_base_url: Box::new(|_client| None), - } - } -} - -impl K8sAccess { - #[must_use] - pub fn new() -> Self { - Self::default() + fn node_deployment_name(_release: &str, index: usize) -> String { + format!("{}-{index}", T::k8s_binary_spec().node_name_prefix) } - #[must_use] - pub fn with_node_clients(mut self, build_node_clients: F) -> Self - where - F: Fn(&str, &[u16], &[u16]) -> Result, DynError> + Send + Sync + 'static, - { - self.build_node_clients = Box::new(build_node_clients); - self + fn node_service_name(_release: &str, index: usize) -> String { + format!("{}-{index}", T::k8s_binary_spec().node_name_prefix) } - - #[must_use] - pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self { - self.readiness_path = readiness_path; - self - } - - #[must_use] - pub fn with_node_role(mut self, node_role: &'static str) -> Self { - self.node_role = node_role; - self - } - - #[must_use] - pub fn with_node_base_url(mut self, node_base_url: F) -> Self - where - F: Fn(&E::NodeClient) -> Option + Send + Sync + 'static, - { - self.node_base_url = Box::new(node_base_url); - self - } -} - -impl Default for K8sManual { - fn default() -> Self { - Self { - cfgsync_service: Box::new(|_release| None), - cfgsync_hostnames: None, - build_cfgsync_override_artifacts: Box::new( - |_topology, _node_index, _hostnames, _options| Ok(None), - ), - } - } -} - -impl K8sManual { - #[must_use] - pub fn new() -> Self { - Self::default() - } - - #[must_use] - pub fn with_cfgsync_service(mut self, cfgsync_service: F) -> Self - where - F: Fn(&str) -> Option<(String, u16)> + Send + Sync + 'static, - { - self.cfgsync_service = Box::new(cfgsync_service); - self - } - - #[must_use] - pub fn with_cfgsync_hostnames(mut self, cfgsync_hostnames: F) -> Self - where - F: Fn(&str, usize) -> Vec + Send + Sync + 'static, - { - self.cfgsync_hostnames = Some(Box::new(cfgsync_hostnames)); - self - } - - #[must_use] - pub fn with_cfgsync_override_artifacts(mut self, build_override_artifacts: F) -> Self - where - F: Fn( - &E::Deployment, - usize, - &[String], - &testing_framework_core::scenario::StartNodeOptions, - ) -> Result, DynError> - + Send - + Sync - + 'static, - { - self.build_cfgsync_override_artifacts = Box::new(build_override_artifacts); - self - } -} - -pub trait K8sDeployEnv: Application + Sized { - fn k8s_runtime() -> K8sRuntime; -} - -pub(crate) fn runtime_for() -> K8sRuntime { - E::k8s_runtime() } pub(crate) fn collect_port_specs(deployment: &E::Deployment) -> PortSpecs { - (runtime_for::().install.collect_port_specs)(deployment) + E::collect_port_specs(deployment) } pub(crate) fn prepare_stack( deployment: &E::Deployment, metrics_otlp_ingest_url: Option<&Url>, -) -> Result, DynError> { - (runtime_for::().install.prepare_stack)(deployment, metrics_otlp_ingest_url) +) -> Result, DynError> +where + E::Assets: PreparedK8sStack + 'static, +{ + Ok(Box::new(E::prepare_assets( + deployment, + metrics_otlp_ingest_url, + )?)) } pub(crate) fn cluster_identifiers() -> (String, String) { - (runtime_for::().install.cluster_identifiers)() + E::cluster_identifiers() } pub(crate) fn build_node_clients( @@ -616,44 +569,35 @@ pub(crate) fn build_node_clients( node_api_ports: &[u16], node_auxiliary_ports: &[u16], ) -> Result, DynError> { - (runtime_for::().access.build_node_clients)(host, node_api_ports, node_auxiliary_ports) + E::build_node_clients(host, node_api_ports, node_auxiliary_ports) } pub(crate) fn node_readiness_path() -> &'static str { - runtime_for::().access.readiness_path + ::node_readiness_path() } pub(crate) async fn wait_remote_readiness( - _deployment: &E::Deployment, + deployment: &E::Deployment, urls: &[Url], requirement: HttpReadinessRequirement, ) -> Result<(), DynError> { - let readiness_urls: Vec<_> = urls - .iter() - .map(|url| { - let mut endpoint = url.clone(); - endpoint.set_path(node_readiness_path::()); - endpoint - }) - .collect(); - wait_http_readiness(&readiness_urls, requirement).await?; - Ok(()) + E::wait_remote_readiness(deployment, urls, requirement).await } pub(crate) fn node_role() -> &'static str { - runtime_for::().access.node_role + E::node_role() } pub(crate) fn node_deployment_name(release: &str, index: usize) -> String { - (runtime_for::().install.node_deployment_name)(release, index) + E::node_deployment_name(release, index) } pub(crate) fn node_service_name(release: &str, index: usize) -> String { - (runtime_for::().install.node_service_name)(release, index) + E::node_service_name(release, index) } pub(crate) fn attach_node_service_selector(release: &str) -> String { - (runtime_for::().install.attach_node_service_selector)(release) + E::attach_node_service_selector(release) } pub(crate) async fn wait_for_node_http( @@ -664,36 +608,19 @@ pub(crate) async fn wait_for_node_http( poll_interval: Duration, requirement: HttpReadinessRequirement, ) -> Result<(), DynError> { - let _ = role; - let _ = timeout; - let _ = poll_interval; - wait_for_http_ports_with_host_and_requirement( - ports, - host, - node_readiness_path::(), - requirement, - ) - .await?; - Ok(()) + E::wait_for_node_http(ports, role, host, timeout, poll_interval, requirement).await } pub(crate) fn node_base_url(client: &E::NodeClient) -> Option { - (runtime_for::().access.node_base_url)(client) + E::node_base_url(client) } pub(crate) fn cfgsync_service(release: &str) -> Option<(String, u16)> { - (runtime_for::().manual.cfgsync_service)(release) + E::cfgsync_service(release) } pub(crate) fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec { - let runtime = runtime_for::(); - if let Some(cfgsync_hostnames) = runtime.manual.cfgsync_hostnames { - return cfgsync_hostnames(release, node_count); - } - - (0..node_count) - .map(|index| (runtime.install.node_service_name)(release, index)) - .collect() + E::cfgsync_hostnames(release, node_count) } pub(crate) fn build_cfgsync_override_artifacts( @@ -702,9 +629,7 @@ pub(crate) fn build_cfgsync_override_artifacts( hostnames: &[String], options: &testing_framework_core::scenario::StartNodeOptions, ) -> Result, DynError> { - (runtime_for::().manual.build_cfgsync_override_artifacts)( - deployment, node_index, hostnames, options, - ) + E::build_cfgsync_override_artifacts(deployment, node_index, hostnames, options) } fn default_cluster_identifiers() -> (String, String) { @@ -716,24 +641,6 @@ fn default_cluster_identifiers() -> (String, String) { (format!("tf-testnet-{suffix}"), String::from("tf-runner")) } -fn default_build_node_clients( - host: &str, - node_api_ports: &[u16], - node_auxiliary_ports: &[u16], -) -> Result, DynError> { - node_api_ports - .iter() - .zip(node_auxiliary_ports.iter()) - .map(|(&api_port, &auxiliary_port)| { - ::build_node_client(&discovered_node_access( - host, - api_port, - auxiliary_port, - )) - }) - .collect() -} - fn default_node_name(release: &str, index: usize) -> String { format!("{release}-node-{index}") } @@ -741,7 +648,3 @@ fn default_node_name(release: &str, index: usize) -> String { fn default_attach_node_service_selector(release: &str) -> String { format!("app.kubernetes.io/instance={release}") } - -fn named_resource(prefix: String) -> impl Fn(&str, usize) -> String + Send + Sync + 'static { - move |_release, index| format!("{prefix}-{index}") -} diff --git a/testing-framework/deployers/k8s/src/lib.rs b/testing-framework/deployers/k8s/src/lib.rs index 94c3f35..2186d49 100644 --- a/testing-framework/deployers/k8s/src/lib.rs +++ b/testing-framework/deployers/k8s/src/lib.rs @@ -23,8 +23,8 @@ pub(crate) fn ensure_rustls_provider_installed() { pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError}; pub use env::{ - BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sAccess, K8sDeployEnv, K8sInstall, - K8sManual, K8sRuntime, PreparedK8sStack, RenderedHelmChartAssets, discovered_node_access, + BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sBinaryApp, K8sDeployEnv, + PreparedK8sStack, RenderedHelmChartAssets, discovered_node_access, install_helm_release_with_cleanup, render_binary_config_node_chart_assets, render_binary_config_node_manifest, render_manifest_chart_assets, render_single_template_chart_assets, standard_port_specs, diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index fd8ecb4..6791a6d 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -684,25 +684,33 @@ mod tests { #[async_trait::async_trait] impl K8sDeployEnv for DummyEnv { - fn k8s_runtime() -> crate::env::K8sRuntime { - crate::env::K8sRuntime::new(crate::env::K8sInstall::new( - |_topology| standard_port_specs(1, 8080, 8081), - |_topology, _metrics_otlp_ingest_url| { - let assets: RenderedHelmChartAssets = - render_single_template_chart_assets("dummy", "dummy.yaml", "")?; - Ok(Box::new(assets) as Box) - }, - )) - .with_manual( - crate::env::K8sManual::new() - .with_cfgsync_service(|release| Some((format!("{release}-cfgsync"), 4400))) - .with_cfgsync_override_artifacts(|topology, node_index, hostnames, options| { - build_node_artifact_override::( - topology, node_index, hostnames, options, - ) - .map_err(Into::into) - }), - ) + type Assets = RenderedHelmChartAssets; + + fn collect_port_specs( + _topology: &Self::Deployment, + ) -> crate::infrastructure::cluster::PortSpecs { + standard_port_specs(1, 8080, 8081) + } + + fn prepare_assets( + _topology: &Self::Deployment, + _metrics_otlp_ingest_url: Option<&reqwest::Url>, + ) -> Result { + render_single_template_chart_assets("dummy", "dummy.yaml", "") + } + + fn cfgsync_service(release: &str) -> Option<(String, u16)> { + Some((format!("{release}-cfgsync"), 4400)) + } + + fn build_cfgsync_override_artifacts( + topology: &Self::Deployment, + node_index: usize, + hostnames: &[String], + options: &testing_framework_core::scenario::StartNodeOptions, + ) -> Result, DynError> { + build_node_artifact_override::(topology, node_index, hostnames, options) + .map_err(Into::into) } } diff --git a/testing-framework/deployers/local/src/env/mod.rs b/testing-framework/deployers/local/src/env/mod.rs index 626e773..e3df1fe 100644 --- a/testing-framework/deployers/local/src/env/mod.rs +++ b/testing-framework/deployers/local/src/env/mod.rs @@ -1,5 +1,12 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr}, + path::{Path, PathBuf}, +}; + +use async_trait::async_trait; use testing_framework_core::scenario::{ - Application, DynError, HttpReadinessRequirement, ReadinessError, + Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions, wait_for_http_ports_with_requirement, }; @@ -9,7 +16,6 @@ use crate::{ }; mod helpers; -mod runtime; #[cfg(test)] mod tests; @@ -20,108 +26,405 @@ pub use helpers::{ reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, yaml_node_config, }; -pub use runtime::{ - LocalAccess, LocalBuildContext, LocalLifecycle, LocalProcess, LocalRuntime, - LocalStableReadinessFuture, cluster_node_config_from_context, -}; +/// Context passed while building a local node config. +pub struct LocalBuildContext<'a, E: Application> { + pub topology: &'a E::Deployment, + pub index: usize, + pub ports: &'a LocalNodePorts, + pub peers: &'a [LocalPeerNode], + pub peer_ports: &'a [u16], + pub peer_ports_by_name: &'a HashMap, + pub options: &'a StartNodeOptions, + pub template_config: Option<&'a E::NodeConfig>, +} + +/// Spawned local process node for a concrete application environment. pub type Node = ProcessNode<::NodeConfig, ::NodeClient>; +/// Advanced local deployer integration. +/// +/// This is the full-control path. It exposes runner-facing hooks directly and +/// is intended for applications that need custom startup, endpoint discovery, +/// or lifecycle behavior. +#[async_trait] pub trait LocalDeployerEnv: Application + Sized where ::NodeConfig: Clone + Send + Sync + 'static, { - fn local_runtime() -> LocalRuntime; + fn local_port_names() -> &'static [&'static str] { + Self::initial_local_port_names() + } + + fn build_node_config( + topology: &Self::Deployment, + index: usize, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + ) -> Result::NodeConfig>, DynError> { + Self::build_node_config_from_template( + topology, + index, + peer_ports_by_name, + options, + peer_ports, + None, + ) + } + + fn build_node_config_from_template( + topology: &Self::Deployment, + index: usize, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + template_config: Option<&::NodeConfig>, + ) -> Result::NodeConfig>, DynError> { + let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node") + .map_err(|source| -> DynError { source.into() })?; + let ports = reserved + .pop() + .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; + let network_port = ports.network_port(); + let config = Self::build_local_node_config( + topology, + index, + &ports, + peer_ports_by_name, + options, + peer_ports, + template_config, + )?; + + Ok(BuiltNodeConfig { + config, + network_port, + }) + } + + fn build_initial_node_configs( + topology: &Self::Deployment, + ) -> Result::NodeConfig>>, ProcessSpawnError> { + helpers::build_generated_initial_nodes::( + topology, + Self::initial_node_name_prefix(), + Self::initial_local_port_names(), + |context| { + Self::build_node_config_from_template( + context.topology, + context.index, + context.peer_ports_by_name, + context.options, + context.peer_ports, + context.template_config, + ) + }, + ) + } + + fn initial_node_name_prefix() -> &'static str { + "node" + } + + fn initial_local_port_names() -> &'static [&'static str] { + &[] + } + + fn build_initial_node_config( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peer_ports: &[u16], + ) -> Result<::NodeConfig, DynError> { + let peer_ports = helpers::compact_peer_ports(peer_ports, index); + let peer_ports_by_name = HashMap::new(); + let options = StartNodeOptions::::default(); + Self::build_local_node_config( + topology, + index, + ports, + &peer_ports_by_name, + &options, + &peer_ports, + None, + ) + } + + fn build_local_node_config( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + let peers = build_local_peer_nodes(peer_ports, index); + Self::build_local_node_config_with_peers( + topology, + index, + ports, + &peers, + peer_ports_by_name, + options, + template_config, + ) + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + _index: usize, + _ports: &LocalNodePorts, + _peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + Err(std::io::Error::other( + "build_local_node_config_with_peers is not implemented for this app", + ) + .into()) + } + + fn initial_persist_dir( + _topology: &Self::Deployment, + _node_name: &str, + _index: usize, + ) -> Option { + None + } + + fn initial_snapshot_dir( + _topology: &Self::Deployment, + _node_name: &str, + _index: usize, + ) -> Option { + None + } + + fn local_process_spec() -> Option { + None + } + + fn render_local_config( + _config: &::NodeConfig, + ) -> Result, DynError> { + Err(std::io::Error::other("render_local_config is not implemented for this app").into()) + } + + fn build_launch_spec( + config: &::NodeConfig, + _dir: &Path, + _label: &str, + ) -> Result { + let spec = Self::local_process_spec().ok_or_else(|| { + std::io::Error::other("build_launch_spec is not implemented for this app") + })?; + let rendered = Self::render_local_config(config)?; + helpers::rendered_config_launch_spec(rendered, &spec) + } + + fn http_api_port(_config: &::NodeConfig) -> Option { + None + } + + fn node_endpoints( + config: &::NodeConfig, + ) -> Result { + if let Some(port) = Self::http_api_port(config) { + return Ok(NodeEndpoints { + api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), + extra_ports: HashMap::new(), + }); + } + + Err(std::io::Error::other("node_endpoints is not implemented for this app").into()) + } + + fn node_peer_port(node: &Node) -> u16 { + node.endpoints().api.port() + } + + fn node_client_from_api_endpoint(_api: SocketAddr) -> Option { + None + } + + fn node_client(endpoints: &NodeEndpoints) -> Result { + if let Ok(client) = + ::build_node_client(&discovered_node_access(endpoints)) + { + return Ok(client); + } + + if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) { + return Ok(client); + } + + Err(std::io::Error::other("node_client is not implemented for this app").into()) + } + + fn readiness_endpoint_path() -> &'static str { + ::node_readiness_path() + } + + async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { + Ok(()) + } } -pub(crate) fn runtime_for() -> LocalRuntime { - E::local_runtime() +/// Common local binary-app path. +/// +/// This is the compact path for apps that: +/// - launch one local binary per node +/// - materialize one config file per node +/// - expose an HTTP API port used for readiness and discovery +#[async_trait] +pub trait LocalBinaryApp: Application + Sized +where + ::NodeConfig: Clone + Send + Sync + 'static, +{ + fn initial_node_name_prefix() -> &'static str; + + fn initial_local_port_names() -> &'static [&'static str] { + &[] + } + + fn build_local_node_config_with_peers( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError>; + + fn local_process_spec() -> LocalProcessSpec; + + fn render_local_config(config: &::NodeConfig) + -> Result, DynError>; + + fn http_api_port(config: &::NodeConfig) -> u16; + + fn readiness_endpoint_path() -> &'static str { + ::node_readiness_path() + } + + async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { + Ok(()) + } +} + +#[async_trait] +impl LocalDeployerEnv for T +where + T: LocalBinaryApp, + ::NodeConfig: Clone + Send + Sync + 'static, +{ + fn initial_node_name_prefix() -> &'static str { + T::initial_node_name_prefix() + } + + fn initial_local_port_names() -> &'static [&'static str] { + T::initial_local_port_names() + } + + fn build_local_node_config_with_peers( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + T::build_local_node_config_with_peers( + topology, + index, + ports, + peers, + peer_ports_by_name, + options, + template_config, + ) + } + + fn local_process_spec() -> Option { + Some(T::local_process_spec()) + } + + fn render_local_config( + config: &::NodeConfig, + ) -> Result, DynError> { + T::render_local_config(config) + } + + fn http_api_port(config: &::NodeConfig) -> Option { + Some(T::http_api_port(config)) + } + + fn readiness_endpoint_path() -> &'static str { + T::readiness_endpoint_path() + } + + async fn wait_readiness_stable(nodes: &[Node]) -> Result<(), DynError> { + T::wait_readiness_stable(nodes).await + } } pub(crate) fn build_node_from_template( topology: &E::Deployment, index: usize, - peer_ports_by_name: &std::collections::HashMap, - options: &testing_framework_core::scenario::StartNodeOptions, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, peer_ports: &[u16], template_config: Option<&E::NodeConfig>, ) -> Result, DynError> { - let runtime = runtime_for::(); - let mut reserved = reserve_local_node_ports(1, runtime.process.port_names, "node") - .map_err(|source| -> DynError { source.into() })?; - let ports = reserved - .pop() - .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; - let peers = build_local_peer_nodes(peer_ports, index); - - runtime.process.build_node(LocalBuildContext { + E::build_node_config_from_template( topology, index, - ports: &ports, - peers: &peers, - peer_ports, peer_ports_by_name, options, + peer_ports, template_config, - }) + ) } pub(crate) fn build_initial_node_configs( topology: &E::Deployment, ) -> Result>, ProcessSpawnError> { - runtime_for::().process.build_initial_nodes(topology) + E::build_initial_node_configs(topology) } pub(crate) fn initial_persist_dir( topology: &E::Deployment, node_name: &str, index: usize, -) -> Option { - runtime_for::() - .lifecycle - .initial_persist_dir(topology, node_name, index) +) -> Option { + E::initial_persist_dir(topology, node_name, index) } pub(crate) fn initial_snapshot_dir( topology: &E::Deployment, node_name: &str, index: usize, -) -> Option { - runtime_for::() - .lifecycle - .initial_snapshot_dir(topology, node_name, index) -} - -pub(crate) fn build_launch_spec( - config: &E::NodeConfig, - dir: &std::path::Path, - label: &str, -) -> Result { - runtime_for::() - .process - .build_launch_spec(config, dir, label) -} - -pub(crate) fn node_endpoints( - config: &E::NodeConfig, -) -> Result { - runtime_for::().access.node_endpoints(config) +) -> Option { + E::initial_snapshot_dir(topology, node_name, index) } pub(crate) fn node_client( endpoints: &NodeEndpoints, ) -> Result { - runtime_for::().access.node_client(endpoints) + E::node_client(endpoints) } pub(crate) fn node_peer_port(node: &Node) -> u16 { - runtime_for::() - .access - .node_peer_port(node.config(), node.endpoints()) + E::node_peer_port(node) } pub(crate) fn readiness_endpoint_path() -> &'static str { - runtime_for::().access.readiness_path() + E::readiness_endpoint_path() } pub async fn wait_local_http_readiness( @@ -133,12 +436,9 @@ pub async fn wait_local_http_readiness( .map(|node| node.endpoints().api.port()) .collect(); - wait_for_http_ports_with_requirement(&ports, readiness_endpoint_path::(), requirement) - .await?; + wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?; - runtime_for::() - .lifecycle - .wait_stable(nodes) + E::wait_readiness_stable(nodes) .await .map_err(|source| ReadinessError::ClusterStable { source }) } @@ -153,12 +453,12 @@ pub async fn spawn_node_from_config( ProcessNode::spawn( &label, config, - build_launch_spec::, - node_endpoints::, + E::build_launch_spec, + E::node_endpoints, keep_tempdir, persist_dir, snapshot_dir, - node_client::, + E::node_client, ) .await } diff --git a/testing-framework/deployers/local/src/env/runtime.rs b/testing-framework/deployers/local/src/env/runtime.rs deleted file mode 100644 index 28e72d2..0000000 --- a/testing-framework/deployers/local/src/env/runtime.rs +++ /dev/null @@ -1,438 +0,0 @@ -use std::{ - collections::HashMap, - future::Future, - path::{Path, PathBuf}, - pin::Pin, -}; - -use serde::Serialize; -use testing_framework_core::scenario::{ - Application, ClusterNodeConfigApplication, DynError, NodeAccess, StartNodeOptions, -}; - -use crate::{ - env::{ - BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, Node, NodeConfigEntry, - NodeEndpoints, build_local_cluster_node_config, discovered_node_access, yaml_node_config, - }, - process::{LaunchEnvVar, LaunchSpec, ProcessSpawnError}, -}; - -pub struct LocalBuildContext<'a, E: Application> { - pub topology: &'a E::Deployment, - pub index: usize, - pub ports: &'a LocalNodePorts, - pub peers: &'a [LocalPeerNode], - pub peer_ports: &'a [u16], - pub peer_ports_by_name: &'a HashMap, - pub options: &'a StartNodeOptions, - pub template_config: Option<&'a E::NodeConfig>, -} - -pub type LocalConfigBuilder = - for<'a> fn(LocalBuildContext<'a, E>) -> Result<::NodeConfig, DynError>; - -pub type LocalDynamicNodeBuilder = - for<'a> fn( - LocalBuildContext<'a, E>, - ) -> Result::NodeConfig>, DynError>; - -pub type LocalConfigRenderer = fn(&::NodeConfig) -> Result, DynError>; - -pub type LocalInitialNodesBuilder = - fn( - &::Deployment, - ) -> Result::NodeConfig>>, ProcessSpawnError>; - -pub type LocalLaunchSpecBuilder = - fn(&::NodeConfig, &Path, &str) -> Result; - -pub type LocalApiPort = fn(&::NodeConfig) -> u16; -pub type LocalEndpoints = fn(&::NodeConfig) -> Result; -pub type LocalClientBuilder = - fn(&NodeAccess) -> Result<::NodeClient, DynError>; -pub type LocalPeerPort = fn(&::NodeConfig, &NodeEndpoints) -> u16; -pub type LocalPersistDir = fn(&::Deployment, &str, usize) -> Option; -pub type LocalSnapshotDir = fn(&::Deployment, &str, usize) -> Option; -pub type LocalStableReadinessFuture<'a> = - Pin> + Send + 'a>>; -pub type LocalStableReadiness = for<'a> fn(&'a [Node]) -> LocalStableReadinessFuture<'a>; - -#[derive(Clone)] -enum LocalDynamicNode { - Standard { build_config: LocalConfigBuilder }, - Custom(LocalDynamicNodeBuilder), -} - -impl LocalDynamicNode { - fn build( - &self, - context: LocalBuildContext<'_, E>, - ) -> Result, DynError> { - match self { - Self::Standard { build_config } => { - let network_port = context.ports.network_port(); - Ok(BuiltNodeConfig { - config: build_config(context)?, - network_port, - }) - } - Self::Custom(build) => build(context), - } - } -} - -#[derive(Clone)] -enum LocalInitialNodes { - Generated, - Custom(LocalInitialNodesBuilder), -} - -#[derive(Clone)] -enum LocalLaunch { - Rendered { - spec: LocalProcessSpec, - render_config: LocalConfigRenderer, - }, - Custom(LocalLaunchSpecBuilder), -} - -#[derive(Clone)] -pub struct LocalProcess { - pub(crate) node_name_prefix: &'static str, - pub(crate) port_names: &'static [&'static str], - dynamic_node: LocalDynamicNode, - initial_nodes: LocalInitialNodes, - launch: LocalLaunch, -} - -impl LocalProcess { - #[must_use] - pub fn new( - binary_env_var: &'static str, - binary_name: &'static str, - build_config: LocalConfigBuilder, - render_config: LocalConfigRenderer, - ) -> Self { - Self { - node_name_prefix: "node", - port_names: &[], - dynamic_node: LocalDynamicNode::Standard { build_config }, - initial_nodes: LocalInitialNodes::Generated, - launch: LocalLaunch::Rendered { - spec: LocalProcessSpec::new(binary_env_var, binary_name), - render_config, - }, - } - } - - #[must_use] - pub fn custom( - build_node: LocalDynamicNodeBuilder, - build_launch_spec: LocalLaunchSpecBuilder, - ) -> Self { - Self { - node_name_prefix: "node", - port_names: &[], - dynamic_node: LocalDynamicNode::Custom(build_node), - initial_nodes: LocalInitialNodes::Generated, - launch: LocalLaunch::Custom(build_launch_spec), - } - } - - #[must_use] - pub fn with_node_name_prefix(mut self, value: &'static str) -> Self { - self.node_name_prefix = value; - self - } - - #[must_use] - pub fn with_port_names(mut self, value: &'static [&'static str]) -> Self { - self.port_names = value; - self - } - - #[must_use] - pub fn with_initial_nodes(mut self, build_initial_nodes: LocalInitialNodesBuilder) -> Self { - self.initial_nodes = LocalInitialNodes::Custom(build_initial_nodes); - self - } - - #[must_use] - pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self { - if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { - *spec = spec.clone().with_config_file(file_name, arg); - } - self - } - - #[must_use] - pub fn with_env(mut self, key: &str, value: &str) -> Self { - if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { - *spec = spec.clone().with_env(key, value); - } - self - } - - #[must_use] - pub fn with_rust_log(mut self, value: &str) -> Self { - if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { - *spec = spec.clone().with_rust_log(value); - } - self - } - - #[must_use] - pub fn with_args(mut self, args: impl IntoIterator) -> Self { - if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { - *spec = spec.clone().with_args(args); - } - self - } - - #[must_use] - pub fn with_launch_env(mut self, vars: impl IntoIterator) -> Self { - if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { - spec.env.extend(vars); - } - self - } - - pub(crate) fn build_node( - &self, - context: LocalBuildContext<'_, E>, - ) -> Result, DynError> { - self.dynamic_node.build(context) - } - - pub(crate) fn build_initial_nodes( - &self, - topology: &E::Deployment, - ) -> Result>, ProcessSpawnError> - where - E::NodeConfig: Clone, - { - match self.initial_nodes { - LocalInitialNodes::Generated => super::helpers::build_generated_initial_nodes::( - topology, - self.node_name_prefix, - self.port_names, - |context| self.build_node(context), - ), - LocalInitialNodes::Custom(build) => build(topology), - } - } - - pub(crate) fn build_launch_spec( - &self, - config: &E::NodeConfig, - dir: &Path, - label: &str, - ) -> Result { - match &self.launch { - LocalLaunch::Rendered { - spec, - render_config, - } => super::helpers::rendered_config_launch_spec(render_config(config)?, spec), - LocalLaunch::Custom(build) => build(config, dir, label), - } - } -} - -impl LocalProcess -where - E: Application, - E::NodeConfig: Serialize, -{ - #[must_use] - pub fn yaml( - binary_env_var: &'static str, - binary_name: &'static str, - build_config: LocalConfigBuilder, - ) -> Self { - Self::new( - binary_env_var, - binary_name, - build_config, - yaml_node_config::, - ) - } -} - -#[derive(Clone)] -pub struct LocalAccess { - api_port: Option>, - endpoints: Option>, - client: Option>, - peer_port: Option>, - readiness_path: &'static str, -} - -impl LocalAccess { - #[must_use] - pub fn http(api_port: LocalApiPort) -> Self { - Self { - api_port: Some(api_port), - endpoints: None, - client: None, - peer_port: None, - readiness_path: E::node_readiness_path(), - } - } - - #[must_use] - pub fn custom(endpoints: LocalEndpoints) -> Self { - Self { - api_port: None, - endpoints: Some(endpoints), - client: None, - peer_port: None, - readiness_path: E::node_readiness_path(), - } - } - - #[must_use] - pub fn with_client(mut self, client: LocalClientBuilder) -> Self { - self.client = Some(client); - self - } - - #[must_use] - pub fn with_peer_port(mut self, peer_port: LocalPeerPort) -> Self { - self.peer_port = Some(peer_port); - self - } - - #[must_use] - pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self { - self.readiness_path = readiness_path; - self - } - - pub(crate) fn node_endpoints(&self, config: &E::NodeConfig) -> Result { - if let Some(endpoints) = self.endpoints { - return endpoints(config); - } - - if let Some(api_port) = self.api_port { - return Ok(NodeEndpoints::from_api_port(api_port(config))); - } - - Err(std::io::Error::other("node endpoints are not configured").into()) - } - - pub(crate) fn node_client(&self, endpoints: &NodeEndpoints) -> Result { - if let Some(client) = self.client { - return client(&discovered_node_access(endpoints)); - } - - E::build_node_client(&discovered_node_access(endpoints)) - } - - pub(crate) fn node_peer_port(&self, config: &E::NodeConfig, endpoints: &NodeEndpoints) -> u16 { - self.peer_port - .map(|peer_port| peer_port(config, endpoints)) - .unwrap_or_else(|| endpoints.api.port()) - } - - pub(crate) fn readiness_path(&self) -> &'static str { - self.readiness_path - } -} - -#[derive(Clone)] -pub struct LocalLifecycle { - initial_persist_dir: Option>, - initial_snapshot_dir: Option>, - stable_readiness: Option>, -} - -impl LocalLifecycle { - #[must_use] - pub fn new() -> Self { - Self { - initial_persist_dir: None, - initial_snapshot_dir: None, - stable_readiness: None, - } - } - - #[must_use] - pub fn with_initial_persist_dir(mut self, persist_dir: LocalPersistDir) -> Self { - self.initial_persist_dir = Some(persist_dir); - self - } - - #[must_use] - pub fn with_initial_snapshot_dir(mut self, snapshot_dir: LocalSnapshotDir) -> Self { - self.initial_snapshot_dir = Some(snapshot_dir); - self - } - - #[must_use] - pub fn with_stable_readiness(mut self, stable_readiness: LocalStableReadiness) -> Self { - self.stable_readiness = Some(stable_readiness); - self - } - - pub(crate) fn initial_persist_dir( - &self, - topology: &E::Deployment, - node_name: &str, - index: usize, - ) -> Option { - self.initial_persist_dir - .and_then(|persist_dir| persist_dir(topology, node_name, index)) - } - - pub(crate) fn initial_snapshot_dir( - &self, - topology: &E::Deployment, - node_name: &str, - index: usize, - ) -> Option { - self.initial_snapshot_dir - .and_then(|snapshot_dir| snapshot_dir(topology, node_name, index)) - } - - pub(crate) async fn wait_stable(&self, nodes: &[Node]) -> Result<(), DynError> { - if let Some(stable_readiness) = self.stable_readiness { - return stable_readiness(nodes).await; - } - - Ok(()) - } -} - -#[derive(Clone)] -pub struct LocalRuntime { - pub(crate) process: LocalProcess, - pub(crate) access: LocalAccess, - pub(crate) lifecycle: LocalLifecycle, -} - -impl LocalRuntime { - #[must_use] - pub fn new(process: LocalProcess, access: LocalAccess) -> Self { - Self { - process, - access, - lifecycle: LocalLifecycle::new(), - } - } - - #[must_use] - pub fn with_lifecycle(mut self, lifecycle: LocalLifecycle) -> Self { - self.lifecycle = lifecycle; - self - } -} - -pub fn cluster_node_config_from_context( - context: LocalBuildContext<'_, E>, -) -> Result<::NodeConfig, DynError> -where - E: Application + ClusterNodeConfigApplication, -{ - build_local_cluster_node_config::(context.index, context.ports, context.peers) -} diff --git a/testing-framework/deployers/local/src/env/tests.rs b/testing-framework/deployers/local/src/env/tests.rs index a9a6c69..9d1769b 100644 --- a/testing-framework/deployers/local/src/env/tests.rs +++ b/testing-framework/deployers/local/src/env/tests.rs @@ -56,26 +56,51 @@ impl Application for DummyEnv { } } +#[async_trait::async_trait] impl LocalDeployerEnv for DummyEnv { - fn local_runtime() -> LocalRuntime { - LocalRuntime::new( - LocalProcess::custom(build_dummy_node, build_dummy_launch_spec) - .with_initial_nodes(build_dummy_initial_nodes), - LocalAccess::custom(dummy_endpoints).with_client(|_| Ok(())), - ) - .with_lifecycle(LocalLifecycle::new().with_stable_readiness(dummy_wait_stable)) + fn build_node_config( + _topology: &Self::Deployment, + _index: usize, + _peer_ports_by_name: &std::collections::HashMap, + _options: &testing_framework_core::scenario::StartNodeOptions, + _peer_ports: &[u16], + ) -> Result, DynError> { + build_dummy_node() + } + + fn build_initial_node_configs( + _topology: &Self::Deployment, + ) -> Result>, crate::process::ProcessSpawnError> { + build_dummy_initial_nodes() + } + + fn build_launch_spec( + config: &DummyConfig, + dir: &std::path::Path, + label: &str, + ) -> Result { + build_dummy_launch_spec(config, dir, label) + } + + fn node_endpoints(_config: &DummyConfig) -> Result { + dummy_endpoints() + } + + fn node_client(_endpoints: &NodeEndpoints) -> Result { + Ok(()) + } + + async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { + dummy_wait_stable().await } } -fn build_dummy_node( - _context: LocalBuildContext<'_, DummyEnv>, -) -> Result, DynError> { +fn build_dummy_node() -> Result, DynError> { unreachable!("not used in this test") } -fn build_dummy_initial_nodes( - _topology: &DummyTopology, -) -> Result>, crate::process::ProcessSpawnError> { +fn build_dummy_initial_nodes() +-> Result>, crate::process::ProcessSpawnError> { unreachable!("not used in this test") } @@ -87,15 +112,13 @@ fn build_dummy_launch_spec( Ok(crate::process::LaunchSpec::default()) } -fn dummy_endpoints(_config: &DummyConfig) -> Result { +fn dummy_endpoints() -> Result { Ok(NodeEndpoints::default()) } -fn dummy_wait_stable<'a>(_nodes: &'a [Node]) -> runtime::LocalStableReadinessFuture<'a> { - Box::pin(async { - STABLE_CALLS.fetch_add(1, Ordering::SeqCst); - Ok(()) - }) +async fn dummy_wait_stable() -> Result<(), DynError> { + STABLE_CALLS.fetch_add(1, Ordering::SeqCst); + Ok(()) } #[tokio::test] diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index a2d7317..33c73b7 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -9,13 +9,12 @@ pub mod process; pub use binary::{BinaryConfig, BinaryResolver}; pub use deployer::{ProcessDeployer, ProcessDeployerError}; pub use env::{ - BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalLifecycle, - LocalNodePorts, LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime, - LocalStableReadinessFuture, NodeConfigEntry, build_indexed_http_peers, + BuiltNodeConfig, LocalBinaryApp, LocalBuildContext, LocalDeployerEnv, LocalNodePorts, + LocalPeerNode, LocalProcessSpec, NodeConfigEntry, build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config, build_local_peer_nodes, - cluster_node_config_from_context, default_yaml_launch_spec, discovered_node_access, - preallocate_ports, reserve_local_node_ports, single_http_node_endpoints, - text_config_launch_spec, text_node_config, yaml_config_launch_spec, yaml_node_config, + default_yaml_launch_spec, discovered_node_access, preallocate_ports, reserve_local_node_ports, + single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, + yaml_node_config, }; pub use manual::{ManualCluster, ManualClusterError}; pub use node_control::{NodeManager, NodeManagerError, NodeManagerSeed};