diff --git a/logos/examples/doc-snippets/src/chaos_workloads_random_restart.rs b/logos/examples/doc-snippets/src/chaos_workloads_random_restart.rs index d351c20..78ff50a 100644 --- a/logos/examples/doc-snippets/src/chaos_workloads_random_restart.rs +++ b/logos/examples/doc-snippets/src/chaos_workloads_random_restart.rs @@ -7,7 +7,7 @@ use crate::SnippetResult; pub fn random_restart_plan() -> SnippetResult> { ScenarioBuilder::topology_with(|t| t.network_star().nodes(2)) - .enable_node_control() + .with_node_control() .with_workload(RandomRestartWorkload::new( Duration::from_secs(45), // min delay Duration::from_secs(75), // max delay diff --git a/logos/examples/doc-snippets/src/dsl_cheat_sheet_workload_chaos.rs b/logos/examples/doc-snippets/src/dsl_cheat_sheet_workload_chaos.rs index fa3d963..58b2172 100644 --- a/logos/examples/doc-snippets/src/dsl_cheat_sheet_workload_chaos.rs +++ b/logos/examples/doc-snippets/src/dsl_cheat_sheet_workload_chaos.rs @@ -8,7 +8,7 @@ use crate::SnippetResult; pub fn chaos_plan() -> SnippetResult> { ScenarioBuilder::topology_with(|t| t.network_star().nodes(3)) - .enable_node_control() // Enable node control capability + .with_node_control() // Enable node control capability .chaos_with(|c| { c.restart() // Random restart chaos .min_delay(Duration::from_secs(30)) // Min time between restarts diff --git a/logos/examples/doc-snippets/src/examples_advanced_aggressive_chaos_test.rs b/logos/examples/doc-snippets/src/examples_advanced_aggressive_chaos_test.rs index bb9e927..b3f18a3 100644 --- a/logos/examples/doc-snippets/src/examples_advanced_aggressive_chaos_test.rs +++ b/logos/examples/doc-snippets/src/examples_advanced_aggressive_chaos_test.rs @@ -7,7 +7,7 @@ use testing_framework_workflows::{ChaosBuilderExt, ScenarioBuilderExt}; pub async fn aggressive_chaos_test() -> Result<()> { let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(4)) - .enable_node_control() + .with_node_control() .wallets(50) .transactions_with(|txs| txs.rate(10).users(20)) .chaos_with(|c| { diff --git a/logos/examples/doc-snippets/src/examples_chaos_resilience.rs b/logos/examples/doc-snippets/src/examples_chaos_resilience.rs index 7545260..ba8fc49 100644 --- a/logos/examples/doc-snippets/src/examples_chaos_resilience.rs +++ b/logos/examples/doc-snippets/src/examples_chaos_resilience.rs @@ -7,7 +7,7 @@ use testing_framework_workflows::{ChaosBuilderExt, ScenarioBuilderExt}; pub async fn chaos_resilience() -> Result<()> { let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(4)) - .enable_node_control() + .with_node_control() .wallets(20) .transactions_with(|txs| txs.rate(3).users(10)) .chaos_with(|c| { diff --git a/logos/examples/doc-snippets/src/testing_philosophy_determinism_first.rs b/logos/examples/doc-snippets/src/testing_philosophy_determinism_first.rs index 160db8c..1053c08 100644 --- a/logos/examples/doc-snippets/src/testing_philosophy_determinism_first.rs +++ b/logos/examples/doc-snippets/src/testing_philosophy_determinism_first.rs @@ -16,7 +16,7 @@ pub fn determinism_first() -> SnippetResult<()> { // Separate: chaos test (introduces randomness) let _chaos_plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3)) - .enable_node_control() + .with_node_control() .chaos_with(|c| { c.restart() .min_delay(Duration::from_secs(30)) diff --git a/logos/examples/src/bin/compose_runner.rs b/logos/examples/src/bin/compose_runner.rs index 25cecf5..2254ca8 100644 --- a/logos/examples/src/bin/compose_runner.rs +++ b/logos/examples/src/bin/compose_runner.rs @@ -41,7 +41,7 @@ async fn run_compose_case(nodes: usize, run_duration: Duration) -> Result<()> { t.with_network_layout(Libp2pNetworkLayout::Star) .with_node_count(nodes) }) - .enable_node_control() + .with_node_control() .with_run_duration(run_duration) .with_deployment_seed(seed) .initialize_wallet( diff --git a/logos/examples/src/bin/k8s_runner.rs b/logos/examples/src/bin/k8s_runner.rs index ff0e554..8e2dba6 100644 --- a/logos/examples/src/bin/k8s_runner.rs +++ b/logos/examples/src/bin/k8s_runner.rs @@ -37,7 +37,7 @@ async fn run_k8s_case(nodes: usize, run_duration: Duration) -> Result<()> { t.with_network_layout(Libp2pNetworkLayout::Star) .with_node_count(nodes) }) - .enable_observability() + .with_observability() .with_run_duration(run_duration) .with_deployment_seed(seed) .initialize_wallet( diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index fa3befd..5c36326 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -20,10 +20,10 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; - let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .with_run_duration(Duration::from_secs(5)) - .with_attach_source(attach_source) + .with_existing_cluster_from(&metadata) + .map_err(|err| anyhow!("{err}"))? .build()?; let attached_deployer = LbcComposeDeployer::default(); diff --git a/logos/examples/tests/dynamic_join.rs b/logos/examples/tests/dynamic_join.rs index ba88ee1..93a3f3c 100644 --- a/logos/examples/tests/dynamic_join.rs +++ b/logos/examples/tests/dynamic_join.rs @@ -115,7 +115,7 @@ async fn dynamic_join_reaches_consensus_liveness() -> Result<()> { t.with_network_layout(Libp2pNetworkLayout::Star) .with_node_count(2) }) - .enable_node_control() + .with_node_control() .with_workload(JoinNodeWorkload::new("joiner")) .with_expectation(lb_framework::workloads::ConsensusLiveness::::default()) .with_run_duration(Duration::from_secs(60)) @@ -135,7 +135,7 @@ async fn dynamic_join_with_peers_reaches_consensus_liveness() -> Result<()> { t.with_network_layout(Libp2pNetworkLayout::Star) .with_node_count(2) }) - .enable_node_control() + .with_node_control() .with_workload(JoinNodeWithPeersWorkload::new( "joiner", vec!["node-0".to_string()], diff --git a/logos/examples/tests/external_sources_local.rs b/logos/examples/tests/external_sources_local.rs index af16b9d..fbf36f2 100644 --- a/logos/examples/tests/external_sources_local.rs +++ b/logos/examples/tests/external_sources_local.rs @@ -145,8 +145,7 @@ async fn scenario_managed_plus_external_sources_are_orchestrated() -> Result<()> let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder)) .with_run_duration(Duration::from_secs(5)) - .with_external_node(seed_cluster.external_sources()[0].clone()) - .with_external_node(seed_cluster.external_sources()[1].clone()) + .with_external_nodes(seed_cluster.external_sources().to_vec()) .build()?; let deployer = ProcessDeployer::::default(); diff --git a/logos/examples/tests/k8s_attach_node_control.rs b/logos/examples/tests/k8s_attach_node_control.rs index 72b2c9d..66785f8 100644 --- a/logos/examples/tests/k8s_attach_node_control.rs +++ b/logos/examples/tests/k8s_attach_node_control.rs @@ -20,10 +20,10 @@ async fn k8s_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; - let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .with_run_duration(Duration::from_secs(5)) - .with_attach_source(attach_source) + .with_existing_cluster_from(&metadata) + .map_err(|err| anyhow!("{err}"))? .build()?; let attached_deployer = LbcK8sDeployer::default(); diff --git a/logos/examples/tests/local_deployer_restart.rs b/logos/examples/tests/local_deployer_restart.rs index 10649f3..078a1c0 100644 --- a/logos/examples/tests/local_deployer_restart.rs +++ b/logos/examples/tests/local_deployer_restart.rs @@ -12,7 +12,7 @@ use tracing_subscriber::fmt::try_init; async fn local_restart_node() -> Result<()> { let _ = try_init(); let mut scenario = ScenarioBuilder::deployment_with(|t| t.with_node_count(1)) - .enable_node_control() + .with_node_control() .with_run_duration(Duration::from_secs(1)) .build()?; diff --git a/logos/runtime/ext/src/lib.rs b/logos/runtime/ext/src/lib.rs index 177823b..8701c6c 100644 --- a/logos/runtime/ext/src/lib.rs +++ b/logos/runtime/ext/src/lib.rs @@ -44,7 +44,7 @@ impl Application for LbcExtEnv { type FeedRuntime = ::FeedRuntime; fn external_node_client(source: &ExternalNodeSource) -> Result { - let base_url = Url::parse(&source.endpoint)?; + let base_url = Url::parse(source.endpoint())?; Ok(NodeHttpClient::from_urls(base_url, None)) } diff --git a/logos/runtime/ext/src/scenario/mod.rs b/logos/runtime/ext/src/scenario/mod.rs index a25297d..50c976d 100644 --- a/logos/runtime/ext/src/scenario/mod.rs +++ b/logos/runtime/ext/src/scenario/mod.rs @@ -63,7 +63,7 @@ impl CoreBuilderExt for ScenarioBuilder { impl CoreBuilderExt for NodeControlScenarioBuilder { fn deployment_with(f: impl FnOnce(DeploymentBuilder) -> DeploymentBuilder) -> Self { - ScenarioBuilder::deployment_with(f).enable_node_control() + ScenarioBuilder::deployment_with(f).with_node_control() } fn with_wallet_config(self, wallet: WalletConfig) -> Self { @@ -82,7 +82,7 @@ impl CoreBuilderExt for NodeControlScenarioBuilder { impl CoreBuilderExt for ObservabilityScenarioBuilder { fn deployment_with(f: impl FnOnce(DeploymentBuilder) -> DeploymentBuilder) -> Self { - ScenarioBuilder::deployment_with(f).enable_observability() + ScenarioBuilder::deployment_with(f).with_observability() } fn with_wallet_config(self, wallet: WalletConfig) -> Self { diff --git a/testing-framework/core/src/runtime/manual.rs b/testing-framework/core/src/runtime/manual.rs index dfa0292..ac17826 100644 --- a/testing-framework/core/src/runtime/manual.rs +++ b/testing-framework/core/src/runtime/manual.rs @@ -1,15 +1,11 @@ use async_trait::async_trait; -use crate::scenario::{Application, DynError, NodeControlHandle, StartNodeOptions, StartedNode}; +use crate::scenario::{Application, ClusterControlProfile, ClusterWaitHandle, NodeControlHandle}; /// Interface for imperative, deployer-backed manual clusters. #[async_trait] -pub trait ManualClusterHandle: NodeControlHandle { - async fn start_node_with( - &self, - name: &str, - options: StartNodeOptions, - ) -> Result, DynError>; - - async fn wait_network_ready(&self) -> Result<(), DynError>; +pub trait ManualClusterHandle: NodeControlHandle + ClusterWaitHandle { + fn cluster_control_profile(&self) -> ClusterControlProfile { + ClusterControlProfile::ManualControlled + } } diff --git a/testing-framework/core/src/scenario/builder_ext.rs b/testing-framework/core/src/scenario/builder_ext.rs index 9ffca85..b771c2e 100644 --- a/testing-framework/core/src/scenario/builder_ext.rs +++ b/testing-framework/core/src/scenario/builder_ext.rs @@ -95,7 +95,7 @@ impl ObservabilityBuilderExt for ScenarioBuilder { type Env = E; fn with_metrics_query_url(self, url: Url) -> ObservabilityScenarioBuilder { - self.with_observability(single_url_observability(Some(url), None, None)) + self.with_observability_capability(single_url_observability(Some(url), None, None)) } fn with_metrics_query_url_str(self, url: &str) -> ObservabilityScenarioBuilder { @@ -112,7 +112,7 @@ impl ObservabilityBuilderExt for ScenarioBuilder { } fn with_metrics_otlp_ingest_url(self, url: Url) -> ObservabilityScenarioBuilder { - self.with_observability(single_url_observability(None, Some(url), None)) + self.with_observability_capability(single_url_observability(None, Some(url), None)) } fn with_metrics_otlp_ingest_url_str(self, url: &str) -> ObservabilityScenarioBuilder { @@ -129,7 +129,7 @@ impl ObservabilityBuilderExt for ScenarioBuilder { } fn with_grafana_url(self, url: Url) -> ObservabilityScenarioBuilder { - self.with_observability(single_url_observability(None, None, Some(url))) + self.with_observability_capability(single_url_observability(None, None, Some(url))) } fn with_grafana_url_str(self, url: &str) -> ObservabilityScenarioBuilder { diff --git a/testing-framework/core/src/scenario/definition.rs b/testing-framework/core/src/scenario/definition.rs index e8b3a4e..0aa2350 100644 --- a/testing-framework/core/src/scenario/definition.rs +++ b/testing-framework/core/src/scenario/definition.rs @@ -4,15 +4,16 @@ use thiserror::Error; use tracing::{debug, info}; use super::{ - Application, AttachSource, DeploymentPolicy, DynError, ExternalNodeSource, - HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability, ScenarioSources, - SourceReadinessPolicy, + Application, ClusterControlProfile, ClusterMode, DeploymentPolicy, DynError, ExistingCluster, + ExternalNodeSource, HttpReadinessRequirement, IntoExistingCluster, NodeControlCapability, + ObservabilityCapability, RequiresNodeControl, builder_ops::CoreBuilderAccess, expectation::Expectation, runtime::{ context::RunMetrics, - orchestration::{SourceModeName, SourceOrchestrationPlan, SourceOrchestrationPlanError}, + orchestration::{SourceOrchestrationPlan, SourceOrchestrationPlanError}, }, + sources::ScenarioSources, workload::Workload, }; use crate::topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, DynTopologyError}; @@ -44,7 +45,6 @@ pub struct Scenario { expectation_cooldown: Duration, deployment_policy: DeploymentPolicy, sources: ScenarioSources, - source_readiness_policy: SourceReadinessPolicy, source_orchestration_plan: SourceOrchestrationPlan, capabilities: Caps, } @@ -58,7 +58,6 @@ impl Scenario { expectation_cooldown: Duration, deployment_policy: DeploymentPolicy, sources: ScenarioSources, - source_readiness_policy: SourceReadinessPolicy, source_orchestration_plan: SourceOrchestrationPlan, capabilities: Caps, ) -> Self { @@ -70,7 +69,6 @@ impl Scenario { expectation_cooldown, deployment_policy, sources, - source_readiness_policy, source_orchestration_plan, capabilities, } @@ -117,17 +115,34 @@ impl Scenario { } #[must_use] - /// Selected source readiness policy. - /// - /// This is currently reserved for future mixed-source orchestration and - /// does not change runtime behavior yet. - pub const fn source_readiness_policy(&self) -> SourceReadinessPolicy { - self.source_readiness_policy + pub fn existing_cluster(&self) -> Option<&ExistingCluster> { + self.sources.existing_cluster() } #[must_use] - pub fn sources(&self) -> &ScenarioSources { - &self.sources + pub const fn cluster_mode(&self) -> ClusterMode { + self.sources.cluster_mode() + } + + #[must_use] + pub const fn cluster_control_profile(&self) -> ClusterControlProfile { + self.sources.control_profile() + } + + #[must_use] + #[doc(hidden)] + pub fn attached_source(&self) -> Option<&ExistingCluster> { + self.existing_cluster() + } + + #[must_use] + pub fn external_nodes(&self) -> &[ExternalNodeSource] { + self.sources.external_nodes() + } + + #[must_use] + pub fn has_external_nodes(&self) -> bool { + !self.sources.external_nodes().is_empty() } #[must_use] @@ -151,7 +166,6 @@ pub struct Builder { expectation_cooldown: Option, deployment_policy: DeploymentPolicy, sources: ScenarioSources, - source_readiness_policy: SourceReadinessPolicy, capabilities: Caps, } @@ -247,8 +261,24 @@ macro_rules! impl_common_builder_methods { } #[must_use] - pub fn with_attach_source(self, attach: AttachSource) -> Self { - self.map_core_builder(|builder| builder.with_attach_source(attach)) + pub fn with_existing_cluster(self, cluster: ExistingCluster) -> Self { + self.map_core_builder(|builder| builder.with_existing_cluster(cluster)) + } + + #[must_use] + pub fn with_existing_cluster_from( + self, + cluster: impl IntoExistingCluster, + ) -> Result { + let cluster = cluster.into_existing_cluster()?; + + Ok(self.with_existing_cluster(cluster)) + } + + #[must_use] + #[doc(hidden)] + pub fn with_attach_source(self, attach: ExistingCluster) -> Self { + self.with_existing_cluster(attach) } #[must_use] @@ -257,13 +287,24 @@ macro_rules! impl_common_builder_methods { } #[must_use] - pub fn with_source_readiness_policy(self, policy: SourceReadinessPolicy) -> Self { - self.map_core_builder(|builder| builder.with_source_readiness_policy(policy)) + pub fn with_external_nodes( + self, + nodes: impl IntoIterator, + ) -> Self { + self.map_core_builder(|builder| builder.with_external_nodes(nodes)) } #[must_use] - pub fn with_external_only_sources(self) -> Self { - self.map_core_builder(|builder| builder.with_external_only_sources()) + pub fn with_external_only(self) -> Self { + self.map_core_builder(|builder| builder.with_external_only()) + } + + #[must_use] + pub fn with_external_only_nodes( + self, + nodes: impl IntoIterator, + ) -> Self { + self.map_core_builder(|builder| builder.with_external_only_nodes(nodes)) } #[must_use] @@ -350,7 +391,6 @@ impl Builder { expectation_cooldown: None, deployment_policy: DeploymentPolicy::default(), sources: ScenarioSources::default(), - source_readiness_policy: SourceReadinessPolicy::default(), capabilities: Caps::default(), } } @@ -365,14 +405,20 @@ impl ScenarioBuilder { } #[must_use] - pub fn enable_node_control(self) -> NodeControlScenarioBuilder { + pub fn with_node_control(self) -> NodeControlScenarioBuilder { NodeControlScenarioBuilder { inner: self.inner.with_capabilities(NodeControlCapability), } } #[must_use] - pub fn enable_observability(self) -> ObservabilityScenarioBuilder { + #[doc(hidden)] + pub fn enable_node_control(self) -> NodeControlScenarioBuilder { + self.with_node_control() + } + + #[must_use] + pub fn with_observability(self) -> ObservabilityScenarioBuilder { ObservabilityScenarioBuilder { inner: self .inner @@ -380,11 +426,17 @@ impl ScenarioBuilder { } } + #[must_use] + #[doc(hidden)] + pub fn enable_observability(self) -> ObservabilityScenarioBuilder { + self.with_observability() + } + pub fn build(self) -> Result, ScenarioBuildError> { self.inner.build() } - pub(crate) fn with_observability( + pub(crate) fn with_observability_capability( self, observability: ObservabilityCapability, ) -> ObservabilityScenarioBuilder { @@ -453,7 +505,6 @@ impl Builder { expectation_cooldown, deployment_policy, sources, - source_readiness_policy, .. } = self; @@ -466,7 +517,6 @@ impl Builder { expectation_cooldown, deployment_policy, sources, - source_readiness_policy, capabilities, } } @@ -568,33 +618,65 @@ impl Builder { } #[must_use] - pub fn with_attach_source(mut self, attach: AttachSource) -> Self { - self.sources.set_attach(attach); + pub fn with_existing_cluster(mut self, cluster: ExistingCluster) -> Self { + self.sources = self.sources.with_attach(cluster); self } + #[must_use] + pub fn with_existing_cluster_from( + self, + cluster: impl IntoExistingCluster, + ) -> Result { + let cluster = cluster.into_existing_cluster()?; + + Ok(self.with_existing_cluster(cluster)) + } + + #[must_use] + #[doc(hidden)] + pub fn with_attach_source(self, attach: ExistingCluster) -> Self { + self.with_existing_cluster(attach) + } + #[must_use] pub fn with_external_node(mut self, node: ExternalNodeSource) -> Self { - self.sources.add_external_node(node); + self.sources = self.sources.with_external_node(node); self } #[must_use] - /// Configure source readiness policy metadata. - /// - /// This is currently reserved for future mixed-source orchestration and - /// does not change runtime behavior yet. - pub fn with_source_readiness_policy(mut self, policy: SourceReadinessPolicy) -> Self { - self.source_readiness_policy = policy; + pub fn with_external_nodes( + mut self, + nodes: impl IntoIterator, + ) -> Self { + for node in nodes { + self.sources = self.sources.with_external_node(node); + } + self } #[must_use] - pub fn with_external_only_sources(mut self) -> Self { - self.sources.set_external_only(); + pub fn with_external_only(mut self) -> Self { + self.sources = self.sources.into_external_only(); self } + #[must_use] + pub fn with_external_only_nodes( + self, + nodes: impl IntoIterator, + ) -> Self { + self.with_external_only().with_external_nodes(nodes) + } + + #[must_use] + #[doc(hidden)] + pub fn with_external_only_sources(self) -> Self { + self.with_external_only() + } + fn add_workload(&mut self, workload: Box>) { self.expectations.extend(workload.expectations()); self.workloads.push(workload); @@ -607,13 +689,18 @@ impl Builder { #[must_use] /// Finalize the scenario, computing run metrics and initializing /// components. - pub fn build(self) -> Result, ScenarioBuildError> { + pub fn build(self) -> Result, ScenarioBuildError> + where + Caps: RequiresNodeControl, + { let mut parts = BuilderParts::from_builder(self); let descriptors = parts.resolve_deployment()?; let run_plan = parts.run_plan(); let run_metrics = RunMetrics::new(run_plan.duration); - let source_orchestration_plan = - build_source_orchestration_plan(parts.sources(), parts.source_readiness_policy)?; + + validate_source_contract::(parts.sources())?; + + let source_orchestration_plan = build_source_orchestration_plan(parts.sources())?; initialize_components( &descriptors, @@ -640,7 +727,6 @@ impl Builder { run_plan.expectation_cooldown, parts.deployment_policy, parts.sources, - parts.source_readiness_policy, source_orchestration_plan, parts.capabilities, )) @@ -661,7 +747,6 @@ struct BuilderParts { expectation_cooldown: Option, deployment_policy: DeploymentPolicy, sources: ScenarioSources, - source_readiness_policy: SourceReadinessPolicy, capabilities: Caps, } @@ -676,7 +761,6 @@ impl BuilderParts { expectation_cooldown, deployment_policy, sources, - source_readiness_policy, capabilities, .. } = builder; @@ -690,7 +774,6 @@ impl BuilderParts { expectation_cooldown, deployment_policy, sources, - source_readiness_policy, capabilities, } } @@ -715,38 +798,82 @@ impl BuilderParts { fn build_source_orchestration_plan( sources: &ScenarioSources, - readiness_policy: SourceReadinessPolicy, ) -> Result { - SourceOrchestrationPlan::try_from_sources(sources, readiness_policy) - .map_err(source_plan_error_to_build_error) + SourceOrchestrationPlan::try_from_sources(sources).map_err(source_plan_error_to_build_error) +} + +fn validate_source_contract(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> +where + Caps: RequiresNodeControl, +{ + validate_external_only_sources(sources)?; + + validate_node_control_profile::(sources)?; + + Ok(()) } fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> ScenarioBuildError { match error { SourceOrchestrationPlanError::SourceModeNotWiredYet { mode } => { - ScenarioBuildError::SourceModeNotWiredYet { - mode: source_mode_name(mode), - } + ScenarioBuildError::SourceModeNotWiredYet { mode } } } } -const fn source_mode_name(mode: SourceModeName) -> &'static str { - match mode { - SourceModeName::Attached => "Attached", +fn validate_external_only_sources(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> { + if matches!(sources.cluster_mode(), ClusterMode::ExternalOnly) + && sources.external_nodes().is_empty() + { + return Err(ScenarioBuildError::SourceConfiguration { + message: "external-only scenarios require at least one external node".to_owned(), + }); } + + Ok(()) +} + +fn validate_node_control_profile(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> +where + Caps: RequiresNodeControl, +{ + let profile = sources.control_profile(); + + if Caps::REQUIRED && matches!(profile, ClusterControlProfile::ExternalUncontrolled) { + return Err(ScenarioBuildError::SourceConfiguration { + message: format!( + "node control is not available for cluster mode '{}' with control profile '{}'", + sources.cluster_mode().as_str(), + profile.as_str(), + ), + }); + } + + Ok(()) } impl Builder { #[must_use] - pub fn enable_node_control(self) -> Builder { + pub fn with_node_control(self) -> Builder { self.with_capabilities(NodeControlCapability) } #[must_use] - pub fn enable_observability(self) -> Builder { + #[doc(hidden)] + pub fn enable_node_control(self) -> Builder { + self.with_node_control() + } + + #[must_use] + pub fn with_observability(self) -> Builder { self.with_capabilities(ObservabilityCapability::default()) } + + #[must_use] + #[doc(hidden)] + pub fn enable_observability(self) -> Builder { + self.with_observability() + } } fn initialize_components( @@ -821,3 +948,59 @@ fn expectation_cooldown_for(override_value: Option) -> Duration { fn min_run_duration() -> Duration { Duration::from_secs(MIN_RUN_DURATION_SECS) } + +#[cfg(test)] +mod tests { + use super::{ + ScenarioBuildError, validate_external_only_sources, validate_node_control_profile, + }; + use crate::scenario::{ + ExistingCluster, ExternalNodeSource, NodeControlCapability, sources::ScenarioSources, + }; + + #[test] + fn external_only_requires_external_nodes() { + let error = + validate_external_only_sources(&ScenarioSources::default().into_external_only()) + .expect_err("external-only without nodes should fail"); + + assert!(matches!( + error, + ScenarioBuildError::SourceConfiguration { .. } + )); + assert_eq!( + error.to_string(), + "invalid scenario source configuration: external-only scenarios require at least one external node" + ); + } + + #[test] + fn external_only_rejects_node_control_requirement() { + let sources = ScenarioSources::default() + .with_external_node(ExternalNodeSource::new( + "node-0".to_owned(), + "http://127.0.0.1:1".to_owned(), + )) + .into_external_only(); + let error = validate_node_control_profile::(&sources) + .expect_err("external-only should reject node control"); + + assert!(matches!( + error, + ScenarioBuildError::SourceConfiguration { .. } + )); + assert_eq!( + error.to_string(), + "invalid scenario source configuration: node control is not available for cluster mode 'external-only' with control profile 'external-uncontrolled'" + ); + } + + #[test] + fn existing_cluster_accepts_node_control_requirement() { + let sources = ScenarioSources::default() + .with_attach(ExistingCluster::for_compose_project("project".to_owned())); + + validate_node_control_profile::(&sources) + .expect("existing cluster should be considered controllable"); + } +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index f2e43d6..4089111 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -36,23 +36,27 @@ pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder}; pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; +#[doc(hidden)] pub use runtime::{ - ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode, - BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, - HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory, - ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, - SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, - build_source_orchestration_plan, + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard, + FeedHandle, ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources, + orchestrate_sources_with_providers, resolve_sources, +}; +pub use runtime::{ + Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, + RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig, metrics::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed, - wait_for_http_ports, wait_for_http_ports_with_host, + spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, }; -pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; +pub use sources::{ + ClusterControlProfile, ClusterMode, ExistingCluster, ExternalNodeSource, IntoExistingCluster, +}; pub use workload::Workload; pub use crate::env::Application; diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 65f28a5..438cb57 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -2,8 +2,7 @@ use std::{sync::Arc, time::Duration}; use super::{metrics::Metrics, node_clients::ClusterClient}; use crate::scenario::{ - Application, BorrowedNode, ClusterWaitHandle, DynError, ManagedNode, NodeClients, - NodeControlHandle, + Application, ClusterControlProfile, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle, }; #[derive(Debug, thiserror::Error)] @@ -18,6 +17,21 @@ pub struct RunContext { node_clients: NodeClients, metrics: RunMetrics, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, + telemetry: Metrics, + feed: ::Feed, + node_control: Option>>, + cluster_wait: Option>>, +} + +/// Low-level runtime assembly input used by deployers to build a runnable +/// cluster context. +pub struct RuntimeAssembly { + descriptors: E::Deployment, + node_clients: NodeClients, + run_duration: Duration, + expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -27,11 +41,12 @@ pub struct RunContext { impl RunContext { /// Builds a run context from prepared deployment/runtime artifacts. #[must_use] - pub fn new( + pub(crate) fn new( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, @@ -43,6 +58,7 @@ impl RunContext { node_clients, metrics, expectation_cooldown, + cluster_control_profile, telemetry, feed, node_control, @@ -51,7 +67,7 @@ impl RunContext { } #[must_use] - pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + pub(crate) fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { self.cluster_wait = Some(cluster_wait); self } @@ -71,26 +87,6 @@ impl RunContext { self.node_clients.random_client() } - #[must_use] - pub fn managed_nodes(&self) -> Vec> { - self.node_clients.managed_nodes() - } - - #[must_use] - pub fn borrowed_nodes(&self) -> Vec> { - self.node_clients.borrowed_nodes() - } - - #[must_use] - pub fn find_managed_node(&self, identity: &str) -> Option> { - self.node_clients.find_managed(identity) - } - - #[must_use] - pub fn find_borrowed_node(&self, identity: &str) -> Option> { - self.node_clients.find_borrowed(identity) - } - #[must_use] pub fn feed(&self) -> ::Feed { self.feed.clone() @@ -107,13 +103,13 @@ impl RunContext { } #[must_use] - pub const fn expectation_cooldown(&self) -> Duration { + pub(crate) const fn expectation_cooldown(&self) -> Duration { self.expectation_cooldown } #[must_use] - pub const fn run_metrics(&self) -> RunMetrics { - self.metrics + pub const fn cluster_control_profile(&self) -> ClusterControlProfile { + self.cluster_control_profile } #[must_use] @@ -121,22 +117,7 @@ impl RunContext { self.node_control.clone() } - #[must_use] - pub fn cluster_wait(&self) -> Option>> { - self.cluster_wait.clone() - } - - #[must_use] - pub const fn controls_nodes(&self) -> bool { - self.node_control.is_some() - } - - #[must_use] - pub const fn can_wait_network_ready(&self) -> bool { - self.cluster_wait.is_some() - } - - pub async fn wait_network_ready(&self) -> Result<(), DynError> { + pub(crate) async fn wait_network_ready(&self) -> Result<(), DynError> { self.require_cluster_wait()?.wait_network_ready().await } @@ -146,11 +127,90 @@ impl RunContext { } fn require_cluster_wait(&self) -> Result>, DynError> { - self.cluster_wait() + self.cluster_wait + .as_ref() + .map(Arc::clone) .ok_or_else(|| RunContextCapabilityError::MissingClusterWait.into()) } } +impl RuntimeAssembly { + #[must_use] + pub fn new( + descriptors: E::Deployment, + node_clients: NodeClients, + run_duration: Duration, + expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, + telemetry: Metrics, + feed: ::Feed, + ) -> Self { + Self { + descriptors, + node_clients, + run_duration, + expectation_cooldown, + cluster_control_profile, + telemetry, + feed, + node_control: None, + cluster_wait: None, + } + } + + #[must_use] + pub fn with_node_control(mut self, node_control: Arc>) -> Self { + self.node_control = Some(node_control); + self + } + + #[must_use] + pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + self.cluster_wait = Some(cluster_wait); + self + } + + #[must_use] + pub fn build_context(self) -> RunContext { + let context = RunContext::new( + self.descriptors, + self.node_clients, + self.run_duration, + self.expectation_cooldown, + self.cluster_control_profile, + self.telemetry, + self.feed, + self.node_control, + ); + + match self.cluster_wait { + Some(cluster_wait) => context.with_cluster_wait(cluster_wait), + None => context, + } + } + + #[must_use] + pub fn build_runner(self, cleanup_guard: Option>) -> super::Runner { + super::Runner::new(self.build_context(), cleanup_guard) + } +} + +impl From> for RuntimeAssembly { + fn from(context: RunContext) -> Self { + Self { + descriptors: context.descriptors, + node_clients: context.node_clients, + run_duration: context.metrics.run_duration(), + expectation_cooldown: context.expectation_cooldown, + cluster_control_profile: context.cluster_control_profile, + telemetry: context.telemetry, + feed: context.feed, + node_control: context.node_control, + cluster_wait: context.cluster_wait, + } + } +} + /// Handle returned by the runner to control the lifecycle of the run. pub struct RunHandle { run_context: Arc>, @@ -166,15 +226,6 @@ impl Drop for RunHandle { } impl RunHandle { - #[must_use] - /// Build a handle from owned context and optional cleanup guard. - pub fn new(context: RunContext, cleanup_guard: Option>) -> Self { - Self { - run_context: Arc::new(context), - cleanup_guard, - } - } - #[must_use] /// Build a handle from a shared context reference. pub(crate) fn from_shared( @@ -192,10 +243,6 @@ impl RunHandle { pub fn context(&self) -> &RunContext { &self.run_context } - - pub async fn wait_network_ready(&self) -> Result<(), DynError> { - self.run_context.wait_network_ready().await - } } /// Derived metrics about the current run timing. diff --git a/testing-framework/core/src/scenario/runtime/inventory/mod.rs b/testing-framework/core/src/scenario/runtime/inventory/mod.rs index 6bc0334..575f53f 100644 --- a/testing-framework/core/src/scenario/runtime/inventory/mod.rs +++ b/testing-framework/core/src/scenario/runtime/inventory/mod.rs @@ -1,3 +1,3 @@ mod node_inventory; -pub use node_inventory::{BorrowedNode, BorrowedOrigin, ManagedNode, NodeHandle, NodeInventory}; +pub(crate) use node_inventory::NodeInventory; diff --git a/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs b/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs index c45385e..d9b45ce 100644 --- a/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs +++ b/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs @@ -1,91 +1,18 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use parking_lot::RwLock; -use crate::scenario::{Application, DynError, NodeControlHandle, StartNodeOptions, StartedNode}; +use crate::scenario::Application; -/// Origin for borrowed (non-managed) nodes in the runtime inventory. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum BorrowedOrigin { - /// Node discovered from an attached cluster provider. - Attached, - /// Node provided explicitly as an external endpoint. - External, -} - -/// Managed node handle with full lifecycle capabilities. -pub struct ManagedNode { - /// Canonical node identity used for deduplication and lookups. - pub identity: String, - /// Application-specific API client for this node. - pub client: E::NodeClient, -} - -/// Borrowed node handle (attached or external), query-only by default. -pub struct BorrowedNode { - /// Canonical node identity used for deduplication and lookups. - pub identity: String, - /// Application-specific API client for this node. - pub client: E::NodeClient, - /// Borrowed source kind used for diagnostics and selection. - pub origin: BorrowedOrigin, -} - -/// Unified node handle variant used by runtime inventory snapshots. -pub enum NodeHandle { - /// Managed node variant. - Managed(ManagedNode), - /// Borrowed node variant. - Borrowed(BorrowedNode), -} - -impl Clone for ManagedNode { - fn clone(&self) -> Self { - Self { - identity: self.identity.clone(), - client: self.client.clone(), - } - } -} - -impl Clone for BorrowedNode { - fn clone(&self) -> Self { - Self { - identity: self.identity.clone(), - client: self.client.clone(), - origin: self.origin, - } - } -} - -impl Clone for NodeHandle { - fn clone(&self) -> Self { - match self { - Self::Managed(node) => Self::Managed(node.clone()), - Self::Borrowed(node) => Self::Borrowed(node.clone()), - } - } -} - -/// Thread-safe node inventory with identity-based upsert semantics. -pub struct NodeInventory { - inner: Arc>>, -} - -struct NodeInventoryInner { - nodes: Vec>, - indices_by_identity: HashMap, - next_synthetic_id: usize, +/// Thread-safe node client storage used by runtime handles. +pub(crate) struct NodeInventory { + clients: Arc>>, } impl Default for NodeInventory { fn default() -> Self { Self { - inner: Arc::new(RwLock::new(NodeInventoryInner { - nodes: Vec::new(), - indices_by_identity: HashMap::new(), - next_synthetic_id: 0, - })), + clients: Arc::new(RwLock::new(Vec::new())), } } } @@ -93,243 +20,44 @@ impl Default for NodeInventory { impl Clone for NodeInventory { fn clone(&self) -> Self { Self { - inner: Arc::clone(&self.inner), + clients: Arc::clone(&self.clients), } } } impl NodeInventory { #[must_use] - /// Builds an inventory from managed clients. - pub fn from_managed_clients(clients: Vec) -> Self { - let inventory = Self::default(); - - for client in clients { - inventory.add_managed_node(client, None); - } - - inventory - } - - #[must_use] - /// Returns a cloned snapshot of all node clients. - pub fn snapshot_clients(&self) -> Vec { - self.inner.read().nodes.iter().map(clone_client).collect() - } - - #[must_use] - /// Returns cloned managed node handles from the current inventory. - pub fn managed_nodes(&self) -> Vec> { - self.inner - .read() - .nodes - .iter() - .filter_map(|handle| match handle { - NodeHandle::Managed(node) => Some(node.clone()), - NodeHandle::Borrowed(_) => None, - }) - .collect() - } - - #[must_use] - /// Returns cloned borrowed node handles from the current inventory. - pub fn borrowed_nodes(&self) -> Vec> { - self.inner - .read() - .nodes - .iter() - .filter_map(|handle| match handle { - NodeHandle::Managed(_) => None, - NodeHandle::Borrowed(node) => Some(node.clone()), - }) - .collect() - } - - #[must_use] - /// Finds a managed node by canonical identity. - pub fn find_managed(&self, identity: &str) -> Option> { - let guard = self.inner.read(); - match node_by_identity(&guard, identity)? { - NodeHandle::Managed(node) => Some(node.clone()), - NodeHandle::Borrowed(_) => None, + pub(crate) fn from_clients(clients: Vec) -> Self { + Self { + clients: Arc::new(RwLock::new(clients)), } } #[must_use] - /// Finds a borrowed node by canonical identity. - pub fn find_borrowed(&self, identity: &str) -> Option> { - let guard = self.inner.read(); - match node_by_identity(&guard, identity)? { - NodeHandle::Managed(_) => None, - NodeHandle::Borrowed(node) => Some(node.clone()), - } + pub(crate) fn snapshot_clients(&self) -> Vec { + self.clients.read().clone() } #[must_use] - /// Finds any node handle by canonical identity. - pub fn find_node(&self, identity: &str) -> Option> { - let guard = self.inner.read(); - node_by_identity(&guard, identity).cloned() + pub(crate) fn len(&self) -> usize { + self.clients.read().len() } #[must_use] - /// Returns current number of nodes in inventory. - pub fn len(&self) -> usize { - self.inner.read().nodes.len() - } - - #[must_use] - /// Returns true when no nodes are registered. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.len() == 0 } - /// Clears all nodes and identity indexes. - pub fn clear(&self) { - let mut guard = self.inner.write(); - guard.nodes.clear(); - guard.indices_by_identity.clear(); - guard.next_synthetic_id = 0; + pub(crate) fn clear(&self) { + self.clients.write().clear(); } - /// Adds or replaces a managed node entry using canonical identity - /// resolution. Re-adding the same node identity updates the stored handle. - pub fn add_managed_node(&self, client: E::NodeClient, identity_hint: Option) { - let mut guard = self.inner.write(); - let identity = canonical_identity::(&client, identity_hint, &mut guard); - let handle = NodeHandle::Managed(ManagedNode { - identity: identity.clone(), - client, - }); - upsert_node(&mut guard, identity, handle); + pub(crate) fn add_client(&self, client: E::NodeClient) { + self.clients.write().push(client); } - /// Adds or replaces an attached node entry. - pub fn add_attached_node(&self, client: E::NodeClient, identity_hint: Option) { - self.add_borrowed_node(client, BorrowedOrigin::Attached, identity_hint); - } - - /// Adds or replaces an external static node entry. - pub fn add_external_node(&self, client: E::NodeClient, identity_hint: Option) { - self.add_borrowed_node(client, BorrowedOrigin::External, identity_hint); - } - - /// Executes a synchronous read over a cloned client slice. - pub fn with_clients(&self, f: impl FnOnce(&[E::NodeClient]) -> R) -> R { - let guard = self.inner.read(); - let clients = guard.nodes.iter().map(clone_client).collect::>(); + pub(crate) fn with_clients(&self, f: impl FnOnce(&[E::NodeClient]) -> R) -> R { + let clients = self.clients.read(); f(&clients) } - - fn add_borrowed_node( - &self, - client: E::NodeClient, - origin: BorrowedOrigin, - identity_hint: Option, - ) { - let mut guard = self.inner.write(); - let identity = canonical_identity::(&client, identity_hint, &mut guard); - let handle = NodeHandle::Borrowed(BorrowedNode { - identity: identity.clone(), - client, - origin, - }); - upsert_node(&mut guard, identity, handle); - } -} - -impl ManagedNode { - #[must_use] - /// Returns the node client. - pub const fn client(&self) -> &E::NodeClient { - &self.client - } - - /// Delegates restart to the deployer's control surface for this node name. - pub async fn restart( - &self, - control: &dyn NodeControlHandle, - node_name: &str, - ) -> Result<(), DynError> { - control.restart_node(node_name).await - } - - /// Delegates stop to the deployer's control surface for this node name. - pub async fn stop( - &self, - control: &dyn NodeControlHandle, - node_name: &str, - ) -> Result<(), DynError> { - control.stop_node(node_name).await - } - - /// Delegates dynamic node start with options to the control surface. - pub async fn start_with( - &self, - control: &dyn NodeControlHandle, - node_name: &str, - options: StartNodeOptions, - ) -> Result, DynError> { - control.start_node_with(node_name, options).await - } - - #[must_use] - /// Returns process id if the backend can expose it for this node name. - pub fn pid(&self, control: &dyn NodeControlHandle, node_name: &str) -> Option { - control.node_pid(node_name) - } -} - -impl BorrowedNode { - #[must_use] - /// Returns the node client. - pub const fn client(&self) -> &E::NodeClient { - &self.client - } -} - -fn upsert_node( - inner: &mut NodeInventoryInner, - identity: String, - handle: NodeHandle, -) { - if let Some(existing_index) = inner.indices_by_identity.get(&identity).copied() { - inner.nodes[existing_index] = handle; - return; - } - - let index = inner.nodes.len(); - inner.nodes.push(handle); - inner.indices_by_identity.insert(identity, index); -} - -fn canonical_identity( - _client: &E::NodeClient, - identity_hint: Option, - inner: &mut NodeInventoryInner, -) -> String { - // Priority: explicit hint -> synthetic. - if let Some(identity) = identity_hint.filter(|value| !value.trim().is_empty()) { - return identity; - } - - let synthetic = format!("node:{}", inner.next_synthetic_id); - inner.next_synthetic_id += 1; - - synthetic -} - -fn clone_client(handle: &NodeHandle) -> E::NodeClient { - match handle { - NodeHandle::Managed(node) => node.client.clone(), - NodeHandle::Borrowed(node) => node.client.clone(), - } -} - -fn node_by_identity<'a, E: Application>( - inner: &'a NodeInventoryInner, - identity: &str, -) -> Option<&'a NodeHandle> { - let index = *inner.indices_by_identity.get(identity)?; - inner.nodes.get(index) } diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 97bd2d6..33420c3 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -1,6 +1,6 @@ pub mod context; mod deployer; -pub mod inventory; +mod inventory; pub mod metrics; mod node_clients; pub mod orchestration; @@ -9,9 +9,8 @@ pub mod readiness; mod runner; use async_trait::async_trait; -pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics}; +pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly}; pub use deployer::{Deployer, ScenarioError}; -pub use inventory::{BorrowedNode, BorrowedOrigin, ManagedNode, NodeHandle, NodeInventory}; pub use node_clients::NodeClients; #[doc(hidden)] pub use orchestration::{ diff --git a/testing-framework/core/src/scenario/runtime/node_clients.rs b/testing-framework/core/src/scenario/runtime/node_clients.rs index 363e80d..b35a0ed 100644 --- a/testing-framework/core/src/scenario/runtime/node_clients.rs +++ b/testing-framework/core/src/scenario/runtime/node_clients.rs @@ -1,6 +1,6 @@ use rand::{seq::SliceRandom as _, thread_rng}; -use super::inventory::{BorrowedNode, ManagedNode, NodeInventory}; +use super::inventory::NodeInventory; use crate::scenario::{Application, DynError}; /// Collection of API clients for the node set. @@ -29,7 +29,7 @@ impl NodeClients { /// Build clients from preconstructed vectors. pub fn new(nodes: Vec) -> Self { Self { - inventory: NodeInventory::from_managed_clients(nodes), + inventory: NodeInventory::from_clients(nodes), } } @@ -72,37 +72,13 @@ impl NodeClients { } pub fn add_node(&self, client: E::NodeClient) { - self.inventory.add_managed_node(client, None); + self.inventory.add_client(client); } pub fn clear(&self) { self.inventory.clear(); } - #[must_use] - /// Returns a cloned snapshot of managed node handles. - pub fn managed_nodes(&self) -> Vec> { - self.inventory.managed_nodes() - } - - #[must_use] - /// Returns a cloned snapshot of borrowed node handles. - pub fn borrowed_nodes(&self) -> Vec> { - self.inventory.borrowed_nodes() - } - - #[must_use] - /// Finds a managed node by canonical identity. - pub fn find_managed(&self, identity: &str) -> Option> { - self.inventory.find_managed(identity) - } - - #[must_use] - /// Finds a borrowed node by canonical identity. - pub fn find_borrowed(&self, identity: &str) -> Option> { - self.inventory.find_borrowed(identity) - } - fn shuffled_snapshot(&self) -> Vec { let mut clients = self.snapshot(); clients.shuffle(&mut thread_rng()); diff --git a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs index 9e71458..a17ce28 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs @@ -3,9 +3,9 @@ mod source_orchestration_plan; #[allow(dead_code)] mod source_resolver; +pub(crate) use source_orchestration_plan::SourceOrchestrationMode; pub use source_orchestration_plan::{ - ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan, - SourceOrchestrationPlanError, + ManagedSource, SourceOrchestrationPlan, SourceOrchestrationPlanError, }; pub use source_resolver::{ build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers, diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index dd57ee9..2dd84bd 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -1,6 +1,4 @@ -use std::fmt; - -use crate::scenario::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; +use crate::scenario::{ClusterMode, ExistingCluster, ExternalNodeSource, sources::ScenarioSources}; /// Explicit descriptor for managed node sourcing. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -15,13 +13,13 @@ pub enum ManagedSource { /// This is scaffolding-only and is intentionally not executed by deployers /// yet. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum SourceOrchestrationMode { +pub(crate) enum SourceOrchestrationMode { Managed { managed: ManagedSource, external: Vec, }, Attached { - attach: AttachSource, + attach: ExistingCluster, external: Vec, }, ExternalOnly { @@ -34,41 +32,28 @@ pub enum SourceOrchestrationMode { /// This captures only mapping-time source intent and readiness policy. #[derive(Clone, Debug, Eq, PartialEq)] pub struct SourceOrchestrationPlan { - pub mode: SourceOrchestrationMode, - pub readiness_policy: SourceReadinessPolicy, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum SourceModeName { - Attached, -} - -impl fmt::Display for SourceModeName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Attached => f.write_str("Attached"), - } - } + mode: SourceOrchestrationMode, } /// Validation failure while building orchestration plan from sources. #[derive(Debug, thiserror::Error)] pub enum SourceOrchestrationPlanError { #[error("source mode '{mode}' is not wired into deployers yet")] - SourceModeNotWiredYet { mode: SourceModeName }, + SourceModeNotWiredYet { mode: &'static str }, } impl SourceOrchestrationPlan { - pub fn try_from_sources( + pub(crate) fn try_from_sources( sources: &ScenarioSources, - readiness_policy: SourceReadinessPolicy, ) -> Result { let mode = mode_from_sources(sources); - Ok(Self { - mode, - readiness_policy, - }) + Ok(Self { mode }) + } + + #[must_use] + pub(crate) fn mode(&self) -> &SourceOrchestrationMode { + &self.mode } #[must_use] @@ -84,34 +69,104 @@ impl SourceOrchestrationPlan { #[cfg(test)] mod tests { use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; - use crate::scenario::{AttachSource, ScenarioSources, SourceReadinessPolicy}; + use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources}; + + #[test] + fn managed_sources_are_planned() { + let plan = SourceOrchestrationPlan::try_from_sources(&ScenarioSources::default()) + .expect("managed sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode(), + SourceOrchestrationMode::Managed { .. } + )); + assert!(plan.external_sources().is_empty()); + } #[test] fn attached_sources_are_planned() { - let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()])); - let plan = - SourceOrchestrationPlan::try_from_sources(&sources, SourceReadinessPolicy::AllReady) - .expect("attached sources should build a source orchestration plan"); + let sources = + ScenarioSources::default().with_attach(ExistingCluster::for_compose_services( + "test-project".to_string(), + vec!["node-0".to_string()], + )); + let plan = SourceOrchestrationPlan::try_from_sources(&sources) + .expect("attached sources should build a source orchestration plan"); assert!(matches!( - plan.mode, + plan.mode(), SourceOrchestrationMode::Attached { .. } )); } + + #[test] + fn attached_sources_keep_external_nodes() { + let sources = ScenarioSources::default() + .with_attach(ExistingCluster::for_compose_project( + "test-project".to_string(), + )) + .with_external_node(ExternalNodeSource::new( + "external-0".to_owned(), + "http://127.0.0.1:1".to_owned(), + )); + let plan = SourceOrchestrationPlan::try_from_sources(&sources) + .expect("attached sources with external nodes should build"); + + assert!(matches!( + plan.mode(), + SourceOrchestrationMode::Attached { .. } + )); + assert_eq!(plan.external_sources().len(), 1); + assert_eq!(plan.external_sources()[0].label(), "external-0"); + } + + #[test] + fn external_only_sources_are_planned() { + let sources = ScenarioSources::default() + .with_external_node(ExternalNodeSource::new( + "external-0".to_owned(), + "http://127.0.0.1:1".to_owned(), + )) + .into_external_only(); + let plan = SourceOrchestrationPlan::try_from_sources(&sources) + .expect("external-only sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode(), + SourceOrchestrationMode::ExternalOnly { .. } + )); + assert_eq!(plan.external_sources().len(), 1); + assert_eq!(plan.external_sources()[0].label(), "external-0"); + } } fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode { - match sources { - ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed { - managed: ManagedSource::DeployerManaged, - external: external.clone(), + match sources.cluster_mode() { + ClusterMode::Managed => match sources { + ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed { + managed: ManagedSource::DeployerManaged, + external: external.clone(), + }, + ScenarioSources::Attached { .. } | ScenarioSources::ExternalOnly { .. } => { + unreachable!("cluster mode and source storage must stay aligned") + } }, - ScenarioSources::Attached { attach, external } => SourceOrchestrationMode::Attached { - attach: attach.clone(), - external: external.clone(), + ClusterMode::ExistingCluster => match sources { + ScenarioSources::Attached { attach, external } => SourceOrchestrationMode::Attached { + attach: attach.clone(), + external: external.clone(), + }, + ScenarioSources::Managed { .. } | ScenarioSources::ExternalOnly { .. } => { + unreachable!("cluster mode and source storage must stay aligned") + } }, - ScenarioSources::ExternalOnly { external } => SourceOrchestrationMode::ExternalOnly { - external: external.clone(), + ClusterMode::ExternalOnly => match sources { + ScenarioSources::ExternalOnly { external } => SourceOrchestrationMode::ExternalOnly { + external: external.clone(), + }, + ScenarioSources::Managed { .. } | ScenarioSources::Attached { .. } => { + unreachable!("cluster mode and source storage must stay aligned") + } }, } } diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs index 6ebd646..d6f822b 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -41,10 +41,7 @@ pub enum SourceResolveError { pub fn build_source_orchestration_plan( scenario: &Scenario, ) -> Result { - SourceOrchestrationPlan::try_from_sources( - scenario.sources(), - scenario.source_readiness_policy(), - ) + Ok(scenario.source_orchestration_plan().clone()) } /// Resolves runtime source nodes via unified providers from orchestration plan. @@ -52,7 +49,7 @@ pub async fn resolve_sources( plan: &SourceOrchestrationPlan, providers: &SourceProviders, ) -> Result, SourceResolveError> { - match &plan.mode { + match plan.mode() { SourceOrchestrationMode::Managed { managed, .. } => { let managed_nodes = providers.managed.provide(managed).await?; let external_nodes = providers.external.provide(plan.external_sources()).await?; @@ -115,7 +112,8 @@ pub async fn orchestrate_sources_with_providers( ) -> Result, DynError> { let resolved = resolve_sources(plan, &providers).await?; - if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() { + if matches!(plan.mode(), SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() + { return Err(SourceResolveError::ManagedNodesMissing.into()); } diff --git a/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs b/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs index d31bd78..aadb31c 100644 --- a/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs +++ b/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use crate::scenario::{Application, AttachSource, DynError}; +use crate::scenario::{Application, DynError, ExistingCluster}; -/// Attached node discovered from an existing external cluster source. +/// Node discovered from an existing cluster descriptor. #[derive(Clone, Debug)] pub struct AttachedNode { /// Optional stable identity hint used by runtime inventory dedup logic. @@ -14,8 +14,8 @@ pub struct AttachedNode { /// Errors returned by attach providers while discovering attached nodes. #[derive(Debug, thiserror::Error)] pub enum AttachProviderError { - #[error("attach source is not supported by this provider: {attach_source:?}")] - UnsupportedSource { attach_source: AttachSource }, + #[error("existing cluster descriptor is not supported by this provider: {attach_source:?}")] + UnsupportedSource { attach_source: ExistingCluster }, #[error("attach discovery failed: {source}")] Discovery { #[source] @@ -23,16 +23,16 @@ pub enum AttachProviderError { }, } -/// Internal adapter interface for discovering pre-existing nodes. +/// Internal adapter interface for discovering nodes in an existing cluster. /// /// This is scaffolding-only in phase 1 and is intentionally not wired into /// deployer runtime orchestration yet. #[async_trait] pub trait AttachProvider: Send + Sync { - /// Discovers node clients for the requested attach source. + /// Discovers node clients for the requested existing cluster. async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError>; } @@ -44,7 +44,7 @@ pub struct NoopAttachProvider; impl AttachProvider for NoopAttachProvider { async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError> { Err(AttachProviderError::UnsupportedSource { attach_source: source.clone(), diff --git a/testing-framework/core/src/scenario/runtime/providers/external_provider.rs b/testing-framework/core/src/scenario/runtime/providers/external_provider.rs index 343a35c..1a1beb6 100644 --- a/testing-framework/core/src/scenario/runtime/providers/external_provider.rs +++ b/testing-framework/core/src/scenario/runtime/providers/external_provider.rs @@ -71,11 +71,11 @@ impl ExternalProvider for ApplicationExternalProvider { .map(|source| { E::external_node_client(source) .map(|client| ExternalNode { - identity_hint: Some(source.label.clone()), + identity_hint: Some(source.label().to_string()), client, }) .map_err(|build_error| ExternalProviderError::Build { - source_label: source.label.clone(), + source_label: source.label().to_string(), source: build_error, }) }) diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index 652d740..216cd97 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -34,9 +34,11 @@ impl Drop for Runner { } impl Runner { - /// Construct a runner from the run context and optional cleanup guard. #[must_use] - pub fn new(context: RunContext, cleanup_guard: Option>) -> Self { + pub(crate) fn new( + context: RunContext, + cleanup_guard: Option>, + ) -> Self { Self { context: Arc::new(context), cleanup_guard, @@ -45,8 +47,8 @@ impl Runner { /// Access the underlying run context. #[must_use] - pub fn context(&self) -> Arc> { - Arc::clone(&self.context) + pub fn context(&self) -> &RunContext { + self.context.as_ref() } pub async fn wait_network_ready(&self) -> Result<(), DynError> { @@ -71,7 +73,7 @@ impl Runner { where Caps: Send + Sync, { - let context = self.context(); + let context = Arc::clone(&self.context); let run_duration = scenario.duration(); let workloads = scenario.workloads().to_vec(); let expectation_count = scenario.expectations().len(); @@ -190,7 +192,7 @@ impl Runner { } fn settle_wait_duration(context: &RunContext) -> Option { - let has_node_control = context.controls_nodes(); + let has_node_control = context.node_control().is_some(); let configured_wait = context.expectation_cooldown(); if configured_wait.is_zero() && !has_node_control { @@ -231,7 +233,7 @@ impl Runner { fn cooldown_duration(context: &RunContext) -> Option { // Managed environments need a minimum cooldown so feed and expectations // observe stabilized state. - let needs_stabilization = context.controls_nodes(); + let needs_stabilization = context.cluster_control_profile().framework_owns_lifecycle(); let mut wait = context.expectation_cooldown(); diff --git a/testing-framework/core/src/scenario/sources/mod.rs b/testing-framework/core/src/scenario/sources/mod.rs index 3585d8a..68a9fa4 100644 --- a/testing-framework/core/src/scenario/sources/mod.rs +++ b/testing-framework/core/src/scenario/sources/mod.rs @@ -1,3 +1,7 @@ mod model; -pub use model::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; +pub(crate) use model::ScenarioSources; +#[doc(hidden)] +pub use model::{ + ClusterControlProfile, ClusterMode, ExistingCluster, ExternalNodeSource, IntoExistingCluster, +}; diff --git a/testing-framework/core/src/scenario/sources/model.rs b/testing-framework/core/src/scenario/sources/model.rs index 0e14091..ea48b1d 100644 --- a/testing-framework/core/src/scenario/sources/model.rs +++ b/testing-framework/core/src/scenario/sources/model.rs @@ -1,6 +1,13 @@ -/// Typed attach source for existing clusters. +use crate::scenario::DynError; + +/// Typed descriptor for an existing cluster. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum AttachSource { +pub struct ExistingCluster { + kind: ExistingClusterKind, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +enum ExistingClusterKind { K8s { namespace: Option, label_selector: String, @@ -11,52 +18,107 @@ pub enum AttachSource { }, } -impl AttachSource { +impl ExistingCluster { #[must_use] - pub fn k8s(label_selector: String) -> Self { - Self::K8s { - namespace: None, - label_selector, + pub fn for_k8s_selector(label_selector: String) -> Self { + Self { + kind: ExistingClusterKind::K8s { + namespace: None, + label_selector, + }, } } #[must_use] - pub fn with_namespace(self, namespace: String) -> Self { - match self { - Self::K8s { label_selector, .. } => Self::K8s { + pub fn for_k8s_selector_in_namespace(namespace: String, label_selector: String) -> Self { + Self { + kind: ExistingClusterKind::K8s { namespace: Some(namespace), label_selector, }, - other => other, } } #[must_use] - pub fn compose(services: Vec) -> Self { - Self::Compose { - project: None, - services, + pub fn for_compose_project(project: String) -> Self { + Self { + kind: ExistingClusterKind::Compose { + project: Some(project), + services: Vec::new(), + }, } } #[must_use] - pub fn with_project(self, project: String) -> Self { - match self { - Self::Compose { services, .. } => Self::Compose { + pub fn for_compose_services(project: String, services: Vec) -> Self { + Self { + kind: ExistingClusterKind::Compose { project: Some(project), services, }, - other => other, } } + + #[must_use] + #[doc(hidden)] + pub fn compose_project(&self) -> Option<&str> { + match &self.kind { + ExistingClusterKind::Compose { project, .. } => project.as_deref(), + ExistingClusterKind::K8s { .. } => None, + } + } + + #[must_use] + #[doc(hidden)] + pub fn compose_services(&self) -> Option<&[String]> { + match &self.kind { + ExistingClusterKind::Compose { services, .. } => Some(services), + ExistingClusterKind::K8s { .. } => None, + } + } + + #[must_use] + #[doc(hidden)] + pub fn k8s_namespace(&self) -> Option<&str> { + match &self.kind { + ExistingClusterKind::K8s { namespace, .. } => namespace.as_deref(), + ExistingClusterKind::Compose { .. } => None, + } + } + + #[must_use] + #[doc(hidden)] + pub fn k8s_label_selector(&self) -> Option<&str> { + match &self.kind { + ExistingClusterKind::K8s { label_selector, .. } => Some(label_selector), + ExistingClusterKind::Compose { .. } => None, + } + } +} + +/// Converts a value into an existing-cluster descriptor. +pub trait IntoExistingCluster { + fn into_existing_cluster(self) -> Result; +} + +impl IntoExistingCluster for ExistingCluster { + fn into_existing_cluster(self) -> Result { + Ok(self) + } +} + +impl IntoExistingCluster for &ExistingCluster { + fn into_existing_cluster(self) -> Result { + Ok(self.clone()) + } } /// Static external node endpoint that should be included in the runtime /// inventory. #[derive(Clone, Debug, Eq, PartialEq)] pub struct ExternalNodeSource { - pub label: String, - pub endpoint: String, + label: String, + endpoint: String, } impl ExternalNodeSource { @@ -64,30 +126,72 @@ impl ExternalNodeSource { pub fn new(label: String, endpoint: String) -> Self { Self { label, endpoint } } + + #[must_use] + pub fn label(&self) -> &str { + &self.label + } + + #[must_use] + pub fn endpoint(&self) -> &str { + &self.endpoint + } } -/// Planned readiness strategy for mixed managed/attached/external sources. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)] -pub enum SourceReadinessPolicy { - /// Phase 1 default: require every known node to pass readiness checks. - #[default] - AllReady, - /// Optional relaxed policy for large/partial environments. - Quorum, - /// Future policy for per-source constraints (for example managed minimum - /// plus overall quorum). - SourceAware, +/// High-level source mode of a scenario cluster. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ClusterMode { + Managed, + ExistingCluster, + ExternalOnly, +} + +impl ClusterMode { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Managed => "managed", + Self::ExistingCluster => "existing-cluster", + Self::ExternalOnly => "external-only", + } + } +} + +/// High-level control/lifecycle expectation for a cluster surface. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ClusterControlProfile { + FrameworkManaged, + ExistingClusterAttached, + ExternalUncontrolled, + ManualControlled, +} + +impl ClusterControlProfile { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::FrameworkManaged => "framework-managed", + Self::ExistingClusterAttached => "existing-cluster-attached", + Self::ExternalUncontrolled => "external-uncontrolled", + Self::ManualControlled => "manual-controlled", + } + } + + #[must_use] + pub const fn framework_owns_lifecycle(self) -> bool { + matches!(self, Self::FrameworkManaged) + } } /// Source model that makes invalid managed+attached combinations /// unrepresentable by type. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum ScenarioSources { +pub(crate) enum ScenarioSources { Managed { external: Vec, }, Attached { - attach: AttachSource, + attach: ExistingCluster, external: Vec, }, ExternalOnly { @@ -105,45 +209,40 @@ impl Default for ScenarioSources { impl ScenarioSources { #[must_use] - pub const fn managed() -> Self { - Self::Managed { - external: Vec::new(), - } - } - - #[must_use] - pub fn attached(attach: AttachSource) -> Self { - Self::Attached { - attach, - external: Vec::new(), - } - } - - #[must_use] - pub fn external_only(external: Vec) -> Self { - Self::ExternalOnly { external } - } - - pub fn add_external_node(&mut self, node: ExternalNodeSource) { - match self { + pub(crate) fn with_external_node(mut self, node: ExternalNodeSource) -> Self { + match &mut self { Self::Managed { external } | Self::Attached { external, .. } | Self::ExternalOnly { external } => external.push(node), } - } - pub fn set_attach(&mut self, attach: AttachSource) { - let external = self.external_nodes().to_vec(); - *self = Self::Attached { attach, external }; - } - - pub fn set_external_only(&mut self) { - let external = self.external_nodes().to_vec(); - *self = Self::ExternalOnly { external }; + self } #[must_use] - pub fn external_nodes(&self) -> &[ExternalNodeSource] { + pub(crate) fn with_attach(self, attach: ExistingCluster) -> Self { + let external = self.external_nodes().to_vec(); + + Self::Attached { attach, external } + } + + #[must_use] + pub(crate) fn into_external_only(self) -> Self { + let external = self.external_nodes().to_vec(); + + Self::ExternalOnly { external } + } + + #[must_use] + pub(crate) fn existing_cluster(&self) -> Option<&ExistingCluster> { + match self { + Self::Attached { attach, .. } => Some(attach), + Self::Managed { .. } | Self::ExternalOnly { .. } => None, + } + } + + #[must_use] + pub(crate) fn external_nodes(&self) -> &[ExternalNodeSource] { match self { Self::Managed { external } | Self::Attached { external, .. } @@ -152,17 +251,59 @@ impl ScenarioSources { } #[must_use] - pub const fn is_managed(&self) -> bool { - matches!(self, Self::Managed { .. }) + pub(crate) const fn cluster_mode(&self) -> ClusterMode { + match self { + Self::Managed { .. } => ClusterMode::Managed, + Self::Attached { .. } => ClusterMode::ExistingCluster, + Self::ExternalOnly { .. } => ClusterMode::ExternalOnly, + } } #[must_use] - pub const fn is_attached(&self) -> bool { - matches!(self, Self::Attached { .. }) - } - - #[must_use] - pub const fn is_external_only(&self) -> bool { - matches!(self, Self::ExternalOnly { .. }) + pub(crate) const fn control_profile(&self) -> ClusterControlProfile { + match self.cluster_mode() { + ClusterMode::Managed => ClusterControlProfile::FrameworkManaged, + ClusterMode::ExistingCluster => ClusterControlProfile::ExistingClusterAttached, + ClusterMode::ExternalOnly => ClusterControlProfile::ExternalUncontrolled, + } + } +} + +#[cfg(test)] +mod tests { + use super::{ClusterControlProfile, ExistingCluster, ExternalNodeSource, ScenarioSources}; + + #[test] + fn managed_sources_map_to_framework_managed_control() { + assert_eq!( + ScenarioSources::default().control_profile(), + ClusterControlProfile::FrameworkManaged, + ); + } + + #[test] + fn attached_sources_map_to_existing_cluster_control() { + let sources = ScenarioSources::default() + .with_attach(ExistingCluster::for_compose_project("project".to_owned())); + + assert_eq!( + sources.control_profile(), + ClusterControlProfile::ExistingClusterAttached, + ); + } + + #[test] + fn external_only_sources_map_to_uncontrolled_profile() { + let sources = ScenarioSources::default() + .with_external_node(ExternalNodeSource::new( + "node".to_owned(), + "http://node".to_owned(), + )) + .into_external_only(); + + assert_eq!( + sources.control_profile(), + ClusterControlProfile::ExternalUncontrolled, + ); } } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 87358aa..24c16d6 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -2,8 +2,8 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, - ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, + AttachProvider, AttachProviderError, AttachedNode, ClusterWaitHandle, DynError, + ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; use url::Url; @@ -22,7 +22,7 @@ pub(super) struct ComposeAttachProvider { pub(super) struct ComposeAttachedClusterWait { host: String, - source: AttachSource, + source: ExistingCluster, _env: PhantomData, } @@ -42,12 +42,14 @@ impl ComposeAttachProvider { } impl ComposeAttachedClusterWait { - pub(super) fn new(host: String, source: AttachSource) -> Self { - Self { + pub(super) fn try_new(host: String, source: &ExistingCluster) -> Result { + let _ = compose_wait_request(source)?; + + Ok(Self { host, - source, + source: source.clone(), _env: PhantomData, - } + }) } } @@ -60,7 +62,7 @@ struct ComposeAttachRequest<'a> { impl AttachProvider for ComposeAttachProvider { async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError> { let request = compose_attach_request(source)?; let services = resolve_services(request.project, request.services) @@ -85,16 +87,17 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { } fn compose_attach_request( - source: &AttachSource, + source: &ExistingCluster, ) -> Result, AttachProviderError> { - let AttachSource::Compose { project, services } = source else { - return Err(AttachProviderError::UnsupportedSource { - attach_source: source.clone(), - }); - }; + let services = + source + .compose_services() + .ok_or_else(|| AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + })?; - let project = project - .as_deref() + let project = source + .compose_project() .ok_or_else(|| AttachProviderError::Discovery { source: ComposeAttachDiscoveryError::MissingProjectName.into(), })?; @@ -172,14 +175,13 @@ impl ClusterWaitHandle for ComposeAttachedClusterWait } } -fn compose_wait_request(source: &AttachSource) -> Result, DynError> { - let AttachSource::Compose { project, services } = source else { - return Err("compose cluster wait requires a compose attach source".into()); - }; - - let project = project - .as_deref() - .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; +fn compose_wait_request(source: &ExistingCluster) -> Result, DynError> { + let project = source.compose_project().ok_or_else(|| { + DynError::from("compose cluster wait requires a compose existing-cluster descriptor") + })?; + let services = source.compose_services().ok_or_else(|| { + DynError::from("compose cluster wait requires a compose existing-cluster descriptor") + })?; Ok(ComposeAttachRequest { project, services }) } diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 60a88de..9771a9a 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -9,8 +9,8 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, - RequiresNodeControl, Runner, Scenario, + CleanupGuard, Deployer, DynError, ExistingCluster, FeedHandle, IntoExistingCluster, + ObservabilityCapabilityProvider, RequiresNodeControl, Runner, Scenario, }; use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; @@ -36,6 +36,22 @@ enum ComposeMetadataError { } impl ComposeDeploymentMetadata { + #[must_use] + pub fn for_project(project_name: String) -> Self { + Self { + project_name: Some(project_name), + } + } + + #[must_use] + pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self { + Self { + project_name: cluster + .and_then(ExistingCluster::compose_project) + .map(ToOwned::to_owned), + } + } + /// Returns project name when deployment is bound to a specific compose /// project. #[must_use] @@ -43,26 +59,56 @@ impl ComposeDeploymentMetadata { self.project_name.as_deref() } - /// Builds an attach source for the same compose project using deployer - /// discovery to resolve services. - pub fn attach_source(&self) -> Result { + /// Builds an existing-cluster descriptor for the same compose project + /// using deployer discovery to resolve services. + pub fn existing_cluster(&self) -> Result { let project_name = self .project_name() .ok_or(ComposeMetadataError::MissingProjectName)?; - Ok(AttachSource::compose(Vec::new()).with_project(project_name.to_owned())) + Ok(ExistingCluster::for_compose_project( + project_name.to_owned(), + )) } - /// Builds an attach source for the same compose project. + /// Builds an existing-cluster descriptor for the same compose project. + pub fn existing_cluster_for_services( + &self, + services: Vec, + ) -> Result { + let project_name = self + .project_name() + .ok_or(ComposeMetadataError::MissingProjectName)?; + + Ok(ExistingCluster::for_compose_services( + project_name.to_owned(), + services, + )) + } + + #[doc(hidden)] + pub fn attach_source(&self) -> Result { + self.existing_cluster() + } + + #[doc(hidden)] pub fn attach_source_for_services( &self, services: Vec, - ) -> Result { - let project_name = self - .project_name() - .ok_or(ComposeMetadataError::MissingProjectName)?; + ) -> Result { + self.existing_cluster_for_services(services) + } +} - Ok(AttachSource::compose(services).with_project(project_name.to_owned())) +impl IntoExistingCluster for ComposeDeploymentMetadata { + fn into_existing_cluster(self) -> Result { + self.existing_cluster() + } +} + +impl IntoExistingCluster for &ComposeDeploymentMetadata { + fn into_existing_cluster(self) -> Result { + self.existing_cluster() } } diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 831f3fe..554db9b 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,11 +3,11 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, - DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, - NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, DeploymentPolicy, DynError, ExistingCluster, FeedHandle, FeedRuntime, + HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -64,6 +64,12 @@ impl DeploymentOrchestrator { where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { + validate_supported_cluster_mode(scenario).map_err(|source| { + ComposeRunnerError::SourceOrchestration { + source: source.into(), + } + })?; + // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ComposeRunnerError::SourceOrchestration { @@ -71,11 +77,11 @@ impl DeploymentOrchestrator { } })?; - if scenario.sources().is_attached() { + if matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { return self - .deploy_attached_only::(scenario, source_plan) + .deploy_existing_cluster::(scenario, source_plan) .await - .map(|runner| (runner, attached_metadata(scenario))); + .map(|runner| (runner, existing_cluster_metadata(scenario))); } let deployment = scenario.deployment(); @@ -138,7 +144,7 @@ impl DeploymentOrchestrator { )) } - async fn deploy_attached_only( + async fn deploy_existing_cluster( &self, scenario: &Scenario, source_plan: SourceOrchestrationPlan, @@ -158,11 +164,12 @@ impl DeploymentOrchestrator { let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; - let context = build_run_context( + let assembly = build_runtime_assembly( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), observability.telemetry_handle()?, feed, node_control, @@ -170,7 +177,7 @@ impl DeploymentOrchestrator { ); let cleanup_guard: Box = Box::new(feed_task); - Ok(Runner::new(context, Some(cleanup_guard))) + Ok(assembly.build_runner(Some(cleanup_guard))) } fn source_providers(&self, managed_clients: Vec) -> SourceProviders { @@ -214,31 +221,15 @@ impl DeploymentOrchestrator { return Ok(None); } - let ScenarioSources::Attached { attach, .. } = scenario.sources() else { - return Err(ComposeRunnerError::InternalInvariant { - message: "attached node control requested outside attached source mode", - }); - }; + let attach = scenario + .existing_cluster() + .ok_or(ComposeRunnerError::InternalInvariant { + message: "existing-cluster node control requested outside existing-cluster mode", + })?; + let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; - let AttachSource::Compose { project, .. } = attach else { - return Err(ComposeRunnerError::InternalInvariant { - message: "compose deployer requires compose attach source for node control", - }); - }; - - let Some(project_name) = project - .as_ref() - .map(|value| value.trim()) - .filter(|value| !value.is_empty()) - else { - return Err(ComposeRunnerError::InternalInvariant { - message: "attached compose mode requires explicit project name for node control", - }); - }; - - Ok(Some(Arc::new(ComposeAttachedNodeControl { - project_name: project_name.to_owned(), - }) as Arc>)) + Ok(Some(Arc::new(node_control) as Arc>)) } fn attached_cluster_wait( @@ -248,16 +239,15 @@ impl DeploymentOrchestrator { where Caps: Send + Sync, { - let ScenarioSources::Attached { attach, .. } = scenario.sources() else { - return Err(ComposeRunnerError::InternalInvariant { - message: "compose attached cluster wait requested outside attached source mode", - }); - }; + let attach = scenario + .existing_cluster() + .ok_or(ComposeRunnerError::InternalInvariant { + message: "compose cluster wait requested outside existing-cluster mode", + })?; + let cluster_wait = ComposeAttachedClusterWait::::try_new(compose_runner_host(), attach) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - attach.clone(), - ))) + Ok(Arc::new(cluster_wait)) } async fn build_runner( @@ -274,7 +264,8 @@ impl DeploymentOrchestrator { { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); - let cluster_wait = self.managed_cluster_wait(project_name); + let cluster_wait = + self.managed_cluster_wait(ComposeDeploymentMetadata::for_project(project_name))?; log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); @@ -285,6 +276,7 @@ impl DeploymentOrchestrator { descriptors: prepared.descriptors.clone(), duration: scenario.duration(), expectation_cooldown: scenario.expectation_cooldown(), + cluster_control_profile: scenario.cluster_control_profile(), telemetry, environment: &mut prepared.environment, node_control, @@ -300,7 +292,7 @@ impl DeploymentOrchestrator { "compose runtime prepared" ); - Ok(Runner::new(runtime.context, Some(cleanup_guard))) + Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } fn maybe_node_control( @@ -318,11 +310,18 @@ impl DeploymentOrchestrator { }) } - fn managed_cluster_wait(&self, project_name: String) -> Arc> { - Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - AttachSource::compose(Vec::new()).with_project(project_name), - )) + fn managed_cluster_wait( + &self, + metadata: ComposeDeploymentMetadata, + ) -> Result>, ComposeRunnerError> { + let existing_cluster = metadata + .existing_cluster() + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let cluster_wait = + ComposeAttachedClusterWait::::try_new(compose_runner_host(), &existing_cluster) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + + Ok(Arc::new(cluster_wait)) } fn log_deploy_start( @@ -373,20 +372,61 @@ impl DeploymentOrchestrator { } } -fn attached_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), DynError> { + if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + return Ok(()); + } + + let cluster = scenario + .existing_cluster() + .ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?; + + ensure_compose_existing_cluster(cluster) +} + +fn ensure_compose_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> { + if cluster.compose_project().is_some() && cluster.compose_services().is_some() { + return Ok(()); + } + + Err("compose deployer requires a compose existing-cluster descriptor".into()) +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ExistingCluster; + + use super::ensure_compose_existing_cluster; + + #[test] + fn compose_cluster_validator_accepts_compose_descriptor() { + ensure_compose_existing_cluster(&ExistingCluster::for_compose_project( + "project".to_owned(), + )) + .expect("compose descriptor should be accepted"); + } + + #[test] + fn compose_cluster_validator_rejects_k8s_descriptor() { + let error = ensure_compose_existing_cluster(&ExistingCluster::for_k8s_selector( + "app=node".to_owned(), + )) + .expect_err("k8s descriptor should be rejected"); + + assert_eq!( + error.to_string(), + "compose deployer requires a compose existing-cluster descriptor" + ); + } +} +fn existing_cluster_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata where E: ComposeDeployEnv, Caps: Send + Sync, { - let project_name = match scenario.sources() { - ScenarioSources::Attached { - attach: AttachSource::Compose { project, .. }, - .. - } => project.clone(), - _ => None, - }; - - ComposeDeploymentMetadata { project_name } + ComposeDeploymentMetadata::from_existing_cluster(scenario.existing_cluster()) } struct DeployedNodes { @@ -397,7 +437,7 @@ struct DeployedNodes { } struct ComposeRuntime { - context: RunContext, + assembly: RuntimeAssembly, feed_task: FeedHandle, } @@ -406,6 +446,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { descriptors: E::Deployment, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, @@ -426,18 +467,22 @@ async fn build_compose_runtime( .start_block_feed(&node_clients, input.environment) .await?; - let context = build_run_context( + let assembly = build_runtime_assembly( input.descriptors, node_clients, input.duration, input.expectation_cooldown, + input.cluster_control_profile, input.telemetry, feed, input.node_control, input.cluster_wait, ); - Ok(ComposeRuntime { context, feed_task }) + Ok(ComposeRuntime { + assembly, + feed_task, + }) } async fn deploy_nodes( @@ -470,26 +515,33 @@ async fn deploy_nodes( }) } -fn build_run_context( +fn build_runtime_assembly( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: ::Feed, node_control: Option>>, cluster_wait: Arc>, -) -> RunContext { - RunContext::new( +) -> RuntimeAssembly { + let mut assembly = RuntimeAssembly::new( descriptors, node_clients, run_duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, - node_control, ) - .with_cluster_wait(cluster_wait) + .with_cluster_wait(cluster_wait); + + if let Some(node_control) = node_control { + assembly = assembly.with_node_control(node_control); + } + + assembly } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 8e98948..799b633 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -5,7 +5,7 @@ use std::{ use testing_framework_core::{ adjust_timeout, - scenario::{Application, DynError, NodeControlHandle}, + scenario::{Application, DynError, ExistingCluster, NodeControlHandle}, }; use tokio::{process::Command, time::timeout}; use tracing::info; @@ -155,11 +155,27 @@ impl NodeControlHandle for ComposeNodeControl { } } -/// Node control handle for compose attached mode. +/// Node control handle for compose existing-cluster mode. pub struct ComposeAttachedNodeControl { pub(crate) project_name: String, } +impl ComposeAttachedNodeControl { + pub fn try_from_existing_cluster(source: &ExistingCluster) -> Result { + let Some(project_name) = source + .compose_project() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return Err("attached compose node control requires explicit project name".into()); + }; + + Ok(Self { + project_name: project_name.to_owned(), + }) + } +} + #[async_trait::async_trait] impl NodeControlHandle for ComposeAttachedNodeControl { async fn restart_node(&self, name: &str) -> Result<(), DynError> { diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index b10a24a..bdfa1fe 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -7,8 +7,8 @@ use kube::{ api::{ListParams, ObjectList}, }; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, - ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, + AttachProvider, AttachProviderError, AttachedNode, ClusterWaitHandle, DynError, + ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; use url::Url; @@ -37,7 +37,7 @@ pub(super) struct K8sAttachProvider { pub(super) struct K8sAttachedClusterWait { client: Client, - source: AttachSource, + source: ExistingCluster, _env: PhantomData, } @@ -56,12 +56,14 @@ impl K8sAttachProvider { } impl K8sAttachedClusterWait { - pub(super) fn new(client: Client, source: AttachSource) -> Self { - Self { + pub(super) fn try_new(client: Client, source: &ExistingCluster) -> Result { + let _ = k8s_wait_request(source)?; + + Ok(Self { client, - source, + source: source.clone(), _env: PhantomData, - } + }) } } @@ -69,7 +71,7 @@ impl K8sAttachedClusterWait { impl AttachProvider for K8sAttachProvider { async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError> { let request = k8s_attach_request(source)?; let services = discover_services(&self.client, request.namespace, request.label_selector) @@ -90,12 +92,10 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } -fn k8s_attach_request(source: &AttachSource) -> Result, AttachProviderError> { - let AttachSource::K8s { - namespace, - label_selector, - } = source - else { +fn k8s_attach_request( + source: &ExistingCluster, +) -> Result, AttachProviderError> { + let Some(label_selector) = source.k8s_label_selector() else { return Err(AttachProviderError::UnsupportedSource { attach_source: source.clone(), }); @@ -108,7 +108,7 @@ fn k8s_attach_request(source: &AttachSource) -> Result, Att } Ok(K8sAttachRequest { - namespace: namespace.as_deref().unwrap_or("default"), + namespace: source.k8s_namespace().unwrap_or("default"), label_selector, }) } @@ -246,21 +246,17 @@ impl ClusterWaitHandle for K8sAttachedClusterWait { } } -fn k8s_wait_request(source: &AttachSource) -> Result, DynError> { - let AttachSource::K8s { - namespace, - label_selector, - } = source - else { - return Err("k8s cluster wait requires a k8s attach source".into()); - }; +fn k8s_wait_request(source: &ExistingCluster) -> Result, DynError> { + let label_selector = source.k8s_label_selector().ok_or_else(|| { + DynError::from("k8s cluster wait requires a k8s existing-cluster descriptor") + })?; if label_selector.trim().is_empty() { return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); } Ok(K8sAttachRequest { - namespace: namespace.as_deref().unwrap_or("default"), + namespace: source.k8s_namespace().unwrap_or("default"), label_selector, }) } diff --git a/testing-framework/deployers/k8s/src/deployer/mod.rs b/testing-framework/deployers/k8s/src/deployer/mod.rs index e16ec45..33d74dd 100644 --- a/testing-framework/deployers/k8s/src/deployer/mod.rs +++ b/testing-framework/deployers/k8s/src/deployer/mod.rs @@ -2,7 +2,7 @@ mod attach_provider; mod orchestrator; pub use orchestrator::{K8sDeployer, K8sRunnerError}; -use testing_framework_core::scenario::{AttachSource, DynError}; +use testing_framework_core::scenario::{DynError, ExistingCluster, IntoExistingCluster}; /// Kubernetes deployment metadata returned by k8s-specific deployment APIs. #[derive(Clone, Debug, Eq, PartialEq)] @@ -22,6 +22,18 @@ enum K8sMetadataError { } impl K8sDeploymentMetadata { + #[must_use] + pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self { + Self { + namespace: cluster + .and_then(ExistingCluster::k8s_namespace) + .map(ToOwned::to_owned), + label_selector: cluster + .and_then(ExistingCluster::k8s_label_selector) + .map(ToOwned::to_owned), + } + } + /// Returns namespace when deployment is bound to a specific namespace. #[must_use] pub fn namespace(&self) -> Option<&str> { @@ -34,13 +46,33 @@ impl K8sDeploymentMetadata { self.label_selector.as_deref() } - /// Builds an attach source for the same k8s deployment scope. - pub fn attach_source(&self) -> Result { + /// Builds an existing-cluster descriptor for the same k8s deployment scope. + pub fn existing_cluster(&self) -> Result { let namespace = self.namespace().ok_or(K8sMetadataError::MissingNamespace)?; let label_selector = self .label_selector() .ok_or(K8sMetadataError::MissingLabelSelector)?; - Ok(AttachSource::k8s(label_selector.to_owned()).with_namespace(namespace.to_owned())) + Ok(ExistingCluster::for_k8s_selector_in_namespace( + namespace.to_owned(), + label_selector.to_owned(), + )) + } + + #[doc(hidden)] + pub fn attach_source(&self) -> Result { + self.existing_cluster() + } +} + +impl IntoExistingCluster for K8sDeploymentMetadata { + fn into_existing_cluster(self) -> Result { + self.existing_cluster() + } +} + +impl IntoExistingCluster for &K8sDeploymentMetadata { + fn into_existing_cluster(self) -> Result { + self.existing_cluster() } } diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index eb39b33..83ba257 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -5,11 +5,11 @@ use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ - Application, ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, - Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, - MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode, + ClusterWaitHandle, Deployer, DynError, ExistingCluster, FeedHandle, FeedRuntime, + HttpReadinessRequirement, Metrics, MetricsError, NodeClients, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -171,6 +171,9 @@ where E: K8sDeployEnv, Caps: ObservabilityCapabilityProvider + Send + Sync, { + validate_supported_cluster_mode(scenario) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { K8sRunnerError::SourceOrchestration { @@ -180,9 +183,10 @@ where let observability = resolve_observability_inputs(scenario.capabilities())?; - if scenario.sources().is_attached() { - let runner = deploy_attached_only::(scenario, source_plan, observability).await?; - return Ok((runner, attached_metadata(scenario))); + if matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + let runner = + deploy_existing_cluster::(scenario, source_plan, observability).await?; + return Ok((runner, existing_cluster_metadata(scenario))); } let deployment = build_k8s_deployment::(deployer, scenario, &observability).await?; @@ -205,15 +209,15 @@ where runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&runtime.node_clients)?; - let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); - log_configured_observability(&observability); - maybe_print_endpoints::(&observability, &parts.node_clients); + maybe_print_endpoints::(&observability, &runtime.node_clients); + + let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); let runner = finalize_runner::(&mut cluster, parts)?; Ok((runner, metadata)) } -async fn deploy_attached_only( +async fn deploy_existing_cluster( scenario: &Scenario, source_plan: SourceOrchestrationPlan, observability: ObservabilityInputs, @@ -231,42 +235,26 @@ where let telemetry = observability.telemetry_handle()?; let (feed, feed_task) = spawn_block_feed_with::(&node_clients).await?; let cluster_wait = attached_cluster_wait::(scenario, client)?; - let context = RunContext::new( + let context = RuntimeAssembly::new( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), telemetry, feed, - None, ) .with_cluster_wait(cluster_wait); - Ok(Runner::new(context, Some(Box::new(feed_task)))) + Ok(context.build_runner(Some(Box::new(feed_task)))) } -fn attached_metadata(scenario: &Scenario) -> K8sDeploymentMetadata +fn existing_cluster_metadata(scenario: &Scenario) -> K8sDeploymentMetadata where E: K8sDeployEnv, Caps: Send + Sync, { - match scenario.sources() { - ScenarioSources::Attached { - attach: - AttachSource::K8s { - namespace, - label_selector, - }, - .. - } => K8sDeploymentMetadata { - namespace: namespace.clone(), - label_selector: Some(label_selector.clone()), - }, - _ => K8sDeploymentMetadata { - namespace: None, - label_selector: None, - }, - } + K8sDeploymentMetadata::from_existing_cluster(scenario.existing_cluster()) } fn attached_cluster_wait( @@ -277,16 +265,63 @@ where E: K8sDeployEnv, Caps: Send + Sync, { - let ScenarioSources::Attached { attach, .. } = scenario.sources() else { - return Err(K8sRunnerError::InternalInvariant { - message: "k8s attached cluster wait requested outside attached source mode".to_owned(), - }); - }; + let attach = scenario + .existing_cluster() + .ok_or_else(|| K8sRunnerError::InternalInvariant { + message: "k8s cluster wait requested outside existing-cluster mode".to_owned(), + })?; + let cluster_wait = K8sAttachedClusterWait::::try_new(client, attach) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(K8sAttachedClusterWait::::new( - client, - attach.clone(), - ))) + Ok(Arc::new(cluster_wait)) +} + +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), DynError> { + if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) { + return Ok(()); + } + + let cluster = scenario + .existing_cluster() + .ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?; + + ensure_k8s_existing_cluster(cluster) +} + +fn ensure_k8s_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> { + if cluster.k8s_label_selector().is_some() { + return Ok(()); + } + + Err("k8s deployer requires a k8s existing-cluster descriptor".into()) +} + +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ExistingCluster; + + use super::ensure_k8s_existing_cluster; + + #[test] + fn k8s_cluster_validator_accepts_k8s_descriptor() { + ensure_k8s_existing_cluster(&ExistingCluster::for_k8s_selector("app=node".to_owned())) + .expect("k8s descriptor should be accepted"); + } + + #[test] + fn k8s_cluster_validator_rejects_compose_descriptor() { + let error = ensure_k8s_existing_cluster(&ExistingCluster::for_compose_project( + "project".to_owned(), + )) + .expect_err("compose descriptor should be rejected"); + + assert_eq!( + error.to_string(), + "k8s deployer requires a k8s existing-cluster descriptor" + ); + } } fn managed_cluster_wait( @@ -295,13 +330,12 @@ fn managed_cluster_wait( ) -> Result>, K8sRunnerError> { let client = client_from_cluster(cluster)?; let attach_source = metadata - .attach_source() + .existing_cluster() + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + let cluster_wait = K8sAttachedClusterWait::::try_new(client, &attach_source) .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(K8sAttachedClusterWait::::new( - client, - attach_source, - ))) + Ok(Arc::new(cluster_wait)) } fn client_from_cluster(cluster: &Option) -> Result { @@ -517,15 +551,19 @@ fn build_runner_parts( cluster_wait: Arc>, ) -> K8sRunnerParts { K8sRunnerParts { - descriptors: scenario.deployment().clone(), - node_clients: runtime.node_clients, - duration: scenario.duration(), - expectation_cooldown: scenario.expectation_cooldown(), - telemetry: runtime.telemetry, - feed: runtime.feed, + assembly: build_k8s_runtime_assembly( + scenario.deployment().clone(), + runtime.node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + scenario.cluster_control_profile(), + runtime.telemetry, + runtime.feed, + cluster_wait, + ), feed_task: runtime.feed_task, node_count, - cluster_wait, + duration_secs: scenario.duration().as_secs(), } } @@ -613,15 +651,10 @@ fn maybe_print_endpoints( } struct K8sRunnerParts { - descriptors: E::Deployment, - node_clients: NodeClients, - duration: Duration, - expectation_cooldown: Duration, - telemetry: Metrics, - feed: Feed, + assembly: RuntimeAssembly, feed_task: FeedHandle, node_count: usize, - cluster_wait: Arc>, + duration_secs: u64, } fn finalize_runner( @@ -632,36 +665,21 @@ fn finalize_runner( let (cleanup, port_forwards) = environment.into_cleanup()?; let K8sRunnerParts { - descriptors, - node_clients, - duration, - expectation_cooldown, - telemetry, - feed, + assembly, feed_task, node_count, - cluster_wait, + duration_secs, } = parts; - let duration_secs = duration.as_secs(); let cleanup_guard: Box = Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards)); - let context = build_k8s_run_context( - descriptors, - node_clients, - duration, - expectation_cooldown, - telemetry, - feed, - cluster_wait, - ); info!( nodes = node_count, duration_secs, "k8s deployment ready; handing control to scenario runner" ); - Ok(Runner::new(context, Some(cleanup_guard))) + Ok(assembly.build_runner(Some(cleanup_guard))) } fn take_ready_cluster( @@ -674,23 +692,24 @@ fn take_ready_cluster( }) } -fn build_k8s_run_context( +fn build_k8s_runtime_assembly( descriptors: E::Deployment, node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, telemetry: Metrics, feed: Feed, cluster_wait: Arc>, -) -> RunContext { - RunContext::new( +) -> RuntimeAssembly { + RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, + cluster_control_profile, telemetry, feed, - None, ) .with_cluster_wait(cluster_wait) } diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 28cc01f..5af3d2e 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -10,10 +10,10 @@ use std::{ use async_trait::async_trait; use testing_framework_core::{ scenario::{ - Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, - HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan, - build_source_orchestration_plan, spawn_feed, + Application, CleanupGuard, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, + DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario, + ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -187,6 +187,8 @@ impl ProcessDeployer { &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + validate_supported_cluster_mode(scenario)?; + // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { @@ -211,6 +213,7 @@ impl ProcessDeployer { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), None, ) .await?; @@ -218,13 +221,15 @@ impl ProcessDeployer { let cleanup_guard: Box = Box::new(LocalProcessGuard::::new(nodes, runtime.feed_task)); - Ok(Runner::new(runtime.context, Some(cleanup_guard))) + Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } async fn deploy_with_node_control( &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + validate_supported_cluster_mode(scenario)?; + // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { @@ -248,14 +253,14 @@ impl ProcessDeployer { node_clients, scenario.duration(), scenario.expectation_cooldown(), + scenario.cluster_control_profile(), Some(node_control), ) .await?; - Ok(Runner::new( - runtime.context, - Some(Box::new(runtime.feed_task)), - )) + Ok(runtime + .assembly + .build_runner(Some(Box::new(runtime.feed_task)))) } fn node_control_from( @@ -312,6 +317,22 @@ impl ProcessDeployer { } } +fn validate_supported_cluster_mode( + scenario: &Scenario, +) -> Result<(), ProcessDeployerError> { + ensure_local_cluster_mode(scenario.cluster_mode()) +} + +fn ensure_local_cluster_mode(mode: ClusterMode) -> Result<(), ProcessDeployerError> { + if matches!(mode, ClusterMode::ExistingCluster) { + return Err(ProcessDeployerError::SourceOrchestration { + source: DynError::from("local deployer does not support existing-cluster mode"), + }); + } + + Ok(()) +} + fn merge_source_clients_for_local( source_plan: &SourceOrchestrationPlan, node_clients: NodeClients, @@ -339,6 +360,29 @@ fn build_retry_execution_config( (retry_policy, execution) } +#[cfg(test)] +mod tests { + use testing_framework_core::scenario::ClusterMode; + + use super::ensure_local_cluster_mode; + + #[test] + fn local_cluster_validator_accepts_managed_mode() { + ensure_local_cluster_mode(ClusterMode::Managed).expect("managed mode should be accepted"); + } + + #[test] + fn local_cluster_validator_rejects_existing_cluster_mode() { + let error = ensure_local_cluster_mode(ClusterMode::ExistingCluster) + .expect_err("existing-cluster mode should be rejected"); + + assert_eq!( + error.to_string(), + "source orchestration failed: local deployer does not support existing-cluster mode" + ); + } +} + async fn run_retry_attempt( descriptors: &E::Deployment, execution: RetryExecutionConfig, @@ -475,7 +519,7 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_ } struct RuntimeContext { - context: RunContext, + assembly: RuntimeAssembly, feed_task: FeedHandle, } @@ -484,6 +528,7 @@ async fn run_context_for( node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, + cluster_control_profile: ClusterControlProfile, node_control: Option>>, ) -> Result, ProcessDeployerError> { if node_clients.is_empty() { @@ -491,15 +536,21 @@ async fn run_context_for( } let (feed, feed_task) = spawn_feed_with::(&node_clients).await?; - let context = RunContext::new( + let mut assembly = RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, + cluster_control_profile, Metrics::empty(), feed, - node_control, ); + if let Some(node_control) = node_control { + assembly = assembly.with_node_control(node_control); + } - Ok(RuntimeContext { context, feed_task }) + Ok(RuntimeContext { + assembly, + feed_task, + }) } diff --git a/testing-framework/deployers/local/src/external.rs b/testing-framework/deployers/local/src/external.rs index 713c5be..696d05e 100644 --- a/testing-framework/deployers/local/src/external.rs +++ b/testing-framework/deployers/local/src/external.rs @@ -35,8 +35,8 @@ pub fn build_external_client( } fn resolve_api_socket(source: &ExternalNodeSource) -> Result { - let source_label = source.label.clone(); - let endpoint = source.endpoint.trim(); + let source_label = source.label().to_string(); + let endpoint = source.endpoint().trim(); if endpoint.is_empty() { return Err(ExternalClientBuildError::EmptyEndpoint { label: source_label, diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index 36a4019..028a690 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -1,8 +1,8 @@ use testing_framework_core::{ manual::ManualClusterHandle, scenario::{ - DynError, ExternalNodeSource, NodeClients, NodeControlHandle, ReadinessError, - StartNodeOptions, StartedNode, + ClusterWaitHandle, DynError, ExternalNodeSource, NodeClients, NodeControlHandle, + ReadinessError, StartNodeOptions, StartedNode, }, }; use thiserror::Error; @@ -157,19 +157,11 @@ impl NodeControlHandle for ManualCluster { } #[async_trait::async_trait] -impl ManualClusterHandle for ManualCluster { - async fn start_node_with( - &self, - name: &str, - options: StartNodeOptions, - ) -> Result, DynError> { - self.nodes - .start_node_with(name, options) - .await - .map_err(|err| err.into()) - } - +impl ClusterWaitHandle for ManualCluster { async fn wait_network_ready(&self) -> Result<(), DynError> { self.wait_network_ready().await.map_err(|err| err.into()) } } + +#[async_trait::async_trait] +impl ManualClusterHandle for ManualCluster {}