framework: tighten source API contract

This commit is contained in:
Andrus Salumets 2026-03-25 13:23:44 +07:00 committed by GitHub
commit 0d00bb3f7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 339 additions and 226 deletions

View File

@ -20,10 +20,12 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> {
Err(error) => return Err(Error::new(error)), Err(error) => return Err(Error::new(error)),
}; };
let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; let attach_source = metadata
.existing_cluster()
.map_err(|err| anyhow!("{err}"))?;
let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.with_run_duration(Duration::from_secs(5)) .with_run_duration(Duration::from_secs(5))
.with_attach_source(attach_source) .with_existing_cluster(attach_source)
.build()?; .build()?;
let attached_deployer = LbcComposeDeployer::default(); let attached_deployer = LbcComposeDeployer::default();

View File

@ -20,10 +20,12 @@ async fn k8s_attach_mode_queries_node_api_opt_in() -> Result<()> {
Err(error) => return Err(Error::new(error)), Err(error) => return Err(Error::new(error)),
}; };
let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; let attach_source = metadata
.existing_cluster()
.map_err(|err| anyhow!("{err}"))?;
let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.with_run_duration(Duration::from_secs(5)) .with_run_duration(Duration::from_secs(5))
.with_attach_source(attach_source) .with_existing_cluster(attach_source)
.build()?; .build()?;
let attached_deployer = LbcK8sDeployer::default(); let attached_deployer = LbcK8sDeployer::default();

View File

@ -4,14 +4,15 @@ use thiserror::Error;
use tracing::{debug, info}; use tracing::{debug, info};
use super::{ use super::{
Application, AttachSource, DeploymentPolicy, DynError, ExternalNodeSource, Application, DeploymentPolicy, DynError, ExistingCluster, ExternalNodeSource,
HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability, ScenarioSources, HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability,
builder_ops::CoreBuilderAccess, builder_ops::CoreBuilderAccess,
expectation::Expectation, expectation::Expectation,
runtime::{ runtime::{
context::RunMetrics, context::RunMetrics,
orchestration::{SourceOrchestrationPlan, SourceOrchestrationPlanError}, orchestration::{SourceOrchestrationPlan, SourceOrchestrationPlanError},
}, },
sources::ScenarioSources,
workload::Workload, workload::Workload,
}; };
use crate::topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, DynTopologyError}; use crate::topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, DynTopologyError};
@ -113,8 +114,39 @@ impl<E: Application, Caps> Scenario<E, Caps> {
} }
#[must_use] #[must_use]
pub fn sources(&self) -> &ScenarioSources { pub fn existing_cluster(&self) -> Option<&ExistingCluster> {
&self.sources self.sources.existing_cluster()
}
#[must_use]
pub const fn uses_existing_cluster(&self) -> bool {
self.sources.uses_existing_cluster()
}
#[must_use]
#[doc(hidden)]
pub fn attached_source(&self) -> Option<&ExistingCluster> {
self.existing_cluster()
}
#[must_use]
pub fn external_nodes(&self) -> &[ExternalNodeSource] {
self.sources.external_nodes()
}
#[must_use]
pub const fn is_managed(&self) -> bool {
self.sources.is_managed()
}
#[must_use]
pub const fn is_external_only(&self) -> bool {
self.sources.is_external_only()
}
#[must_use]
pub fn has_external_nodes(&self) -> bool {
!self.sources.external_nodes().is_empty()
} }
#[must_use] #[must_use]
@ -233,8 +265,14 @@ macro_rules! impl_common_builder_methods {
} }
#[must_use] #[must_use]
pub fn with_attach_source(self, attach: AttachSource) -> Self { pub fn with_existing_cluster(self, cluster: ExistingCluster) -> Self {
self.map_core_builder(|builder| builder.with_attach_source(attach)) self.map_core_builder(|builder| builder.with_existing_cluster(cluster))
}
#[must_use]
#[doc(hidden)]
pub fn with_attach_source(self, attach: ExistingCluster) -> Self {
self.with_existing_cluster(attach)
} }
#[must_use] #[must_use]
@ -546,11 +584,17 @@ impl<E: Application, Caps> Builder<E, Caps> {
} }
#[must_use] #[must_use]
pub fn with_attach_source(mut self, attach: AttachSource) -> Self { pub fn with_existing_cluster(mut self, cluster: ExistingCluster) -> Self {
self.sources = self.sources.with_attach(attach); self.sources = self.sources.with_attach(cluster);
self self
} }
#[must_use]
#[doc(hidden)]
pub fn with_attach_source(self, attach: ExistingCluster) -> Self {
self.with_existing_cluster(attach)
}
#[must_use] #[must_use]
pub fn with_external_node(mut self, node: ExternalNodeSource) -> Self { pub fn with_external_node(mut self, node: ExternalNodeSource) -> Self {
self.sources = self.sources.with_external_node(node); self.sources = self.sources.with_external_node(node);

View File

@ -51,7 +51,7 @@ pub use runtime::{
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_http_readiness, wait_until_stable, wait_http_readiness, wait_until_stable,
}; };
pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources}; pub use sources::{ExistingCluster, ExternalNodeSource};
pub use workload::Workload; pub use workload::Workload;
pub use crate::env::Application; pub use crate::env::Application;

