From fbede7f535da53b57b6f45641e3931a537552f13 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 8 Mar 2026 14:17:56 +0100 Subject: [PATCH] Confine backend cluster details to deployer adapters --- .../compose/src/deployer/attach_provider.rs | 10 +-- .../deployers/compose/src/deployer/mod.rs | 16 +++++ .../compose/src/deployer/orchestrator.rs | 61 ++++++++----------- .../deployers/compose/src/docker/control.rs | 18 +++++- .../k8s/src/deployer/attach_provider.rs | 10 +-- .../deployers/k8s/src/deployer/mod.rs | 12 ++++ .../k8s/src/deployer/orchestrator.rs | 28 +++------ 7 files changed, 90 insertions(+), 65 deletions(-) diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 314ae3d..8874fd1 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -42,12 +42,14 @@ impl ComposeAttachProvider { } impl ComposeAttachedClusterWait { - pub(super) fn new(host: String, source: ExistingCluster) -> Self { - Self { + pub(super) fn try_new(host: String, source: &ExistingCluster) -> Result { + let _ = compose_wait_request(source)?; + + Ok(Self { host, - source, + source: source.clone(), _env: PhantomData, - } + }) } } diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 66859c1..2eab7c6 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -36,6 +36,22 @@ enum ComposeMetadataError { } impl ComposeDeploymentMetadata { + #[must_use] + pub fn for_project(project_name: String) -> Self { + Self { + project_name: Some(project_name), + } + } + + #[must_use] + pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self { + Self { + project_name: cluster + .and_then(ExistingCluster::compose_project) + .map(ToOwned::to_owned), + } + } + /// Returns project name when deployment is bound to a specific compose /// project. #[must_use] diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index d9cf18c..29b58c3 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,12 +3,11 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, - ExistingCluster, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, - NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, SourceOrchestrationPlan, - SourceProviders, StaticManagedProvider, build_source_orchestration_plan, - orchestrate_sources_with_providers, + ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, + FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, + Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -219,20 +218,10 @@ impl DeploymentOrchestrator { .ok_or(ComposeRunnerError::InternalInvariant { message: "attached node control requested outside attached source mode", })?; + let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; - let Some(project_name) = attach - .compose_project() - .map(str::trim) - .filter(|value| !value.is_empty()) - else { - return Err(ComposeRunnerError::InternalInvariant { - message: "attached compose mode requires explicit project name for node control", - }); - }; - - Ok(Some(Arc::new(ComposeAttachedNodeControl { - project_name: project_name.to_owned(), - }) as Arc>)) + Ok(Some(Arc::new(node_control) as Arc>)) } fn attached_cluster_wait( @@ -247,11 +236,10 @@ impl DeploymentOrchestrator { .ok_or(ComposeRunnerError::InternalInvariant { message: "compose attached cluster wait requested outside attached source mode", })?; + let cluster_wait = ComposeAttachedClusterWait::::try_new(compose_runner_host(), attach) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - attach.clone(), - ))) + Ok(Arc::new(cluster_wait)) } async fn build_runner( @@ -268,7 +256,8 @@ impl DeploymentOrchestrator { { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); - let cluster_wait = self.managed_cluster_wait(project_name); + let cluster_wait = + self.managed_cluster_wait(ComposeDeploymentMetadata::for_project(project_name))?; log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); @@ -312,11 +301,18 @@ impl DeploymentOrchestrator { }) } - fn managed_cluster_wait(&self, project_name: String) -> Arc> { - Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - ExistingCluster::compose_in_project(Vec::new(), project_name), - )) + fn managed_cluster_wait( + &self, + metadata: ComposeDeploymentMetadata, + ) -> Result>, ComposeRunnerError> { + let existing_cluster = metadata + .existing_cluster() + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let cluster_wait = + ComposeAttachedClusterWait::::try_new(compose_runner_host(), &existing_cluster) + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + + Ok(Arc::new(cluster_wait)) } fn log_deploy_start( @@ -372,12 +368,7 @@ where E: ComposeDeployEnv, Caps: Send + Sync, { - let project_name = scenario - .existing_cluster() - .and_then(|attach| attach.compose_project()) - .map(ToOwned::to_owned); - - ComposeDeploymentMetadata { project_name } + ComposeDeploymentMetadata::from_existing_cluster(scenario.existing_cluster()) } struct DeployedNodes { diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 8e98948..90b8a3d 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -5,7 +5,7 @@ use std::{ use testing_framework_core::{ adjust_timeout, - scenario::{Application, DynError, NodeControlHandle}, + scenario::{Application, DynError, ExistingCluster, NodeControlHandle}, }; use tokio::{process::Command, time::timeout}; use tracing::info; @@ -160,6 +160,22 @@ pub struct ComposeAttachedNodeControl { pub(crate) project_name: String, } +impl ComposeAttachedNodeControl { + pub fn try_from_existing_cluster(source: &ExistingCluster) -> Result { + let Some(project_name) = source + .compose_project() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return Err("attached compose node control requires explicit project name".into()); + }; + + Ok(Self { + project_name: project_name.to_owned(), + }) + } +} + #[async_trait::async_trait] impl NodeControlHandle for ComposeAttachedNodeControl { async fn restart_node(&self, name: &str) -> Result<(), DynError> { diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index cfa0dc2..43925bc 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -56,12 +56,14 @@ impl K8sAttachProvider { } impl K8sAttachedClusterWait { - pub(super) fn new(client: Client, source: ExistingCluster) -> Self { - Self { + pub(super) fn try_new(client: Client, source: &ExistingCluster) -> Result { + let _ = k8s_wait_request(source)?; + + Ok(Self { client, - source, + source: source.clone(), _env: PhantomData, - } + }) } } diff --git a/testing-framework/deployers/k8s/src/deployer/mod.rs b/testing-framework/deployers/k8s/src/deployer/mod.rs index eae7507..e20fcc5 100644 --- a/testing-framework/deployers/k8s/src/deployer/mod.rs +++ b/testing-framework/deployers/k8s/src/deployer/mod.rs @@ -22,6 +22,18 @@ enum K8sMetadataError { } impl K8sDeploymentMetadata { + #[must_use] + pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self { + Self { + namespace: cluster + .and_then(ExistingCluster::k8s_namespace) + .map(ToOwned::to_owned), + label_selector: cluster + .and_then(ExistingCluster::k8s_label_selector) + .map(ToOwned::to_owned), + } + } + /// Returns namespace when deployment is bound to a specific namespace. #[must_use] pub fn namespace(&self) -> Option<&str> { diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 836a892..5fa21fe 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -249,19 +249,7 @@ where E: K8sDeployEnv, Caps: Send + Sync, { - let namespace = scenario - .existing_cluster() - .and_then(|attach| attach.k8s_namespace()) - .map(ToOwned::to_owned); - let label_selector = scenario - .existing_cluster() - .and_then(|attach| attach.k8s_label_selector()) - .map(ToOwned::to_owned); - - K8sDeploymentMetadata { - namespace, - label_selector, - } + K8sDeploymentMetadata::from_existing_cluster(scenario.existing_cluster()) } fn attached_cluster_wait( @@ -277,11 +265,10 @@ where .ok_or_else(|| K8sRunnerError::InternalInvariant { message: "k8s attached cluster wait requested outside attached source mode".to_owned(), })?; + let cluster_wait = K8sAttachedClusterWait::::try_new(client, attach) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(K8sAttachedClusterWait::::new( - client, - attach.clone(), - ))) + Ok(Arc::new(cluster_wait)) } fn managed_cluster_wait( @@ -292,11 +279,10 @@ fn managed_cluster_wait( let attach_source = metadata .existing_cluster() .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + let cluster_wait = K8sAttachedClusterWait::::try_new(client, &attach_source) + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; - Ok(Arc::new(K8sAttachedClusterWait::::new( - client, - attach_source, - ))) + Ok(Arc::new(cluster_wait)) } fn client_from_cluster(cluster: &Option) -> Result {