Confine backend cluster details to deployer adapters

This commit is contained in:
andrussal 2026-03-08 14:17:56 +01:00
parent f18820b8d1
commit fbede7f535
7 changed files with 90 additions and 65 deletions

View File

@ -42,12 +42,14 @@ impl<E: ComposeDeployEnv> ComposeAttachProvider<E> {
}
impl<E: ComposeDeployEnv> ComposeAttachedClusterWait<E> {
pub(super) fn new(host: String, source: ExistingCluster) -> Self {
Self {
pub(super) fn try_new(host: String, source: &ExistingCluster) -> Result<Self, DynError> {
let _ = compose_wait_request(source)?;
Ok(Self {
host,
source,
source: source.clone(),
_env: PhantomData,
}
})
}
}

View File

@ -36,6 +36,22 @@ enum ComposeMetadataError {
}
impl ComposeDeploymentMetadata {
#[must_use]
pub fn for_project(project_name: String) -> Self {
Self {
project_name: Some(project_name),
}
}
#[must_use]
pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self {
Self {
project_name: cluster
.and_then(ExistingCluster::compose_project)
.map(ToOwned::to_owned),
}
}
/// Returns project name when deployment is bound to a specific compose
/// project.
#[must_use]

View File

@ -3,12 +3,11 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url;
use testing_framework_core::{
scenario::{
ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy,
ExistingCluster, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, RunContext, Runner, Scenario, SourceOrchestrationPlan,
SourceProviders, StaticManagedProvider, build_source_orchestration_plan,
orchestrate_sources_with_providers,
ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -219,20 +218,10 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
.ok_or(ComposeRunnerError::InternalInvariant {
message: "attached node control requested outside attached source mode",
})?;
let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let Some(project_name) = attach
.compose_project()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Err(ComposeRunnerError::InternalInvariant {
message: "attached compose mode requires explicit project name for node control",
});
};
Ok(Some(Arc::new(ComposeAttachedNodeControl {
project_name: project_name.to_owned(),
}) as Arc<dyn NodeControlHandle<E>>))
Ok(Some(Arc::new(node_control) as Arc<dyn NodeControlHandle<E>>))
}
fn attached_cluster_wait<Caps>(
@ -247,11 +236,10 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
.ok_or(ComposeRunnerError::InternalInvariant {
message: "compose attached cluster wait requested outside attached source mode",
})?;
let cluster_wait = ComposeAttachedClusterWait::<E>::try_new(compose_runner_host(), attach)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
attach.clone(),
)))
Ok(Arc::new(cluster_wait))
}
async fn build_runner<Caps>(
@ -268,7 +256,8 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
{
let telemetry = observability.telemetry_handle()?;
let node_control = self.maybe_node_control::<Caps>(&prepared.environment);
let cluster_wait = self.managed_cluster_wait(project_name);
let cluster_wait =
self.managed_cluster_wait(ComposeDeploymentMetadata::for_project(project_name))?;
log_observability_endpoints(&observability);
log_profiling_urls(&deployed.host, &deployed.host_ports);
@ -312,11 +301,18 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
})
}
fn managed_cluster_wait(&self, project_name: String) -> Arc<dyn ClusterWaitHandle<E>> {
Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
ExistingCluster::compose_in_project(Vec::new(), project_name),
))
fn managed_cluster_wait(
&self,
metadata: ComposeDeploymentMetadata,
) -> Result<Arc<dyn ClusterWaitHandle<E>>, ComposeRunnerError> {
let existing_cluster = metadata
.existing_cluster()
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let cluster_wait =
ComposeAttachedClusterWait::<E>::try_new(compose_runner_host(), &existing_cluster)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(cluster_wait))
}
fn log_deploy_start<Caps>(
@ -372,12 +368,7 @@ where
E: ComposeDeployEnv,
Caps: Send + Sync,
{
let project_name = scenario
.existing_cluster()
.and_then(|attach| attach.compose_project())
.map(ToOwned::to_owned);
ComposeDeploymentMetadata { project_name }
ComposeDeploymentMetadata::from_existing_cluster(scenario.existing_cluster())
}
struct DeployedNodes<E: ComposeDeployEnv> {

View File

@ -5,7 +5,7 @@ use std::{
use testing_framework_core::{
adjust_timeout,
scenario::{Application, DynError, NodeControlHandle},
scenario::{Application, DynError, ExistingCluster, NodeControlHandle},
};
use tokio::{process::Command, time::timeout};
use tracing::info;
@ -160,6 +160,22 @@ pub struct ComposeAttachedNodeControl {
pub(crate) project_name: String,
}
impl ComposeAttachedNodeControl {
pub fn try_from_existing_cluster(source: &ExistingCluster) -> Result<Self, DynError> {
let Some(project_name) = source
.compose_project()
.map(str::trim)
.filter(|value| !value.is_empty())
else {
return Err("attached compose node control requires explicit project name".into());
};
Ok(Self {
project_name: project_name.to_owned(),
})
}
}
#[async_trait::async_trait]
impl<E: Application> NodeControlHandle<E> for ComposeAttachedNodeControl {
async fn restart_node(&self, name: &str) -> Result<(), DynError> {

View File

@ -56,12 +56,14 @@ impl<E: K8sDeployEnv> K8sAttachProvider<E> {
}
impl<E: K8sDeployEnv> K8sAttachedClusterWait<E> {
pub(super) fn new(client: Client, source: ExistingCluster) -> Self {
Self {
pub(super) fn try_new(client: Client, source: &ExistingCluster) -> Result<Self, DynError> {
let _ = k8s_wait_request(source)?;
Ok(Self {
client,
source,
source: source.clone(),
_env: PhantomData,
}
})
}
}

View File

@ -22,6 +22,18 @@ enum K8sMetadataError {
}
impl K8sDeploymentMetadata {
#[must_use]
pub fn from_existing_cluster(cluster: Option<&ExistingCluster>) -> Self {
Self {
namespace: cluster
.and_then(ExistingCluster::k8s_namespace)
.map(ToOwned::to_owned),
label_selector: cluster
.and_then(ExistingCluster::k8s_label_selector)
.map(ToOwned::to_owned),
}
}
/// Returns namespace when deployment is bound to a specific namespace.
#[must_use]
pub fn namespace(&self) -> Option<&str> {

View File

@ -249,19 +249,7 @@ where
E: K8sDeployEnv,
Caps: Send + Sync,
{
let namespace = scenario
.existing_cluster()
.and_then(|attach| attach.k8s_namespace())
.map(ToOwned::to_owned);
let label_selector = scenario
.existing_cluster()
.and_then(|attach| attach.k8s_label_selector())
.map(ToOwned::to_owned);
K8sDeploymentMetadata {
namespace,
label_selector,
}
K8sDeploymentMetadata::from_existing_cluster(scenario.existing_cluster())
}
fn attached_cluster_wait<E, Caps>(
@ -277,11 +265,10 @@ where
.ok_or_else(|| K8sRunnerError::InternalInvariant {
message: "k8s attached cluster wait requested outside attached source mode".to_owned(),
})?;
let cluster_wait = K8sAttachedClusterWait::<E>::try_new(client, attach)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(K8sAttachedClusterWait::<E>::new(
client,
attach.clone(),
)))
Ok(Arc::new(cluster_wait))
}
fn managed_cluster_wait<E: K8sDeployEnv>(
@ -292,11 +279,10 @@ fn managed_cluster_wait<E: K8sDeployEnv>(
let attach_source = metadata
.existing_cluster()
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
let cluster_wait = K8sAttachedClusterWait::<E>::try_new(client, &attach_source)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
Ok(Arc::new(K8sAttachedClusterWait::<E>::new(
client,
attach_source,
)))
Ok(Arc::new(cluster_wait))
}
fn client_from_cluster(cluster: &Option<ClusterEnvironment>) -> Result<Client, K8sRunnerError> {