diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index fa3befd..4f909a6 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -20,10 +20,12 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; - let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; + let attach_source = metadata + .existing_cluster() + .map_err(|err| anyhow!("{err}"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .with_run_duration(Duration::from_secs(5)) - .with_attach_source(attach_source) + .with_existing_cluster(attach_source) .build()?; let attached_deployer = LbcComposeDeployer::default(); diff --git a/logos/examples/tests/k8s_attach_node_control.rs b/logos/examples/tests/k8s_attach_node_control.rs index 72b2c9d..47bbe35 100644 --- a/logos/examples/tests/k8s_attach_node_control.rs +++ b/logos/examples/tests/k8s_attach_node_control.rs @@ -20,10 +20,12 @@ async fn k8s_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; - let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; + let attach_source = metadata + .existing_cluster() + .map_err(|err| anyhow!("{err}"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .with_run_duration(Duration::from_secs(5)) - .with_attach_source(attach_source) + .with_existing_cluster(attach_source) .build()?; let attached_deployer = LbcK8sDeployer::default(); diff --git a/testing-framework/core/src/scenario/definition.rs b/testing-framework/core/src/scenario/definition.rs index b16b6a7..0eea6f2 100644 --- a/testing-framework/core/src/scenario/definition.rs +++ b/testing-framework/core/src/scenario/definition.rs @@ -4,14 +4,15 @@ use thiserror::Error; use tracing::{debug, info}; use super::{ - Application, AttachSource, DeploymentPolicy, DynError, ExternalNodeSource, - HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability, ScenarioSources, + Application, DeploymentPolicy, DynError, ExistingCluster, ExternalNodeSource, + HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability, builder_ops::CoreBuilderAccess, expectation::Expectation, runtime::{ context::RunMetrics, orchestration::{SourceOrchestrationPlan, SourceOrchestrationPlanError}, }, + sources::ScenarioSources, workload::Workload, }; use crate::topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, DynTopologyError}; @@ -113,8 +114,39 @@ impl Scenario { } #[must_use] - pub fn sources(&self) -> &ScenarioSources { - &self.sources + pub fn existing_cluster(&self) -> Option<&ExistingCluster> { + self.sources.existing_cluster() + } + + #[must_use] + pub const fn uses_existing_cluster(&self) -> bool { + self.sources.uses_existing_cluster() + } + + #[must_use] + #[doc(hidden)] + pub fn attached_source(&self) -> Option<&ExistingCluster> { + self.existing_cluster() + } + + #[must_use] + pub fn external_nodes(&self) -> &[ExternalNodeSource] { + self.sources.external_nodes() + } + + #[must_use] + pub const fn is_managed(&self) -> bool { + self.sources.is_managed() + } + + #[must_use] + pub const fn is_external_only(&self) -> bool { + self.sources.is_external_only() + } + + #[must_use] + pub fn has_external_nodes(&self) -> bool { + !self.sources.external_nodes().is_empty() } #[must_use] @@ -233,8 +265,14 @@ macro_rules! impl_common_builder_methods { } #[must_use] - pub fn with_attach_source(self, attach: AttachSource) -> Self { - self.map_core_builder(|builder| builder.with_attach_source(attach)) + pub fn with_existing_cluster(self, cluster: ExistingCluster) -> Self { + self.map_core_builder(|builder| builder.with_existing_cluster(cluster)) + } + + #[must_use] + #[doc(hidden)] + pub fn with_attach_source(self, attach: ExistingCluster) -> Self { + self.with_existing_cluster(attach) } #[must_use] @@ -546,11 +584,17 @@ impl Builder { } #[must_use] - pub fn with_attach_source(mut self, attach: AttachSource) -> Self { - self.sources = self.sources.with_attach(attach); + pub fn with_existing_cluster(mut self, cluster: ExistingCluster) -> Self { + self.sources = self.sources.with_attach(cluster); self } + #[must_use] + #[doc(hidden)] + pub fn with_attach_source(self, attach: ExistingCluster) -> Self { + self.with_existing_cluster(attach) + } + #[must_use] pub fn with_external_node(mut self, node: ExternalNodeSource) -> Self { self.sources = self.sources.with_external_node(node); diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 8b570ad..ca358a5 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -51,7 +51,7 @@ pub use runtime::{ wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, }; -pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources}; +pub use sources::{ExistingCluster, ExternalNodeSource}; pub use workload::Workload; pub use crate::env::Application; diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index f1dd9b9..2c3cb4e 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -1,4 +1,4 @@ -use crate::scenario::{AttachSource, ExternalNodeSource, ScenarioSources}; +use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources}; /// Explicit descriptor for managed node sourcing. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -19,7 +19,7 @@ pub(crate) enum SourceOrchestrationMode { external: Vec, }, Attached { - attach: AttachSource, + attach: ExistingCluster, external: Vec, }, ExternalOnly { @@ -43,7 +43,7 @@ pub enum SourceOrchestrationPlanError { } impl SourceOrchestrationPlan { - pub fn try_from_sources( + pub(crate) fn try_from_sources( sources: &ScenarioSources, ) -> Result { let mode = mode_from_sources(sources); @@ -69,11 +69,15 @@ impl SourceOrchestrationPlan { #[cfg(test)] mod tests { use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; - use crate::scenario::{AttachSource, ScenarioSources}; + use crate::scenario::{ExistingCluster, sources::ScenarioSources}; #[test] fn attached_sources_are_planned() { - let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()])); + let sources = + ScenarioSources::default().with_attach(ExistingCluster::for_compose_services( + "test-project".to_string(), + vec!["node-0".to_string()], + )); let plan = SourceOrchestrationPlan::try_from_sources(&sources) .expect("attached sources should build a source orchestration plan"); diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs index ebf2d2d..d6f822b 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -41,7 +41,7 @@ pub enum SourceResolveError { pub fn build_source_orchestration_plan( scenario: &Scenario, ) -> Result { - SourceOrchestrationPlan::try_from_sources(scenario.sources()) + Ok(scenario.source_orchestration_plan().clone()) } /// Resolves runtime source nodes via unified providers from orchestration plan. diff --git a/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs b/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs index d31bd78..b239193 100644 --- a/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs +++ b/testing-framework/core/src/scenario/runtime/providers/attach_provider.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use crate::scenario::{Application, AttachSource, DynError}; +use crate::scenario::{Application, DynError, ExistingCluster}; /// Attached node discovered from an existing external cluster source. #[derive(Clone, Debug)] @@ -15,7 +15,7 @@ pub struct AttachedNode { #[derive(Debug, thiserror::Error)] pub enum AttachProviderError { #[error("attach source is not supported by this provider: {attach_source:?}")] - UnsupportedSource { attach_source: AttachSource }, + UnsupportedSource { attach_source: ExistingCluster }, #[error("attach discovery failed: {source}")] Discovery { #[source] @@ -32,7 +32,7 @@ pub trait AttachProvider: Send + Sync { /// Discovers node clients for the requested attach source. async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError>; } @@ -44,7 +44,7 @@ pub struct NoopAttachProvider; impl AttachProvider for NoopAttachProvider { async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError> { Err(AttachProviderError::UnsupportedSource { attach_source: source.clone(), diff --git a/testing-framework/core/src/scenario/sources/mod.rs b/testing-framework/core/src/scenario/sources/mod.rs index 98324dd..506a8aa 100644 --- a/testing-framework/core/src/scenario/sources/mod.rs +++ b/testing-framework/core/src/scenario/sources/mod.rs @@ -1,3 +1,5 @@ mod model; -pub use model::{AttachSource, ExternalNodeSource, ScenarioSources}; +pub(crate) use model::ScenarioSources; +#[doc(hidden)] +pub use model::{ExistingCluster, ExternalNodeSource}; diff --git a/testing-framework/core/src/scenario/sources/model.rs b/testing-framework/core/src/scenario/sources/model.rs index 98c60ad..a64b010 100644 --- a/testing-framework/core/src/scenario/sources/model.rs +++ b/testing-framework/core/src/scenario/sources/model.rs @@ -1,6 +1,11 @@ -/// Typed attach source for existing clusters. +/// Typed descriptor for an existing cluster. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum AttachSource { +pub struct ExistingCluster { + kind: ExistingClusterKind, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +enum ExistingClusterKind { K8s { namespace: Option, label_selector: String, @@ -11,36 +16,80 @@ pub enum AttachSource { }, } -impl AttachSource { +impl ExistingCluster { #[must_use] - pub fn k8s(label_selector: String) -> Self { - Self::K8s { - namespace: None, - label_selector, + pub fn for_k8s_selector(label_selector: String) -> Self { + Self { + kind: ExistingClusterKind::K8s { + namespace: None, + label_selector, + }, } } #[must_use] - pub fn k8s_in_namespace(label_selector: String, namespace: String) -> Self { - Self::K8s { - namespace: Some(namespace), - label_selector, + pub fn for_k8s_selector_in_namespace(namespace: String, label_selector: String) -> Self { + Self { + kind: ExistingClusterKind::K8s { + namespace: Some(namespace), + label_selector, + }, } } #[must_use] - pub fn compose(services: Vec) -> Self { - Self::Compose { - project: None, - services, + pub fn for_compose_project(project: String) -> Self { + Self { + kind: ExistingClusterKind::Compose { + project: Some(project), + services: Vec::new(), + }, } } #[must_use] - pub fn compose_in_project(services: Vec, project: String) -> Self { - Self::Compose { - project: Some(project), - services, + pub fn for_compose_services(project: String, services: Vec) -> Self { + Self { + kind: ExistingClusterKind::Compose { + project: Some(project), + services, + }, + } + } + + #[must_use] + #[doc(hidden)] + pub fn compose_project(&self) -> Option<&str> { + match &self.kind { + ExistingClusterKind::Compose { project, .. } => project.as_deref(), + ExistingClusterKind::K8s { .. } => None, + } + } + + #[must_use] + #[doc(hidden)] + pub fn compose_services(&self) -> Option<&[String]> { + match &self.kind { + ExistingClusterKind::Compose { services, .. } => Some(services), + ExistingClusterKind::K8s { .. } => None, + } + } + + #[must_use] + #[doc(hidden)] + pub fn k8s_namespace(&self) -> Option<&str> { + match &self.kind { + ExistingClusterKind::K8s { namespace, .. } => namespace.as_deref(), + ExistingClusterKind::Compose { .. } => None, + } + } + + #[must_use] + #[doc(hidden)] + pub fn k8s_label_selector(&self) -> Option<&str> { + match &self.kind { + ExistingClusterKind::K8s { label_selector, .. } => Some(label_selector), + ExistingClusterKind::Compose { .. } => None, } } } @@ -73,12 +122,12 @@ impl ExternalNodeSource { /// Source model that makes invalid managed+attached combinations /// unrepresentable by type. #[derive(Clone, Debug, Eq, PartialEq)] -pub enum ScenarioSources { +pub(crate) enum ScenarioSources { Managed { external: Vec, }, Attached { - attach: AttachSource, + attach: ExistingCluster, external: Vec, }, ExternalOnly { @@ -96,27 +145,7 @@ impl Default for ScenarioSources { impl ScenarioSources { #[must_use] - pub const fn managed() -> Self { - Self::Managed { - external: Vec::new(), - } - } - - #[must_use] - pub fn attached(attach: AttachSource) -> Self { - Self::Attached { - attach, - external: Vec::new(), - } - } - - #[must_use] - pub fn external_only(external: Vec) -> Self { - Self::ExternalOnly { external } - } - - #[must_use] - pub fn with_external_node(mut self, node: ExternalNodeSource) -> Self { + pub(crate) fn with_external_node(mut self, node: ExternalNodeSource) -> Self { match &mut self { Self::Managed { external } | Self::Attached { external, .. } @@ -127,21 +156,29 @@ impl ScenarioSources { } #[must_use] - pub fn with_attach(self, attach: AttachSource) -> Self { + pub(crate) fn with_attach(self, attach: ExistingCluster) -> Self { let external = self.external_nodes().to_vec(); Self::Attached { attach, external } } #[must_use] - pub fn into_external_only(self) -> Self { + pub(crate) fn into_external_only(self) -> Self { let external = self.external_nodes().to_vec(); Self::ExternalOnly { external } } #[must_use] - pub fn external_nodes(&self) -> &[ExternalNodeSource] { + pub(crate) fn existing_cluster(&self) -> Option<&ExistingCluster> { + match self { + Self::Attached { attach, .. } => Some(attach), + Self::Managed { .. } | Self::ExternalOnly { .. } => None, + } + } + + #[must_use] + pub(crate) fn external_nodes(&self) -> &[ExternalNodeSource] { match self { Self::Managed { external } | Self::Attached { external, .. } @@ -150,17 +187,17 @@ impl ScenarioSources { } #[must_use] - pub const fn is_managed(&self) -> bool { + pub(crate) const fn is_managed(&self) -> bool { matches!(self, Self::Managed { .. }) } #[must_use] - pub const fn is_attached(&self) -> bool { + pub(crate) const fn uses_existing_cluster(&self) -> bool { matches!(self, Self::Attached { .. }) } #[must_use] - pub const fn is_external_only(&self) -> bool { + pub(crate) const fn is_external_only(&self) -> bool { matches!(self, Self::ExternalOnly { .. }) } } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 87358aa..8874fd1 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -2,8 +2,8 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, - ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, + AttachProvider, AttachProviderError, AttachedNode, ClusterWaitHandle, DynError, + ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; use url::Url; @@ -22,7 +22,7 @@ pub(super) struct ComposeAttachProvider { pub(super) struct ComposeAttachedClusterWait { host: String, - source: AttachSource, + source: ExistingCluster, _env: PhantomData, } @@ -42,12 +42,14 @@ impl ComposeAttachProvider { } impl ComposeAttachedClusterWait { - pub(super) fn new(host: String, source: AttachSource) -> Self { - Self { + pub(super) fn try_new(host: String, source: &ExistingCluster) -> Result { + let _ = compose_wait_request(source)?; + + Ok(Self { host, - source, + source: source.clone(), _env: PhantomData, - } + }) } } @@ -60,7 +62,7 @@ struct ComposeAttachRequest<'a> { impl AttachProvider for ComposeAttachProvider { async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError> { let request = compose_attach_request(source)?; let services = resolve_services(request.project, request.services) @@ -85,16 +87,17 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { } fn compose_attach_request( - source: &AttachSource, + source: &ExistingCluster, ) -> Result, AttachProviderError> { - let AttachSource::Compose { project, services } = source else { - return Err(AttachProviderError::UnsupportedSource { - attach_source: source.clone(), - }); - }; + let services = + source + .compose_services() + .ok_or_else(|| AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + })?; - let project = project - .as_deref() + let project = source + .compose_project() .ok_or_else(|| AttachProviderError::Discovery { source: ComposeAttachDiscoveryError::MissingProjectName.into(), })?; @@ -172,14 +175,13 @@ impl ClusterWaitHandle for ComposeAttachedClusterWait } } -fn compose_wait_request(source: &AttachSource) -> Result, DynError> { - let AttachSource::Compose { project, services } = source else { - return Err("compose cluster wait requires a compose attach source".into()); - }; - - let project = project - .as_deref() - .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; +fn compose_wait_request(source: &ExistingCluster) -> Result, DynError> { + let project = source + .compose_project() + .ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?; + let services = source + .compose_services() + .ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?; Ok(ComposeAttachRequest { project, services }) } diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 2f809c7..a0c35d5 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -9,7 +9,7 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, + CleanupGuard, Deployer, DynError, ExistingCluster, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl, Runner, Scenario, }; @@ -36,6 +36,22 @@ enum ComposeMetadataError { } impl ComposeDeploymentMetadata { + #[must_use] + pub fn for_project(project_name: String) -> Self { + Self { + project_name: Some(project_name), + } + } + + #[must_use] + pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self { + Self { + project_name: cluster + .and_then(ExistingCluster::compose_project) + .map(ToOwned::to_owned), + } + } + /// Returns project name when deployment is bound to a specific compose /// project. #[must_use] @@ -43,33 +59,45 @@ impl ComposeDeploymentMetadata { self.project_name.as_deref() } - /// Builds an attach source for the same compose project using deployer - /// discovery to resolve services. - pub fn attach_source(&self) -> Result { + /// Builds an existing-cluster descriptor for the same compose project + /// using deployer discovery to resolve services. + pub fn existing_cluster(&self) -> Result { let project_name = self .project_name() .ok_or(ComposeMetadataError::MissingProjectName)?; - Ok(AttachSource::compose_in_project( - Vec::new(), + Ok(ExistingCluster::for_compose_project( project_name.to_owned(), )) } - /// Builds an attach source for the same compose project. - pub fn attach_source_for_services( + /// Builds an existing-cluster descriptor for the same compose project. + pub fn existing_cluster_for_services( &self, services: Vec, - ) -> Result { + ) -> Result { let project_name = self .project_name() .ok_or(ComposeMetadataError::MissingProjectName)?; - Ok(AttachSource::compose_in_project( - services, + Ok(ExistingCluster::for_compose_services( project_name.to_owned(), + services, )) } + + #[doc(hidden)] + pub fn attach_source(&self) -> Result { + self.existing_cluster() + } + + #[doc(hidden)] + pub fn attach_source_for_services( + &self, + services: Vec, + ) -> Result { + self.existing_cluster_for_services(services) + } } impl Default for ComposeDeployer { diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index ddd88f0..29b58c3 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,11 +3,10 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, - DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, - NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, + FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, + Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -71,7 +70,7 @@ impl DeploymentOrchestrator { } })?; - if scenario.sources().is_attached() { + if scenario.uses_existing_cluster() { return self .deploy_attached_only::(scenario, source_plan) .await @@ -214,31 +213,15 @@ impl DeploymentOrchestrator { return Ok(None); } - let ScenarioSources::Attached { attach, .. } = scenario.sources() else { - return Err(ComposeRunnerError::InternalInvariant { + let attach = scenario + .existing_cluster() + .ok_or(ComposeRunnerError::InternalInvariant { message: "attached node control requested outside attached source mode", - }); - }; + })?; + let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; - let AttachSource::Compose { project, .. } = attach else { - return Err(ComposeRunnerError::InternalInvariant { - message: "compose deployer requires compose attach source for node control", - }); - }; - - let Some(project_name) = project - .as_ref() - .map(|value| value.trim()) - .filter(|value| !value.is_empty()) - else { - return Err(ComposeRunnerError::InternalInvariant { - message: "attached compose mode requires explicit project name for node control", - }); - }; - - Ok(Some(Arc::new(ComposeAttachedNodeControl { - project_name: project_name.to_owned(), - }) as Arc>)) + Ok(Some(Arc::new(node_control) as Arc>)) } fn attached_cluster_wait( @@ -248,16 +231,15 @@ impl DeploymentOrchestrator { where Caps: Send + Sync, { - let ScenarioSources::Attached { attach, .. } = scenario.sources() else { - return Err(ComposeRunnerError::InternalInvariant { + let attach = scenario + .existing_cluster() + .ok_or(ComposeRunnerError::InternalInvariant { message: "compose attached cluster wait requested outside attached source mode", - }); - }; + })?; + let cluster_wait = ComposeAttachedClusterWait::::try_new(compose_runner_host(), attach) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - attach.clone(), - ))) + Ok(Arc::new(cluster_wait)) } async fn build_runner( @@ -274,7 +256,8 @@ impl DeploymentOrchestrator { { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); - let cluster_wait = self.managed_cluster_wait(project_name); + let cluster_wait = + self.managed_cluster_wait(ComposeDeploymentMetadata::for_project(project_name))?; log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); @@ -318,11 +301,18 @@ impl DeploymentOrchestrator { }) } - fn managed_cluster_wait(&self, project_name: String) -> Arc> { - Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - AttachSource::compose_in_project(Vec::new(), project_name), - )) + fn managed_cluster_wait( + &self, + metadata: ComposeDeploymentMetadata, + ) -> Result>, ComposeRunnerError> { + let existing_cluster = metadata + .existing_cluster() + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let cluster_wait = + ComposeAttachedClusterWait::::try_new(compose_runner_host(), &existing_cluster) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + + Ok(Arc::new(cluster_wait)) } fn log_deploy_start( @@ -378,15 +368,7 @@ where E: ComposeDeployEnv, Caps: Send + Sync, { - let project_name = match scenario.sources() { - ScenarioSources::Attached { - attach: AttachSource::Compose { project, .. }, - .. - } => project.clone(), - _ => None, - }; - - ComposeDeploymentMetadata { project_name } + ComposeDeploymentMetadata::from_existing_cluster(scenario.existing_cluster()) } struct DeployedNodes { diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 8e98948..90b8a3d 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -5,7 +5,7 @@ use std::{ use testing_framework_core::{ adjust_timeout, - scenario::{Application, DynError, NodeControlHandle}, + scenario::{Application, DynError, ExistingCluster, NodeControlHandle}, }; use tokio::{process::Command, time::timeout}; use tracing::info; @@ -160,6 +160,22 @@ pub struct ComposeAttachedNodeControl { pub(crate) project_name: String, } +impl ComposeAttachedNodeControl { + pub fn try_from_existing_cluster(source: &ExistingCluster) -> Result { + let Some(project_name) = source + .compose_project() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return Err("attached compose node control requires explicit project name".into()); + }; + + Ok(Self { + project_name: project_name.to_owned(), + }) + } +} + #[async_trait::async_trait] impl NodeControlHandle for ComposeAttachedNodeControl { async fn restart_node(&self, name: &str) -> Result<(), DynError> { diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index b10a24a..43925bc 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -7,8 +7,8 @@ use kube::{ api::{ListParams, ObjectList}, }; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, - ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, + AttachProvider, AttachProviderError, AttachedNode, ClusterWaitHandle, DynError, + ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; use url::Url; @@ -37,7 +37,7 @@ pub(super) struct K8sAttachProvider { pub(super) struct K8sAttachedClusterWait { client: Client, - source: AttachSource, + source: ExistingCluster, _env: PhantomData, } @@ -56,12 +56,14 @@ impl K8sAttachProvider { } impl K8sAttachedClusterWait { - pub(super) fn new(client: Client, source: AttachSource) -> Self { - Self { + pub(super) fn try_new(client: Client, source: &ExistingCluster) -> Result { + let _ = k8s_wait_request(source)?; + + Ok(Self { client, - source, + source: source.clone(), _env: PhantomData, - } + }) } } @@ -69,7 +71,7 @@ impl K8sAttachedClusterWait { impl AttachProvider for K8sAttachProvider { async fn discover( &self, - source: &AttachSource, + source: &ExistingCluster, ) -> Result>, AttachProviderError> { let request = k8s_attach_request(source)?; let services = discover_services(&self.client, request.namespace, request.label_selector) @@ -90,12 +92,10 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } -fn k8s_attach_request(source: &AttachSource) -> Result, AttachProviderError> { - let AttachSource::K8s { - namespace, - label_selector, - } = source - else { +fn k8s_attach_request( + source: &ExistingCluster, +) -> Result, AttachProviderError> { + let Some(label_selector) = source.k8s_label_selector() else { return Err(AttachProviderError::UnsupportedSource { attach_source: source.clone(), }); @@ -108,7 +108,7 @@ fn k8s_attach_request(source: &AttachSource) -> Result, Att } Ok(K8sAttachRequest { - namespace: namespace.as_deref().unwrap_or("default"), + namespace: source.k8s_namespace().unwrap_or("default"), label_selector, }) } @@ -246,21 +246,17 @@ impl ClusterWaitHandle for K8sAttachedClusterWait { } } -fn k8s_wait_request(source: &AttachSource) -> Result, DynError> { - let AttachSource::K8s { - namespace, - label_selector, - } = source - else { - return Err("k8s cluster wait requires a k8s attach source".into()); - }; +fn k8s_wait_request(source: &ExistingCluster) -> Result, DynError> { + let label_selector = source + .k8s_label_selector() + .ok_or_else(|| DynError::from("k8s cluster wait requires a k8s attach source"))?; if label_selector.trim().is_empty() { return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); } Ok(K8sAttachRequest { - namespace: namespace.as_deref().unwrap_or("default"), + namespace: source.k8s_namespace().unwrap_or("default"), label_selector, }) } diff --git a/testing-framework/deployers/k8s/src/deployer/mod.rs b/testing-framework/deployers/k8s/src/deployer/mod.rs index 43f0ff5..d97615b 100644 --- a/testing-framework/deployers/k8s/src/deployer/mod.rs +++ b/testing-framework/deployers/k8s/src/deployer/mod.rs @@ -2,7 +2,7 @@ mod attach_provider; mod orchestrator; pub use orchestrator::{K8sDeployer, K8sRunnerError}; -use testing_framework_core::scenario::{AttachSource, DynError}; +use testing_framework_core::scenario::{DynError, ExistingCluster}; /// Kubernetes deployment metadata returned by k8s-specific deployment APIs. #[derive(Clone, Debug, Eq, PartialEq)] @@ -22,6 +22,18 @@ enum K8sMetadataError { } impl K8sDeploymentMetadata { + #[must_use] + pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self { + Self { + namespace: cluster + .and_then(ExistingCluster::k8s_namespace) + .map(ToOwned::to_owned), + label_selector: cluster + .and_then(ExistingCluster::k8s_label_selector) + .map(ToOwned::to_owned), + } + } + /// Returns namespace when deployment is bound to a specific namespace. #[must_use] pub fn namespace(&self) -> Option<&str> { @@ -34,16 +46,21 @@ impl K8sDeploymentMetadata { self.label_selector.as_deref() } - /// Builds an attach source for the same k8s deployment scope. - pub fn attach_source(&self) -> Result { + /// Builds an existing-cluster descriptor for the same k8s deployment scope. + pub fn existing_cluster(&self) -> Result { let namespace = self.namespace().ok_or(K8sMetadataError::MissingNamespace)?; let label_selector = self .label_selector() .ok_or(K8sMetadataError::MissingLabelSelector)?; - Ok(AttachSource::k8s_in_namespace( - label_selector.to_owned(), + Ok(ExistingCluster::for_k8s_selector_in_namespace( namespace.to_owned(), + label_selector.to_owned(), )) } + + #[doc(hidden)] + pub fn attach_source(&self) -> Result { + self.existing_cluster() + } } diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index eb39b33..5fa21fe 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -5,12 +5,11 @@ use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ - Application, ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, - Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, - MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, - build_source_orchestration_plan, orchestrate_sources_with_providers, + Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer, + DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, + NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, + RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -180,7 +179,7 @@ where let observability = resolve_observability_inputs(scenario.capabilities())?; - if scenario.sources().is_attached() { + if scenario.uses_existing_cluster() { let runner = deploy_attached_only::(scenario, source_plan, observability).await?; return Ok((runner, attached_metadata(scenario))); } @@ -250,23 +249,7 @@ where E: K8sDeployEnv, Caps: Send + Sync, { - match scenario.sources() { - ScenarioSources::Attached { - attach: - AttachSource::K8s { - namespace, - label_selector, - }, - .. - } => K8sDeploymentMetadata { - namespace: namespace.clone(), - label_selector: Some(label_selector.clone()), - }, - _ => K8sDeploymentMetadata { - namespace: None, - label_selector: None, - }, - } + K8sDeploymentMetadata::from_existing_cluster(scenario.existing_cluster()) } fn attached_cluster_wait( @@ -277,16 +260,15 @@ where E: K8sDeployEnv, Caps: Send + Sync, { - let ScenarioSources::Attached { attach, .. } = scenario.sources() else { - return Err(K8sRunnerError::InternalInvariant { + let attach = scenario + .existing_cluster() + .ok_or_else(|| K8sRunnerError::InternalInvariant { message: "k8s attached cluster wait requested outside attached source mode".to_owned(), - }); - }; + })?; + let cluster_wait = K8sAttachedClusterWait::::try_new(client, attach) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(K8sAttachedClusterWait::::new( - client, - attach.clone(), - ))) + Ok(Arc::new(cluster_wait)) } fn managed_cluster_wait( @@ -295,13 +277,12 @@ fn managed_cluster_wait( ) -> Result>, K8sRunnerError> { let client = client_from_cluster(cluster)?; let attach_source = metadata - .attach_source() + .existing_cluster() + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + let cluster_wait = K8sAttachedClusterWait::::try_new(client, &attach_source) .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(K8sAttachedClusterWait::::new( - client, - attach_source, - ))) + Ok(Arc::new(cluster_wait)) } fn client_from_cluster(cluster: &Option) -> Result {