From 120b8879a472c26ec148c99f4a130df638b7103b Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 8 Mar 2026 14:27:09 +0100 Subject: [PATCH] Move runtime assembly out of runner and context --- testing-framework/core/src/scenario/mod.rs | 2 +- .../core/src/scenario/runtime/context.rs | 102 +++++++++++++++--- .../core/src/scenario/runtime/mod.rs | 2 +- .../core/src/scenario/runtime/runner.rs | 7 +- .../compose/src/deployer/orchestrator.rs | 19 ++-- .../k8s/src/deployer/orchestrator.rs | 13 ++- .../local/src/deployer/orchestrator.rs | 19 ++-- 7 files changed, 122 insertions(+), 42 deletions(-) diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index ca358a5..e0486dc 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -39,7 +39,7 @@ pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedSource, NodeClients, - ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, + ReadinessError, RunContext, RunHandle, RunMetrics, Runner, RuntimeAssembly, ScenarioError, SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, build_source_orchestration_plan, metrics::{ diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 9e40486..cce8c05 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -21,11 +21,23 @@ pub struct RunContext { cluster_wait: Option>>, } +/// Low-level runtime assembly input used by deployers to build a runnable +/// cluster context. +pub struct RuntimeAssembly { + descriptors: E::Deployment, + node_clients: NodeClients, + run_duration: Duration, + expectation_cooldown: Duration, + telemetry: Metrics, + feed: ::Feed, + node_control: Option>>, + cluster_wait: Option>>, +} + impl RunContext { /// Builds a run context from prepared deployment/runtime artifacts. #[must_use] - #[doc(hidden)] - pub fn new( + pub(crate) fn new( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, @@ -49,8 +61,7 @@ impl RunContext { } #[must_use] - #[doc(hidden)] - pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + pub(crate) fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { self.cluster_wait = Some(cluster_wait); self } @@ -122,6 +133,79 @@ impl RunContext { } } +impl RuntimeAssembly { + #[must_use] + pub fn new( + descriptors: E::Deployment, + node_clients: NodeClients, + run_duration: Duration, + expectation_cooldown: Duration, + telemetry: Metrics, + feed: ::Feed, + ) -> Self { + Self { + descriptors, + node_clients, + run_duration, + expectation_cooldown, + telemetry, + feed, + node_control: None, + cluster_wait: None, + } + } + + #[must_use] + pub fn with_node_control(mut self, node_control: Arc>) -> Self { + self.node_control = Some(node_control); + self + } + + #[must_use] + pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + self.cluster_wait = Some(cluster_wait); + self + } + + #[must_use] + pub fn build_context(self) -> RunContext { + let context = RunContext::new( + self.descriptors, + self.node_clients, + self.run_duration, + self.expectation_cooldown, + self.telemetry, + self.feed, + self.node_control, + ); + + match self.cluster_wait { + Some(cluster_wait) => context.with_cluster_wait(cluster_wait), + None => context, + } + } + + #[must_use] + pub fn build_runner(self, cleanup_guard: Option>) -> super::Runner { + super::Runner::new(self.build_context(), cleanup_guard) + } +} + +impl From> for RuntimeAssembly { + fn from(context: RunContext) -> Self { + Self { + descriptors: context.descriptors, + node_clients: context.node_clients, + run_duration: context.metrics.run_duration(), + expectation_cooldown: context.expectation_cooldown, + telemetry: context.telemetry, + feed: context.feed, + node_control: context.node_control, + cluster_wait: context.cluster_wait, + } + } +} + /// Handle returned by the runner to control the lifecycle of the run. pub struct RunHandle { run_context: Arc>, @@ -137,16 +221,6 @@ impl Drop for RunHandle { } impl RunHandle { - #[must_use] - /// Build a handle from owned context and optional cleanup guard. - #[doc(hidden)] - pub fn new(context: RunContext, cleanup_guard: Option>) -> Self { - Self { - run_context: Arc::new(context), - cleanup_guard, - } - } - #[must_use] /// Build a handle from a shared context reference. pub(crate) fn from_shared( diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 5682ccc..33420c3 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -9,7 +9,7 @@ pub mod readiness; mod runner; use async_trait::async_trait; -pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics}; +pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly}; pub use deployer::{Deployer, ScenarioError}; pub use node_clients::NodeClients; #[doc(hidden)] diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index d9e3f5e..d9bf2b3 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -34,10 +34,11 @@ impl Drop for Runner { } impl Runner { - /// Construct a runner from the run context and optional cleanup guard. #[must_use] - #[doc(hidden)] - pub fn new(context: RunContext, cleanup_guard: Option>) -> Self { + pub(crate) fn new( + context: RunContext, + cleanup_guard: Option>, + ) -> Self { Self { context: Arc::new(context), cleanup_guard, diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 29b58c3..e8ae1eb 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -6,8 +6,8 @@ use testing_framework_core::{ ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, - build_source_orchestration_plan, orchestrate_sources_with_providers, + Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -169,7 +169,7 @@ impl DeploymentOrchestrator { ); let cleanup_guard: Box = Box::new(feed_task); - Ok(Runner::new(context, Some(cleanup_guard))) + Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard))) } fn source_providers(&self, managed_clients: Vec) -> SourceProviders { @@ -283,7 +283,7 @@ impl DeploymentOrchestrator { "compose runtime prepared" ); - Ok(Runner::new(runtime.context, Some(cleanup_guard))) + Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard))) } fn maybe_node_control( @@ -462,16 +462,21 @@ fn build_run_context( node_control: Option>>, cluster_wait: Arc>, ) -> RunContext { - RunContext::new( + let mut assembly = RuntimeAssembly::new( descriptors, node_clients, run_duration, expectation_cooldown, telemetry, feed, - node_control, ) - .with_cluster_wait(cluster_wait) + .with_cluster_wait(cluster_wait); + + if let Some(node_control) = node_control { + assembly = assembly.with_node_control(node_control); + } + + assembly.build_context() } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 5fa21fe..23b9bc8 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -8,7 +8,7 @@ use testing_framework_core::{ Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, - RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders, + RunContext, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -230,18 +230,17 @@ where let telemetry = observability.telemetry_handle()?; let (feed, feed_task) = spawn_block_feed_with::(&node_clients).await?; let cluster_wait = attached_cluster_wait::(scenario, client)?; - let context = RunContext::new( + let context = RuntimeAssembly::new( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), telemetry, feed, - None, ) .with_cluster_wait(cluster_wait); - Ok(Runner::new(context, Some(Box::new(feed_task)))) + Ok(context.build_runner(Some(Box::new(feed_task)))) } fn attached_metadata(scenario: &Scenario) -> K8sDeploymentMetadata @@ -642,7 +641,7 @@ fn finalize_runner( duration_secs, "k8s deployment ready; handing control to scenario runner" ); - Ok(Runner::new(context, Some(cleanup_guard))) + Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard))) } fn take_ready_cluster( @@ -664,16 +663,16 @@ fn build_k8s_run_context( feed: Feed, cluster_wait: Arc>, ) -> RunContext { - RunContext::new( + RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, telemetry, feed, - None, ) .with_cluster_wait(cluster_wait) + .build_context() } fn endpoint_or_disabled(endpoint: Option<&Url>) -> String { diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 28cc01f..bf93859 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -12,8 +12,8 @@ use testing_framework_core::{ scenario::{ Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan, - build_source_orchestration_plan, spawn_feed, + RetryPolicy, RunContext, Runner, RuntimeAssembly, Scenario, ScenarioError, + SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -218,7 +218,7 @@ impl ProcessDeployer { let cleanup_guard: Box = Box::new(LocalProcessGuard::::new(nodes, runtime.feed_task)); - Ok(Runner::new(runtime.context, Some(cleanup_guard))) + Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard))) } async fn deploy_with_node_control( @@ -252,10 +252,7 @@ impl ProcessDeployer { ) .await?; - Ok(Runner::new( - runtime.context, - Some(Box::new(runtime.feed_task)), - )) + Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(Box::new(runtime.feed_task)))) } fn node_control_from( @@ -491,15 +488,19 @@ async fn run_context_for( } let (feed, feed_task) = spawn_feed_with::(&node_clients).await?; - let context = RunContext::new( + let mut assembly = RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, Metrics::empty(), feed, - node_control, ); + if let Some(node_control) = node_control { + assembly = assembly.with_node_control(node_control); + } + + let context = assembly.build_context(); Ok(RuntimeContext { context, feed_task }) }