View File

@ -1,4 +1,4 @@
use crate::scenario::{AttachSource, ExternalNodeSource, ScenarioSources}; use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources};
/// Explicit descriptor for managed node sourcing. /// Explicit descriptor for managed node sourcing.
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
@ -19,7 +19,7 @@ pub(crate) enum SourceOrchestrationMode {
external: Vec<ExternalNodeSource>, external: Vec<ExternalNodeSource>,
}, },
Attached { Attached {
attach: AttachSource, attach: ExistingCluster,
external: Vec<ExternalNodeSource>, external: Vec<ExternalNodeSource>,
}, },
ExternalOnly { ExternalOnly {
@ -43,7 +43,7 @@ pub enum SourceOrchestrationPlanError {
} }
impl SourceOrchestrationPlan { impl SourceOrchestrationPlan {
pub fn try_from_sources( pub(crate) fn try_from_sources(
sources: &ScenarioSources, sources: &ScenarioSources,
) -> Result<Self, SourceOrchestrationPlanError> { ) -> Result<Self, SourceOrchestrationPlanError> {
let mode = mode_from_sources(sources); let mode = mode_from_sources(sources);
@ -69,11 +69,15 @@ impl SourceOrchestrationPlan {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; use super::{SourceOrchestrationMode, SourceOrchestrationPlan};
use crate::scenario::{AttachSource, ScenarioSources}; use crate::scenario::{ExistingCluster, sources::ScenarioSources};
#[test] #[test]
fn attached_sources_are_planned() { fn attached_sources_are_planned() {
let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()])); let sources =
ScenarioSources::default().with_attach(ExistingCluster::for_compose_services(
"test-project".to_string(),
vec!["node-0".to_string()],
));
let plan = SourceOrchestrationPlan::try_from_sources(&sources) let plan = SourceOrchestrationPlan::try_from_sources(&sources)
.expect("attached sources should build a source orchestration plan"); .expect("attached sources should build a source orchestration plan");

View File

@ -41,7 +41,7 @@ pub enum SourceResolveError {
pub fn build_source_orchestration_plan<E: Application, Caps>( pub fn build_source_orchestration_plan<E: Application, Caps>(
scenario: &Scenario<E, Caps>, scenario: &Scenario<E, Caps>,
) -> Result<SourceOrchestrationPlan, SourceOrchestrationPlanError> { ) -> Result<SourceOrchestrationPlan, SourceOrchestrationPlanError> {
SourceOrchestrationPlan::try_from_sources(scenario.sources()) Ok(scenario.source_orchestration_plan().clone())
} }
/// Resolves runtime source nodes via unified providers from orchestration plan. /// Resolves runtime source nodes via unified providers from orchestration plan.

View File

@ -1,6 +1,6 @@
use async_trait::async_trait; use async_trait::async_trait;
use crate::scenario::{Application, AttachSource, DynError}; use crate::scenario::{Application, DynError, ExistingCluster};
/// Attached node discovered from an existing external cluster source. /// Attached node discovered from an existing external cluster source.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -15,7 +15,7 @@ pub struct AttachedNode<E: Application> {
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum AttachProviderError { pub enum AttachProviderError {
#[error("attach source is not supported by this provider: {attach_source:?}")] #[error("attach source is not supported by this provider: {attach_source:?}")]
UnsupportedSource { attach_source: AttachSource }, UnsupportedSource { attach_source: ExistingCluster },
#[error("attach discovery failed: {source}")] #[error("attach discovery failed: {source}")]
Discovery { Discovery {
#[source] #[source]
@ -32,7 +32,7 @@ pub trait AttachProvider<E: Application>: Send + Sync {
/// Discovers node clients for the requested attach source. /// Discovers node clients for the requested attach source.
async fn discover( async fn discover(
&self, &self,
source: &AttachSource, source: &ExistingCluster,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError>; ) -> Result<Vec<AttachedNode<E>>, AttachProviderError>;
} }
@ -44,7 +44,7 @@ pub struct NoopAttachProvider;
impl<E: Application> AttachProvider<E> for NoopAttachProvider { impl<E: Application> AttachProvider<E> for NoopAttachProvider {
async fn discover( async fn discover(
&self, &self,
source: &AttachSource, source: &ExistingCluster,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> { ) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
Err(AttachProviderError::UnsupportedSource { Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(), attach_source: source.clone(),

View File

@ -1,3 +1,5 @@
mod model; mod model;
pub use model::{AttachSource, ExternalNodeSource, ScenarioSources}; pub(crate) use model::ScenarioSources;
#[doc(hidden)]
pub use model::{ExistingCluster, ExternalNodeSource};

View File

@ -1,6 +1,11 @@
/// Typed attach source for existing clusters. /// Typed descriptor for an existing cluster.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub enum AttachSource { pub struct ExistingCluster {
kind: ExistingClusterKind,
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum ExistingClusterKind {
K8s { K8s {
namespace: Option<String>, namespace: Option<String>,
label_selector: String, label_selector: String,
@ -11,36 +16,80 @@ pub enum AttachSource {
}, },
} }
impl AttachSource { impl ExistingCluster {
#[must_use] #[must_use]
pub fn k8s(label_selector: String) -> Self { pub fn for_k8s_selector(label_selector: String) -> Self {
Self::K8s { Self {
namespace: None, kind: ExistingClusterKind::K8s {
label_selector, namespace: None,
label_selector,
},
} }
} }
#[must_use] #[must_use]
pub fn k8s_in_namespace(label_selector: String, namespace: String) -> Self { pub fn for_k8s_selector_in_namespace(namespace: String, label_selector: String) -> Self {
Self::K8s { Self {
namespace: Some(namespace), kind: ExistingClusterKind::K8s {
label_selector, namespace: Some(namespace),
label_selector,
},
} }
} }
#[must_use] #[must_use]
pub fn compose(services: Vec<String>) -> Self { pub fn for_compose_project(project: String) -> Self {
Self::Compose { Self {
project: None, kind: ExistingClusterKind::Compose {
services, project: Some(project),
services: Vec::new(),
},
} }
} }
#[must_use] #[must_use]
pub fn compose_in_project(services: Vec<String>, project: String) -> Self { pub fn for_compose_services(project: String, services: Vec<String>) -> Self {
Self::Compose { Self {
project: Some(project), kind: ExistingClusterKind::Compose {
services, project: Some(project),
services,
},
}
}
#[must_use]
#[doc(hidden)]
pub fn compose_project(&self) -> Option<&str> {
match &self.kind {
ExistingClusterKind::Compose { project, .. } => project.as_deref(),
ExistingClusterKind::K8s { .. } => None,
}
}
#[must_use]
#[doc(hidden)]
pub fn compose_services(&self) -> Option<&[String]> {
match &self.kind {
ExistingClusterKind::Compose { services, .. } => Some(services),
ExistingClusterKind::K8s { .. } => None,
}
}
#[must_use]
#[doc(hidden)]
pub fn k8s_namespace(&self) -> Option<&str> {
match &self.kind {
ExistingClusterKind::K8s { namespace, .. } => namespace.as_deref(),
ExistingClusterKind::Compose { .. } => None,
}
}
#[must_use]
#[doc(hidden)]
pub fn k8s_label_selector(&self) -> Option<&str> {
match &self.kind {
ExistingClusterKind::K8s { label_selector, .. } => Some(label_selector),
ExistingClusterKind::Compose { .. } => None,
} }
} }
} }
@ -73,12 +122,12 @@ impl ExternalNodeSource {
/// Source model that makes invalid managed+attached combinations /// Source model that makes invalid managed+attached combinations
/// unrepresentable by type. /// unrepresentable by type.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub enum ScenarioSources { pub(crate) enum ScenarioSources {
Managed { Managed {
external: Vec<ExternalNodeSource>, external: Vec<ExternalNodeSource>,
}, },
Attached { Attached {
attach: AttachSource, attach: ExistingCluster,
external: Vec<ExternalNodeSource>, external: Vec<ExternalNodeSource>,
}, },
ExternalOnly { ExternalOnly {
@ -96,27 +145,7 @@ impl Default for ScenarioSources {
impl ScenarioSources { impl ScenarioSources {
#[must_use] #[must_use]
pub const fn managed() -> Self { pub(crate) fn with_external_node(mut self, node: ExternalNodeSource) -> Self {
Self::Managed {
external: Vec::new(),
}
}
#[must_use]
pub fn attached(attach: AttachSource) -> Self {
Self::Attached {
attach,
external: Vec::new(),
}
}
#[must_use]
pub fn external_only(external: Vec<ExternalNodeSource>) -> Self {
Self::ExternalOnly { external }
}
#[must_use]
pub fn with_external_node(mut self, node: ExternalNodeSource) -> Self {
match &mut self { match &mut self {
Self::Managed { external } Self::Managed { external }
| Self::Attached { external, .. } | Self::Attached { external, .. }
@ -127,21 +156,29 @@ impl ScenarioSources {
} }
#[must_use] #[must_use]
pub fn with_attach(self, attach: AttachSource) -> Self { pub(crate) fn with_attach(self, attach: ExistingCluster) -> Self {
let external = self.external_nodes().to_vec(); let external = self.external_nodes().to_vec();
Self::Attached { attach, external } Self::Attached { attach, external }
} }
#[must_use] #[must_use]
pub fn into_external_only(self) -> Self { pub(crate) fn into_external_only(self) -> Self {
let external = self.external_nodes().to_vec(); let external = self.external_nodes().to_vec();
Self::ExternalOnly { external } Self::ExternalOnly { external }
} }
#[must_use] #[must_use]
pub fn external_nodes(&self) -> &[ExternalNodeSource] { pub(crate) fn existing_cluster(&self) -> Option<&ExistingCluster> {
match self {
Self::Attached { attach, .. } => Some(attach),
Self::Managed { .. } | Self::ExternalOnly { .. } => None,
}
}
#[must_use]
pub(crate) fn external_nodes(&self) -> &[ExternalNodeSource] {
match self { match self {
Self::Managed { external } Self::Managed { external }
| Self::Attached { external, .. } | Self::Attached { external, .. }
@ -150,17 +187,17 @@ impl ScenarioSources {
} }
#[must_use] #[must_use]
pub const fn is_managed(&self) -> bool { pub(crate) const fn is_managed(&self) -> bool {
matches!(self, Self::Managed { .. }) matches!(self, Self::Managed { .. })
} }
#[must_use] #[must_use]
pub const fn is_attached(&self) -> bool { pub(crate) const fn uses_existing_cluster(&self) -> bool {
matches!(self, Self::Attached { .. }) matches!(self, Self::Attached { .. })
} }
#[must_use] #[must_use]
pub const fn is_external_only(&self) -> bool { pub(crate) const fn is_external_only(&self) -> bool {
matches!(self, Self::ExternalOnly { .. }) matches!(self, Self::ExternalOnly { .. })
} }
} }

View File

@ -2,8 +2,8 @@ use std::marker::PhantomData;
use async_trait::async_trait; use async_trait::async_trait;
use testing_framework_core::scenario::{ use testing_framework_core::scenario::{
AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, AttachProvider, AttachProviderError, AttachedNode, ClusterWaitHandle, DynError,
ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness,
}; };
use url::Url; use url::Url;
@ -22,7 +22,7 @@ pub(super) struct ComposeAttachProvider<E: ComposeDeployEnv> {
pub(super) struct ComposeAttachedClusterWait<E: ComposeDeployEnv> { pub(super) struct ComposeAttachedClusterWait<E: ComposeDeployEnv> {
host: String, host: String,
source: AttachSource, source: ExistingCluster,
_env: PhantomData<E>, _env: PhantomData<E>,
} }
@ -42,12 +42,14 @@ impl<E: ComposeDeployEnv> ComposeAttachProvider<E> {
} }
impl<E: ComposeDeployEnv> ComposeAttachedClusterWait<E> { impl<E: ComposeDeployEnv> ComposeAttachedClusterWait<E> {
pub(super) fn new(host: String, source: AttachSource) -> Self { pub(super) fn try_new(host: String, source: &ExistingCluster) -> Result<Self, DynError> {
Self { let _ = compose_wait_request(source)?;
Ok(Self {
host, host,
source, source: source.clone(),
_env: PhantomData, _env: PhantomData,
} })
} }
} }
@ -60,7 +62,7 @@ struct ComposeAttachRequest<'a> {
impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> { impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
async fn discover( async fn discover(
&self, &self,
source: &AttachSource, source: &ExistingCluster,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> { ) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
let request = compose_attach_request(source)?; let request = compose_attach_request(source)?;
let services = resolve_services(request.project, request.services) let services = resolve_services(request.project, request.services)
@ -85,16 +87,17 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
} }
fn compose_attach_request( fn compose_attach_request(
source: &AttachSource, source: &ExistingCluster,
) -> Result<ComposeAttachRequest<'_>, AttachProviderError> { ) -> Result<ComposeAttachRequest<'_>, AttachProviderError> {
let AttachSource::Compose { project, services } = source else { let services =
return Err(AttachProviderError::UnsupportedSource { source
attach_source: source.clone(), .compose_services()
}); .ok_or_else(|| AttachProviderError::UnsupportedSource {
}; attach_source: source.clone(),
})?;
let project = project let project = source
.as_deref() .compose_project()
.ok_or_else(|| AttachProviderError::Discovery { .ok_or_else(|| AttachProviderError::Discovery {
source: ComposeAttachDiscoveryError::MissingProjectName.into(), source: ComposeAttachDiscoveryError::MissingProjectName.into(),
})?; })?;
@ -172,14 +175,13 @@ impl<E: ComposeDeployEnv> ClusterWaitHandle<E> for ComposeAttachedClusterWait<E>
} }
} }
fn compose_wait_request(source: &AttachSource) -> Result<ComposeAttachRequest<'_>, DynError> { fn compose_wait_request(source: &ExistingCluster) -> Result<ComposeAttachRequest<'_>, DynError> {
let AttachSource::Compose { project, services } = source else { let project = source
return Err("compose cluster wait requires a compose attach source".into()); .compose_project()
}; .ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?;
let services = source
let project = project .compose_services()
.as_deref() .ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?;
.ok_or(ComposeAttachDiscoveryError::MissingProjectName)?;
Ok(ComposeAttachRequest { project, services }) Ok(ComposeAttachRequest { project, services })
} }

View File

@ -9,7 +9,7 @@ use std::marker::PhantomData;
use async_trait::async_trait; use async_trait::async_trait;
use testing_framework_core::scenario::{ use testing_framework_core::scenario::{
AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, CleanupGuard, Deployer, DynError, ExistingCluster, FeedHandle, ObservabilityCapabilityProvider,
RequiresNodeControl, Runner, Scenario, RequiresNodeControl, Runner, Scenario,
}; };
@ -36,6 +36,22 @@ enum ComposeMetadataError {
} }
impl ComposeDeploymentMetadata { impl ComposeDeploymentMetadata {
#[must_use]
pub fn for_project(project_name: String) -> Self {
Self {
project_name: Some(project_name),
}
}
#[must_use]
pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self {
Self {
project_name: cluster
.and_then(ExistingCluster::compose_project)
.map(ToOwned::to_owned),
}
}
/// Returns project name when deployment is bound to a specific compose /// Returns project name when deployment is bound to a specific compose
/// project. /// project.
#[must_use] #[must_use]
@ -43,33 +59,45 @@ impl ComposeDeploymentMetadata {
self.project_name.as_deref() self.project_name.as_deref()
} }
/// Builds an attach source for the same compose project using deployer /// Builds an existing-cluster descriptor for the same compose project
/// discovery to resolve services. /// using deployer discovery to resolve services.
pub fn attach_source(&self) -> Result<AttachSource, DynError> { pub fn existing_cluster(&self) -> Result<ExistingCluster, DynError> {
let project_name = self let project_name = self
.project_name() .project_name()
.ok_or(ComposeMetadataError::MissingProjectName)?; .ok_or(ComposeMetadataError::MissingProjectName)?;
Ok(AttachSource::compose_in_project( Ok(ExistingCluster::for_compose_project(
Vec::new(),
project_name.to_owned(), project_name.to_owned(),
)) ))
} }
/// Builds an attach source for the same compose project. /// Builds an existing-cluster descriptor for the same compose project.
pub fn attach_source_for_services( pub fn existing_cluster_for_services(
&self, &self,
services: Vec<String>, services: Vec<String>,
) -> Result<AttachSource, DynError> { ) -> Result<ExistingCluster, DynError> {
let project_name = self let project_name = self
.project_name() .project_name()
.ok_or(ComposeMetadataError::MissingProjectName)?; .ok_or(ComposeMetadataError::MissingProjectName)?;
Ok(AttachSource::compose_in_project( Ok(ExistingCluster::for_compose_services(
services,
project_name.to_owned(), project_name.to_owned(),
services,
)) ))
} }
#[doc(hidden)]
pub fn attach_source(&self) -> Result<ExistingCluster, DynError> {
self.existing_cluster()
}
#[doc(hidden)]
pub fn attach_source_for_services(
&self,
services: Vec<String>,
) -> Result<ExistingCluster, DynError> {
self.existing_cluster_for_services(services)
}
} }
impl<E: ComposeDeployEnv> Default for ComposeDeployer<E> { impl<E: ComposeDeployEnv> Default for ComposeDeployer<E> {

View File

@ -3,11 +3,10 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url; use reqwest::Url;
use testing_framework_core::{ use testing_framework_core::{
scenario::{ scenario::{
ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle,
DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers, build_source_orchestration_plan, orchestrate_sources_with_providers,
}, },
topology::DeploymentDescriptor, topology::DeploymentDescriptor,
@ -71,7 +70,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
} }
})?; })?;
if scenario.sources().is_attached() { if scenario.uses_existing_cluster() {
return self return self
.deploy_attached_only::<Caps>(scenario, source_plan) .deploy_attached_only::<Caps>(scenario, source_plan)
.await .await
@ -214,31 +213,15 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
return Ok(None); return Ok(None);
} }
let ScenarioSources::Attached { attach, .. } = scenario.sources() else { let attach = scenario
return Err(ComposeRunnerError::InternalInvariant { .existing_cluster()
.ok_or(ComposeRunnerError::InternalInvariant {
message: "attached node control requested outside attached source mode", message: "attached node control requested outside attached source mode",
}); })?;
}; let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let AttachSource::Compose { project, .. } = attach else { Ok(Some(Arc::new(node_control) as Arc<dyn NodeControlHandle<E>>))
return Err(ComposeRunnerError::InternalInvariant {
message: "compose deployer requires compose attach source for node control",
});
};
let Some(project_name) = project
.as_ref()
.map(|value| value.trim())
.filter(|value| !value.is_empty())
else {
return Err(ComposeRunnerError::InternalInvariant {
message: "attached compose mode requires explicit project name for node control",
});
};
Ok(Some(Arc::new(ComposeAttachedNodeControl {
project_name: project_name.to_owned(),
}) as Arc<dyn NodeControlHandle<E>>))
} }
fn attached_cluster_wait<Caps>( fn attached_cluster_wait<Caps>(
@ -248,16 +231,15 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
where where
Caps: Send + Sync, Caps: Send + Sync,
{ {
let ScenarioSources::Attached { attach, .. } = scenario.sources() else { let attach = scenario
return Err(ComposeRunnerError::InternalInvariant { .existing_cluster()
.ok_or(ComposeRunnerError::InternalInvariant {
message: "compose attached cluster wait requested outside attached source mode", message: "compose attached cluster wait requested outside attached source mode",
}); })?;
}; let cluster_wait = ComposeAttachedClusterWait::<E>::try_new(compose_runner_host(), attach)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(ComposeAttachedClusterWait::<E>::new( Ok(Arc::new(cluster_wait))
compose_runner_host(),
attach.clone(),
)))
} }
async fn build_runner<Caps>( async fn build_runner<Caps>(
@ -274,7 +256,8 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
{ {
let telemetry = observability.telemetry_handle()?; let telemetry = observability.telemetry_handle()?;
let node_control = self.maybe_node_control::<Caps>(&prepared.environment); let node_control = self.maybe_node_control::<Caps>(&prepared.environment);
let cluster_wait = self.managed_cluster_wait(project_name); let cluster_wait =
self.managed_cluster_wait(ComposeDeploymentMetadata::for_project(project_name))?;
log_observability_endpoints(&observability); log_observability_endpoints(&observability);
log_profiling_urls(&deployed.host, &deployed.host_ports); log_profiling_urls(&deployed.host, &deployed.host_ports);
@ -318,11 +301,18 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}) })
} }
fn managed_cluster_wait(&self, project_name: String) -> Arc<dyn ClusterWaitHandle<E>> { fn managed_cluster_wait(
Arc::new(ComposeAttachedClusterWait::<E>::new( &self,
compose_runner_host(), metadata: ComposeDeploymentMetadata,
AttachSource::compose_in_project(Vec::new(), project_name), ) -> Result<Arc<dyn ClusterWaitHandle<E>>, ComposeRunnerError> {
)) let existing_cluster = metadata
.existing_cluster()
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let cluster_wait =
ComposeAttachedClusterWait::<E>::try_new(compose_runner_host(), &existing_cluster)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(cluster_wait))
} }
fn log_deploy_start<Caps>( fn log_deploy_start<Caps>(
@ -378,15 +368,7 @@ where
E: ComposeDeployEnv, E: ComposeDeployEnv,
Caps: Send + Sync, Caps: Send + Sync,
{ {
let project_name = match scenario.sources() { ComposeDeploymentMetadata::from_existing_cluster(scenario.existing_cluster())
ScenarioSources::Attached {
attach: AttachSource::Compose { project, .. },
..
} => project.clone(),
_ => None,
};
ComposeDeploymentMetadata { project_name }
} }
struct DeployedNodes<E: ComposeDeployEnv> { struct DeployedNodes<E: ComposeDeployEnv> {

View File

@ -5,7 +5,7 @@ use std::{
use testing_framework_core::{ use testing_framework_core::{
adjust_timeout, adjust_timeout,
scenario::{Application, DynError, NodeControlHandle}, scenario::{Application, DynError, ExistingCluster, NodeControlHandle},
}; };
use tokio::{process::Command, time::timeout}; use tokio::{process::Command, time::timeout};
use tracing::info; use tracing::info;
@ -160,6 +160,22 @@ pub struct ComposeAttachedNodeControl {
pub(crate) project_name: String, pub(crate) project_name: String,
} }
impl ComposeAttachedNodeControl {
pub fn try_from_existing_cluster(source: &ExistingCluster) -> Result<Self, DynError> {
let Some(project_name) = source
.compose_project()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Err("attached compose node control requires explicit project name".into());
};
Ok(Self {
project_name: project_name.to_owned(),
})
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl<E: Application> NodeControlHandle<E> for ComposeAttachedNodeControl { impl<E: Application> NodeControlHandle<E> for ComposeAttachedNodeControl {
async fn restart_node(&self, name: &str) -> Result<(), DynError> { async fn restart_node(&self, name: &str) -> Result<(), DynError> {

View File

@ -7,8 +7,8 @@ use kube::{
api::{ListParams, ObjectList}, api::{ListParams, ObjectList},
}; };
use testing_framework_core::scenario::{ use testing_framework_core::scenario::{
AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, AttachProvider, AttachProviderError, AttachedNode, ClusterWaitHandle, DynError,
ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness,
}; };
use url::Url; use url::Url;
@ -37,7 +37,7 @@ pub(super) struct K8sAttachProvider<E: K8sDeployEnv> {
pub(super) struct K8sAttachedClusterWait<E: K8sDeployEnv> { pub(super) struct K8sAttachedClusterWait<E: K8sDeployEnv> {
client: Client, client: Client,
source: AttachSource, source: ExistingCluster,
_env: PhantomData<E>, _env: PhantomData<E>,
} }
@ -56,12 +56,14 @@ impl<E: K8sDeployEnv> K8sAttachProvider<E> {
} }
impl<E: K8sDeployEnv> K8sAttachedClusterWait<E> { impl<E: K8sDeployEnv> K8sAttachedClusterWait<E> {
pub(super) fn new(client: Client, source: AttachSource) -> Self { pub(super) fn try_new(client: Client, source: &ExistingCluster) -> Result<Self, DynError> {
Self { let _ = k8s_wait_request(source)?;
Ok(Self {
client, client,
source, source: source.clone(),
_env: PhantomData, _env: PhantomData,
} })
} }
} }
@ -69,7 +71,7 @@ impl<E: K8sDeployEnv> K8sAttachedClusterWait<E> {
impl<E: K8sDeployEnv> AttachProvider<E> for K8sAttachProvider<E> { impl<E: K8sDeployEnv> AttachProvider<E> for K8sAttachProvider<E> {
async fn discover( async fn discover(
&self, &self,
source: &AttachSource, source: &ExistingCluster,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> { ) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
let request = k8s_attach_request(source)?; let request = k8s_attach_request(source)?;
let services = discover_services(&self.client, request.namespace, request.label_selector) let services = discover_services(&self.client, request.namespace, request.label_selector)
@ -90,12 +92,10 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source } AttachProviderError::Discovery { source }
} }
fn k8s_attach_request(source: &AttachSource) -> Result<K8sAttachRequest<'_>, AttachProviderError> { fn k8s_attach_request(
let AttachSource::K8s { source: &ExistingCluster,
namespace, ) -> Result<K8sAttachRequest<'_>, AttachProviderError> {
label_selector, let Some(label_selector) = source.k8s_label_selector() else {
} = source
else {
return Err(AttachProviderError::UnsupportedSource { return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(), attach_source: source.clone(),
}); });
@ -108,7 +108,7 @@ fn k8s_attach_request(source: &AttachSource) -> Result<K8sAttachRequest<'_>, Att
} }
Ok(K8sAttachRequest { Ok(K8sAttachRequest {
namespace: namespace.as_deref().unwrap_or("default"), namespace: source.k8s_namespace().unwrap_or("default"),
label_selector, label_selector,
}) })
} }
@ -246,21 +246,17 @@ impl<E: K8sDeployEnv> ClusterWaitHandle<E> for K8sAttachedClusterWait<E> {
} }
} }
fn k8s_wait_request(source: &AttachSource) -> Result<K8sAttachRequest<'_>, DynError> { fn k8s_wait_request(source: &ExistingCluster) -> Result<K8sAttachRequest<'_>, DynError> {
let AttachSource::K8s { let label_selector = source
namespace, .k8s_label_selector()
label_selector, .ok_or_else(|| DynError::from("k8s cluster wait requires a k8s attach source"))?;
} = source
else {
return Err("k8s cluster wait requires a k8s attach source".into());
};
if label_selector.trim().is_empty() { if label_selector.trim().is_empty() {
return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into());
} }
Ok(K8sAttachRequest { Ok(K8sAttachRequest {
namespace: namespace.as_deref().unwrap_or("default"), namespace: source.k8s_namespace().unwrap_or("default"),
label_selector, label_selector,
}) })
} }

View File

@ -2,7 +2,7 @@ mod attach_provider;
mod orchestrator; mod orchestrator;
pub use orchestrator::{K8sDeployer, K8sRunnerError}; pub use orchestrator::{K8sDeployer, K8sRunnerError};
use testing_framework_core::scenario::{AttachSource, DynError}; use testing_framework_core::scenario::{DynError, ExistingCluster};
/// Kubernetes deployment metadata returned by k8s-specific deployment APIs. /// Kubernetes deployment metadata returned by k8s-specific deployment APIs.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
@ -22,6 +22,18 @@ enum K8sMetadataError {
} }
impl K8sDeploymentMetadata { impl K8sDeploymentMetadata {
#[must_use]
pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self {
Self {
namespace: cluster
.and_then(ExistingCluster::k8s_namespace)
.map(ToOwned::to_owned),
label_selector: cluster
.and_then(ExistingCluster::k8s_label_selector)
.map(ToOwned::to_owned),
}
}
/// Returns namespace when deployment is bound to a specific namespace. /// Returns namespace when deployment is bound to a specific namespace.
#[must_use] #[must_use]
pub fn namespace(&self) -> Option<&str> { pub fn namespace(&self) -> Option<&str> {
@ -34,16 +46,21 @@ impl K8sDeploymentMetadata {
self.label_selector.as_deref() self.label_selector.as_deref()
} }
/// Builds an attach source for the same k8s deployment scope. /// Builds an existing-cluster descriptor for the same k8s deployment scope.
pub fn attach_source(&self) -> Result<AttachSource, DynError> { pub fn existing_cluster(&self) -> Result<ExistingCluster, DynError> {
let namespace = self.namespace().ok_or(K8sMetadataError::MissingNamespace)?; let namespace = self.namespace().ok_or(K8sMetadataError::MissingNamespace)?;
let label_selector = self let label_selector = self
.label_selector() .label_selector()
.ok_or(K8sMetadataError::MissingLabelSelector)?; .ok_or(K8sMetadataError::MissingLabelSelector)?;
Ok(AttachSource::k8s_in_namespace( Ok(ExistingCluster::for_k8s_selector_in_namespace(
label_selector.to_owned(),
namespace.to_owned(), namespace.to_owned(),
label_selector.to_owned(),
)) ))
} }
#[doc(hidden)]
pub fn attach_source(&self) -> Result<ExistingCluster, DynError> {
self.existing_cluster()
}
} }

View File

@ -5,12 +5,11 @@ use kube::Client;
use reqwest::Url; use reqwest::Url;
use testing_framework_core::{ use testing_framework_core::{
scenario::{ scenario::{
Application, ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer,
Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError,
MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl,
RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
build_source_orchestration_plan, orchestrate_sources_with_providers,
}, },
topology::DeploymentDescriptor, topology::DeploymentDescriptor,
}; };
@ -180,7 +179,7 @@ where
let observability = resolve_observability_inputs(scenario.capabilities())?; let observability = resolve_observability_inputs(scenario.capabilities())?;
if scenario.sources().is_attached() { if scenario.uses_existing_cluster() {
let runner = deploy_attached_only::<E, Caps>(scenario, source_plan, observability).await?; let runner = deploy_attached_only::<E, Caps>(scenario, source_plan, observability).await?;
return Ok((runner, attached_metadata(scenario))); return Ok((runner, attached_metadata(scenario)));
} }
@ -250,23 +249,7 @@ where
E: K8sDeployEnv, E: K8sDeployEnv,
Caps: Send + Sync, Caps: Send + Sync,
{ {
match scenario.sources() { K8sDeploymentMetadata::from_existing_cluster(scenario.existing_cluster())
ScenarioSources::Attached {
attach:
AttachSource::K8s {
namespace,
label_selector,
},
..
} => K8sDeploymentMetadata {
namespace: namespace.clone(),
label_selector: Some(label_selector.clone()),
},
_ => K8sDeploymentMetadata {
namespace: None,
label_selector: None,
},
}
} }
fn attached_cluster_wait<E, Caps>( fn attached_cluster_wait<E, Caps>(
@ -277,16 +260,15 @@ where
E: K8sDeployEnv, E: K8sDeployEnv,
Caps: Send + Sync, Caps: Send + Sync,
{ {
let ScenarioSources::Attached { attach, .. } = scenario.sources() else { let attach = scenario
return Err(K8sRunnerError::InternalInvariant { .existing_cluster()
.ok_or_else(|| K8sRunnerError::InternalInvariant {
message: "k8s attached cluster wait requested outside attached source mode".to_owned(), message: "k8s attached cluster wait requested outside attached source mode".to_owned(),
}); })?;
}; let cluster_wait = K8sAttachedClusterWait::<E>::try_new(client, attach)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(K8sAttachedClusterWait::<E>::new( Ok(Arc::new(cluster_wait))
client,
attach.clone(),
)))
} }
fn managed_cluster_wait<E: K8sDeployEnv>( fn managed_cluster_wait<E: K8sDeployEnv>(
@ -295,13 +277,12 @@ fn managed_cluster_wait<E: K8sDeployEnv>(
) -> Result<Arc<dyn ClusterWaitHandle<E>>, K8sRunnerError> { ) -> Result<Arc<dyn ClusterWaitHandle<E>>, K8sRunnerError> {
let client = client_from_cluster(cluster)?; let client = client_from_cluster(cluster)?;
let attach_source = metadata let attach_source = metadata
.attach_source() .existing_cluster()
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
let cluster_wait = K8sAttachedClusterWait::<E>::try_new(client, &attach_source)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?; .map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(K8sAttachedClusterWait::<E>::new( Ok(Arc::new(cluster_wait))
client,
attach_source,
)))
} }
fn client_from_cluster(cluster: &Option<ClusterEnvironment>) -> Result<Client, K8sRunnerError> { fn client_from_cluster(cluster: &Option<ClusterEnvironment>) -> Result<Client, K8sRunnerError> {