refactor(compose): tighten attach errors and readability

This commit is contained in:
andrussal 2026-03-06 14:24:27 +01:00
parent d9c99322c7
commit 712f93db18
4 changed files with 86 additions and 51 deletions

View File

@ -1,10 +1,10 @@
use std::time::Duration;
use std::time::{Duration, Instant};
use anyhow::{Result, anyhow};
use anyhow::{Error, Result, anyhow};
use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder};
use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner};
use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError};
use tokio::process::Command;
use tokio::{process::Command, time::sleep};
#[tokio::test]
#[ignore = "requires Docker and mutates compose runtime state"]
@ -19,7 +19,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
match deployer.deploy_with_metadata(&managed).await {
Ok(result) => result,
Err(ComposeRunnerError::DockerUnavailable) => return Ok(()),
Err(error) => return Err(anyhow::Error::new(error)),
Err(error) => return Err(Error::new(error)),
};
let project_name = metadata
@ -37,7 +37,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
let attached_runner: Runner<LbcExtEnv> = match deployer.deploy(&attached).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => return Ok(()),
Err(error) => return Err(anyhow::Error::new(error)),
Err(error) => return Err(Error::new(error)),
};
let control = attached_runner
@ -88,12 +88,25 @@ async fn service_started_at(project: &str, service: &str) -> Result<String> {
])
.await?;
let container_id = container_id
let container_ids: Vec<&str> = container_id
.lines()
.next()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| anyhow!("no running container found for service '{service}'"))?;
.collect();
let container_id = match container_ids.as_slice() {
[] => {
return Err(anyhow!(
"no running container found for service '{service}'"
));
}
[id] => *id,
_ => {
return Err(anyhow!(
"multiple running containers found for service '{service}'"
));
}
};
let started_at =
run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?;
@ -115,7 +128,7 @@ async fn wait_until_service_restarted(
previous_started_at: &str,
timeout: Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + timeout;
let deadline = Instant::now() + timeout;
loop {
let started_at = service_started_at(project, service).await?;
@ -124,13 +137,13 @@ async fn wait_until_service_restarted(
return Ok(());
}
if std::time::Instant::now() >= deadline {
if Instant::now() >= deadline {
return Err(anyhow!(
"timed out waiting for restarted compose service '{service}'"
));
}
tokio::time::sleep(Duration::from_millis(500)).await;
sleep(Duration::from_millis(500)).await;
}
}

View File

@ -18,6 +18,12 @@ pub(super) struct ComposeAttachProvider<E: ComposeDeployEnv> {
_env: PhantomData<E>,
}
#[derive(Debug, thiserror::Error)]
enum ComposeAttachDiscoveryError {
#[error("compose attach source requires an explicit project name")]
MissingProjectName,
}
impl<E: ComposeDeployEnv> ComposeAttachProvider<E> {
pub(super) fn new(host: String) -> Self {
Self {
@ -45,7 +51,7 @@ impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
let project = project
.as_ref()
.ok_or_else(|| AttachProviderError::Discovery {
source: "compose attach source requires an explicit project name".into(),
source: ComposeAttachDiscoveryError::MissingProjectName.into(),
})?;
let services = resolve_services(project, services)
@ -57,9 +63,11 @@ impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
let container_id = discover_service_container_id(project, service)
.await
.map_err(to_discovery_error)?;
let api_port = discover_api_port(&container_id)
.await
.map_err(to_discovery_error)?;
let endpoint =
build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?;
let source = ExternalNodeSource::new(service.clone(), endpoint.to_string());
@ -84,15 +92,7 @@ async fn resolve_services(project: &str, requested: &[String]) -> Result<Vec<Str
return Ok(requested.to_owned());
}
let discovered = discover_attachable_services(project).await?;
if discovered.is_empty() {
return Err(
format!("no running compose services discovered for project '{project}'").into(),
);
}
Ok(discovered)
discover_attachable_services(project).await
}
async fn discover_api_port(container_id: &str) -> Result<u16, DynError> {

View File

@ -29,6 +29,12 @@ pub struct ComposeDeploymentMetadata {
pub project_name: Option<String>,
}
#[derive(Debug, thiserror::Error)]
enum ComposeMetadataError {
#[error("compose deployment metadata has no project name")]
MissingProjectName,
}
impl ComposeDeploymentMetadata {
/// Returns project name when deployment is bound to a specific compose
/// project.
@ -42,9 +48,9 @@ impl ComposeDeploymentMetadata {
&self,
services: Vec<String>,
) -> Result<AttachSource, DynError> {
let Some(project_name) = self.project_name() else {
return Err("compose metadata has no project name".into());
};
let project_name = self
.project_name()
.ok_or(ComposeMetadataError::MissingProjectName)?;
Ok(AttachSource::compose(services).with_project(project_name.to_owned()))
}

View File

@ -6,8 +6,8 @@ use testing_framework_core::{
ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, ScenarioSources, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -102,19 +102,11 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
)
.await?;
// Source orchestration currently runs here after managed clients are prepared.
let source_providers = SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(
deployed.node_clients.snapshot(),
)))
.with_attach(Arc::new(ComposeAttachProvider::<E>::new(
compose_runner_host(),
)))
.with_external(Arc::new(ApplicationExternalProvider));
let source_providers = self.source_providers(deployed.node_clients.snapshot());
deployed.node_clients = orchestrate_sources_with_providers(&source_plan, source_providers)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
deployed.node_clients = self
.resolve_node_clients(&source_plan, source_providers)
.await?;
let project_name = prepared.environment.project_name().to_owned();
@ -147,25 +139,19 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
async fn deploy_attached_only<Caps>(
&self,
scenario: &Scenario<E, Caps>,
source_plan: testing_framework_core::scenario::SourceOrchestrationPlan,
source_plan: SourceOrchestrationPlan,
) -> Result<Runner<E>, ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
let observability = resolve_observability_inputs(scenario)?;
let source_providers = SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(Vec::new())))
.with_attach(Arc::new(ComposeAttachProvider::<E>::new(
compose_runner_host(),
)))
.with_external(Arc::new(ApplicationExternalProvider));
let node_clients = orchestrate_sources_with_providers(&source_plan, source_providers)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let source_providers = self.source_providers(Vec::new());
if node_clients.is_empty() {
return Err(ComposeRunnerError::RuntimePreflight);
}
let node_clients = self
.resolve_node_clients(&source_plan, source_providers)
.await?;
self.ensure_non_empty_node_clients(&node_clients)?;
let node_control = self.attached_node_control::<Caps>(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
@ -183,6 +169,36 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
Ok(Runner::new(context, Some(cleanup_guard)))
}
fn source_providers(&self, managed_clients: Vec<E::NodeClient>) -> SourceProviders<E> {
SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(managed_clients)))
.with_attach(Arc::new(ComposeAttachProvider::<E>::new(
compose_runner_host(),
)))
.with_external(Arc::new(ApplicationExternalProvider))
}
async fn resolve_node_clients(
&self,
source_plan: &SourceOrchestrationPlan,
source_providers: SourceProviders<E>,
) -> Result<NodeClients<E>, ComposeRunnerError> {
orchestrate_sources_with_providers(source_plan, source_providers)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })
}
fn ensure_non_empty_node_clients(
&self,
node_clients: &NodeClients<E>,
) -> Result<(), ComposeRunnerError> {
if node_clients.is_empty() {
return Err(ComposeRunnerError::RuntimePreflight);
}
Ok(())
}
fn attached_node_control<Caps>(
&self,
scenario: &Scenario<E, Caps>,