From 712f93db18b33bb13a9edc5c007a2f26412a6a27 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 14:24:27 +0100 Subject: [PATCH] refactor(compose): tighten attach errors and readability --- .../tests/compose_attach_node_control.rs | 35 +++++++--- .../compose/src/deployer/attach_provider.rs | 20 +++--- .../deployers/compose/src/deployer/mod.rs | 12 +++- .../compose/src/deployer/orchestrator.rs | 70 ++++++++++++------- 4 files changed, 86 insertions(+), 51 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 62c3c3d..386fd77 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -1,10 +1,10 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; -use anyhow::{Result, anyhow}; +use anyhow::{Error, Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; -use tokio::process::Command; +use tokio::{process::Command, time::sleep}; #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] @@ -19,7 +19,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { match deployer.deploy_with_metadata(&managed).await { Ok(result) => result, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), - Err(error) => return Err(anyhow::Error::new(error)), + Err(error) => return Err(Error::new(error)), }; let project_name = metadata @@ -37,7 +37,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { let attached_runner: Runner = match deployer.deploy(&attached).await { Ok(runner) => runner, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), - Err(error) => return Err(anyhow::Error::new(error)), + Err(error) => return Err(Error::new(error)), }; let control = attached_runner @@ -88,12 +88,25 @@ async fn service_started_at(project: &str, service: &str) -> Result { ]) .await?; - let container_id = container_id + let container_ids: Vec<&str> = container_id .lines() - .next() .map(str::trim) .filter(|value| !value.is_empty()) - .ok_or_else(|| anyhow!("no running container found for service '{service}'"))?; + .collect(); + + let container_id = match container_ids.as_slice() { + [] => { + return Err(anyhow!( + "no running container found for service '{service}'" + )); + } + [id] => *id, + _ => { + return Err(anyhow!( + "multiple running containers found for service '{service}'" + )); + } + }; let started_at = run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?; @@ -115,7 +128,7 @@ async fn wait_until_service_restarted( previous_started_at: &str, timeout: Duration, ) -> Result<()> { - let deadline = std::time::Instant::now() + timeout; + let deadline = Instant::now() + timeout; loop { let started_at = service_started_at(project, service).await?; @@ -124,13 +137,13 @@ async fn wait_until_service_restarted( return Ok(()); } - if std::time::Instant::now() >= deadline { + if Instant::now() >= deadline { return Err(anyhow!( "timed out waiting for restarted compose service '{service}'" )); } - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; } } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 7ddf243..23d574e 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -18,6 +18,12 @@ pub(super) struct ComposeAttachProvider { _env: PhantomData, } +#[derive(Debug, thiserror::Error)] +enum ComposeAttachDiscoveryError { + #[error("compose attach source requires an explicit project name")] + MissingProjectName, +} + impl ComposeAttachProvider { pub(super) fn new(host: String) -> Self { Self { @@ -45,7 +51,7 @@ impl AttachProvider for ComposeAttachProvider { let project = project .as_ref() .ok_or_else(|| AttachProviderError::Discovery { - source: "compose attach source requires an explicit project name".into(), + source: ComposeAttachDiscoveryError::MissingProjectName.into(), })?; let services = resolve_services(project, services) @@ -57,9 +63,11 @@ impl AttachProvider for ComposeAttachProvider { let container_id = discover_service_container_id(project, service) .await .map_err(to_discovery_error)?; + let api_port = discover_api_port(&container_id) .await .map_err(to_discovery_error)?; + let endpoint = build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?; let source = ExternalNodeSource::new(service.clone(), endpoint.to_string()); @@ -84,15 +92,7 @@ async fn resolve_services(project: &str, requested: &[String]) -> Result Result { diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 55cbc2f..8bd695d 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -29,6 +29,12 @@ pub struct ComposeDeploymentMetadata { pub project_name: Option, } +#[derive(Debug, thiserror::Error)] +enum ComposeMetadataError { + #[error("compose deployment metadata has no project name")] + MissingProjectName, +} + impl ComposeDeploymentMetadata { /// Returns project name when deployment is bound to a specific compose /// project. @@ -42,9 +48,9 @@ impl ComposeDeploymentMetadata { &self, services: Vec, ) -> Result { - let Some(project_name) = self.project_name() else { - return Err("compose metadata has no project name".into()); - }; + let project_name = self + .project_name() + .ok_or(ComposeMetadataError::MissingProjectName)?; Ok(AttachSource::compose(services).with_project(project_name.to_owned())) } diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 7889f68..a001680 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -6,8 +6,8 @@ use testing_framework_core::{ ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, ScenarioSources, SourceProviders, StaticManagedProvider, - build_source_orchestration_plan, orchestrate_sources_with_providers, + Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -102,19 +102,11 @@ impl DeploymentOrchestrator { ) .await?; - // Source orchestration currently runs here after managed clients are prepared. - let source_providers = SourceProviders::default() - .with_managed(Arc::new(StaticManagedProvider::new( - deployed.node_clients.snapshot(), - ))) - .with_attach(Arc::new(ComposeAttachProvider::::new( - compose_runner_host(), - ))) - .with_external(Arc::new(ApplicationExternalProvider)); + let source_providers = self.source_providers(deployed.node_clients.snapshot()); - deployed.node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) - .await - .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + deployed.node_clients = self + .resolve_node_clients(&source_plan, source_providers) + .await?; let project_name = prepared.environment.project_name().to_owned(); @@ -147,25 +139,19 @@ impl DeploymentOrchestrator { async fn deploy_attached_only( &self, scenario: &Scenario, - source_plan: testing_framework_core::scenario::SourceOrchestrationPlan, + source_plan: SourceOrchestrationPlan, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let observability = resolve_observability_inputs(scenario)?; - let source_providers = SourceProviders::default() - .with_managed(Arc::new(StaticManagedProvider::new(Vec::new()))) - .with_attach(Arc::new(ComposeAttachProvider::::new( - compose_runner_host(), - ))) - .with_external(Arc::new(ApplicationExternalProvider)); - let node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) - .await - .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let source_providers = self.source_providers(Vec::new()); - if node_clients.is_empty() { - return Err(ComposeRunnerError::RuntimePreflight); - } + let node_clients = self + .resolve_node_clients(&source_plan, source_providers) + .await?; + + self.ensure_non_empty_node_clients(&node_clients)?; let node_control = self.attached_node_control::(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; @@ -183,6 +169,36 @@ impl DeploymentOrchestrator { Ok(Runner::new(context, Some(cleanup_guard))) } + fn source_providers(&self, managed_clients: Vec) -> SourceProviders { + SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new(managed_clients))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)) + } + + async fn resolve_node_clients( + &self, + source_plan: &SourceOrchestrationPlan, + source_providers: SourceProviders, + ) -> Result, ComposeRunnerError> { + orchestrate_sources_with_providers(source_plan, source_providers) + .await + .map_err(|source| ComposeRunnerError::SourceOrchestration { source }) + } + + fn ensure_non_empty_node_clients( + &self, + node_clients: &NodeClients, + ) -> Result<(), ComposeRunnerError> { + if node_clients.is_empty() { + return Err(ComposeRunnerError::RuntimePreflight); + } + + Ok(()) + } + fn attached_node_control( &self, scenario: &Scenario,