refactor(k8s): redesign runtime around install access manual

This commit is contained in:
andrussal 2026-04-10 17:13:15 +02:00
parent 29637acadf
commit 1c2f734bca
8 changed files with 538 additions and 254 deletions

View File

@ -13,7 +13,10 @@ use testing_framework_core::scenario::{
};
use url::Url;
use crate::{env::K8sDeployEnv, host::node_host};
use crate::{
env::{K8sDeployEnv, node_readiness_path},
host::node_host,
};
#[derive(Debug, thiserror::Error)]
enum K8sAttachDiscoveryError {
@ -264,7 +267,7 @@ fn collect_readiness_endpoints<E: K8sDeployEnv>(
for service in services {
let api_port = extract_api_node_port(service)?;
let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?;
endpoint.set_path(<E as K8sDeployEnv>::node_readiness_path());
endpoint.set_path(node_readiness_path::<E>());
endpoints.push(endpoint);
}

View File

@ -24,7 +24,10 @@ use crate::{
K8sDeploymentMetadata,
attach_provider::{K8sAttachProvider, K8sAttachedClusterWait},
},
env::{HelmReleaseAssets, K8sDeployEnv},
env::{
K8sDeployEnv, attach_node_service_selector, cluster_identifiers, node_base_url,
prepare_stack,
},
infrastructure::cluster::{
ClusterEnvironment, ClusterEnvironmentError, NodeClientError, PortSpecs,
RemoteReadinessError, build_node_clients, collect_port_specs, ensure_cluster_readiness,
@ -72,7 +75,6 @@ impl<E: K8sDeployEnv> K8sDeployer<E> {
scenario: &Scenario<E, Caps>,
) -> Result<(Runner<E>, K8sDeploymentMetadata), K8sRunnerError>
where
E::Assets: HelmReleaseAssets,
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
deploy_with_observability(self, scenario).await
@ -131,7 +133,6 @@ pub enum K8sRunnerError {
impl<E, Caps> Deployer<E, Caps> for K8sDeployer<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
type Error = K8sRunnerError;
@ -174,7 +175,6 @@ async fn deploy_with_observability<E, Caps>(
) -> Result<(Runner<E>, K8sDeploymentMetadata), K8sRunnerError>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
Caps: ObservabilityCapabilityProvider + Send + Sync,
{
validate_supported_cluster_mode(scenario)
@ -198,7 +198,7 @@ where
let deployment = build_k8s_deployment::<E, Caps>(deployer, scenario, &observability).await?;
let metadata = K8sDeploymentMetadata {
namespace: Some(deployment.cluster.namespace().to_owned()),
label_selector: Some(E::attach_node_service_selector(
label_selector: Some(attach_node_service_selector::<E>(
deployment.cluster.release(),
)),
};
@ -397,7 +397,6 @@ async fn build_k8s_deployment<E, Caps>(
) -> Result<BuiltK8sDeployment, K8sRunnerError>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
Caps: ObservabilityCapabilityProvider,
{
let descriptors = scenario.deployment();
@ -439,10 +438,7 @@ async fn setup_cluster<E: K8sDeployEnv>(
readiness_checks: bool,
readiness_requirement: HttpReadinessRequirement,
observability: &ObservabilityInputs,
) -> Result<ClusterEnvironment, K8sRunnerError>
where
E::Assets: HelmReleaseAssets,
{
) -> Result<ClusterEnvironment, K8sRunnerError> {
let (setup, cleanup) = prepare_cluster_setup::<E>(client, descriptors, observability).await?;
let mut cleanup_guard = Some(cleanup);
@ -475,17 +471,15 @@ async fn prepare_cluster_setup<E: K8sDeployEnv>(
client: &Client,
descriptors: &E::Deployment,
observability: &ObservabilityInputs,
) -> Result<(ClusterSetup, RunnerCleanup), K8sRunnerError>
where
E::Assets: HelmReleaseAssets,
{
let assets = E::prepare_assets(descriptors, observability.metrics_otlp_ingest_url.as_ref())
) -> Result<(ClusterSetup, RunnerCleanup), K8sRunnerError> {
let assets = prepare_stack::<E>(descriptors, observability.metrics_otlp_ingest_url.as_ref())
.map_err(|source| K8sRunnerError::Assets { source })?;
let nodes = descriptors.node_count();
let (namespace, release) = E::cluster_identifiers();
let (namespace, release) = cluster_identifiers::<E>();
info!(%namespace, %release, nodes, "preparing k8s assets and namespace");
let cleanup = E::install_stack(client, &assets, &namespace, &release, nodes)
let cleanup = assets
.install(client, &namespace, &release, nodes)
.await
.map_err(|source| K8sRunnerError::InstallStack { source })?;
@ -749,7 +743,7 @@ fn print_node_pprof_endpoints<E: K8sDeployEnv>(node_clients: &NodeClients<E>) {
let nodes = node_clients.snapshot();
for (idx, client) in nodes.iter().enumerate() {
if let Some(base_url) = E::node_base_url(client) {
if let Some(base_url) = node_base_url::<E>(client) {
println!(
"TESTNET_PPROF node_{}={}/debug/pprof/profile?seconds=15&format=proto",
idx, base_url

View File

@ -281,170 +281,467 @@ pub async fn install_helm_release_with_cleanup<A: HelmReleaseAssets>(
}
#[async_trait]
pub trait K8sDeployEnv: Application + Sized {
type Assets: Send + Sync;
/// Collect container port specs from the topology.
fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs;
/// Build deploy-time assets (charts, config payloads, scripts).
fn prepare_assets(
topology: &Self::Deployment,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<Self::Assets, DynError>;
/// Install the k8s stack using the prepared assets.
async fn install_stack(
pub trait PreparedK8sStack: Send + Sync {
async fn install(
&self,
client: &Client,
assets: &Self::Assets,
namespace: &str,
release: &str,
nodes: usize,
) -> Result<RunnerCleanup, DynError>
where
Self::Assets: HelmReleaseAssets,
{
) -> Result<RunnerCleanup, DynError>;
}
#[async_trait]
impl<T> PreparedK8sStack for T
where
T: HelmReleaseAssets + Send + Sync,
{
async fn install(
&self,
client: &Client,
namespace: &str,
release: &str,
nodes: usize,
) -> Result<RunnerCleanup, DynError> {
let _ = nodes;
install_helm_release_with_cleanup(client, assets, namespace, release).await
}
/// Provide a namespace/release identifier pair.
fn cluster_identifiers() -> (String, String) {
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or_default();
let suffix = format!("{stamp:x}-{:x}", process::id());
(format!("tf-testnet-{suffix}"), String::from("tf-runner"))
}
/// Build a single node client from forwarded ports.
fn node_client_from_ports(
host: &str,
api_port: u16,
auxiliary_port: u16,
) -> Result<Self::NodeClient, DynError> {
<Self as Application>::build_node_client(&discovered_node_access(
host,
api_port,
auxiliary_port,
))
}
/// Build node clients from forwarded ports.
fn build_node_clients(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<Self::NodeClient>, DynError> {
node_api_ports
.iter()
.zip(node_auxiliary_ports.iter())
.map(|(&api_port, &auxiliary_port)| {
Self::node_client_from_ports(host, api_port, auxiliary_port)
})
.collect()
}
/// Path appended to readiness probe URLs.
fn node_readiness_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
/// Wait for remote readiness using topology + URLs.
async fn wait_remote_readiness(
topology: &Self::Deployment,
urls: &[Url],
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let _ = topology;
let readiness_urls: Vec<_> = urls
.iter()
.map(|url| {
let mut endpoint = url.clone();
endpoint.set_path(<Self as K8sDeployEnv>::node_readiness_path());
endpoint
})
.collect();
wait_http_readiness(&readiness_urls, requirement).await?;
Ok(())
}
/// Label used for readiness probe logging.
fn node_role() -> &'static str {
"node"
}
/// Deployment resource name for a node index.
fn node_deployment_name(release: &str, index: usize) -> String {
format!("{release}-node-{index}")
}
/// Service resource name for a node index.
fn node_service_name(release: &str, index: usize) -> String {
format!("{release}-node-{index}")
}
/// Label selector used to discover managed node services in
/// existing-cluster mode.
fn attach_node_service_selector(release: &str) -> String {
format!("app.kubernetes.io/instance={release}")
}
/// Wait for HTTP readiness on provided ports for a given host.
async fn wait_for_node_http(
ports: &[u16],
role: &'static str,
host: &str,
timeout: Duration,
poll_interval: Duration,
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let _ = role;
let _ = timeout;
let _ = poll_interval;
wait_for_http_ports_with_host_and_requirement(
ports,
host,
<Self as K8sDeployEnv>::node_readiness_path(),
requirement,
)
.await?;
Ok(())
}
/// Optional base URL for node client diagnostics.
fn node_base_url(_client: &Self::NodeClient) -> Option<String> {
None
}
/// Optional cfgsync/bootstrap service reachable from inside the cluster.
///
/// Manual cluster uses this to update one node's served config before
/// start.
fn cfgsync_service(_release: &str) -> Option<(String, u16)> {
None
}
/// Hostnames that should be rendered into cfgsync-served node configs.
fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec<String> {
(0..node_count)
.map(|index| Self::node_service_name(release, index))
.collect()
}
/// Optional node-local artifact override for manual cluster startup
/// options.
///
/// Return `Some(..)` when options require a node-specific config
/// replacement before the node starts. Return `None` to keep the
/// original cfgsync artifact set.
fn build_cfgsync_override_artifacts(
_topology: &Self::Deployment,
_node_index: usize,
_hostnames: &[String],
_options: &testing_framework_core::scenario::StartNodeOptions<Self>,
) -> Result<Option<ArtifactSet>, DynError> {
Ok(None)
install_helm_release_with_cleanup(client, self, namespace, release).await
}
}
type K8sPortSpecsBuilder<E> =
Box<dyn Fn(&<E as Application>::Deployment) -> PortSpecs + Send + Sync>;
type K8sPreparedStackBuilder<E> = Box<
dyn Fn(
&<E as Application>::Deployment,
Option<&Url>,
) -> Result<Box<dyn PreparedK8sStack>, DynError>
+ Send
+ Sync,
>;
type K8sClusterIdentifiers = Box<dyn Fn() -> (String, String) + Send + Sync>;
type K8sIndexedName = Box<dyn Fn(&str, usize) -> String + Send + Sync>;
type K8sAttachSelector = Box<dyn Fn(&str) -> String + Send + Sync>;
type K8sNodeClientsBuilder<E> = Box<
dyn Fn(&str, &[u16], &[u16]) -> Result<Vec<<E as Application>::NodeClient>, DynError>
+ Send
+ Sync,
>;
type K8sNodeBaseUrl<E> =
Box<dyn Fn(&<E as Application>::NodeClient) -> Option<String> + Send + Sync>;
type K8sCfgsyncService = Box<dyn Fn(&str) -> Option<(String, u16)> + Send + Sync>;
type K8sCfgsyncHostnames = Box<dyn Fn(&str, usize) -> Vec<String> + Send + Sync>;
type K8sCfgsyncOverrideBuilder<E> = Box<
dyn Fn(
&<E as Application>::Deployment,
usize,
&[String],
&testing_framework_core::scenario::StartNodeOptions<E>,
) -> Result<Option<ArtifactSet>, DynError>
+ Send
+ Sync,
>;
pub struct K8sRuntime<E: Application> {
install: K8sInstall<E>,
access: K8sAccess<E>,
manual: K8sManual<E>,
}
pub struct K8sInstall<E: Application> {
collect_port_specs: K8sPortSpecsBuilder<E>,
prepare_stack: K8sPreparedStackBuilder<E>,
cluster_identifiers: K8sClusterIdentifiers,
node_deployment_name: K8sIndexedName,
node_service_name: K8sIndexedName,
attach_node_service_selector: K8sAttachSelector,
}
pub struct K8sAccess<E: Application> {
build_node_clients: K8sNodeClientsBuilder<E>,
readiness_path: &'static str,
node_role: &'static str,
node_base_url: K8sNodeBaseUrl<E>,
}
pub struct K8sManual<E: Application> {
cfgsync_service: K8sCfgsyncService,
cfgsync_hostnames: Option<K8sCfgsyncHostnames>,
build_cfgsync_override_artifacts: K8sCfgsyncOverrideBuilder<E>,
}
impl<E: Application> K8sRuntime<E> {
#[must_use]
pub fn new(install: K8sInstall<E>) -> Self {
Self {
install,
access: K8sAccess::default(),
manual: K8sManual::default(),
}
}
#[must_use]
pub fn with_access(mut self, access: K8sAccess<E>) -> Self {
self.access = access;
self
}
#[must_use]
pub fn with_manual(mut self, manual: K8sManual<E>) -> Self {
self.manual = manual;
self
}
}
impl<E> K8sRuntime<E>
where
E: Application + StaticNodeConfigProvider,
E::Deployment: DeploymentDescriptor,
{
#[must_use]
pub fn binary_config(spec: BinaryConfigK8sSpec) -> Self {
let prepare_spec = spec.clone();
let name_prefix = spec.node_name_prefix.clone();
let container_http_port = spec.container_http_port;
let service_testing_port = spec.service_testing_port;
Self::new(
K8sInstall::new(
move |topology: &E::Deployment| {
standard_port_specs(
topology.node_count(),
container_http_port,
service_testing_port,
)
},
move |topology, _metrics_otlp_ingest_url| {
let assets =
render_binary_config_node_chart_assets::<E>(topology, &prepare_spec)?;
Ok(Box::new(assets) as Box<dyn PreparedK8sStack>)
},
)
.with_node_name_prefix(name_prefix),
)
}
}
impl<E: Application> K8sInstall<E> {
#[must_use]
pub fn new<FP, FA>(collect_port_specs: FP, prepare_stack: FA) -> Self
where
FP: Fn(&E::Deployment) -> PortSpecs + Send + Sync + 'static,
FA: Fn(&E::Deployment, Option<&Url>) -> Result<Box<dyn PreparedK8sStack>, DynError>
+ Send
+ Sync
+ 'static,
{
Self {
collect_port_specs: Box::new(collect_port_specs),
prepare_stack: Box::new(prepare_stack),
cluster_identifiers: Box::new(default_cluster_identifiers),
node_deployment_name: Box::new(default_node_name),
node_service_name: Box::new(default_node_name),
attach_node_service_selector: Box::new(default_attach_node_service_selector),
}
}
#[must_use]
pub fn with_cluster_identifiers<F>(mut self, cluster_identifiers: F) -> Self
where
F: Fn() -> (String, String) + Send + Sync + 'static,
{
self.cluster_identifiers = Box::new(cluster_identifiers);
self
}
#[must_use]
pub fn with_node_name_prefix(mut self, prefix: impl Into<String>) -> Self {
let prefix = prefix.into();
self.node_deployment_name = Box::new(named_resource(prefix.clone()));
self.node_service_name = Box::new(named_resource(prefix));
self
}
#[must_use]
pub fn with_resource_names<FD, FS>(
mut self,
node_deployment_name: FD,
node_service_name: FS,
) -> Self
where
FD: Fn(&str, usize) -> String + Send + Sync + 'static,
FS: Fn(&str, usize) -> String + Send + Sync + 'static,
{
self.node_deployment_name = Box::new(node_deployment_name);
self.node_service_name = Box::new(node_service_name);
self
}
#[must_use]
pub fn with_attach_node_service_selector<F>(mut self, attach_node_service_selector: F) -> Self
where
F: Fn(&str) -> String + Send + Sync + 'static,
{
self.attach_node_service_selector = Box::new(attach_node_service_selector);
self
}
}
impl<E: Application> Default for K8sAccess<E> {
fn default() -> Self {
Self {
build_node_clients: Box::new(default_build_node_clients::<E>),
readiness_path: E::node_readiness_path(),
node_role: "node",
node_base_url: Box::new(|_client| None),
}
}
}
impl<E: Application> K8sAccess<E> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_node_clients<F>(mut self, build_node_clients: F) -> Self
where
F: Fn(&str, &[u16], &[u16]) -> Result<Vec<E::NodeClient>, DynError> + Send + Sync + 'static,
{
self.build_node_clients = Box::new(build_node_clients);
self
}
#[must_use]
pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self {
self.readiness_path = readiness_path;
self
}
#[must_use]
pub fn with_node_role(mut self, node_role: &'static str) -> Self {
self.node_role = node_role;
self
}
#[must_use]
pub fn with_node_base_url<F>(mut self, node_base_url: F) -> Self
where
F: Fn(&E::NodeClient) -> Option<String> + Send + Sync + 'static,
{
self.node_base_url = Box::new(node_base_url);
self
}
}
impl<E: Application> Default for K8sManual<E> {
fn default() -> Self {
Self {
cfgsync_service: Box::new(|_release| None),
cfgsync_hostnames: None,
build_cfgsync_override_artifacts: Box::new(
|_topology, _node_index, _hostnames, _options| Ok(None),
),
}
}
}
impl<E: Application> K8sManual<E> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_cfgsync_service<F>(mut self, cfgsync_service: F) -> Self
where
F: Fn(&str) -> Option<(String, u16)> + Send + Sync + 'static,
{
self.cfgsync_service = Box::new(cfgsync_service);
self
}
#[must_use]
pub fn with_cfgsync_hostnames<F>(mut self, cfgsync_hostnames: F) -> Self
where
F: Fn(&str, usize) -> Vec<String> + Send + Sync + 'static,
{
self.cfgsync_hostnames = Some(Box::new(cfgsync_hostnames));
self
}
#[must_use]
pub fn with_cfgsync_override_artifacts<F>(mut self, build_override_artifacts: F) -> Self
where
F: Fn(
&E::Deployment,
usize,
&[String],
&testing_framework_core::scenario::StartNodeOptions<E>,
) -> Result<Option<ArtifactSet>, DynError>
+ Send
+ Sync
+ 'static,
{
self.build_cfgsync_override_artifacts = Box::new(build_override_artifacts);
self
}
}
pub trait K8sDeployEnv: Application + Sized {
fn k8s_runtime() -> K8sRuntime<Self>;
}
pub(crate) fn runtime_for<E: K8sDeployEnv>() -> K8sRuntime<E> {
E::k8s_runtime()
}
pub(crate) fn collect_port_specs<E: K8sDeployEnv>(deployment: &E::Deployment) -> PortSpecs {
(runtime_for::<E>().install.collect_port_specs)(deployment)
}
pub(crate) fn prepare_stack<E: K8sDeployEnv>(
deployment: &E::Deployment,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<Box<dyn PreparedK8sStack>, DynError> {
(runtime_for::<E>().install.prepare_stack)(deployment, metrics_otlp_ingest_url)
}
pub(crate) fn cluster_identifiers<E: K8sDeployEnv>() -> (String, String) {
(runtime_for::<E>().install.cluster_identifiers)()
}
pub(crate) fn build_node_clients<E: K8sDeployEnv>(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<E::NodeClient>, DynError> {
(runtime_for::<E>().access.build_node_clients)(host, node_api_ports, node_auxiliary_ports)
}
pub(crate) fn node_readiness_path<E: K8sDeployEnv>() -> &'static str {
runtime_for::<E>().access.readiness_path
}
pub(crate) async fn wait_remote_readiness<E: K8sDeployEnv>(
_deployment: &E::Deployment,
urls: &[Url],
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let readiness_urls: Vec<_> = urls
.iter()
.map(|url| {
let mut endpoint = url.clone();
endpoint.set_path(node_readiness_path::<E>());
endpoint
})
.collect();
wait_http_readiness(&readiness_urls, requirement).await?;
Ok(())
}
pub(crate) fn node_role<E: K8sDeployEnv>() -> &'static str {
runtime_for::<E>().access.node_role
}
pub(crate) fn node_deployment_name<E: K8sDeployEnv>(release: &str, index: usize) -> String {
(runtime_for::<E>().install.node_deployment_name)(release, index)
}
pub(crate) fn node_service_name<E: K8sDeployEnv>(release: &str, index: usize) -> String {
(runtime_for::<E>().install.node_service_name)(release, index)
}
pub(crate) fn attach_node_service_selector<E: K8sDeployEnv>(release: &str) -> String {
(runtime_for::<E>().install.attach_node_service_selector)(release)
}
pub(crate) async fn wait_for_node_http<E: K8sDeployEnv>(
ports: &[u16],
role: &'static str,
host: &str,
timeout: Duration,
poll_interval: Duration,
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let _ = role;
let _ = timeout;
let _ = poll_interval;
wait_for_http_ports_with_host_and_requirement(
ports,
host,
node_readiness_path::<E>(),
requirement,
)
.await?;
Ok(())
}
pub(crate) fn node_base_url<E: K8sDeployEnv>(client: &E::NodeClient) -> Option<String> {
(runtime_for::<E>().access.node_base_url)(client)
}
pub(crate) fn cfgsync_service<E: K8sDeployEnv>(release: &str) -> Option<(String, u16)> {
(runtime_for::<E>().manual.cfgsync_service)(release)
}
pub(crate) fn cfgsync_hostnames<E: K8sDeployEnv>(release: &str, node_count: usize) -> Vec<String> {
let runtime = runtime_for::<E>();
if let Some(cfgsync_hostnames) = runtime.manual.cfgsync_hostnames {
return cfgsync_hostnames(release, node_count);
}
(0..node_count)
.map(|index| (runtime.install.node_service_name)(release, index))
.collect()
}
pub(crate) fn build_cfgsync_override_artifacts<E: K8sDeployEnv>(
deployment: &E::Deployment,
node_index: usize,
hostnames: &[String],
options: &testing_framework_core::scenario::StartNodeOptions<E>,
) -> Result<Option<ArtifactSet>, DynError> {
(runtime_for::<E>().manual.build_cfgsync_override_artifacts)(
deployment, node_index, hostnames, options,
)
}
fn default_cluster_identifiers() -> (String, String) {
let stamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or_default();
let suffix = format!("{stamp:x}-{:x}", process::id());
(format!("tf-testnet-{suffix}"), String::from("tf-runner"))
}
fn default_build_node_clients<E: Application>(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<E::NodeClient>, DynError> {
node_api_ports
.iter()
.zip(node_auxiliary_ports.iter())
.map(|(&api_port, &auxiliary_port)| {
<E as Application>::build_node_client(&discovered_node_access(
host,
api_port,
auxiliary_port,
))
})
.collect()
}
fn default_node_name(release: &str, index: usize) -> String {
format!("{release}-node-{index}")
}
fn default_attach_node_service_selector(release: &str) -> String {
format!("app.kubernetes.io/instance={release}")
}
fn named_resource(prefix: String) -> impl Fn(&str, usize) -> String + Send + Sync + 'static {
move |_release, index| format!("{prefix}-{index}")
}

View File

@ -7,7 +7,10 @@ use tracing::{debug, info};
use url::ParseError;
use crate::{
env::K8sDeployEnv,
env::{
K8sDeployEnv, build_node_clients as build_k8s_node_clients,
collect_port_specs as collect_k8s_port_specs, node_role, wait_remote_readiness,
},
lifecycle::{cleanup::RunnerCleanup, logs::dump_namespace_logs},
wait::{
ClusterPorts, ClusterReady, NodeConfigPorts, PortForwardHandle, wait_for_cluster_ready,
@ -131,7 +134,7 @@ pub enum RemoteReadinessError {
}
pub fn collect_port_specs<E: K8sDeployEnv>(descriptors: &E::Deployment) -> PortSpecs {
let specs = E::collect_port_specs(descriptors);
let specs = collect_k8s_port_specs::<E>(descriptors);
debug!(nodes = specs.nodes.len(), "collected k8s port specs");
specs
}
@ -139,7 +142,7 @@ pub fn collect_port_specs<E: K8sDeployEnv>(descriptors: &E::Deployment) -> PortS
pub fn build_node_clients<E: K8sDeployEnv>(
cluster: &ClusterEnvironment,
) -> Result<NodeClients<E>, NodeClientError> {
let nodes = E::build_node_clients(
let nodes = build_k8s_node_clients::<E>(
&cluster.node_host,
&cluster.node_api_ports,
&cluster.node_auxiliary_ports,
@ -159,9 +162,9 @@ pub async fn ensure_cluster_readiness<E: K8sDeployEnv>(
info!("waiting for remote readiness (API + membership)");
let (node_api, _node_auxiliary) = cluster.node_ports();
let node_urls = readiness_urls(node_api, E::node_role(), &cluster.node_host)?;
let node_urls = readiness_urls(node_api, node_role::<E>(), &cluster.node_host)?;
E::wait_remote_readiness(descriptors, &node_urls, requirement)
wait_remote_readiness::<E>(descriptors, &node_urls, requirement)
.await
.map_err(|source| RemoteReadinessError::Remote { source })?;

View File

@ -23,10 +23,11 @@ pub(crate) fn ensure_rustls_provider_installed() {
pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError};
pub use env::{
BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets,
discovered_node_access, install_helm_release_with_cleanup,
render_binary_config_node_chart_assets, render_binary_config_node_manifest,
render_manifest_chart_assets, render_single_template_chart_assets, standard_port_specs,
BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sAccess, K8sDeployEnv, K8sInstall,
K8sManual, K8sRuntime, PreparedK8sStack, RenderedHelmChartAssets, discovered_node_access,
install_helm_release_with_cleanup, render_binary_config_node_chart_assets,
render_binary_config_node_manifest, render_manifest_chart_assets,
render_single_template_chart_assets, standard_port_specs,
};
pub use infrastructure::{
chart_values::{

View File

@ -3,7 +3,10 @@ use std::time::Duration;
use testing_framework_core::scenario::HttpReadinessRequirement;
use super::{ClusterWaitError, http_poll_interval, node_http_probe_timeout, node_http_timeout};
use crate::{env::K8sDeployEnv, host::node_host};
use crate::{
env::{K8sDeployEnv, wait_for_node_http},
host::node_host,
};
const LOCALHOST: &str = "127.0.0.1";
const READINESS_REQUIREMENT: HttpReadinessRequirement = HttpReadinessRequirement::AllNodesReady;
@ -29,7 +32,7 @@ async fn wait_for_node_http_on_host<E: K8sDeployEnv>(
host: &str,
timeout: Duration,
) -> Result<(), ClusterWaitError> {
E::wait_for_node_http(
wait_for_node_http::<E>(
ports,
role,
host,

View File

@ -2,7 +2,7 @@ use kube::Client;
use super::{ClusterPorts, ClusterReady, ClusterWaitError, NodeConfigPorts, NodePortAllocation};
use crate::{
env::K8sDeployEnv,
env::{K8sDeployEnv, node_deployment_name, node_role, node_service_name},
lifecycle::wait::{
deployment::wait_for_deployment_ready,
forwarding::{
@ -27,7 +27,7 @@ pub async fn wait_for_cluster_ready<E: K8sDeployEnv>(
let node_allocations =
discover_ready_node_allocations::<E>(client, namespace, release, node_ports).await?;
let role = E::node_role();
let role = node_role::<E>();
let (readiness, node_allocations) =
if needs_port_forward_fallback::<E>(&node_allocations, role).await {
@ -112,7 +112,7 @@ async fn discover_ready_node_allocations<E: K8sDeployEnv>(
let mut allocations = Vec::with_capacity(node_ports.len());
for (index, ports) in node_ports.iter().enumerate() {
let deployment_name = E::node_deployment_name(release, index);
let deployment_name = node_deployment_name::<E>(release, index);
wait_for_deployment_ready(client, namespace, &deployment_name).await?;
let allocation = discover_node_ports(client, namespace, &deployment_name, *ports).await?;
allocations.push(allocation);
@ -131,7 +131,7 @@ async fn spawn_port_forwards<E: K8sDeployEnv>(
let mut forwards = Vec::new();
for (index, ports) in node_ports.iter().enumerate() {
let service = E::node_service_name(&release, index);
let service = node_service_name::<E>(&release, index);
let api_forward = port_forward_service(&namespace, &service, ports.api)?;
let auxiliary_forward = port_forward_service(&namespace, &service, ports.auxiliary)?;
register_forward_pair(

View File

@ -22,7 +22,11 @@ use tokio_retry::{RetryIf, strategy::FixedInterval};
use crate::{
K8sDeployer,
env::{HelmReleaseAssets, K8sDeployEnv, discovered_node_access},
env::{
K8sDeployEnv, build_cfgsync_override_artifacts, cfgsync_hostnames, cfgsync_service,
cluster_identifiers, collect_port_specs, discovered_node_access, node_deployment_name,
node_readiness_path, node_service_name, prepare_stack,
},
host::node_host,
lifecycle::{
cleanup::RunnerCleanup,
@ -107,10 +111,7 @@ struct ManualClusterState<E: K8sDeployEnv> {
known_clients: Vec<Option<E::NodeClient>>,
}
pub struct ManualCluster<E: K8sDeployEnv>
where
E::Assets: HelmReleaseAssets,
{
pub struct ManualCluster<E: K8sDeployEnv> {
client: Client,
namespace: String,
release: String,
@ -122,10 +123,7 @@ where
state: Arc<Mutex<ManualClusterState<E>>>,
}
impl<E: K8sDeployEnv> ManualCluster<E>
where
E::Assets: HelmReleaseAssets,
{
impl<E: K8sDeployEnv> ManualCluster<E> {
pub async fn from_topology(topology: E::Deployment) -> Result<Self, ManualClusterError> {
let nodes = testing_framework_core::topology::DeploymentDescriptor::node_count(&topology);
if nodes == 0 {
@ -136,14 +134,15 @@ where
let client = Client::try_default()
.await
.map_err(|source| ManualClusterError::ClientInit { source })?;
let assets = E::prepare_assets(&topology, None)
let assets = prepare_stack::<E>(&topology, None)
.map_err(|source| ManualClusterError::Assets { source })?;
let (namespace, release) = E::cluster_identifiers();
let cleanup = E::install_stack(&client, &assets, &namespace, &release, nodes)
let (namespace, release) = cluster_identifiers::<E>();
let cleanup = assets
.install(&client, &namespace, &release, nodes)
.await
.map_err(|source| ManualClusterError::InstallStack { source })?;
let node_ports = E::collect_port_specs(&topology).nodes;
let node_ports = collect_port_specs::<E>(&topology).nodes;
let node_allocations =
discover_all_node_ports::<E>(&client, &namespace, &release, &node_ports).await?;
scale_all_nodes::<E>(&client, &namespace, &release, nodes, 0).await?;
@ -296,7 +295,7 @@ where
testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement(
&ports,
&self.node_host,
<E as K8sDeployEnv>::node_readiness_path(),
node_readiness_path::<E>(),
HttpReadinessRequirement::AllNodesReady,
)
.await
@ -311,7 +310,7 @@ where
testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement(
&[port],
&self.node_host,
<E as K8sDeployEnv>::node_readiness_path(),
node_readiness_path::<E>(),
HttpReadinessRequirement::AllNodesReady,
)
.await
@ -393,13 +392,13 @@ where
index: usize,
options: &StartNodeOptions<E>,
) -> Result<(), ManualClusterError> {
let Some((service, port)) = E::cfgsync_service(&self.release) else {
let Some((service, port)) = cfgsync_service::<E>(&self.release) else {
return ensure_default_cfgsync_options(options);
};
let hostnames = E::cfgsync_hostnames(&self.release, self.node_count);
let hostnames = cfgsync_hostnames::<E>(&self.release, self.node_count);
let artifacts =
E::build_cfgsync_override_artifacts(&self.topology, index, &hostnames, options)
build_cfgsync_override_artifacts::<E>(&self.topology, index, &hostnames, options)
.map_err(|source| ManualClusterError::CfgsyncUpdate {
name: canonical_node_name(index),
source,
@ -431,7 +430,6 @@ where
impl<E> Drop for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
fn drop(&mut self) {
self.stop_all();
@ -445,7 +443,6 @@ where
impl<E> NodeControlHandle<E> for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
Self::restart_node(self, name).await.map_err(Into::into)
@ -478,7 +475,6 @@ where
impl<E> ClusterWaitHandle<E> for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
async fn wait_network_ready(&self) -> Result<(), DynError> {
Self::wait_network_ready(self).await.map_err(Into::into)
@ -486,17 +482,11 @@ where
}
#[async_trait::async_trait]
impl<E> ManualClusterHandle<E> for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
}
impl<E> ManualClusterHandle<E> for ManualCluster<E> where E: K8sDeployEnv {}
impl<E> K8sDeployer<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
pub async fn manual_cluster_from_descriptors(
&self,
@ -515,7 +505,7 @@ async fn discover_all_node_ports<E: K8sDeployEnv>(
) -> Result<Vec<crate::wait::NodePortAllocation>, ManualClusterError> {
let mut allocations = Vec::with_capacity(node_ports.len());
for (index, ports) in node_ports.iter().enumerate() {
let service_name = E::node_service_name(release, index);
let service_name = node_service_name::<E>(release, index);
allocations.push(discover_node_ports(client, namespace, &service_name, *ports).await?);
}
Ok(allocations)
@ -541,7 +531,7 @@ async fn scale_node<E: K8sDeployEnv>(
index: usize,
replicas: i32,
) -> Result<(), ManualClusterError> {
let name = E::node_deployment_name(release, index);
let name = node_deployment_name::<E>(release, index);
let deployments = Api::<Deployment>::namespaced(client.clone(), namespace);
let patch = serde_json::json!({"spec": {"replicas": replicas}});
deployments
@ -663,8 +653,7 @@ mod tests {
use super::*;
use crate::{
PortSpecs, RenderedHelmChartAssets, render_single_template_chart_assets,
standard_port_specs,
RenderedHelmChartAssets, render_single_template_chart_assets, standard_port_specs,
};
struct DummyEnv;
@ -695,31 +684,25 @@ mod tests {
#[async_trait::async_trait]
impl K8sDeployEnv for DummyEnv {
type Assets = RenderedHelmChartAssets;
fn collect_port_specs(_topology: &Self::Deployment) -> PortSpecs {
standard_port_specs(1, 8080, 8081)
}
fn prepare_assets(
_topology: &Self::Deployment,
_metrics_otlp_ingest_url: Option<&reqwest::Url>,
) -> Result<Self::Assets, DynError> {
render_single_template_chart_assets("dummy", "dummy.yaml", "")
}
fn cfgsync_service(release: &str) -> Option<(String, u16)> {
Some((format!("{release}-cfgsync"), 4400))
}
fn build_cfgsync_override_artifacts(
topology: &Self::Deployment,
node_index: usize,
hostnames: &[String],
options: &StartNodeOptions<Self>,
) -> Result<Option<cfgsync_artifacts::ArtifactSet>, DynError> {
build_node_artifact_override::<Self>(topology, node_index, hostnames, options)
.map_err(Into::into)
fn k8s_runtime() -> crate::env::K8sRuntime<Self> {
crate::env::K8sRuntime::new(crate::env::K8sInstall::new(
|_topology| standard_port_specs(1, 8080, 8081),
|_topology, _metrics_otlp_ingest_url| {
let assets: RenderedHelmChartAssets =
render_single_template_chart_assets("dummy", "dummy.yaml", "")?;
Ok(Box::new(assets) as Box<dyn crate::env::PreparedK8sStack>)
},
))
.with_manual(
crate::env::K8sManual::new()
.with_cfgsync_service(|release| Some((format!("{release}-cfgsync"), 4400)))
.with_cfgsync_override_artifacts(|topology, node_index, hostnames, options| {
build_node_artifact_override::<Self>(
topology, node_index, hostnames, options,
)
.map_err(Into::into)
}),
)
}
}
@ -827,7 +810,7 @@ mod tests {
let options = StartNodeOptions::<DummyEnv>::default()
.with_peers(PeerSelection::Named(vec!["node-0".to_owned()]));
let artifacts = DummyEnv::build_cfgsync_override_artifacts(
let artifacts = crate::env::build_cfgsync_override_artifacts::<DummyEnv>(
&topology,
1,
&["node-0".to_owned(), "node-1".to_owned()],
@ -846,7 +829,7 @@ mod tests {
let options =
StartNodeOptions::<DummyEnv>::default().with_config_override("override".to_owned());
let artifacts = DummyEnv::build_cfgsync_override_artifacts(
let artifacts = crate::env::build_cfgsync_override_artifacts::<DummyEnv>(
&topology,
1,
&["node-0".to_owned(), "node-1".to_owned()],