From 6405d31ebdc79d97742342ea0c627d07082196f6 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 8 Mar 2026 15:13:47 +0100 Subject: [PATCH] Validate deployer support for cluster modes --- .../compose/src/deployer/orchestrator.rs | 66 +++++++++++++++++-- .../k8s/src/deployer/orchestrator.rs | 61 +++++++++++++++-- .../local/src/deployer/orchestrator.rs | 47 ++++++++++++- 3 files changed, 162 insertions(+), 12 deletions(-) diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 8dca52f..e18a16f 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,11 +3,11 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, - ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, - Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, - ObservabilityInputs, RequiresNodeControl, Runner, RuntimeAssembly, Scenario, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, DeploymentPolicy, DynError, ExistingCluster, FeedHandle, FeedRuntime, + HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -64,6 +64,12 @@ impl DeploymentOrchestrator { where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { + validate_supported_cluster_mode(scenario).map_err(|source| { + ComposeRunnerError::SourceOrchestration { + source: source.into(), + } + })?; + // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ComposeRunnerError::SourceOrchestration { @@ -366,6 +372,56 @@ impl DeploymentOrchestrator { } } +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), DynError> { + if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + return Ok(()); + } + + let cluster = scenario + .existing_cluster() + .ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?; + + ensure_compose_existing_cluster(cluster) +} + +fn ensure_compose_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> { + if cluster.compose_project().is_some() && cluster.compose_services().is_some() { + return Ok(()); + } + + Err("compose deployer requires a compose existing-cluster descriptor".into()) +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ExistingCluster; + + use super::ensure_compose_existing_cluster; + + #[test] + fn compose_cluster_validator_accepts_compose_descriptor() { + ensure_compose_existing_cluster(&ExistingCluster::for_compose_project( + "project".to_owned(), + )) + .expect("compose descriptor should be accepted"); + } + + #[test] + fn compose_cluster_validator_rejects_k8s_descriptor() { + let error = ensure_compose_existing_cluster(&ExistingCluster::for_k8s_selector( + "app=node".to_owned(), + )) + .expect_err("k8s descriptor should be rejected"); + + assert_eq!( + error.to_string(), + "compose deployer requires a compose existing-cluster descriptor" + ); + } +} + fn existing_cluster_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata where E: ComposeDeployEnv, diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index d14e373..83ba257 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -6,11 +6,11 @@ use reqwest::Url; use testing_framework_core::{ scenario::{ Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, - ClusterWaitHandle, Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, - Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, - SourceProviders, StaticManagedProvider, build_source_orchestration_plan, - orchestrate_sources_with_providers, + ClusterWaitHandle, Deployer, DynError, ExistingCluster, FeedHandle, FeedRuntime, + HttpReadinessRequirement, Metrics, MetricsError, NodeClients, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -171,6 +171,9 @@ where E: K8sDeployEnv, Caps: ObservabilityCapabilityProvider + Send + Sync, { + validate_supported_cluster_mode(scenario) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { K8sRunnerError::SourceOrchestration { @@ -273,6 +276,54 @@ where Ok(Arc::new(cluster_wait)) } +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), DynError> { + if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + return Ok(()); + } + + let cluster = scenario + .existing_cluster() + .ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?; + + ensure_k8s_existing_cluster(cluster) +} + +fn ensure_k8s_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> { + if cluster.k8s_label_selector().is_some() { + return Ok(()); + } + + Err("k8s deployer requires a k8s existing-cluster descriptor".into()) +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ExistingCluster; + + use super::ensure_k8s_existing_cluster; + + #[test] + fn k8s_cluster_validator_accepts_k8s_descriptor() { + ensure_k8s_existing_cluster(&ExistingCluster::for_k8s_selector("app=node".to_owned())) + .expect("k8s descriptor should be accepted"); + } + + #[test] + fn k8s_cluster_validator_rejects_compose_descriptor() { + let error = ensure_k8s_existing_cluster(&ExistingCluster::for_compose_project( + "project".to_owned(), + )) + .expect_err("compose descriptor should be rejected"); + + assert_eq!( + error.to_string(), + "k8s deployer requires a k8s existing-cluster descriptor" + ); + } +} + fn managed_cluster_wait( cluster: &Option, metadata: &K8sDeploymentMetadata, diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 8ac6e08..5af3d2e 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -10,8 +10,8 @@ use std::{ use async_trait::async_trait; use testing_framework_core::{ scenario::{ - Application, CleanupGuard, ClusterControlProfile, Deployer, DeploymentPolicy, DynError, - FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + Application, CleanupGuard, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, + DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario, ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, }, @@ -187,6 +187,8 @@ impl ProcessDeployer { &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + validate_supported_cluster_mode(scenario)?; + // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { @@ -226,6 +228,8 @@ impl ProcessDeployer { &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + validate_supported_cluster_mode(scenario)?; + // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { @@ -313,6 +317,22 @@ impl ProcessDeployer { } } +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), ProcessDeployerError> { + ensure_local_cluster_mode(scenario.cluster_mode()) +} + +fn ensure_local_cluster_mode(mode: ClusterMode) -> Result<(), ProcessDeployerError> { + if matches!(mode, ClusterMode::ExistingCluster) { + return Err(ProcessDeployerError::SourceOrchestration { + source: DynError::from("local deployer does not support existing-cluster mode"), + }); + } + + Ok(()) +} + fn merge_source_clients_for_local( source_plan: &SourceOrchestrationPlan, node_clients: NodeClients, @@ -340,6 +360,29 @@ fn build_retry_execution_config( (retry_policy, execution) } +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ClusterMode; + + use super::ensure_local_cluster_mode; + + #[test] + fn local_cluster_validator_accepts_managed_mode() { + ensure_local_cluster_mode(ClusterMode::Managed).expect("managed mode should be accepted"); + } + + #[test] + fn local_cluster_validator_rejects_existing_cluster_mode() { + let error = ensure_local_cluster_mode(ClusterMode::ExistingCluster) + .expect_err("existing-cluster mode should be rejected"); + + assert_eq!( + error.to_string(), + "source orchestration failed: local deployer does not support existing-cluster mode" + ); + } +} + async fn run_retry_attempt( descriptors: &E::Deployment, execution: RetryExecutionConfig,