From 909a56e3be60df263195e91ebce4cc7678f2a2c5 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 29 Mar 2026 11:07:34 +0200 Subject: [PATCH] refactor(tf): simplify app integration surface --- Cargo.lock | 3 + testing-framework/core/src/cfgsync/mod.rs | 94 +++- testing-framework/core/src/env.rs | 12 +- testing-framework/core/src/scenario/client.rs | 65 +++ testing-framework/core/src/scenario/config.rs | 185 +++++++ testing-framework/core/src/scenario/mod.rs | 6 + testing-framework/core/src/scenario/noop.rs | 78 +++ testing-framework/core/src/topology/mod.rs | 2 + testing-framework/core/src/topology/simple.rs | 31 ++ .../compose/src/deployer/attach_provider.rs | 2 +- .../deployers/compose/src/descriptor/mod.rs | 6 +- .../deployers/compose/src/descriptor/node.rs | 120 +++++ .../deployers/compose/src/env.rs | 171 ++++-- .../deployers/compose/src/lib.rs | 8 +- .../k8s/src/deployer/attach_provider.rs | 2 +- testing-framework/deployers/k8s/src/env.rs | 70 ++- testing-framework/deployers/k8s/src/lib.rs | 5 +- testing-framework/deployers/local/Cargo.toml | 3 + testing-framework/deployers/local/src/env.rs | 492 +++++++++++++++++- testing-framework/deployers/local/src/lib.rs | 11 +- .../deployers/local/src/process.rs | 15 + 21 files changed, 1294 insertions(+), 87 deletions(-) create mode 100644 testing-framework/core/src/scenario/client.rs create mode 100644 testing-framework/core/src/scenario/config.rs create mode 100644 testing-framework/core/src/scenario/noop.rs create mode 100644 testing-framework/core/src/topology/simple.rs diff --git a/Cargo.lock b/Cargo.lock index 2edb835..5095835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2269,12 +2269,15 @@ version = "0.1.0" dependencies = [ "async-trait", "fs_extra", + "serde", + "serde_yaml", "tempfile", "testing-framework-core", "thiserror 2.0.18", "tokio", "tokio-retry", "tracing", + "which", ] [[package]] diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index 85f6b80..add9abc 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -7,31 +7,34 @@ pub use cfgsync_core::render::{CfgsyncOutputPaths, RenderedCfgsync}; use serde::Serialize; use thiserror::Error; +use crate::{scenario::Application, topology::DeploymentDescriptor}; + #[doc(hidden)] pub type DynCfgsyncError = Box; #[doc(hidden)] pub trait StaticArtifactRenderer { type Deployment; - type Node; type NodeConfig; type Error: Error + Send + Sync + 'static; - fn nodes(deployment: &Self::Deployment) -> &[Self::Node]; + fn node_count(deployment: &Self::Deployment) -> usize; - fn node_identifier(index: usize, node: &Self::Node) -> String; + fn node_identifier(index: usize) -> String; fn build_node_config( deployment: &Self::Deployment, - node: &Self::Node, + node_index: usize, ) -> Result; fn rewrite_for_hostnames( - deployment: &Self::Deployment, - node_index: usize, - hostnames: &[String], - config: &mut Self::NodeConfig, - ) -> Result<(), Self::Error>; + _deployment: &Self::Deployment, + _node_index: usize, + _hostnames: &[String], + _config: &mut Self::NodeConfig, + ) -> Result<(), Self::Error> { + Ok(()) + } fn serialize_node_config(config: &Self::NodeConfig) -> Result; } @@ -39,6 +42,65 @@ pub trait StaticArtifactRenderer { #[doc(hidden)] pub use StaticArtifactRenderer as CfgsyncEnv; +#[doc(hidden)] +pub trait StaticNodeConfigProvider: Application { + type Error: Error + Send + Sync + 'static; + + fn build_node_config( + deployment: &Self::Deployment, + node_index: usize, + ) -> Result; + + fn rewrite_for_hostnames( + _deployment: &Self::Deployment, + _node_index: usize, + _hostnames: &[String], + _config: &mut Self::NodeConfig, + ) -> Result<(), Self::Error> { + Ok(()) + } + + fn serialize_node_config(config: &Self::NodeConfig) -> Result; +} + +impl StaticArtifactRenderer for T +where + T: StaticNodeConfigProvider, + T::Deployment: DeploymentDescriptor, +{ + type Deployment = T::Deployment; + type NodeConfig = T::NodeConfig; + type Error = T::Error; + + fn node_count(deployment: &Self::Deployment) -> usize { + deployment.node_count() + } + + fn node_identifier(index: usize) -> String { + format!("node-{index}") + } + + fn build_node_config( + deployment: &Self::Deployment, + node_index: usize, + ) -> Result { + T::build_node_config(deployment, node_index) + } + + fn rewrite_for_hostnames( + deployment: &Self::Deployment, + node_index: usize, + hostnames: &[String], + config: &mut Self::NodeConfig, + ) -> Result<(), Self::Error> { + T::rewrite_for_hostnames(deployment, node_index, hostnames, config) + } + + fn serialize_node_config(config: &Self::NodeConfig) -> Result { + T::serialize_node_config(config) + } +} + #[derive(Debug, Error)] pub enum BuildStaticArtifactsError { #[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")] @@ -63,25 +125,25 @@ pub fn build_static_artifacts( deployment: &E::Deployment, hostnames: &[String], ) -> Result { - let nodes = E::nodes(deployment); + let node_count = E::node_count(deployment); - if nodes.len() != hostnames.len() { + if node_count != hostnames.len() { return Err(BuildStaticArtifactsError::HostnameCountMismatch { - nodes: nodes.len(), + nodes: node_count, hostnames: hostnames.len(), }); } - let mut output = std::collections::HashMap::with_capacity(nodes.len()); + let mut output = std::collections::HashMap::with_capacity(node_count); - for (index, node) in nodes.iter().enumerate() { - let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?; + for index in 0..node_count { + let mut node_config = E::build_node_config(deployment, index).map_err(adapter_error)?; E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config) .map_err(adapter_error)?; let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; output.insert( - E::node_identifier(index, node), + E::node_identifier(index), ArtifactSet::new(vec![ArtifactFile::new( "/config.yaml".to_string(), config_yaml.clone(), diff --git a/testing-framework/core/src/env.rs b/testing-framework/core/src/env.rs index 139f53b..320c69a 100644 --- a/testing-framework/core/src/env.rs +++ b/testing-framework/core/src/env.rs @@ -3,7 +3,7 @@ use std::io; use async_trait::async_trait; use crate::{ - scenario::{DynError, ExternalNodeSource, FeedRuntime, NodeClients}, + scenario::{DynError, ExternalNodeSource, FeedRuntime, NodeAccess, NodeClients}, topology::DeploymentDescriptor, }; @@ -25,6 +25,16 @@ pub trait Application: Send + Sync + 'static { Err(io::Error::other("external node sources are not supported").into()) } + /// Build an application node client from deployer-provided node access. + fn build_node_client(_access: &NodeAccess) -> Result { + Err(io::Error::other("node access is not supported").into()) + } + + /// Path appended by deployers during default readiness probing. + fn node_readiness_path() -> &'static str { + "/" + } + async fn prepare_feed( node_clients: NodeClients, ) -> Result<(::Feed, Self::FeedRuntime), DynError> diff --git a/testing-framework/core/src/scenario/client.rs b/testing-framework/core/src/scenario/client.rs new file mode 100644 index 0000000..bd3f547 --- /dev/null +++ b/testing-framework/core/src/scenario/client.rs @@ -0,0 +1,65 @@ +use std::collections::HashMap; + +use reqwest::Url; + +use super::DynError; + +/// Deployer-neutral node access facts discovered at runtime. +#[derive(Clone, Debug, Default)] +pub struct NodeAccess { + host: String, + api_port: u16, + testing_port: Option, + named_ports: HashMap, +} + +impl NodeAccess { + #[must_use] + pub fn new(host: impl Into, api_port: u16) -> Self { + Self { + host: host.into(), + api_port, + testing_port: None, + named_ports: HashMap::new(), + } + } + + #[must_use] + pub fn with_testing_port(mut self, port: u16) -> Self { + self.testing_port = Some(port); + self + } + + #[must_use] + pub fn with_named_port(mut self, name: impl Into, port: u16) -> Self { + self.named_ports.insert(name.into(), port); + self + } + + #[must_use] + pub fn host(&self) -> &str { + &self.host + } + + #[must_use] + pub fn api_port(&self) -> u16 { + self.api_port + } + + #[must_use] + pub fn testing_port(&self) -> Option { + self.testing_port + } + + #[must_use] + pub fn named_port(&self, name: &str) -> Option { + self.named_ports.get(name).copied() + } + + pub fn api_base_url(&self) -> Result { + Ok(Url::parse(&format!( + "http://{}:{}", + self.host, self.api_port + ))?) + } +} diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs new file mode 100644 index 0000000..4a34207 --- /dev/null +++ b/testing-framework/core/src/scenario/config.rs @@ -0,0 +1,185 @@ +use std::{collections::HashMap, error::Error}; + +use super::ScenarioApplication; +use crate::{cfgsync::StaticNodeConfigProvider, topology::DeploymentDescriptor}; + +#[derive(Clone, Debug)] +pub struct ClusterPeerView { + index: usize, + host: String, + network_port: u16, +} + +impl ClusterPeerView { + #[must_use] + pub fn new(index: usize, host: impl Into, network_port: u16) -> Self { + Self { + index, + host: host.into(), + network_port, + } + } + + #[must_use] + pub fn index(&self) -> usize { + self.index + } + + #[must_use] + pub fn host(&self) -> &str { + &self.host + } + + #[must_use] + pub fn network_port(&self) -> u16 { + self.network_port + } + + #[must_use] + pub fn authority(&self) -> String { + format!("{}:{}", self.host, self.network_port) + } +} + +#[derive(Clone, Debug)] +pub struct ClusterNodeView { + index: usize, + host: String, + network_port: u16, + named_ports: HashMap<&'static str, u16>, +} + +impl ClusterNodeView { + #[must_use] + pub fn new(index: usize, host: impl Into, network_port: u16) -> Self { + Self { + index, + host: host.into(), + network_port, + named_ports: HashMap::new(), + } + } + + #[must_use] + pub fn with_named_port(mut self, name: &'static str, port: u16) -> Self { + self.named_ports.insert(name, port); + self + } + + #[must_use] + pub fn index(&self) -> usize { + self.index + } + + #[must_use] + pub fn host(&self) -> &str { + &self.host + } + + #[must_use] + pub fn network_port(&self) -> u16 { + self.network_port + } + + pub fn require_named_port(&self, name: &str) -> Result { + self.named_ports + .get(name) + .copied() + .ok_or_else(|| std::io::Error::other(format!("missing node port '{name}'"))) + } + + #[must_use] + pub fn authority(&self) -> String { + format!("{}:{}", self.host, self.network_port) + } +} + +pub trait ClusterNodeConfigApplication: ScenarioApplication { + type ConfigError: Error + Send + Sync + 'static; + + fn static_network_port() -> u16; + + fn static_named_ports() -> &'static [(&'static str, u16)] { + &[] + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result; + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result; +} + +impl StaticNodeConfigProvider for T +where + T: ClusterNodeConfigApplication, + T::Deployment: DeploymentDescriptor, +{ + type Error = T::ConfigError; + + fn build_node_config( + deployment: &Self::Deployment, + node_index: usize, + ) -> Result { + build_static_cluster_node_config::(deployment, node_index, None) + } + + fn rewrite_for_hostnames( + deployment: &Self::Deployment, + node_index: usize, + hostnames: &[String], + config: &mut Self::NodeConfig, + ) -> Result<(), Self::Error> { + *config = build_static_cluster_node_config::(deployment, node_index, Some(hostnames))?; + Ok(()) + } + + fn serialize_node_config(config: &Self::NodeConfig) -> Result { + T::serialize_cluster_node_config(config) + } +} + +fn build_static_cluster_node_config( + deployment: &T::Deployment, + node_index: usize, + hostnames: Option<&[String]>, +) -> Result +where + T: ClusterNodeConfigApplication, + T::Deployment: DeploymentDescriptor, +{ + let node = static_node_view::(node_index, hostnames); + let peers = (0..deployment.node_count()) + .filter(|&i| i != node_index) + .map(|i| static_peer_view::(i, hostnames)) + .collect::>(); + + T::build_cluster_node_config(&node, &peers) +} + +fn static_node_view(node_index: usize, hostnames: Option<&[String]>) -> ClusterNodeView +where + T: ClusterNodeConfigApplication, +{ + let host = hostnames + .and_then(|names| names.get(node_index).cloned()) + .unwrap_or_else(|| format!("node-{node_index}")); + let mut node = ClusterNodeView::new(node_index, host, T::static_network_port()); + for (name, port) in T::static_named_ports() { + node = node.with_named_port(name, *port); + } + node +} + +fn static_peer_view(node_index: usize, hostnames: Option<&[String]>) -> ClusterPeerView +where + T: ClusterNodeConfigApplication, +{ + let host = hostnames + .and_then(|names| names.get(node_index).cloned()) + .unwrap_or_else(|| format!("node-{node_index}")); + ClusterPeerView::new(node_index, host, T::static_network_port()) +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 52306de..fd17dca 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -5,12 +5,15 @@ use std::error::Error; mod builder_ext; mod builder_ops; mod capabilities; +mod client; mod common_builder_ext; +mod config; mod control; mod definition; mod deployment_policy; mod expectation; pub mod internal; +mod noop; mod observability; mod runtime; mod sources; @@ -23,11 +26,14 @@ pub use capabilities::{ NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl, StartNodeOptions, StartedNode, }; +pub use client::NodeAccess; pub use common_builder_ext::CoreBuilderExt; +pub use config::{ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView}; pub use control::{ClusterWaitHandle, NodeControlHandle}; pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder}; pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; +pub use noop::ScenarioApplication; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, diff --git a/testing-framework/core/src/scenario/noop.rs b/testing-framework/core/src/scenario/noop.rs new file mode 100644 index 0000000..18c5a67 --- /dev/null +++ b/testing-framework/core/src/scenario/noop.rs @@ -0,0 +1,78 @@ +use async_trait::async_trait; + +use super::{Application, DynError, Feed, FeedRuntime, NodeAccess, NodeClients}; + +#[derive(Clone)] +pub struct DefaultFeed; + +impl Feed for DefaultFeed { + type Subscription = (); + + fn subscribe(&self) -> Self::Subscription {} +} + +pub struct DefaultFeedRuntime; + +#[async_trait] +impl FeedRuntime for DefaultFeedRuntime { + type Feed = DefaultFeed; + + async fn run(self: Box) {} +} + +/// App surface for the common case where the framework default feed behavior is +/// sufficient and no custom feed runtime is needed. +#[async_trait] +pub trait ScenarioApplication: Send + Sync + 'static { + type Deployment: crate::topology::DeploymentDescriptor + Clone + 'static; + type NodeClient: Clone + Send + Sync + 'static; + type NodeConfig: Clone + Send + Sync + 'static; + + fn external_node_client( + _source: &super::ExternalNodeSource, + ) -> Result { + Err(std::io::Error::other("external node sources are not supported").into()) + } + + fn build_node_client(_access: &NodeAccess) -> Result { + Err(std::io::Error::other("node access is not supported").into()) + } + + fn node_readiness_path() -> &'static str { + "/" + } +} + +#[async_trait] +impl Application for T +where + T: ScenarioApplication, +{ + type Deployment = T::Deployment; + type NodeClient = T::NodeClient; + type NodeConfig = T::NodeConfig; + type FeedRuntime = DefaultFeedRuntime; + + fn external_node_client( + source: &super::ExternalNodeSource, + ) -> Result { + T::external_node_client(source) + } + + fn build_node_client(access: &NodeAccess) -> Result { + T::build_node_client(access) + } + + fn node_readiness_path() -> &'static str { + T::node_readiness_path() + } + + async fn prepare_feed( + _node_clients: NodeClients, + ) -> Result<(::Feed, Self::FeedRuntime), DynError> + where + Self: Sized, + { + Ok((DefaultFeed, DefaultFeedRuntime)) + } +} diff --git a/testing-framework/core/src/topology/mod.rs b/testing-framework/core/src/topology/mod.rs index d6a9491..861d9f8 100644 --- a/testing-framework/core/src/topology/mod.rs +++ b/testing-framework/core/src/topology/mod.rs @@ -19,8 +19,10 @@ pub type DynTopologyError = Box; pub mod generated; pub mod shape; +pub mod simple; pub use generated::{DeploymentPlan, RuntimeTopology, SharedTopology}; pub use shape::TopologyShapeBuilder; +pub use simple::{ClusterTopology, NodeCountTopology}; pub trait DeploymentDescriptor: Send + Sync { fn node_count(&self) -> usize; diff --git a/testing-framework/core/src/topology/simple.rs b/testing-framework/core/src/topology/simple.rs new file mode 100644 index 0000000..954d01a --- /dev/null +++ b/testing-framework/core/src/topology/simple.rs @@ -0,0 +1,31 @@ +use super::DeploymentDescriptor; + +#[derive(Clone, Debug)] +pub struct ClusterTopology { + pub node_count: usize, + node_indices: Vec, +} + +impl ClusterTopology { + #[must_use] + pub fn new(node_count: usize) -> Self { + Self { + node_count, + node_indices: (0..node_count).collect(), + } + } + + #[must_use] + pub fn node_indices(&self) -> &[usize] { + &self.node_indices + } +} + +impl DeploymentDescriptor for ClusterTopology { + fn node_count(&self) -> usize { + self.node_count + } +} + +#[doc(hidden)] +pub type NodeCountTopology = ClusterTopology; diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 2b35207..fc241e5 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -198,7 +198,7 @@ async fn collect_readiness_endpoints( let container_id = discover_service_container_id(project, service).await?; let api_port = discover_api_port(&container_id).await?; let mut endpoint = build_service_endpoint(host, api_port)?; - endpoint.set_path(E::readiness_path()); + endpoint.set_path(::node_readiness_path()); endpoints.push(endpoint); } diff --git a/testing-framework/deployers/compose/src/descriptor/mod.rs b/testing-framework/deployers/compose/src/descriptor/mod.rs index 92523ef..f38e1c2 100644 --- a/testing-framework/deployers/compose/src/descriptor/mod.rs +++ b/testing-framework/deployers/compose/src/descriptor/mod.rs @@ -2,7 +2,11 @@ use serde::Serialize; mod node; -pub use node::{EnvEntry, NodeDescriptor}; +pub use node::{ + BinaryConfigNodeSpec, EnvEntry, LoopbackNodeRuntimeSpec, NodeDescriptor, + binary_config_node_runtime_spec, build_binary_config_node_descriptors, + build_loopback_node_descriptors, +}; /// Top-level docker-compose descriptor built from an environment-specific /// topology. diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index b8adba4..1965da0 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -1,5 +1,9 @@ +use std::env; + use serde::Serialize; +use crate::infrastructure::ports::node_identifier; + /// Describes a node container in the compose stack. #[derive(Clone, Debug, Serialize)] pub struct NodeDescriptor { @@ -127,3 +131,119 @@ impl NodeDescriptor { &self.environment } } + +#[derive(Clone, Debug)] +pub struct LoopbackNodeRuntimeSpec { + pub image: String, + pub entrypoint: String, + pub volumes: Vec, + pub extra_hosts: Vec, + pub container_ports: Vec, + pub environment: Vec, + pub platform: Option, +} + +#[derive(Clone, Debug)] +pub struct BinaryConfigNodeSpec { + pub image_env_var: String, + pub default_image: String, + pub platform_env_var: String, + pub binary_path: String, + pub config_container_path: String, + pub config_file_extension: String, + pub container_ports: Vec, + pub rust_log: String, +} + +impl BinaryConfigNodeSpec { + #[must_use] + pub fn conventional( + binary_path: &str, + config_container_path: &str, + container_ports: Vec, + ) -> Self { + let binary_name = binary_path + .rsplit('/') + .next() + .unwrap_or(binary_path) + .to_owned(); + let app_name = binary_name + .strip_suffix("-node") + .unwrap_or(&binary_name) + .to_owned(); + let env_prefix = app_name.replace('-', "_").to_ascii_uppercase(); + let config_file_extension = config_container_path + .rsplit('.') + .next() + .filter(|ext| !ext.contains('/')) + .unwrap_or("yaml") + .to_owned(); + let rust_target = binary_name.replace('-', "_"); + + Self { + image_env_var: format!("{env_prefix}_IMAGE"), + default_image: format!("{binary_name}:local"), + platform_env_var: format!("{env_prefix}_PLATFORM"), + binary_path: binary_path.to_owned(), + config_container_path: config_container_path.to_owned(), + config_file_extension, + container_ports, + rust_log: format!("{rust_target}=info"), + } + } +} + +pub fn build_loopback_node_descriptors( + node_count: usize, + mut spec_for_index: impl FnMut(usize) -> LoopbackNodeRuntimeSpec, +) -> Vec { + (0..node_count) + .map(|index| { + let spec = spec_for_index(index); + NodeDescriptor::with_loopback_ports( + node_identifier(index), + spec.image, + spec.entrypoint, + spec.volumes, + spec.extra_hosts, + spec.container_ports, + spec.environment, + spec.platform, + ) + }) + .collect() +} + +pub fn build_binary_config_node_descriptors( + node_count: usize, + spec: &BinaryConfigNodeSpec, +) -> Vec { + build_loopback_node_descriptors(node_count, |index| { + binary_config_node_runtime_spec(index, spec) + }) +} + +pub fn binary_config_node_runtime_spec( + index: usize, + spec: &BinaryConfigNodeSpec, +) -> LoopbackNodeRuntimeSpec { + let image = env::var(&spec.image_env_var).unwrap_or_else(|_| spec.default_image.clone()); + let platform = env::var(&spec.platform_env_var).ok(); + let entrypoint = format!( + "/bin/sh -c '{} --config {}'", + spec.binary_path, spec.config_container_path + ); + + LoopbackNodeRuntimeSpec { + image, + entrypoint, + volumes: vec![format!( + "./stack/configs/node-{index}.{}:{}:ro", + spec.config_file_extension, spec.config_container_path + )], + extra_hosts: vec![], + container_ports: spec.container_ports.clone(), + environment: vec![EnvEntry::new("RUST_LOG", &spec.rust_log)], + platform, + } +} diff --git a/testing-framework/deployers/compose/src/env.rs b/testing-framework/deployers/compose/src/env.rs index ff9692d..86eb384 100644 --- a/testing-framework/deployers/compose/src/env.rs +++ b/testing-framework/deployers/compose/src/env.rs @@ -1,20 +1,21 @@ -use std::{path::Path, time::Duration}; +use std::{fs, path::Path, time::Duration}; use async_trait::async_trait; use reqwest::Url; use testing_framework_core::{ - cfgsync::{ - CfgsyncOutputPaths, MaterializedArtifacts, RegistrationServerRenderOptions, - StaticArtifactRenderer, render_and_write_registration_server, - }, + cfgsync::{MaterializedArtifacts, StaticArtifactRenderer}, scenario::{ - Application, DynError, HttpReadinessRequirement, NodeClients, + Application, DynError, HttpReadinessRequirement, NodeAccess, NodeClients, wait_for_http_ports_with_host_and_requirement, wait_http_readiness, }, + topology::DeploymentDescriptor, }; use crate::{ - descriptor::{ComposeDescriptor, NodeDescriptor}, + descriptor::{ + BinaryConfigNodeSpec, ComposeDescriptor, LoopbackNodeRuntimeSpec, NodeDescriptor, + binary_config_node_runtime_spec, build_loopback_node_descriptors, + }, docker::config_server::DockerConfigServerSpec, infrastructure::ports::{ HostPortMapping, NodeContainerPorts, NodeHostPorts, compose_runner_host, @@ -33,11 +34,49 @@ pub trait ConfigServerHandle: Send + Sync { /// Compose-specific topology surface needed by the runner. #[async_trait] pub trait ComposeDeployEnv: Application { + /// Write per-node config files or other compose-time assets into the stack + /// workspace before the stack starts. + fn prepare_compose_configs( + _path: &Path, + _topology: &::Deployment, + _metrics_otlp_ingest_url: Option<&Url>, + ) -> Result<(), DynError> { + Ok(()) + } + + /// File name for a static per-node config rendered into the compose stack. + fn static_node_config_file_name(index: usize) -> String { + format!("node-{index}.yaml") + } + + fn loopback_node_runtime_spec( + _topology: &::Deployment, + _index: usize, + ) -> Option { + if let Some(spec) = Self::binary_config_node_spec(_topology, _index) { + return Some(binary_config_node_runtime_spec(_index, &spec)); + } + None + } + + fn binary_config_node_spec( + _topology: &::Deployment, + _index: usize, + ) -> Option { + None + } + /// Produce the compose descriptor for the given topology. fn compose_descriptor( topology: &::Deployment, - cfgsync_port: u16, - ) -> ComposeDescriptor; + _cfgsync_port: u16, + ) -> ComposeDescriptor { + let nodes = build_loopback_node_descriptors(topology.node_count(), |index| { + Self::loopback_node_runtime_spec(topology, index) + .unwrap_or_else(|| panic!("compose_descriptor is not implemented for this app")) + }); + ComposeDescriptor::new(nodes) + } /// Container ports (API/testing) per node, used for docker-compose port /// discovery. @@ -49,12 +88,17 @@ pub trait ComposeDeployEnv: Application { .nodes() .iter() .enumerate() + .take(topology.node_count()) .filter_map(|(index, node)| parse_node_container_ports(index, node)) .collect() } /// Hostnames used when rewriting node configs for cfgsync delivery. - fn cfgsync_hostnames(topology: &::Deployment) -> Vec; + fn cfgsync_hostnames(topology: &::Deployment) -> Vec { + (0..topology.node_count()) + .map(crate::infrastructure::ports::node_identifier) + .collect() + } /// App-specific cfgsync artifact enrichment. fn enrich_cfgsync_artifacts( @@ -74,34 +118,19 @@ pub trait ComposeDeployEnv: Application { where Self: Sized + StaticArtifactRenderer::Deployment>, { - let _ = metrics_otlp_ingest_url; - let options = RegistrationServerRenderOptions { - port: Some(port), - artifacts_path: None, - }; - let artifacts_path = cfgsync_artifacts_path(path); - let output = CfgsyncOutputPaths { - config_path: path, - artifacts_path: &artifacts_path, - }; - - render_and_write_registration_server::( - topology, - &Self::cfgsync_hostnames(topology), - options, - output, - |artifacts| Self::enrich_cfgsync_artifacts(topology, artifacts), - )?; - + write_static_compose_configs::(path, topology, metrics_otlp_ingest_url)?; + write_dummy_cfgsync_config(path, port)?; Ok(()) } /// Build the config server container specification. fn cfgsync_container_spec( - cfgsync_path: &Path, + _cfgsync_path: &Path, port: u16, network: &str, - ) -> Result; + ) -> Result { + Ok(dummy_cfgsync_spec(port, network)) + } /// Timeout used when launching the config server container. fn cfgsync_start_timeout() -> Duration { @@ -112,7 +141,9 @@ pub trait ComposeDeployEnv: Application { fn node_client_from_ports( ports: &NodeHostPorts, host: &str, - ) -> Result; + ) -> Result { + ::build_node_client(&discovered_node_access(host, ports)) + } /// Build node clients from discovered host ports. fn build_node_clients( @@ -132,8 +163,8 @@ pub trait ComposeDeployEnv: Application { } /// Path used by default readiness checks. - fn readiness_path() -> &'static str { - "/" + fn node_readiness_path() -> &'static str { + ::node_readiness_path() } /// Host used by default remote readiness checks. @@ -148,7 +179,11 @@ pub trait ComposeDeployEnv: Application { requirement: HttpReadinessRequirement, ) -> Result<(), DynError> { let host = Self::compose_runner_host(); - let urls = readiness_urls(&host, mapping, Self::readiness_path())?; + let urls = readiness_urls( + &host, + mapping, + ::node_readiness_path(), + )?; wait_http_readiness(&urls, requirement).await?; Ok(()) } @@ -162,7 +197,7 @@ pub trait ComposeDeployEnv: Application { wait_for_http_ports_with_host_and_requirement( ports, host, - Self::readiness_path(), + ::node_readiness_path(), requirement, ) .await?; @@ -180,11 +215,63 @@ impl ComposeCfgsyncEnv for T where { } -fn cfgsync_artifacts_path(config_path: &Path) -> std::path::PathBuf { - config_path +fn write_static_compose_configs( + path: &Path, + topology: &::Deployment, + metrics_otlp_ingest_url: Option<&Url>, +) -> Result<(), DynError> +where + E: ComposeDeployEnv + StaticArtifactRenderer::Deployment>, +{ + E::prepare_compose_configs(path, topology, metrics_otlp_ingest_url)?; + + let hostnames = E::cfgsync_hostnames(topology); + let configs_dir = stack_configs_dir(path)?; + fs::create_dir_all(&configs_dir)?; + + for index in 0..topology.node_count() { + let mut config = E::build_node_config(topology, index)?; + E::rewrite_for_hostnames(topology, index, &hostnames, &mut config)?; + let rendered = E::serialize_node_config(&config)?; + let output_path = configs_dir.join(E::static_node_config_file_name(index)); + fs::write(&output_path, rendered)?; + } + + Ok(()) +} + +fn stack_configs_dir(cfgsync_path: &Path) -> Result { + let stack_dir = cfgsync_path .parent() - .unwrap_or(config_path) - .join("cfgsync.artifacts.yaml") + .ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?; + Ok(stack_dir.join("configs")) +} + +fn write_dummy_cfgsync_config(path: &Path, port: u16) -> Result<(), DynError> { + fs::write( + path, + format!( + "port: {port}\nsource:\n kind: static\n artifacts_path: cfgsync.artifacts.yaml\n" + ), + )?; + Ok(()) +} + +fn dummy_cfgsync_spec(port: u16, network: &str) -> DockerConfigServerSpec { + use crate::docker::config_server::DockerPortBinding; + + DockerConfigServerSpec::new( + "cfgsync".to_owned(), + network.to_owned(), + "sh".to_owned(), + "busybox:1.36".to_owned(), + ) + .with_network_alias("cfgsync".to_owned()) + .with_args(vec![ + "-c".to_owned(), + format!("while true; do nc -l -p {port} >/dev/null 2>&1; done"), + ]) + .with_ports(vec![DockerPortBinding::tcp(port, port)]) } fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option { @@ -199,6 +286,10 @@ fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option NodeAccess { + NodeAccess::new(host, ports.api).with_testing_port(ports.testing) +} + fn readiness_urls( host: &str, mapping: &HostPortMapping, diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index 7cc118e..82215f7 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -7,7 +7,11 @@ pub mod infrastructure; pub mod lifecycle; pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata}; -pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor}; +pub use descriptor::{ + BinaryConfigNodeSpec, ComposeDescriptor, EnvEntry, LoopbackNodeRuntimeSpec, NodeDescriptor, + binary_config_node_runtime_spec, build_binary_config_node_descriptors, + build_loopback_node_descriptors, +}; pub use docker::{ commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs}, config_server::{ @@ -16,7 +20,7 @@ pub use docker::{ }, platform::host_gateway_entry, }; -pub use env::{ComposeDeployEnv, ConfigServerHandle}; +pub use env::{ComposeDeployEnv, ConfigServerHandle, discovered_node_access}; pub use errors::ComposeRunnerError; pub use infrastructure::{ ports::{HostPortMapping, NodeHostPorts, compose_runner_host, node_identifier}, diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index 6107d09..e94e70a 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -263,7 +263,7 @@ fn collect_readiness_endpoints( for service in services { let api_port = extract_api_node_port(service)?; let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?; - endpoint.set_path(E::readiness_path()); + endpoint.set_path(::node_readiness_path()); endpoints.push(endpoint); } diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index 44cf698..5088820 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -1,14 +1,17 @@ use std::{ - env, process, + env, fs, + path::PathBuf, + process, time::{Duration, SystemTime, UNIX_EPOCH}, }; use async_trait::async_trait; use kube::Client; use reqwest::Url; +use tempfile::TempDir; use testing_framework_core::scenario::{ - Application, DynError, HttpReadinessRequirement, wait_for_http_ports_with_host_and_requirement, - wait_http_readiness, + Application, DynError, HttpReadinessRequirement, NodeAccess, + wait_for_http_ports_with_host_and_requirement, wait_http_readiness, }; use crate::{ @@ -21,6 +24,51 @@ pub trait HelmReleaseAssets { fn release_bundle(&self) -> HelmReleaseBundle; } +#[derive(Debug)] +pub struct RenderedHelmChartAssets { + chart_path: PathBuf, + _tempdir: TempDir, +} + +impl HelmReleaseAssets for RenderedHelmChartAssets { + fn release_bundle(&self) -> HelmReleaseBundle { + HelmReleaseBundle::new(self.chart_path.clone()) + } +} + +pub fn standard_port_specs(node_count: usize, api: u16, auxiliary: u16) -> PortSpecs { + PortSpecs { + nodes: (0..node_count) + .map(|_| crate::wait::NodeConfigPorts { api, auxiliary }) + .collect(), + } +} + +pub fn render_single_template_chart_assets( + chart_name: &str, + template_name: &str, + manifest: &str, +) -> Result { + let tempdir = tempfile::tempdir()?; + let chart_path = tempdir.path().join("chart"); + let templates_path = chart_path.join("templates"); + fs::create_dir_all(&templates_path)?; + fs::write(chart_path.join("Chart.yaml"), render_chart_yaml(chart_name))?; + fs::write(templates_path.join(template_name), manifest)?; + Ok(RenderedHelmChartAssets { + chart_path, + _tempdir: tempdir, + }) +} + +pub fn discovered_node_access(host: &str, api_port: u16, auxiliary_port: u16) -> NodeAccess { + NodeAccess::new(host, api_port).with_testing_port(auxiliary_port) +} + +fn render_chart_yaml(chart_name: &str) -> String { + format!("apiVersion: v2\nname: {chart_name}\nversion: 0.1.0\n") +} + pub async fn install_helm_release_with_cleanup( client: &Client, assets: &A, @@ -85,7 +133,13 @@ pub trait K8sDeployEnv: Application { host: &str, api_port: u16, auxiliary_port: u16, - ) -> Result; + ) -> Result { + ::build_node_client(&discovered_node_access( + host, + api_port, + auxiliary_port, + )) + } /// Build node clients from forwarded ports. fn build_node_clients( @@ -103,8 +157,8 @@ pub trait K8sDeployEnv: Application { } /// Path appended to readiness probe URLs. - fn readiness_path() -> &'static str { - "/" + fn node_readiness_path() -> &'static str { + ::node_readiness_path() } /// Wait for remote readiness using topology + URLs. @@ -118,7 +172,7 @@ pub trait K8sDeployEnv: Application { .iter() .map(|url| { let mut endpoint = url.clone(); - endpoint.set_path(Self::readiness_path()); + endpoint.set_path(::node_readiness_path()); endpoint }) .collect(); @@ -162,7 +216,7 @@ pub trait K8sDeployEnv: Application { wait_for_http_ports_with_host_and_requirement( ports, host, - Self::readiness_path(), + ::node_readiness_path(), requirement, ) .await?; diff --git a/testing-framework/deployers/k8s/src/lib.rs b/testing-framework/deployers/k8s/src/lib.rs index dfbf524..2b9a75f 100644 --- a/testing-framework/deployers/k8s/src/lib.rs +++ b/testing-framework/deployers/k8s/src/lib.rs @@ -9,7 +9,10 @@ pub mod wait { } pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError}; -pub use env::{HelmReleaseAssets, K8sDeployEnv, install_helm_release_with_cleanup}; +pub use env::{ + HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets, discovered_node_access, + install_helm_release_with_cleanup, render_single_template_chart_assets, standard_port_specs, +}; pub use infrastructure::{ chart_values::{ BootstrapExtraFile, BootstrapFiles, BootstrapScripts, BootstrapValues, NodeGroup, diff --git a/testing-framework/deployers/local/Cargo.toml b/testing-framework/deployers/local/Cargo.toml index cbd0b97..20c5add 100644 --- a/testing-framework/deployers/local/Cargo.toml +++ b/testing-framework/deployers/local/Cargo.toml @@ -15,9 +15,12 @@ workspace = true [dependencies] async-trait = "0.1" fs_extra = "1.3" +serde = { workspace = true } +serde_yaml = { workspace = true } tempfile = { workspace = true } testing-framework-core = { path = "../../core" } thiserror = { workspace = true } tokio = { workspace = true } tokio-retry = "0.3" tracing = { workspace = true } +which = "6.0" diff --git a/testing-framework/deployers/local/src/env.rs b/testing-framework/deployers/local/src/env.rs index 602c661..f95d212 100644 --- a/testing-framework/deployers/local/src/env.rs +++ b/testing-framework/deployers/local/src/env.rs @@ -1,14 +1,20 @@ use std::{ collections::HashMap, + net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, }; -use testing_framework_core::scenario::{ - Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions, - wait_for_http_ports_with_requirement, +use serde::Serialize; +use testing_framework_core::{ + scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + HttpReadinessRequirement, NodeAccess, ReadinessError, StartNodeOptions, + wait_for_http_ports_with_requirement, + }, + topology::DeploymentDescriptor, }; -use crate::process::{LaunchSpec, NodeEndpoints, ProcessNode, ProcessSpawnError}; +use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessNode, ProcessSpawnError}; pub type Node = ProcessNode<::NodeConfig, ::NodeClient>; @@ -22,18 +28,326 @@ pub struct NodeConfigEntry { pub config: NodeConfigValue, } +pub struct LocalNodePorts { + network_port: u16, + named_ports: HashMap<&'static str, u16>, +} + +impl LocalNodePorts { + #[must_use] + pub fn network_port(&self) -> u16 { + self.network_port + } + + #[must_use] + pub fn get(&self, name: &str) -> Option { + self.named_ports.get(name).copied() + } + + pub fn require(&self, name: &str) -> Result { + self.get(name) + .ok_or_else(|| format!("missing reserved local port '{name}'").into()) + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.named_ports.iter().map(|(name, port)| (*name, *port)) + } +} + +#[derive(Clone, Debug)] +pub struct LocalPeerNode { + index: usize, + network_port: u16, +} + +impl LocalPeerNode { + #[must_use] + pub fn index(&self) -> usize { + self.index + } + + #[must_use] + pub fn network_port(&self) -> u16 { + self.network_port + } + + #[must_use] + pub fn http_address(&self) -> String { + format!("127.0.0.1:{}", self.network_port) + } + + #[must_use] + pub fn authority(&self) -> String { + self.http_address() + } +} + +#[derive(Clone, Default)] +pub struct LocalProcessSpec { + pub binary_env_var: String, + pub binary_name: String, + pub config_file_name: String, + pub config_arg: String, + pub extra_args: Vec, + pub env: Vec, +} + +impl LocalProcessSpec { + #[must_use] + pub fn new(binary_env_var: &str, binary_name: &str) -> Self { + Self { + binary_env_var: binary_env_var.to_owned(), + binary_name: binary_name.to_owned(), + config_file_name: "config.yaml".to_owned(), + config_arg: "--config".to_owned(), + extra_args: Vec::new(), + env: Vec::new(), + } + } + + #[must_use] + pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self { + self.config_file_name = file_name.to_owned(); + self.config_arg = arg.to_owned(); + self + } + + #[must_use] + pub fn with_env(mut self, key: &str, value: &str) -> Self { + self.env.push(crate::process::LaunchEnvVar::new(key, value)); + self + } + + #[must_use] + pub fn with_rust_log(self, value: &str) -> Self { + self.with_env("RUST_LOG", value) + } + + #[must_use] + pub fn with_args(mut self, args: impl IntoIterator) -> Self { + self.extra_args.extend(args); + self + } +} + +pub fn preallocate_ports(count: usize, label: &str) -> Result, ProcessSpawnError> { + (0..count) + .map(|_| crate::process::allocate_available_port()) + .collect::, _>>() + .map_err(|source| ProcessSpawnError::Config { + source: format!("failed to pre-allocate {label} ports: {source}").into(), + }) +} + +pub fn build_indexed_node_configs( + count: usize, + name_prefix: &str, + build: impl FnMut(usize) -> T, +) -> Vec> { + (0..count) + .map(build) + .enumerate() + .map(|(index, config)| NodeConfigEntry { + name: format!("{name_prefix}-{index}"), + config, + }) + .collect() +} + +pub fn reserve_local_node_ports( + count: usize, + names: &[&'static str], + label: &str, +) -> Result, ProcessSpawnError> { + let network_ports = preallocate_ports(count, label)?; + let mut named_by_role = HashMap::new(); + for name in names { + named_by_role.insert(*name, preallocate_ports(count, &format!("{label} {name}"))?); + } + + Ok((0..count) + .map(|index| LocalNodePorts { + network_port: network_ports[index], + named_ports: named_by_role + .iter() + .map(|(name, ports)| (*name, ports[index])) + .collect(), + }) + .collect()) +} + +pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints { + NodeEndpoints::from_api_port(port) +} + +pub fn build_local_cluster_node_config( + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], +) -> Result<::NodeConfig, DynError> +where + E: ClusterNodeConfigApplication, +{ + let mut node = ClusterNodeView::new(index, "127.0.0.1", ports.network_port()); + for (name, port) in ports.iter() { + node = node.with_named_port(name, port); + } + + let peer_views = peers + .iter() + .map(|peer| ClusterPeerView::new(peer.index(), "127.0.0.1", peer.network_port())) + .collect::>(); + + E::build_cluster_node_config(&node, &peer_views).map_err(Into::into) +} + +pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess { + let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port()); + + for (key, port) in &endpoints.extra_ports { + match key { + NodeEndpointPort::TestingApi => { + access = access.with_testing_port(*port); + } + NodeEndpointPort::Custom(name) => { + access = access.with_named_port(name.clone(), *port); + } + NodeEndpointPort::Network => {} + } + } + + access +} + +pub fn build_indexed_http_peers( + node_count: usize, + self_index: usize, + peer_ports: &[u16], + mut build_peer: impl FnMut(usize, String) -> T, +) -> Vec { + (0..node_count) + .filter(|&i| i != self_index) + .map(|i| build_peer(i, format!("127.0.0.1:{}", peer_ports[i]))) + .collect() +} + +fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec { + peer_ports + .iter() + .enumerate() + .filter_map(|(index, port)| (index != self_index).then_some(*port)) + .collect() +} + +pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec { + peer_ports + .iter() + .enumerate() + .filter_map(|(index, port)| { + (index != self_index).then_some(LocalPeerNode { + index, + network_port: *port, + }) + }) + .collect() +} + +pub fn yaml_config_launch_spec( + config: &T, + spec: &LocalProcessSpec, +) -> Result { + let config_yaml = serde_yaml::to_string(config)?; + rendered_config_launch_spec(config_yaml.into_bytes(), spec) +} + +pub fn text_config_launch_spec( + rendered_config: impl Into>, + spec: &LocalProcessSpec, +) -> Result { + rendered_config_launch_spec(rendered_config.into(), spec) +} + +pub fn default_yaml_launch_spec( + config: &T, + binary_env_var: &str, + binary_name: &str, + rust_log: &str, +) -> Result { + yaml_config_launch_spec( + config, + &LocalProcessSpec::new(binary_env_var, binary_name).with_rust_log(rust_log), + ) +} + +pub fn yaml_node_config(config: &T) -> Result, DynError> { + Ok(serde_yaml::to_string(config)?.into_bytes()) +} + +pub fn text_node_config(rendered_config: impl Into>) -> Vec { + rendered_config.into() +} + +fn rendered_config_launch_spec( + rendered_config: Vec, + spec: &LocalProcessSpec, +) -> Result { + let binary = resolve_binary(spec); + let mut args = vec![spec.config_arg.clone(), spec.config_file_name.clone()]; + args.extend(spec.extra_args.iter().cloned()); + + Ok(LaunchSpec { + binary, + files: vec![crate::process::LaunchFile { + relative_path: spec.config_file_name.clone().into(), + contents: rendered_config, + }], + args, + env: spec.env.clone(), + }) +} + +fn resolve_binary(spec: &LocalProcessSpec) -> PathBuf { + std::env::var(&spec.binary_env_var) + .map(PathBuf::from) + .or_else(|_| which::which(&spec.binary_name)) + .unwrap_or_else(|_| { + let mut path = std::env::current_dir().unwrap_or_default(); + let mut debug = path.clone(); + debug.push(format!("target/debug/{}", spec.binary_name)); + if debug.exists() { + return debug; + } + + path.push(format!("target/release/{}", spec.binary_name)); + path + }) +} + #[async_trait::async_trait] pub trait LocalDeployerEnv: Application + Sized where ::NodeConfig: Clone + Send + Sync + 'static, { + fn local_port_names() -> &'static [&'static str] { + Self::initial_local_port_names() + } + fn build_node_config( topology: &Self::Deployment, index: usize, peer_ports_by_name: &HashMap, options: &StartNodeOptions, peer_ports: &[u16], - ) -> Result::NodeConfig>, DynError>; + ) -> Result::NodeConfig>, DynError> { + Self::build_node_config_from_template( + topology, + index, + peer_ports_by_name, + options, + peer_ports, + None, + ) + } fn build_node_config_from_template( topology: &Self::Deployment, @@ -41,14 +355,119 @@ where peer_ports_by_name: &HashMap, options: &StartNodeOptions, peer_ports: &[u16], - _template_config: Option<&::NodeConfig>, + template_config: Option<&::NodeConfig>, ) -> Result::NodeConfig>, DynError> { - Self::build_node_config(topology, index, peer_ports_by_name, options, peer_ports) + let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node") + .map_err(|source| -> DynError { source.into() })?; + let ports = reserved + .pop() + .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; + let network_port = ports.network_port(); + let config = Self::build_local_node_config( + topology, + index, + &ports, + peer_ports_by_name, + options, + peer_ports, + template_config, + )?; + + Ok(BuiltNodeConfig { + config, + network_port, + }) } fn build_initial_node_configs( topology: &Self::Deployment, - ) -> Result::NodeConfig>>, ProcessSpawnError>; + ) -> Result::NodeConfig>>, ProcessSpawnError> { + let reserved_ports = reserve_local_node_ports( + topology.node_count(), + Self::initial_local_port_names(), + Self::initial_node_name_prefix(), + )?; + let peer_ports = reserved_ports + .iter() + .map(LocalNodePorts::network_port) + .collect::>(); + + let mut configs = Vec::with_capacity(topology.node_count()); + for (index, ports) in reserved_ports.iter().enumerate() { + let config = Self::build_initial_node_config(topology, index, ports, &peer_ports) + .map_err(|source| ProcessSpawnError::Config { source })?; + configs.push(NodeConfigEntry { + name: format!("{}-{index}", Self::initial_node_name_prefix()), + config, + }); + } + + Ok(configs) + } + + fn initial_node_name_prefix() -> &'static str { + "node" + } + + fn initial_local_port_names() -> &'static [&'static str] { + &[] + } + + fn build_initial_node_config( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peer_ports: &[u16], + ) -> Result<::NodeConfig, DynError> { + let compact_peer_ports = compact_peer_ports(peer_ports, index); + let peer_ports_by_name = HashMap::new(); + let options = StartNodeOptions::::default(); + Self::build_local_node_config( + topology, + index, + ports, + &peer_ports_by_name, + &options, + &compact_peer_ports, + None, + ) + } + + fn build_local_node_config( + _topology: &Self::Deployment, + _index: usize, + _ports: &LocalNodePorts, + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _peer_ports: &[u16], + _template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + let peers = build_local_peer_nodes(_peer_ports, _index); + Self::build_local_node_config_with_peers( + _topology, + _index, + _ports, + &peers, + _peer_ports_by_name, + _options, + _template_config, + ) + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + _index: usize, + _ports: &LocalNodePorts, + _peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + Err(std::io::Error::other( + "build_local_node_config_with_peers is not implemented for this app", + ) + .into()) + } fn initial_persist_dir( _topology: &Self::Deployment, @@ -66,22 +485,67 @@ where None } + fn local_process_spec() -> Option { + None + } + + fn render_local_config( + _config: &::NodeConfig, + ) -> Result, DynError> { + Err(std::io::Error::other("render_local_config is not implemented for this app").into()) + } + fn build_launch_spec( config: &::NodeConfig, - dir: &Path, - label: &str, - ) -> Result; + _dir: &Path, + _label: &str, + ) -> Result { + let spec = Self::local_process_spec().ok_or_else(|| { + std::io::Error::other("build_launch_spec is not implemented for this app") + })?; + let rendered = Self::render_local_config(config)?; + rendered_config_launch_spec(rendered, &spec) + } - fn node_endpoints(config: &::NodeConfig) -> NodeEndpoints; + fn http_api_port(_config: &::NodeConfig) -> Option { + None + } + + fn node_endpoints(config: &::NodeConfig) -> NodeEndpoints { + if let Some(port) = Self::http_api_port(config) { + return NodeEndpoints { + api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), + extra_ports: HashMap::new(), + }; + } + + panic!("node_endpoints is not implemented for this app"); + } fn node_peer_port(node: &Node) -> u16 { node.endpoints().api.port() } - fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient; + fn node_client_from_api_endpoint(_api: SocketAddr) -> Option { + None + } + + fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient { + if let Ok(client) = + ::build_node_client(&discovered_node_access(endpoints)) + { + return client; + } + + if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) { + return client; + } + + panic!("node_client is not implemented for this app"); + } fn readiness_endpoint_path() -> &'static str { - "/" + ::node_readiness_path() } async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index 9531a3b..6014a62 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -8,12 +8,19 @@ pub mod process; pub use binary::{BinaryConfig, BinaryResolver}; pub use deployer::{ProcessDeployer, ProcessDeployerError}; -pub use env::{BuiltNodeConfig, LocalDeployerEnv, NodeConfigEntry}; +pub use env::{ + BuiltNodeConfig, LocalDeployerEnv, LocalNodePorts, LocalPeerNode, LocalProcessSpec, + NodeConfigEntry, build_indexed_http_peers, build_indexed_node_configs, + build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec, + discovered_node_access, preallocate_ports, reserve_local_node_ports, + single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, + yaml_node_config, +}; pub use manual::{ManualCluster, ManualClusterError}; pub use node_control::{NodeManager, NodeManagerError, NodeManagerSeed}; pub use process::{ LaunchEnvVar, LaunchFile, LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessNode, - ProcessSpawnError, + ProcessSpawnError, allocate_available_port, }; const KEEP_LOGS_ENV: &str = "TF_KEEP_LOGS"; diff --git a/testing-framework/deployers/local/src/process.rs b/testing-framework/deployers/local/src/process.rs index 9e3df42..3da6a5b 100644 --- a/testing-framework/deployers/local/src/process.rs +++ b/testing-framework/deployers/local/src/process.rs @@ -41,6 +41,14 @@ impl Default for NodeEndpoints { } impl NodeEndpoints { + #[must_use] + pub fn from_api_port(port: u16) -> Self { + Self { + api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), + extra_ports: HashMap::new(), + } + } + pub fn insert_port(&mut self, key: NodeEndpointPort, port: u16) { self.extra_ports.insert(key, port); } @@ -353,6 +361,13 @@ fn default_api_socket() -> SocketAddr { SocketAddr::from((Ipv4Addr::LOCALHOST, 0)) } +pub fn allocate_available_port() -> Result { + let listener = std::net::TcpListener::bind("127.0.0.1:0")?; + let port = listener.local_addr()?.port(); + drop(listener); + Ok(port) +} + fn create_tempdir(persist_dir: Option<&Path>) -> Result { match persist_dir { Some(dir) => {