From 898eadf976f09d324082c8934d76cc336a987bbe Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 8 Mar 2026 14:59:59 +0100 Subject: [PATCH] Drive runtime stabilization from cluster control semantics --- .../core/src/scenario/runtime/context.rs | 20 ++++++++++++++----- .../core/src/scenario/runtime/runner.rs | 6 +++--- .../compose/src/deployer/orchestrator.rs | 18 +++++++++++------ .../k8s/src/deployer/orchestrator.rs | 10 +++++++--- .../local/src/deployer/orchestrator.rs | 12 +++++++---- 5 files changed, 45 insertions(+), 21 deletions(-) diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 6f6faf6..438cb57 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -1,7 +1,9 @@ use std::{sync::Arc, time::Duration}; use super::{metrics::Metrics, node_clients::ClusterClient}; -use crate::scenario::{Application, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle}; +use crate::scenario::{ + Application, ClusterControlProfile, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle, +}; #[derive(Debug, thiserror::Error)] enum RunContextCapabilityError { @@ -15,6 +17,7 @@ pub struct RunContext { node_clients: NodeClients, metrics: RunMetrics, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -28,6 +31,7 @@ pub struct RuntimeAssembly { node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -42,6 +46,7 @@ impl RunContext { node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -53,6 +58,7 @@ impl RunContext { node_clients, metrics, expectation_cooldown, + cluster_control_profile, telemetry, feed, node_control, @@ -102,13 +108,13 @@ impl RunContext { } #[must_use] - pub fn node_control(&self) -> Option>> { - self.node_control.clone() + pub const fn cluster_control_profile(&self) -> ClusterControlProfile { + self.cluster_control_profile } #[must_use] - pub(crate) const fn controls_nodes(&self) -> bool { - self.node_control.is_some() + pub fn node_control(&self) -> Option>> { + self.node_control.clone() } pub(crate) async fn wait_network_ready(&self) -> Result<(), DynError> { @@ -135,6 +141,7 @@ impl RuntimeAssembly { node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, ) -> Self { @@ -143,6 +150,7 @@ impl RuntimeAssembly { node_clients, run_duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, node_control: None, @@ -169,6 +177,7 @@ impl RuntimeAssembly { self.node_clients, self.run_duration, self.expectation_cooldown, + self.cluster_control_profile, self.telemetry, self.feed, self.node_control, @@ -193,6 +202,7 @@ impl From> for RuntimeAssembly { node_clients: context.node_clients, run_duration: context.metrics.run_duration(), expectation_cooldown: context.expectation_cooldown, + cluster_control_profile: context.cluster_control_profile, telemetry: context.telemetry, feed: context.feed, node_control: context.node_control, diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index d9bf2b3..146aed7 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -192,10 +192,10 @@ impl Runner { } fn settle_wait_duration(context: &RunContext) -> Option { - let has_node_control = context.controls_nodes(); + let control_profile = context.cluster_control_profile(); let configured_wait = context.expectation_cooldown(); - if configured_wait.is_zero() && !has_node_control { + if configured_wait.is_zero() && !control_profile.supports_node_control() { return None; } @@ -233,7 +233,7 @@ impl Runner { fn cooldown_duration(context: &RunContext) -> Option { // Managed environments need a minimum cooldown so feed and expectations // observe stabilized state. - let needs_stabilization = context.controls_nodes(); + let needs_stabilization = context.cluster_control_profile().framework_owns_lifecycle(); let mut wait = context.expectation_cooldown(); diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 293dd55..aa4f9c1 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,12 +3,12 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, CleanupGuard, ClusterMode, ClusterWaitHandle, - DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, - NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, - SourceProviders, StaticManagedProvider, build_source_orchestration_plan, - orchestrate_sources_with_providers, + ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, + Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, + ObservabilityInputs, RequiresNodeControl, Runner, RuntimeAssembly, Scenario, + SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -163,6 +163,7 @@ impl DeploymentOrchestrator { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), observability.telemetry_handle()?, feed, node_control, @@ -269,6 +270,7 @@ impl DeploymentOrchestrator { descriptors: prepared.descriptors.clone(), duration: scenario.duration(), expectation_cooldown: scenario.expectation_cooldown(), + cluster_control_profile: scenario.cluster_control_profile(), telemetry, environment: &mut prepared.environment, node_control, @@ -389,6 +391,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { descriptors: E::Deployment, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, @@ -414,6 +417,7 @@ async fn build_compose_runtime( node_clients, input.duration, input.expectation_cooldown, + input.cluster_control_profile, input.telemetry, feed, input.node_control, @@ -461,6 +465,7 @@ fn build_runtime_assembly( node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -471,6 +476,7 @@ fn build_runtime_assembly( node_clients, run_duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, ) diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 2b177fe..67652be 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -5,9 +5,9 @@ use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ - Application, ApplicationExternalProvider, CleanupGuard, ClusterMode, ClusterWaitHandle, - Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, - MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, + Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, + Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, @@ -236,6 +236,7 @@ where node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), telemetry, feed, ) @@ -503,6 +504,7 @@ fn build_runner_parts( runtime.node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), runtime.telemetry, runtime.feed, cluster_wait, @@ -643,6 +645,7 @@ fn build_k8s_runtime_assembly( node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: Feed, cluster_wait: Arc>, @@ -652,6 +655,7 @@ fn build_k8s_runtime_assembly( node_clients, duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, ) diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index f931e8b..8ac6e08 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -10,10 +10,10 @@ use std::{ use async_trait::async_trait; use testing_framework_core::{ scenario::{ - Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, - HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, Runner, RuntimeAssembly, Scenario, ScenarioError, SourceOrchestrationPlan, - build_source_orchestration_plan, spawn_feed, + Application, CleanupGuard, ClusterControlProfile, Deployer, DeploymentPolicy, DynError, + FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario, + ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -211,6 +211,7 @@ impl ProcessDeployer { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), None, ) .await?; @@ -248,6 +249,7 @@ impl ProcessDeployer { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), Some(node_control), ) .await?; @@ -483,6 +485,7 @@ async fn run_context_for( node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, node_control: Option>>, ) -> Result, ProcessDeployerError> { if node_clients.is_empty() { @@ -495,6 +498,7 @@ async fn run_context_for( node_clients, duration, expectation_cooldown, + cluster_control_profile, Metrics::empty(), feed, );