diff --git a/testing-framework/core/src/runtime/manual.rs b/testing-framework/core/src/runtime/manual.rs index 4e8af53..ac17826 100644 --- a/testing-framework/core/src/runtime/manual.rs +++ b/testing-framework/core/src/runtime/manual.rs @@ -1,7 +1,11 @@ use async_trait::async_trait; -use crate::scenario::{Application, ClusterWaitHandle, NodeControlHandle}; +use crate::scenario::{Application, ClusterControlProfile, ClusterWaitHandle, NodeControlHandle}; /// Interface for imperative, deployer-backed manual clusters. #[async_trait] -pub trait ManualClusterHandle: NodeControlHandle + ClusterWaitHandle {} +pub trait ManualClusterHandle: NodeControlHandle + ClusterWaitHandle { + fn cluster_control_profile(&self) -> ClusterControlProfile { + ClusterControlProfile::ManualControlled + } +} diff --git a/testing-framework/core/src/scenario/definition.rs b/testing-framework/core/src/scenario/definition.rs index 0eea6f2..9fd1d1c 100644 --- a/testing-framework/core/src/scenario/definition.rs +++ b/testing-framework/core/src/scenario/definition.rs @@ -4,8 +4,9 @@ use thiserror::Error; use tracing::{debug, info}; use super::{ - Application, DeploymentPolicy, DynError, ExistingCluster, ExternalNodeSource, - HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability, + Application, ClusterControlProfile, ClusterMode, DeploymentPolicy, DynError, ExistingCluster, + ExternalNodeSource, HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability, + RequiresNodeControl, builder_ops::CoreBuilderAccess, expectation::Expectation, runtime::{ @@ -119,8 +120,13 @@ impl Scenario { } #[must_use] - pub const fn uses_existing_cluster(&self) -> bool { - self.sources.uses_existing_cluster() + pub const fn cluster_mode(&self) -> ClusterMode { + self.sources.cluster_mode() + } + + #[must_use] + pub const fn cluster_control_profile(&self) -> ClusterControlProfile { + self.sources.control_profile() } #[must_use] @@ -134,16 +140,6 @@ impl Scenario { self.sources.external_nodes() } - #[must_use] - pub const fn is_managed(&self) -> bool { - self.sources.is_managed() - } - - #[must_use] - pub const fn is_external_only(&self) -> bool { - self.sources.is_external_only() - } - #[must_use] pub fn has_external_nodes(&self) -> bool { !self.sources.external_nodes().is_empty() @@ -619,11 +615,17 @@ impl Builder { #[must_use] /// Finalize the scenario, computing run metrics and initializing /// components. - pub fn build(self) -> Result, ScenarioBuildError> { + pub fn build(self) -> Result, ScenarioBuildError> + where + Caps: RequiresNodeControl, + { let mut parts = BuilderParts::from_builder(self); let descriptors = parts.resolve_deployment()?; let run_plan = parts.run_plan(); let run_metrics = RunMetrics::new(run_plan.duration); + + validate_source_contract::(parts.sources())?; + let source_orchestration_plan = build_source_orchestration_plan(parts.sources())?; initialize_components( @@ -726,6 +728,17 @@ fn build_source_orchestration_plan( SourceOrchestrationPlan::try_from_sources(sources).map_err(source_plan_error_to_build_error) } +fn validate_source_contract(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> +where + Caps: RequiresNodeControl, +{ + validate_external_only_sources(sources)?; + + validate_node_control_profile::(sources)?; + + Ok(()) +} + fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> ScenarioBuildError { match error { SourceOrchestrationPlanError::SourceModeNotWiredYet { mode } => { @@ -734,6 +747,37 @@ fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> Scen } } +fn validate_external_only_sources(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> { + if matches!(sources.cluster_mode(), ClusterMode::ExternalOnly) + && sources.external_nodes().is_empty() + { + return Err(ScenarioBuildError::SourceConfiguration { + message: "external-only scenarios require at least one external node".to_owned(), + }); + } + + Ok(()) +} + +fn validate_node_control_profile(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> +where + Caps: RequiresNodeControl, +{ + let profile = sources.control_profile(); + + if Caps::REQUIRED && matches!(profile, ClusterControlProfile::ExternalUncontrolled) { + return Err(ScenarioBuildError::SourceConfiguration { + message: format!( + "node control is not available for cluster mode '{}' with control profile '{}'", + sources.cluster_mode().as_str(), + profile.as_str(), + ), + }); + } + + Ok(()) +} + impl Builder { #[must_use] pub fn enable_node_control(self) -> Builder { @@ -818,3 +862,59 @@ fn expectation_cooldown_for(override_value: Option) -> Duration { fn min_run_duration() -> Duration { Duration::from_secs(MIN_RUN_DURATION_SECS) } + +#[cfg(test)] +mod tests { + use super::{ + ScenarioBuildError, validate_external_only_sources, validate_node_control_profile, + }; + use crate::scenario::{ + ExistingCluster, ExternalNodeSource, NodeControlCapability, sources::ScenarioSources, + }; + + #[test] + fn external_only_requires_external_nodes() { + let error = + validate_external_only_sources(&ScenarioSources::default().into_external_only()) + .expect_err("external-only without nodes should fail"); + + assert!(matches!( + error, + ScenarioBuildError::SourceConfiguration { .. } + )); + assert_eq!( + error.to_string(), + "invalid scenario source configuration: external-only scenarios require at least one external node" + ); + } + + #[test] + fn external_only_rejects_node_control_requirement() { + let sources = ScenarioSources::default() + .with_external_node(ExternalNodeSource::new( + "node-0".to_owned(), + "http://127.0.0.1:1".to_owned(), + )) + .into_external_only(); + let error = validate_node_control_profile::(&sources) + .expect_err("external-only should reject node control"); + + assert!(matches!( + error, + ScenarioBuildError::SourceConfiguration { .. } + )); + assert_eq!( + error.to_string(), + "invalid scenario source configuration: node control is not available for cluster mode 'external-only' with control profile 'external-uncontrolled'" + ); + } + + #[test] + fn existing_cluster_accepts_node_control_requirement() { + let sources = ScenarioSources::default() + .with_attach(ExistingCluster::for_compose_project("project".to_owned())); + + validate_node_control_profile::(&sources) + .expect("existing cluster should be considered controllable"); + } +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index ca358a5..dc464dc 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -36,22 +36,25 @@ pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder}; pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; +#[doc(hidden)] pub use runtime::{ ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard, - Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedSource, NodeClients, - ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, - SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, - build_source_orchestration_plan, + FeedHandle, ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources, + orchestrate_sources_with_providers, resolve_sources, +}; +pub use runtime::{ + Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, + RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig, metrics::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed, - wait_for_http_ports, wait_for_http_ports_with_host, + spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, }; -pub use sources::{ExistingCluster, ExternalNodeSource}; +pub use sources::{ClusterControlProfile, ClusterMode, ExistingCluster, ExternalNodeSource}; pub use workload::Workload; pub use crate::env::Application; diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 9e40486..438cb57 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -1,7 +1,9 @@ use std::{sync::Arc, time::Duration}; use super::{metrics::Metrics, node_clients::ClusterClient}; -use crate::scenario::{Application, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle}; +use crate::scenario::{ + Application, ClusterControlProfile, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle, +}; #[derive(Debug, thiserror::Error)] enum RunContextCapabilityError { @@ -15,6 +17,21 @@ pub struct RunContext { node_clients: NodeClients, metrics: RunMetrics, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, + telemetry: Metrics, + feed: ::Feed, + node_control: Option>>, + cluster_wait: Option>>, +} + +/// Low-level runtime assembly input used by deployers to build a runnable +/// cluster context. +pub struct RuntimeAssembly { + descriptors: E::Deployment, + node_clients: NodeClients, + run_duration: Duration, + expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -24,12 +41,12 @@ pub struct RunContext { impl RunContext { /// Builds a run context from prepared deployment/runtime artifacts. #[must_use] - #[doc(hidden)] - pub fn new( + pub(crate) fn new( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -41,6 +58,7 @@ impl RunContext { node_clients, metrics, expectation_cooldown, + cluster_control_profile, telemetry, feed, node_control, @@ -49,8 +67,7 @@ impl RunContext { } #[must_use] - #[doc(hidden)] - pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + pub(crate) fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { self.cluster_wait = Some(cluster_wait); self } @@ -86,13 +103,13 @@ impl RunContext { } #[must_use] - pub const fn expectation_cooldown(&self) -> Duration { + pub(crate) const fn expectation_cooldown(&self) -> Duration { self.expectation_cooldown } #[must_use] - pub const fn run_metrics(&self) -> RunMetrics { - self.metrics + pub const fn cluster_control_profile(&self) -> ClusterControlProfile { + self.cluster_control_profile } #[must_use] @@ -100,11 +117,6 @@ impl RunContext { self.node_control.clone() } - #[must_use] - pub const fn controls_nodes(&self) -> bool { - self.node_control.is_some() - } - pub(crate) async fn wait_network_ready(&self) -> Result<(), DynError> { self.require_cluster_wait()?.wait_network_ready().await } @@ -122,6 +134,83 @@ impl RunContext { } } +impl RuntimeAssembly { + #[must_use] + pub fn new( + descriptors: E::Deployment, + node_clients: NodeClients, + run_duration: Duration, + expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, + telemetry: Metrics, + feed: ::Feed, + ) -> Self { + Self { + descriptors, + node_clients, + run_duration, + expectation_cooldown, + cluster_control_profile, + telemetry, + feed, + node_control: None, + cluster_wait: None, + } + } + + #[must_use] + pub fn with_node_control(mut self, node_control: Arc>) -> Self { + self.node_control = Some(node_control); + self + } + + #[must_use] + pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + self.cluster_wait = Some(cluster_wait); + self + } + + #[must_use] + pub fn build_context(self) -> RunContext { + let context = RunContext::new( + self.descriptors, + self.node_clients, + self.run_duration, + self.expectation_cooldown, + self.cluster_control_profile, + self.telemetry, + self.feed, + self.node_control, + ); + + match self.cluster_wait { + Some(cluster_wait) => context.with_cluster_wait(cluster_wait), + None => context, + } + } + + #[must_use] + pub fn build_runner(self, cleanup_guard: Option>) -> super::Runner { + super::Runner::new(self.build_context(), cleanup_guard) + } +} + +impl From> for RuntimeAssembly { + fn from(context: RunContext) -> Self { + Self { + descriptors: context.descriptors, + node_clients: context.node_clients, + run_duration: context.metrics.run_duration(), + expectation_cooldown: context.expectation_cooldown, + cluster_control_profile: context.cluster_control_profile, + telemetry: context.telemetry, + feed: context.feed, + node_control: context.node_control, + cluster_wait: context.cluster_wait, + } + } +} + /// Handle returned by the runner to control the lifecycle of the run. pub struct RunHandle { run_context: Arc>, @@ -137,16 +226,6 @@ impl Drop for RunHandle { } impl RunHandle { - #[must_use] - /// Build a handle from owned context and optional cleanup guard. - #[doc(hidden)] - pub fn new(context: RunContext, cleanup_guard: Option>) -> Self { - Self { - run_context: Arc::new(context), - cleanup_guard, - } - } - #[must_use] /// Build a handle from a shared context reference. pub(crate) fn from_shared( diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 5682ccc..33420c3 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -9,7 +9,7 @@ pub mod readiness; mod runner; use async_trait::async_trait; -pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics}; +pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly}; pub use deployer::{Deployer, ScenarioError}; pub use node_clients::NodeClients; #[doc(hidden)] diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index 2c3cb4e..2dd84bd 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -1,4 +1,4 @@ -use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources}; +use crate::scenario::{ClusterMode, ExistingCluster, ExternalNodeSource, sources::ScenarioSources}; /// Explicit descriptor for managed node sourcing. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -69,7 +69,19 @@ impl SourceOrchestrationPlan { #[cfg(test)] mod tests { use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; - use crate::scenario::{ExistingCluster, sources::ScenarioSources}; + use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources}; + + #[test] + fn managed_sources_are_planned() { + let plan = SourceOrchestrationPlan::try_from_sources(&ScenarioSources::default()) + .expect("managed sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode(), + SourceOrchestrationMode::Managed { .. } + )); + assert!(plan.external_sources().is_empty()); + } #[test] fn attached_sources_are_planned() { @@ -86,20 +98,75 @@ mod tests { SourceOrchestrationMode::Attached { .. } )); } + + #[test] + fn attached_sources_keep_external_nodes() { + let sources = ScenarioSources::default() + .with_attach(ExistingCluster::for_compose_project( + "test-project".to_string(), + )) + .with_external_node(ExternalNodeSource::new( + "external-0".to_owned(), + "http://127.0.0.1:1".to_owned(), + )); + let plan = SourceOrchestrationPlan::try_from_sources(&sources) + .expect("attached sources with external nodes should build"); + + assert!(matches!( + plan.mode(), + SourceOrchestrationMode::Attached { .. } + )); + assert_eq!(plan.external_sources().len(), 1); + assert_eq!(plan.external_sources()[0].label(), "external-0"); + } + + #[test] + fn external_only_sources_are_planned() { + let sources = ScenarioSources::default() + .with_external_node(ExternalNodeSource::new( + "external-0".to_owned(), + "http://127.0.0.1:1".to_owned(), + )) + .into_external_only(); + let plan = SourceOrchestrationPlan::try_from_sources(&sources) + .expect("external-only sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode(), + SourceOrchestrationMode::ExternalOnly { .. } + )); + assert_eq!(plan.external_sources().len(), 1); + assert_eq!(plan.external_sources()[0].label(), "external-0"); + } } fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode { - match sources { - ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed { - managed: ManagedSource::DeployerManaged, - external: external.clone(), + match sources.cluster_mode() { + ClusterMode::Managed => match sources { + ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed { + managed: ManagedSource::DeployerManaged, + external: external.clone(), + }, + ScenarioSources::Attached { .. } | ScenarioSources::ExternalOnly { .. } => { + unreachable!("cluster mode and source storage must stay aligned") + } }, - ScenarioSources::Attached { attach, external } => SourceOrchestrationMode::Attached { - attach: attach.clone(), - external: external.clone(), + ClusterMode::ExistingCluster => match sources { + ScenarioSources::Attached { attach, external } => SourceOrchestrationMode::Attached { + attach: attach.clone(), + external: external.clone(), + }, + ScenarioSources::Managed { .. } | ScenarioSources::ExternalOnly { .. } => { + unreachable!("cluster mode and source storage must stay aligned") + } }, - ScenarioSources::ExternalOnly { external } => SourceOrchestrationMode::ExternalOnly { - external: external.clone(), + ClusterMode::ExternalOnly => match sources { + ScenarioSources::ExternalOnly { external } => SourceOrchestrationMode::ExternalOnly { + external: external.clone(), + }, + ScenarioSources::Managed { .. } | ScenarioSources::Attached { .. } => { + unreachable!("cluster mode and source storage must stay aligned") + } }, } } diff --git a/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs b/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs index b239193..aadb31c 100644 --- a/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs +++ b/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; use crate::scenario::{Application, DynError, ExistingCluster}; -/// Attached node discovered from an existing external cluster source. +/// Node discovered from an existing cluster descriptor. #[derive(Clone, Debug)] pub struct AttachedNode { /// Optional stable identity hint used by runtime inventory dedup logic. @@ -14,7 +14,7 @@ pub struct AttachedNode { /// Errors returned by attach providers while discovering attached nodes. #[derive(Debug, thiserror::Error)] pub enum AttachProviderError { - #[error("attach source is not supported by this provider: {attach_source:?}")] + #[error("existing cluster descriptor is not supported by this provider: {attach_source:?}")] UnsupportedSource { attach_source: ExistingCluster }, #[error("attach discovery failed: {source}")] Discovery { @@ -23,13 +23,13 @@ pub enum AttachProviderError { }, } -/// Internal adapter interface for discovering pre-existing nodes. +/// Internal adapter interface for discovering nodes in an existing cluster. /// /// This is scaffolding-only in phase 1 and is intentionally not wired into /// deployer runtime orchestration yet. #[async_trait] pub trait AttachProvider: Send + Sync { - /// Discovers node clients for the requested attach source. + /// Discovers node clients for the requested existing cluster. async fn discover( &self, source: &ExistingCluster, diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index d9e3f5e..216cd97 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -34,10 +34,11 @@ impl Drop for Runner { } impl Runner { - /// Construct a runner from the run context and optional cleanup guard. #[must_use] - #[doc(hidden)] - pub fn new(context: RunContext, cleanup_guard: Option>) -> Self { + pub(crate) fn new( + context: RunContext, + cleanup_guard: Option>, + ) -> Self { Self { context: Arc::new(context), cleanup_guard, @@ -191,7 +192,7 @@ impl Runner { } fn settle_wait_duration(context: &RunContext) -> Option { - let has_node_control = context.controls_nodes(); + let has_node_control = context.node_control().is_some(); let configured_wait = context.expectation_cooldown(); if configured_wait.is_zero() && !has_node_control { @@ -232,7 +233,7 @@ impl Runner { fn cooldown_duration(context: &RunContext) -> Option { // Managed environments need a minimum cooldown so feed and expectations // observe stabilized state. - let needs_stabilization = context.controls_nodes(); + let needs_stabilization = context.cluster_control_profile().framework_owns_lifecycle(); let mut wait = context.expectation_cooldown(); diff --git a/testing-framework/core/src/scenario/sources/mod.rs b/testing-framework/core/src/scenario/sources/mod.rs index 506a8aa..01e8807 100644 --- a/testing-framework/core/src/scenario/sources/mod.rs +++ b/testing-framework/core/src/scenario/sources/mod.rs @@ -2,4 +2,4 @@ mod model; pub(crate) use model::ScenarioSources; #[doc(hidden)] -pub use model::{ExistingCluster, ExternalNodeSource}; +pub use model::{ClusterControlProfile, ClusterMode, ExistingCluster, ExternalNodeSource}; diff --git a/testing-framework/core/src/scenario/sources/model.rs b/testing-framework/core/src/scenario/sources/model.rs index a64b010..e720524 100644 --- a/testing-framework/core/src/scenario/sources/model.rs +++ b/testing-framework/core/src/scenario/sources/model.rs @@ -119,6 +119,51 @@ impl ExternalNodeSource { } } +/// High-level source mode of a scenario cluster. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ClusterMode { + Managed, + ExistingCluster, + ExternalOnly, +} + +impl ClusterMode { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Managed => "managed", + Self::ExistingCluster => "existing-cluster", + Self::ExternalOnly => "external-only", + } + } +} + +/// High-level control/lifecycle expectation for a cluster surface. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ClusterControlProfile { + FrameworkManaged, + ExistingClusterAttached, + ExternalUncontrolled, + ManualControlled, +} + +impl ClusterControlProfile { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::FrameworkManaged => "framework-managed", + Self::ExistingClusterAttached => "existing-cluster-attached", + Self::ExternalUncontrolled => "external-uncontrolled", + Self::ManualControlled => "manual-controlled", + } + } + + #[must_use] + pub const fn framework_owns_lifecycle(self) -> bool { + matches!(self, Self::FrameworkManaged) + } +} + /// Source model that makes invalid managed+attached combinations /// unrepresentable by type. #[derive(Clone, Debug, Eq, PartialEq)] @@ -187,17 +232,59 @@ impl ScenarioSources { } #[must_use] - pub(crate) const fn is_managed(&self) -> bool { - matches!(self, Self::Managed { .. }) + pub(crate) const fn cluster_mode(&self) -> ClusterMode { + match self { + Self::Managed { .. } => ClusterMode::Managed, + Self::Attached { .. } => ClusterMode::ExistingCluster, + Self::ExternalOnly { .. } => ClusterMode::ExternalOnly, + } } #[must_use] - pub(crate) const fn uses_existing_cluster(&self) -> bool { - matches!(self, Self::Attached { .. }) - } - - #[must_use] - pub(crate) const fn is_external_only(&self) -> bool { - matches!(self, Self::ExternalOnly { .. }) + pub(crate) const fn control_profile(&self) -> ClusterControlProfile { + match self.cluster_mode() { + ClusterMode::Managed => ClusterControlProfile::FrameworkManaged, + ClusterMode::ExistingCluster => ClusterControlProfile::ExistingClusterAttached, + ClusterMode::ExternalOnly => ClusterControlProfile::ExternalUncontrolled, + } + } +} + +#[cfg(test)] +mod tests { + use super::{ClusterControlProfile, ExistingCluster, ExternalNodeSource, ScenarioSources}; + + #[test] + fn managed_sources_map_to_framework_managed_control() { + assert_eq!( + ScenarioSources::default().control_profile(), + ClusterControlProfile::FrameworkManaged, + ); + } + + #[test] + fn attached_sources_map_to_existing_cluster_control() { + let sources = ScenarioSources::default() + .with_attach(ExistingCluster::for_compose_project("project".to_owned())); + + assert_eq!( + sources.control_profile(), + ClusterControlProfile::ExistingClusterAttached, + ); + } + + #[test] + fn external_only_sources_map_to_uncontrolled_profile() { + let sources = ScenarioSources::default() + .with_external_node(ExternalNodeSource::new( + "node".to_owned(), + "http://node".to_owned(), + )) + .into_external_only(); + + assert_eq!( + sources.control_profile(), + ClusterControlProfile::ExternalUncontrolled, + ); } } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 8874fd1..24c16d6 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -176,12 +176,12 @@ impl ClusterWaitHandle for ComposeAttachedClusterWait } fn compose_wait_request(source: &ExistingCluster) -> Result, DynError> { - let project = source - .compose_project() - .ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?; - let services = source - .compose_services() - .ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?; + let project = source.compose_project().ok_or_else(|| { + DynError::from("compose cluster wait requires a compose existing-cluster descriptor") + })?; + let services = source.compose_services().ok_or_else(|| { + DynError::from("compose cluster wait requires a compose existing-cluster descriptor") + })?; Ok(ComposeAttachRequest { project, services }) } diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 29b58c3..e18a16f 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,10 +3,11 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, - FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, - ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, DeploymentPolicy, DynError, ExistingCluster, FeedHandle, FeedRuntime, + HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -63,6 +64,12 @@ impl DeploymentOrchestrator { where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { + validate_supported_cluster_mode(scenario).map_err(|source| { + ComposeRunnerError::SourceOrchestration { + source: source.into(), + } + })?; + // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ComposeRunnerError::SourceOrchestration { @@ -70,11 +77,11 @@ impl DeploymentOrchestrator { } })?; - if scenario.uses_existing_cluster() { + if matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { return self - .deploy_attached_only::(scenario, source_plan) + .deploy_existing_cluster::(scenario, source_plan) .await - .map(|runner| (runner, attached_metadata(scenario))); + .map(|runner| (runner, existing_cluster_metadata(scenario))); } let deployment = scenario.deployment(); @@ -137,7 +144,7 @@ impl DeploymentOrchestrator { )) } - async fn deploy_attached_only( + async fn deploy_existing_cluster( &self, scenario: &Scenario, source_plan: SourceOrchestrationPlan, @@ -157,11 +164,12 @@ impl DeploymentOrchestrator { let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; - let context = build_run_context( + let assembly = build_runtime_assembly( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), observability.telemetry_handle()?, feed, node_control, @@ -169,7 +177,7 @@ impl DeploymentOrchestrator { ); let cleanup_guard: Box = Box::new(feed_task); - Ok(Runner::new(context, Some(cleanup_guard))) + Ok(assembly.build_runner(Some(cleanup_guard))) } fn source_providers(&self, managed_clients: Vec) -> SourceProviders { @@ -216,7 +224,7 @@ impl DeploymentOrchestrator { let attach = scenario .existing_cluster() .ok_or(ComposeRunnerError::InternalInvariant { - message: "attached node control requested outside attached source mode", + message: "existing-cluster node control requested outside existing-cluster mode", })?; let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach) .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; @@ -234,7 +242,7 @@ impl DeploymentOrchestrator { let attach = scenario .existing_cluster() .ok_or(ComposeRunnerError::InternalInvariant { - message: "compose attached cluster wait requested outside attached source mode", + message: "compose cluster wait requested outside existing-cluster mode", })?; let cluster_wait = ComposeAttachedClusterWait::::try_new(compose_runner_host(), attach) .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; @@ -268,6 +276,7 @@ impl DeploymentOrchestrator { descriptors: prepared.descriptors.clone(), duration: scenario.duration(), expectation_cooldown: scenario.expectation_cooldown(), + cluster_control_profile: scenario.cluster_control_profile(), telemetry, environment: &mut prepared.environment, node_control, @@ -283,7 +292,7 @@ impl DeploymentOrchestrator { "compose runtime prepared" ); - Ok(Runner::new(runtime.context, Some(cleanup_guard))) + Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } fn maybe_node_control( @@ -363,7 +372,57 @@ impl DeploymentOrchestrator { } } -fn attached_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), DynError> { + if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + return Ok(()); + } + + let cluster = scenario + .existing_cluster() + .ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?; + + ensure_compose_existing_cluster(cluster) +} + +fn ensure_compose_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> { + if cluster.compose_project().is_some() && cluster.compose_services().is_some() { + return Ok(()); + } + + Err("compose deployer requires a compose existing-cluster descriptor".into()) +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ExistingCluster; + + use super::ensure_compose_existing_cluster; + + #[test] + fn compose_cluster_validator_accepts_compose_descriptor() { + ensure_compose_existing_cluster(&ExistingCluster::for_compose_project( + "project".to_owned(), + )) + .expect("compose descriptor should be accepted"); + } + + #[test] + fn compose_cluster_validator_rejects_k8s_descriptor() { + let error = ensure_compose_existing_cluster(&ExistingCluster::for_k8s_selector( + "app=node".to_owned(), + )) + .expect_err("k8s descriptor should be rejected"); + + assert_eq!( + error.to_string(), + "compose deployer requires a compose existing-cluster descriptor" + ); + } +} + +fn existing_cluster_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata where E: ComposeDeployEnv, Caps: Send + Sync, @@ -379,7 +438,7 @@ struct DeployedNodes { } struct ComposeRuntime { - context: RunContext, + assembly: RuntimeAssembly, feed_task: FeedHandle, } @@ -388,6 +447,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { descriptors: E::Deployment, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, @@ -408,18 +468,22 @@ async fn build_compose_runtime( .start_block_feed(&node_clients, input.environment) .await?; - let context = build_run_context( + let assembly = build_runtime_assembly( input.descriptors, node_clients, input.duration, input.expectation_cooldown, + input.cluster_control_profile, input.telemetry, feed, input.node_control, input.cluster_wait, ); - Ok(ComposeRuntime { context, feed_task }) + Ok(ComposeRuntime { + assembly, + feed_task, + }) } async fn deploy_nodes( @@ -452,26 +516,33 @@ async fn deploy_nodes( }) } -fn build_run_context( +fn build_runtime_assembly( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, cluster_wait: Arc>, -) -> RunContext { - RunContext::new( +) -> RuntimeAssembly { + let mut assembly = RuntimeAssembly::new( descriptors, node_clients, run_duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, - node_control, ) - .with_cluster_wait(cluster_wait) + .with_cluster_wait(cluster_wait); + + if let Some(node_control) = node_control { + assembly = assembly.with_node_control(node_control); + } + + assembly } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 90b8a3d..799b633 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -155,7 +155,7 @@ impl NodeControlHandle for ComposeNodeControl { } } -/// Node control handle for compose attached mode. +/// Node control handle for compose existing-cluster mode. pub struct ComposeAttachedNodeControl { pub(crate) project_name: String, } diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index 43925bc..bdfa1fe 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -247,9 +247,9 @@ impl ClusterWaitHandle for K8sAttachedClusterWait { } fn k8s_wait_request(source: &ExistingCluster) -> Result, DynError> { - let label_selector = source - .k8s_label_selector() - .ok_or_else(|| DynError::from("k8s cluster wait requires a k8s attach source"))?; + let label_selector = source.k8s_label_selector().ok_or_else(|| { + DynError::from("k8s cluster wait requires a k8s existing-cluster descriptor") + })?; if label_selector.trim().is_empty() { return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 5fa21fe..83ba257 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -5,11 +5,12 @@ use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ - Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer, - DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, - NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, - RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders, - StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, + Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, Deployer, DynError, ExistingCluster, FeedHandle, FeedRuntime, + HttpReadinessRequirement, Metrics, MetricsError, NodeClients, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -170,6 +171,9 @@ where E: K8sDeployEnv, Caps: ObservabilityCapabilityProvider + Send + Sync, { + validate_supported_cluster_mode(scenario) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { K8sRunnerError::SourceOrchestration { @@ -179,9 +183,10 @@ where let observability = resolve_observability_inputs(scenario.capabilities())?; - if scenario.uses_existing_cluster() { - let runner = deploy_attached_only::(scenario, source_plan, observability).await?; - return Ok((runner, attached_metadata(scenario))); + if matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + let runner = + deploy_existing_cluster::(scenario, source_plan, observability).await?; + return Ok((runner, existing_cluster_metadata(scenario))); } let deployment = build_k8s_deployment::(deployer, scenario, &observability).await?; @@ -204,15 +209,15 @@ where runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&runtime.node_clients)?; - let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); - log_configured_observability(&observability); - maybe_print_endpoints::(&observability, &parts.node_clients); + maybe_print_endpoints::(&observability, &runtime.node_clients); + + let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); let runner = finalize_runner::(&mut cluster, parts)?; Ok((runner, metadata)) } -async fn deploy_attached_only( +async fn deploy_existing_cluster( scenario: &Scenario, source_plan: SourceOrchestrationPlan, observability: ObservabilityInputs, @@ -230,21 +235,21 @@ where let telemetry = observability.telemetry_handle()?; let (feed, feed_task) = spawn_block_feed_with::(&node_clients).await?; let cluster_wait = attached_cluster_wait::(scenario, client)?; - let context = RunContext::new( + let context = RuntimeAssembly::new( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), telemetry, feed, - None, ) .with_cluster_wait(cluster_wait); - Ok(Runner::new(context, Some(Box::new(feed_task)))) + Ok(context.build_runner(Some(Box::new(feed_task)))) } -fn attached_metadata(scenario: &Scenario) -> K8sDeploymentMetadata +fn existing_cluster_metadata(scenario: &Scenario) -> K8sDeploymentMetadata where E: K8sDeployEnv, Caps: Send + Sync, @@ -263,7 +268,7 @@ where let attach = scenario .existing_cluster() .ok_or_else(|| K8sRunnerError::InternalInvariant { - message: "k8s attached cluster wait requested outside attached source mode".to_owned(), + message: "k8s cluster wait requested outside existing-cluster mode".to_owned(), })?; let cluster_wait = K8sAttachedClusterWait::::try_new(client, attach) .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; @@ -271,6 +276,54 @@ where Ok(Arc::new(cluster_wait)) } +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), DynError> { + if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + return Ok(()); + } + + let cluster = scenario + .existing_cluster() + .ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?; + + ensure_k8s_existing_cluster(cluster) +} + +fn ensure_k8s_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> { + if cluster.k8s_label_selector().is_some() { + return Ok(()); + } + + Err("k8s deployer requires a k8s existing-cluster descriptor".into()) +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ExistingCluster; + + use super::ensure_k8s_existing_cluster; + + #[test] + fn k8s_cluster_validator_accepts_k8s_descriptor() { + ensure_k8s_existing_cluster(&ExistingCluster::for_k8s_selector("app=node".to_owned())) + .expect("k8s descriptor should be accepted"); + } + + #[test] + fn k8s_cluster_validator_rejects_compose_descriptor() { + let error = ensure_k8s_existing_cluster(&ExistingCluster::for_compose_project( + "project".to_owned(), + )) + .expect_err("compose descriptor should be rejected"); + + assert_eq!( + error.to_string(), + "k8s deployer requires a k8s existing-cluster descriptor" + ); + } +} + fn managed_cluster_wait( cluster: &Option, metadata: &K8sDeploymentMetadata, @@ -498,15 +551,19 @@ fn build_runner_parts( cluster_wait: Arc>, ) -> K8sRunnerParts { K8sRunnerParts { - descriptors: scenario.deployment().clone(), - node_clients: runtime.node_clients, - duration: scenario.duration(), - expectation_cooldown: scenario.expectation_cooldown(), - telemetry: runtime.telemetry, - feed: runtime.feed, + assembly: build_k8s_runtime_assembly( + scenario.deployment().clone(), + runtime.node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + scenario.cluster_control_profile(), + runtime.telemetry, + runtime.feed, + cluster_wait, + ), feed_task: runtime.feed_task, node_count, - cluster_wait, + duration_secs: scenario.duration().as_secs(), } } @@ -594,15 +651,10 @@ fn maybe_print_endpoints( } struct K8sRunnerParts { - descriptors: E::Deployment, - node_clients: NodeClients, - duration: Duration, - expectation_cooldown: Duration, - telemetry: Metrics, - feed: Feed, + assembly: RuntimeAssembly, feed_task: FeedHandle, node_count: usize, - cluster_wait: Arc>, + duration_secs: u64, } fn finalize_runner( @@ -613,36 +665,21 @@ fn finalize_runner( let (cleanup, port_forwards) = environment.into_cleanup()?; let K8sRunnerParts { - descriptors, - node_clients, - duration, - expectation_cooldown, - telemetry, - feed, + assembly, feed_task, node_count, - cluster_wait, + duration_secs, } = parts; - let duration_secs = duration.as_secs(); let cleanup_guard: Box = Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards)); - let context = build_k8s_run_context( - descriptors, - node_clients, - duration, - expectation_cooldown, - telemetry, - feed, - cluster_wait, - ); info!( nodes = node_count, duration_secs, "k8s deployment ready; handing control to scenario runner" ); - Ok(Runner::new(context, Some(cleanup_guard))) + Ok(assembly.build_runner(Some(cleanup_guard))) } fn take_ready_cluster( @@ -655,23 +692,24 @@ fn take_ready_cluster( }) } -fn build_k8s_run_context( +fn build_k8s_runtime_assembly( descriptors: E::Deployment, node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: Feed, cluster_wait: Arc>, -) -> RunContext { - RunContext::new( +) -> RuntimeAssembly { + RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, - None, ) .with_cluster_wait(cluster_wait) } diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 28cc01f..5af3d2e 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -10,10 +10,10 @@ use std::{ use async_trait::async_trait; use testing_framework_core::{ scenario::{ - Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, - HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan, - build_source_orchestration_plan, spawn_feed, + Application, CleanupGuard, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, + DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario, + ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -187,6 +187,8 @@ impl ProcessDeployer { &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + validate_supported_cluster_mode(scenario)?; + // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { @@ -211,6 +213,7 @@ impl ProcessDeployer { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), None, ) .await?; @@ -218,13 +221,15 @@ impl ProcessDeployer { let cleanup_guard: Box = Box::new(LocalProcessGuard::::new(nodes, runtime.feed_task)); - Ok(Runner::new(runtime.context, Some(cleanup_guard))) + Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } async fn deploy_with_node_control( &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + validate_supported_cluster_mode(scenario)?; + // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { @@ -248,14 +253,14 @@ impl ProcessDeployer { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), Some(node_control), ) .await?; - Ok(Runner::new( - runtime.context, - Some(Box::new(runtime.feed_task)), - )) + Ok(runtime + .assembly + .build_runner(Some(Box::new(runtime.feed_task)))) } fn node_control_from( @@ -312,6 +317,22 @@ impl ProcessDeployer { } } +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), ProcessDeployerError> { + ensure_local_cluster_mode(scenario.cluster_mode()) +} + +fn ensure_local_cluster_mode(mode: ClusterMode) -> Result<(), ProcessDeployerError> { + if matches!(mode, ClusterMode::ExistingCluster) { + return Err(ProcessDeployerError::SourceOrchestration { + source: DynError::from("local deployer does not support existing-cluster mode"), + }); + } + + Ok(()) +} + fn merge_source_clients_for_local( source_plan: &SourceOrchestrationPlan, node_clients: NodeClients, @@ -339,6 +360,29 @@ fn build_retry_execution_config( (retry_policy, execution) } +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ClusterMode; + + use super::ensure_local_cluster_mode; + + #[test] + fn local_cluster_validator_accepts_managed_mode() { + ensure_local_cluster_mode(ClusterMode::Managed).expect("managed mode should be accepted"); + } + + #[test] + fn local_cluster_validator_rejects_existing_cluster_mode() { + let error = ensure_local_cluster_mode(ClusterMode::ExistingCluster) + .expect_err("existing-cluster mode should be rejected"); + + assert_eq!( + error.to_string(), + "source orchestration failed: local deployer does not support existing-cluster mode" + ); + } +} + async fn run_retry_attempt( descriptors: &E::Deployment, execution: RetryExecutionConfig, @@ -475,7 +519,7 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_ } struct RuntimeContext { - context: RunContext, + assembly: RuntimeAssembly, feed_task: FeedHandle, } @@ -484,6 +528,7 @@ async fn run_context_for( node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, node_control: Option>>, ) -> Result, ProcessDeployerError> { if node_clients.is_empty() { @@ -491,15 +536,21 @@ async fn run_context_for( } let (feed, feed_task) = spawn_feed_with::(&node_clients).await?; - let context = RunContext::new( + let mut assembly = RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, + cluster_control_profile, Metrics::empty(), feed, - node_control, ); + if let Some(node_control) = node_control { + assembly = assembly.with_node_control(node_control); + } - Ok(RuntimeContext { context, feed_task }) + Ok(RuntimeContext { + assembly, + feed_task, + }) }