refactor(tf): streamline example integration helpers

This commit is contained in:
andrussal 2026-04-10 08:44:55 +02:00
parent f73179193b
commit e04b08c004
12 changed files with 207 additions and 13 deletions

2
Cargo.lock generated
View File

@ -1979,6 +1979,7 @@ checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4"
dependencies = [
"log",
"once_cell",
"ring",
"rustls-pki-types",
"rustls-webpki",
"subtle",
@ -2394,6 +2395,7 @@ dependencies = [
"k8s-openapi",
"kube",
"reqwest",
"rustls",
"serde",
"serde_json",
"serde_yaml",

View File

@ -3,7 +3,10 @@ use std::io;
use async_trait::async_trait;
use crate::{
scenario::{DynError, ExternalNodeSource, FeedRuntime, NodeAccess, NodeClients},
scenario::{
DefaultFeed, DefaultFeedRuntime, DynError, ExternalNodeSource, FeedRuntime, NodeAccess,
NodeClients,
},
topology::DeploymentDescriptor,
};
@ -36,8 +39,12 @@ pub trait Application: Send + Sync + 'static {
}
async fn prepare_feed(
node_clients: NodeClients<Self>,
_node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
where
Self: Sized;
Self: Sized,
{
let _ = (DefaultFeed::default(), DefaultFeedRuntime::default());
Ok((Default::default(), Default::default()))
}
}

View File

@ -17,7 +17,7 @@ use crate::{
expectation::Expectation, runtime::context::RunMetrics, sources::ScenarioSources,
workload::Workload,
},
topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed},
topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, FixedDeploymentProvider},
};
/// Scenario builder entry point.
@ -268,6 +268,14 @@ impl<E: Application> ScenarioBuilder<E> {
}
}
#[must_use]
pub fn with_deployment(deployment: E::Deployment) -> Self
where
E::Deployment: Clone + Send + Sync + 'static,
{
Self::new(Box::new(FixedDeploymentProvider::new(deployment)))
}
#[must_use]
pub fn with_node_control(self) -> NodeControlScenarioBuilder<E> {
NodeControlScenarioBuilder {

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use super::{DynError, Feed, FeedRuntime};
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct DefaultFeed;
impl Feed for DefaultFeed {
@ -11,6 +11,7 @@ impl Feed for DefaultFeed {
fn subscribe(&self) -> Self::Subscription {}
}
#[derive(Default)]
pub struct DefaultFeedRuntime;
#[async_trait]

View File

@ -29,7 +29,7 @@ use tokio::task::JoinHandle;
use crate::{env::Application, scenario::DynError};
/// Cloneable feed handle exposed to workloads and expectations.
pub trait Feed: Clone + Send + Sync + 'static {
pub trait Feed: Clone + Default + Send + Sync + 'static {
type Subscription: Send + 'static;
fn subscribe(&self) -> Self::Subscription;
@ -37,7 +37,7 @@ pub trait Feed: Clone + Send + Sync + 'static {
/// Background worker driving a cluster feed.
#[async_trait]
pub trait FeedRuntime: Send + 'static {
pub trait FeedRuntime: Default + Send + 'static {
type Feed: Feed;
async fn run(self: Box<Self>);

View File

@ -24,6 +24,18 @@ pub use generated::{DeploymentPlan, RuntimeTopology, SharedTopology};
pub use shape::TopologyShapeBuilder;
pub use simple::{ClusterTopology, NodeCountTopology};
#[derive(Clone)]
pub struct FixedDeploymentProvider<D> {
deployment: D,
}
impl<D> FixedDeploymentProvider<D> {
#[must_use]
pub const fn new(deployment: D) -> Self {
Self { deployment }
}
}
pub trait DeploymentDescriptor: Send + Sync {
fn node_count(&self) -> usize;
}
@ -34,3 +46,12 @@ where
{
fn build(&self, seed: Option<&DeploymentSeed>) -> Result<D, DynTopologyError>;
}
impl<D> DeploymentProvider<D> for FixedDeploymentProvider<D>
where
D: DeploymentDescriptor + Clone + Send + Sync + 'static,
{
fn build(&self, _seed: Option<&DeploymentSeed>) -> Result<D, DynTopologyError> {
Ok(self.deployment.clone())
}
}

View File

@ -20,6 +20,7 @@ cfgsync-core = { workspace = true }
k8s-openapi = { features = ["latest"], version = "0.27" }
kube = { default-features = false, features = ["client", "runtime", "rustls-tls"], version = "3.1" }
reqwest = { features = ["json"], workspace = true }
rustls = { default-features = false, features = ["ring"], version = "0.23" }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }

View File

@ -526,6 +526,7 @@ fn resolve_observability_inputs(
}
async fn init_kube_client() -> Result<Client, K8sRunnerError> {
crate::ensure_rustls_provider_installed();
Client::try_default()
.await
.map_err(|source| K8sRunnerError::ClientInit { source })

View File

@ -10,9 +10,13 @@ use cfgsync_artifacts::ArtifactSet;
use kube::Client;
use reqwest::Url;
use tempfile::TempDir;
use testing_framework_core::scenario::{
Application, DynError, HttpReadinessRequirement, NodeAccess,
wait_for_http_ports_with_host_and_requirement, wait_http_readiness,
use testing_framework_core::{
cfgsync::StaticNodeConfigProvider,
scenario::{
Application, DynError, HttpReadinessRequirement, NodeAccess,
wait_for_http_ports_with_host_and_requirement, wait_http_readiness,
},
topology::DeploymentDescriptor,
};
use crate::{
@ -31,6 +35,20 @@ pub struct RenderedHelmChartAssets {
_tempdir: TempDir,
}
#[derive(Clone, Debug)]
pub struct BinaryConfigK8sSpec {
pub chart_name: String,
pub node_name_prefix: String,
pub binary_path: String,
pub config_container_path: String,
pub container_http_port: u16,
pub service_testing_port: u16,
pub image_env_var: String,
pub fallback_image_env_var: String,
pub default_image: String,
pub image_pull_policy: String,
}
impl HelmReleaseAssets for RenderedHelmChartAssets {
fn release_bundle(&self) -> HelmReleaseBundle {
HelmReleaseBundle::new(self.chart_path.clone())
@ -45,6 +63,127 @@ pub fn standard_port_specs(node_count: usize, api: u16, auxiliary: u16) -> PortS
}
}
impl BinaryConfigK8sSpec {
#[must_use]
pub fn conventional(
chart_name: &str,
node_name_prefix: &str,
binary_path: &str,
config_container_path: &str,
container_http_port: u16,
service_testing_port: u16,
) -> Self {
let binary_name = binary_path
.rsplit('/')
.next()
.unwrap_or(binary_path)
.to_owned();
let env_prefix = binary_name
.strip_suffix("-node")
.unwrap_or(&binary_name)
.replace('-', "_")
.to_ascii_uppercase();
Self {
chart_name: chart_name.to_owned(),
node_name_prefix: node_name_prefix.to_owned(),
binary_path: binary_path.to_owned(),
config_container_path: config_container_path.to_owned(),
container_http_port,
service_testing_port,
image_env_var: format!("{env_prefix}_K8S_IMAGE"),
fallback_image_env_var: format!("{env_prefix}_IMAGE"),
default_image: format!("{binary_name}:local"),
image_pull_policy: "IfNotPresent".to_owned(),
}
}
}
pub fn render_binary_config_node_chart_assets<E>(
deployment: &E::Deployment,
spec: &BinaryConfigK8sSpec,
) -> Result<RenderedHelmChartAssets, DynError>
where
E: StaticNodeConfigProvider,
E::Deployment: DeploymentDescriptor,
{
let manifest = render_binary_config_node_manifest::<E>(deployment, spec)?;
render_single_template_chart_assets(
&spec.chart_name,
&format!("{}.yaml", spec.chart_name),
&manifest,
)
}
pub fn render_binary_config_node_manifest<E>(
deployment: &E::Deployment,
spec: &BinaryConfigK8sSpec,
) -> Result<String, DynError>
where
E: StaticNodeConfigProvider,
E::Deployment: DeploymentDescriptor,
{
let node_count = deployment.node_count();
let mut docs = Vec::with_capacity(node_count * 3);
let hostnames = (0..node_count)
.map(|index| format!("{}-{index}", spec.node_name_prefix))
.collect::<Vec<_>>();
for index in 0..node_count {
let name = &hostnames[index];
let mut config = E::build_node_config(deployment, index)?;
E::rewrite_for_hostnames(deployment, index, &hostnames, &mut config)?;
let config_yaml = E::serialize_node_config(&config)?;
docs.push(render_node_config_map(name, &config_yaml));
docs.push(render_node_deployment(name, spec));
docs.push(render_node_service(name, spec));
}
Ok(docs.join("\n---\n"))
}
fn render_node_config_map(name: &str, config_yaml: &str) -> String {
format!(
"apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: {name}-config\ndata:\n config.yaml: |\n{}",
indent_yaml(config_yaml, 4)
)
}
fn render_node_deployment(name: &str, spec: &BinaryConfigK8sSpec) -> String {
format!(
"apiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: {name}\nspec:\n replicas: 1\n selector:\n matchLabels:\n app: {name}\n template:\n metadata:\n labels:\n app: {name}\n spec:\n containers:\n - name: app\n image: {}\n imagePullPolicy: {}\n args:\n - --config\n - {}\n ports:\n - containerPort: {}\n volumeMounts:\n - name: config\n mountPath: {}\n subPath: config.yaml\n volumes:\n - name: config\n configMap:\n name: {name}-config",
k8s_image(spec),
spec.image_pull_policy,
spec.config_container_path,
spec.container_http_port,
spec.config_container_path,
)
}
fn render_node_service(name: &str, spec: &BinaryConfigK8sSpec) -> String {
format!(
"apiVersion: v1\nkind: Service\nmetadata:\n name: {name}\nspec:\n selector:\n app: {name}\n type: NodePort\n ports:\n - name: api\n port: {api_port}\n targetPort: {api_port}\n protocol: TCP\n - name: testing\n port: {testing_port}\n targetPort: {api_port}\n protocol: TCP",
api_port = spec.container_http_port,
testing_port = spec.service_testing_port
)
}
fn indent_yaml(value: &str, spaces: usize) -> String {
let padding = " ".repeat(spaces);
value
.lines()
.map(|line| format!("{padding}{line}"))
.collect::<Vec<_>>()
.join("\n")
}
fn k8s_image(spec: &BinaryConfigK8sSpec) -> String {
env::var(&spec.image_env_var)
.or_else(|_| env::var(&spec.fallback_image_env_var))
.unwrap_or_else(|_| spec.default_image.clone())
}
pub fn render_single_template_chart_assets(
chart_name: &str,
template_name: &str,

View File

@ -5,14 +5,26 @@ mod infrastructure;
mod lifecycle;
mod manual;
mod workspace;
use std::sync::Once;
pub mod wait {
pub use crate::lifecycle::wait::*;
}
static RUSTLS_PROVIDER: Once = Once::new();
pub(crate) fn ensure_rustls_provider_installed() {
RUSTLS_PROVIDER.call_once(|| {
let _ = rustls::crypto::ring::default_provider().install_default();
});
}
pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError};
pub use env::{
HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets, discovered_node_access,
install_helm_release_with_cleanup, render_single_template_chart_assets, standard_port_specs,
BinaryConfigK8sSpec, HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets,
discovered_node_access, install_helm_release_with_cleanup,
render_binary_config_node_chart_assets, render_binary_config_node_manifest,
render_single_template_chart_assets, standard_port_specs,
};
pub use infrastructure::{
chart_values::{

View File

@ -133,6 +133,7 @@ where
return Err(ManualClusterError::UnsupportedTopology { nodes });
}
crate::ensure_rustls_provider_installed();
let client = Client::try_default()
.await
.map_err(|source| ManualClusterError::ClientInit { source })?;

View File

@ -605,7 +605,7 @@ mod tests {
static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)]
#[derive(Clone, Default)]
struct DummyFeed;
impl Feed for DummyFeed {
@ -614,6 +614,7 @@ mod tests {
fn subscribe(&self) -> Self::Subscription {}
}
#[derive(Default)]
struct DummyFeedRuntime;
#[async_trait]