Validate deployer support for cluster modes

This commit is contained in:
andrussal 2026-03-08 15:13:47 +01:00
parent 1268607a68
commit 6405d31ebd
3 changed files with 162 additions and 12 deletions

View File

@ -3,11 +3,11 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url;
use testing_framework_core::{
scenario::{
ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode,
ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement,
Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider,
ObservabilityInputs, RequiresNodeControl, Runner, RuntimeAssembly, Scenario,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode,
ClusterWaitHandle, DeploymentPolicy, DynError, ExistingCluster, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner,
RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
@ -64,6 +64,12 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
validate_supported_cluster_mode(scenario).map_err(|source| {
ComposeRunnerError::SourceOrchestration {
source: source.into(),
}
})?;
// Source planning is currently resolved here before deployer-specific setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ComposeRunnerError::SourceOrchestration {
@ -366,6 +372,56 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}
}
fn validate_supported_cluster_mode<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<(), DynError> {
if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) {
return Ok(());
}
let cluster = scenario
.existing_cluster()
.ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?;
ensure_compose_existing_cluster(cluster)
}
fn ensure_compose_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> {
if cluster.compose_project().is_some() && cluster.compose_services().is_some() {
return Ok(());
}
Err("compose deployer requires a compose existing-cluster descriptor".into())
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::ExistingCluster;
use super::ensure_compose_existing_cluster;
#[test]
fn compose_cluster_validator_accepts_compose_descriptor() {
ensure_compose_existing_cluster(&ExistingCluster::for_compose_project(
"project".to_owned(),
))
.expect("compose descriptor should be accepted");
}
#[test]
fn compose_cluster_validator_rejects_k8s_descriptor() {
let error = ensure_compose_existing_cluster(&ExistingCluster::for_k8s_selector(
"app=node".to_owned(),
))
.expect_err("k8s descriptor should be rejected");
assert_eq!(
error.to_string(),
"compose deployer requires a compose existing-cluster descriptor"
);
}
}
fn existing_cluster_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> ComposeDeploymentMetadata
where
E: ComposeDeployEnv,

View File

@ -6,11 +6,11 @@ use reqwest::Url;
use testing_framework_core::{
scenario::{
Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode,
ClusterWaitHandle, Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement,
Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan,
SourceProviders, StaticManagedProvider, build_source_orchestration_plan,
orchestrate_sources_with_providers,
ClusterWaitHandle, Deployer, DynError, ExistingCluster, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, MetricsError, NodeClients,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner,
RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -171,6 +171,9 @@ where
E: K8sDeployEnv,
Caps: ObservabilityCapabilityProvider + Send + Sync,
{
validate_supported_cluster_mode(scenario)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
// Source planning is currently resolved here before deployer-specific setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
K8sRunnerError::SourceOrchestration {
@ -273,6 +276,54 @@ where
Ok(Arc::new(cluster_wait))
}
fn validate_supported_cluster_mode<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<(), DynError> {
if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) {
return Ok(());
}
let cluster = scenario
.existing_cluster()
.ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?;
ensure_k8s_existing_cluster(cluster)
}
fn ensure_k8s_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> {
if cluster.k8s_label_selector().is_some() {
return Ok(());
}
Err("k8s deployer requires a k8s existing-cluster descriptor".into())
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::ExistingCluster;
use super::ensure_k8s_existing_cluster;
#[test]
fn k8s_cluster_validator_accepts_k8s_descriptor() {
ensure_k8s_existing_cluster(&ExistingCluster::for_k8s_selector("app=node".to_owned()))
.expect("k8s descriptor should be accepted");
}
#[test]
fn k8s_cluster_validator_rejects_compose_descriptor() {
let error = ensure_k8s_existing_cluster(&ExistingCluster::for_compose_project(
"project".to_owned(),
))
.expect_err("compose descriptor should be rejected");
assert_eq!(
error.to_string(),
"k8s deployer requires a k8s existing-cluster descriptor"
);
}
}
fn managed_cluster_wait<E: K8sDeployEnv>(
cluster: &Option<ClusterEnvironment>,
metadata: &K8sDeploymentMetadata,

View File

@ -10,8 +10,8 @@ use std::{
use async_trait::async_trait;
use testing_framework_core::{
scenario::{
Application, CleanupGuard, ClusterControlProfile, Deployer, DeploymentPolicy, DynError,
FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
Application, CleanupGuard, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy,
DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario,
ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed,
},
@ -187,6 +187,8 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
&self,
scenario: &Scenario<E, ()>,
) -> Result<Runner<E>, ProcessDeployerError> {
validate_supported_cluster_mode(scenario)?;
// Source planning is currently resolved here before node spawn/runtime setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ProcessDeployerError::SourceOrchestration {
@ -226,6 +228,8 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
&self,
scenario: &Scenario<E, NodeControlCapability>,
) -> Result<Runner<E>, ProcessDeployerError> {
validate_supported_cluster_mode(scenario)?;
// Source planning is currently resolved here before node spawn/runtime setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ProcessDeployerError::SourceOrchestration {
@ -313,6 +317,22 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
}
}
fn validate_supported_cluster_mode<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<(), ProcessDeployerError> {
ensure_local_cluster_mode(scenario.cluster_mode())
}
fn ensure_local_cluster_mode(mode: ClusterMode) -> Result<(), ProcessDeployerError> {
if matches!(mode, ClusterMode::ExistingCluster) {
return Err(ProcessDeployerError::SourceOrchestration {
source: DynError::from("local deployer does not support existing-cluster mode"),
});
}
Ok(())
}
fn merge_source_clients_for_local<E: LocalDeployerEnv>(
source_plan: &SourceOrchestrationPlan,
node_clients: NodeClients<E>,
@ -340,6 +360,29 @@ fn build_retry_execution_config(
(retry_policy, execution)
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::ClusterMode;
use super::ensure_local_cluster_mode;
#[test]
fn local_cluster_validator_accepts_managed_mode() {
ensure_local_cluster_mode(ClusterMode::Managed).expect("managed mode should be accepted");
}
#[test]
fn local_cluster_validator_rejects_existing_cluster_mode() {
let error = ensure_local_cluster_mode(ClusterMode::ExistingCluster)
.expect_err("existing-cluster mode should be rejected");
assert_eq!(
error.to_string(),
"source orchestration failed: local deployer does not support existing-cluster mode"
);
}
}
async fn run_retry_attempt<E: LocalDeployerEnv>(
descriptors: &E::Deployment,
execution: RetryExecutionConfig,