From 4195707aa723cfcc43ea93cdc2947c1a7287a0ec Mon Sep 17 00:00:00 2001 From: andrussal Date: Mon, 2 Mar 2026 11:19:55 +0100 Subject: [PATCH 01/14] Add compose attached node control and attach discovery --- .../tests/compose_attach_node_control.rs | 196 ++++++++++++++++++ logos/examples/tests/node_config_override.rs | 104 ++++++++++ testing-framework/core/src/scenario/mod.rs | 10 +- .../core/src/scenario/runtime/mod.rs | 7 +- .../src/scenario/runtime/orchestration/mod.rs | 5 +- .../source_orchestration_plan.rs | 34 +-- .../runtime/orchestration/source_resolver.rs | 14 +- .../src/scenario/runtime/providers/mod.rs | 2 +- .../deployers/compose/Cargo.toml | 2 +- .../compose/src/deployer/attach_provider.rs | 146 +++++++++++++ .../deployers/compose/src/deployer/mod.rs | 1 + .../compose/src/deployer/orchestrator.rs | 107 +++++++++- .../deployers/compose/src/docker/attached.rs | 129 ++++++++++++ .../deployers/compose/src/docker/control.rs | 113 +++++++++- .../deployers/compose/src/docker/mod.rs | 1 + 15 files changed, 837 insertions(+), 34 deletions(-) create mode 100644 logos/examples/tests/compose_attach_node_control.rs create mode 100644 testing-framework/deployers/compose/src/deployer/attach_provider.rs create mode 100644 testing-framework/deployers/compose/src/docker/attached.rs diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs new file mode 100644 index 0000000..4a85422 --- /dev/null +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -0,0 +1,196 @@ +use std::{ + env, + process::{Command, Stdio}, + thread, + time::Duration, +}; + +use anyhow::{Result, anyhow}; +use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; +use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; + +#[tokio::test] +async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { + if env::var("TF_RUN_COMPOSE_ATTACH_NODE_CONTROL").is_err() { + return Ok(()); + } + + let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .enable_node_control() + .with_run_duration(Duration::from_secs(5)) + .build()?; + + let deployer = LbcComposeDeployer::default(); + let managed_runner: Runner = deployer.deploy(&managed).await?; + let managed_client = managed_runner + .context() + .node_clients() + .snapshot() + .into_iter() + .next() + .ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?; + let api_port = managed_client + .base_url() + .port() + .ok_or_else(|| anyhow!("managed node base url has no port"))?; + + let project_name = discover_compose_project_by_published_port(api_port)?; + + let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .enable_node_control() + .with_run_duration(Duration::from_secs(5)) + .with_attach_source( + AttachSource::compose(vec!["node-0".to_owned()]).with_project(project_name.clone()), + ) + .build()?; + + let attached_runner: Runner = deployer.deploy(&attached).await?; + let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?; + let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?; + let control = attached_runner + .context() + .node_control() + .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; + control + .restart_node("node-0") + .await + .map_err(|err| anyhow!("attached restart failed: {err}"))?; + + wait_until_container_restarted( + &project_name, + "node-0", + &pre_restart_started_at, + Duration::from_secs(30), + )?; + + Ok(()) +} + +fn discover_compose_project_by_published_port(port: u16) -> Result { + let container_ids = run_docker_capture(["ps", "-q"])?; + let host_port_token = format!("\"HostPort\":\"{port}\""); + let mut matching_projects = Vec::new(); + + for container_id in container_ids + .lines() + .map(str::trim) + .filter(|id| !id.is_empty()) + { + let ports = run_docker_capture([ + "inspect", + "--format", + "{{json .NetworkSettings.Ports}}", + container_id, + ])?; + if !ports.contains(&host_port_token) { + continue; + } + + let project = run_docker_capture([ + "inspect", + "--format", + "{{ index .Config.Labels \"com.docker.compose.project\" }}", + container_id, + ])?; + let project = project.trim(); + if !project.is_empty() { + matching_projects.push(project.to_owned()); + } + } + + match matching_projects.as_slice() { + [project] => Ok(project.clone()), + [] => Err(anyhow!( + "no compose project found exposing api host port {port}" + )), + _ => Err(anyhow!( + "multiple compose projects expose api host port {port}: {:?}", + matching_projects + )), + } +} + +fn discover_compose_service_container(project: &str, service: &str) -> Result { + let container = run_docker_capture([ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + &format!("label=com.docker.compose.service={service}"), + "--format", + "{{.ID}}", + ])?; + let mut lines = container + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()); + let Some(container_id) = lines.next() else { + return Err(anyhow!( + "no running container found for compose project '{project}' service '{service}'" + )); + }; + if lines.next().is_some() { + return Err(anyhow!( + "multiple running containers found for compose project '{project}' service '{service}'" + )); + } + + Ok(container_id.to_owned()) +} + +fn inspect_container_started_at(container_id: &str) -> Result { + let started_at = + run_docker_capture(["inspect", "--format", "{{.State.StartedAt}}", container_id])?; + let started_at = started_at.trim(); + if started_at.is_empty() { + return Err(anyhow!( + "docker inspect returned empty StartedAt for container {container_id}" + )); + } + + Ok(started_at.to_owned()) +} + +fn wait_until_container_restarted( + project: &str, + service: &str, + previous_started_at: &str, + timeout: Duration, +) -> Result<()> { + let deadline = std::time::Instant::now() + timeout; + loop { + let container_id = discover_compose_service_container(project, service)?; + let started_at = inspect_container_started_at(&container_id)?; + if started_at != previous_started_at { + return Ok(()); + } + + if std::time::Instant::now() >= deadline { + return Err(anyhow!( + "timed out waiting for restarted container timestamp change: {project}/{service}" + )); + } + + thread::sleep(Duration::from_millis(500)); + } +} + +fn run_docker_capture(args: [&str; N]) -> Result { + let output = Command::new("docker") + .args(args) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output()?; + + if !output.status.success() { + return Err(anyhow!( + "docker {} failed: status={} stderr={}", + args.join(" "), + output.status, + String::from_utf8_lossy(&output.stderr).trim() + )); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} diff --git a/logos/examples/tests/node_config_override.rs b/logos/examples/tests/node_config_override.rs index 5761eae..22da1db 100644 --- a/logos/examples/tests/node_config_override.rs +++ b/logos/examples/tests/node_config_override.rs @@ -103,6 +103,110 @@ async fn scenario_builder_api_port_override() -> Result<()> { Ok(()) } +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_override`"] +async fn scenario_builder_identify_hide_listen_addrs_override() -> Result<()> { + let _ = try_init(); + // Required env vars (set on the command line when running this test): + // - `POL_PROOF_DEV_MODE=true` + // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` + // - `LOGOS_BLOCKCHAIN_CIRCUITS=...` + // - `RUST_LOG=info` (optional) + let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(1)); + let base_descriptors = base_builder.clone().build()?; + let base_node = base_descriptors.nodes().first().expect("node 0 descriptor"); + let mut run_config = build_node_run_config( + &base_descriptors, + base_node, + base_descriptors + .config() + .node_config_override(base_node.index()), + ) + .expect("build run config"); + + run_config + .user + .network + .backend + .swarm + .identify + .hide_listen_addrs = Some(true); + + let mut scenario = ScenarioBuilder::new(Box::new( + base_builder.with_node_config_override(0, run_config), + )) + .with_run_duration(Duration::from_secs(1)) + .build()?; + + let deployer = LbcLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + let handle = runner.run(&mut scenario).await?; + + let client = handle + .context() + .random_node_client() + .ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?; + + client + .consensus_info() + .await + .expect("consensus_info should succeed"); + + Ok(()) +} + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_two_nodes`"] +async fn scenario_builder_identify_hide_listen_addrs_two_nodes() -> Result<()> { + let _ = try_init(); + // Required env vars (set on the command line when running this test): + // - `POL_PROOF_DEV_MODE=true` + // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` + // - `LOGOS_BLOCKCHAIN_CIRCUITS=...` + // - `RUST_LOG=info` (optional) + let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)); + let base_descriptors = base_builder.clone().build()?; + + let mut deployment_builder = base_builder; + for idx in 0..2 { + let node = &base_descriptors.nodes()[idx]; + let mut run_config = build_node_run_config( + &base_descriptors, + node, + base_descriptors.config().node_config_override(node.index()), + ) + .expect("build run config"); + run_config + .user + .network + .backend + .swarm + .identify + .hide_listen_addrs = Some(true); + deployment_builder = deployment_builder.with_node_config_override(idx, run_config); + } + + let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder)) + .with_run_duration(Duration::from_secs(3)) + .build()?; + + let deployer = LbcLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + let handle = runner.run(&mut scenario).await?; + + let client = handle + .context() + .random_node_client() + .ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?; + + client + .consensus_info() + .await + .expect("consensus_info should succeed"); + + Ok(()) +} + fn random_api_port() -> u16 { let listener = TcpListener::bind("127.0.0.1:0").expect("bind random API port"); listener.local_addr().expect("read API port").port() diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index ffa3aa3..411bdf1 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -37,7 +37,8 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ - BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode, + BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory, ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, @@ -46,9 +47,10 @@ pub use runtime::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports, - wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, - wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, + orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed, + wait_for_http_ports, wait_for_http_ports_with_host, + wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, + wait_http_readiness, wait_until_stable, }; pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; pub use workload::Workload; diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 7e0804f..97bd2d6 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -16,10 +16,13 @@ pub use node_clients::NodeClients; #[doc(hidden)] pub use orchestration::{ ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources, - resolve_sources, + orchestrate_sources_with_providers, resolve_sources, }; #[doc(hidden)] -pub use providers::{SourceProviders, StaticManagedProvider}; +pub use providers::{ + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, + SourceProviders, StaticManagedProvider, +}; pub use readiness::{ HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, diff --git a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs index cb7d5b2..9e71458 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs @@ -7,4 +7,7 @@ pub use source_orchestration_plan::{ ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError, }; -pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources}; +pub use source_resolver::{ + build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers, + resolve_sources, +}; diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index e88857b..dd57ee9 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -65,13 +65,10 @@ impl SourceOrchestrationPlan { ) -> Result { let mode = mode_from_sources(sources); - let plan = Self { + Ok(Self { mode, readiness_policy, - }; - - plan.ensure_currently_wired()?; - Ok(plan) + }) } #[must_use] @@ -82,13 +79,24 @@ impl SourceOrchestrationPlan { | SourceOrchestrationMode::ExternalOnly { external } => external, } } +} - fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> { - match self.mode { - SourceOrchestrationMode::Managed { .. } - | SourceOrchestrationMode::ExternalOnly { .. } => Ok(()), - SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached), - } +#[cfg(test)] +mod tests { + use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; + use crate::scenario::{AttachSource, ScenarioSources, SourceReadinessPolicy}; + + #[test] + fn attached_sources_are_planned() { + let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()])); + let plan = + SourceOrchestrationPlan::try_from_sources(&sources, SourceReadinessPolicy::AllReady) + .expect("attached sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode, + SourceOrchestrationMode::Attached { .. } + )); } } @@ -107,7 +115,3 @@ fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode { }, } } - -fn not_wired(mode: SourceModeName) -> Result<(), SourceOrchestrationPlanError> { - Err(SourceOrchestrationPlanError::SourceModeNotWiredYet { mode }) -} diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs index 80fce0b..6ebd646 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -91,8 +91,7 @@ pub async fn resolve_sources( /// - Managed mode is backed by prebuilt deployer-managed clients via /// `StaticManagedProvider`. /// - External nodes are resolved via `Application::external_node_client`. -/// - Attached mode remains blocked at plan validation until attach providers -/// are fully wired. +/// - Attached nodes are discovered through the selected attach provider. pub async fn orchestrate_sources( plan: &SourceOrchestrationPlan, node_clients: NodeClients, @@ -103,6 +102,17 @@ pub async fn orchestrate_sources( ))) .with_external(Arc::new(ApplicationExternalProvider)); + orchestrate_sources_with_providers(plan, providers).await +} + +/// Orchestrates scenario sources with caller-supplied provider set. +/// +/// Deployer runtimes can use this to inject attach/external providers with +/// backend-specific discovery and control semantics. +pub async fn orchestrate_sources_with_providers( + plan: &SourceOrchestrationPlan, + providers: SourceProviders, +) -> Result, DynError> { let resolved = resolve_sources(plan, &providers).await?; if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() { diff --git a/testing-framework/core/src/scenario/runtime/providers/mod.rs b/testing-framework/core/src/scenario/runtime/providers/mod.rs index 60edaae..0d5a1a3 100644 --- a/testing-framework/core/src/scenario/runtime/providers/mod.rs +++ b/testing-framework/core/src/scenario/runtime/providers/mod.rs @@ -7,7 +7,7 @@ mod managed_provider; #[allow(dead_code)] mod source_providers; -pub use attach_provider::{AttachProviderError, AttachedNode}; +pub use attach_provider::{AttachProvider, AttachProviderError, AttachedNode}; pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError}; pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider}; pub use source_providers::SourceProviders; diff --git a/testing-framework/deployers/compose/Cargo.toml b/testing-framework/deployers/compose/Cargo.toml index 4880401..9ac8b7a 100644 --- a/testing-framework/deployers/compose/Cargo.toml +++ b/testing-framework/deployers/compose/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1" async-trait = { workspace = true } reqwest = { features = ["json"], workspace = true } serde = { features = ["derive"], workspace = true } +serde_json = { workspace = true } tempfile = { workspace = true } tera = "1.19" testing-framework-core = { path = "../../core" } @@ -30,5 +31,4 @@ uuid = { features = ["v4"], version = "1" } [dev-dependencies] groth16 = { workspace = true } key-management-system-service = { workspace = true } -serde_json = { workspace = true } zksign = { workspace = true } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs new file mode 100644 index 0000000..9086b76 --- /dev/null +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -0,0 +1,146 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use testing_framework_core::scenario::{ + AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource, +}; +use url::Url; + +use crate::{ + docker::attached::{discover_service_container_id, inspect_mapped_tcp_ports}, + env::ComposeDeployEnv, +}; + +pub(super) struct ComposeAttachProvider { + host: String, + _env: PhantomData, +} + +impl ComposeAttachProvider { + pub(super) fn new(host: String) -> Self { + Self { + host, + _env: PhantomData, + } + } +} + +#[async_trait] +impl AttachProvider for ComposeAttachProvider { + async fn discover( + &self, + source: &AttachSource, + ) -> Result>, AttachProviderError> { + let (project, services) = match source { + AttachSource::Compose { project, services } => (project, services), + _ => { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + } + }; + + let project = project + .as_ref() + .ok_or_else(|| AttachProviderError::Discovery { + source: "compose attach source requires an explicit project name".into(), + })?; + + if services.is_empty() { + return Err(AttachProviderError::Discovery { + source: "compose attach source requires at least one service name".into(), + }); + } + + let mut attached = Vec::with_capacity(services.len()); + for service in services { + let container_id = discover_service_container_id(project, service) + .await + .map_err(to_discovery_error)?; + let api_port = discover_api_port(&container_id) + .await + .map_err(to_discovery_error)?; + let endpoint = + build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?; + let source = ExternalNodeSource::new(service.clone(), endpoint.to_string()); + let client = E::external_node_client(&source).map_err(to_discovery_error)?; + + attached.push(AttachedNode { + identity_hint: Some(service.clone()), + client, + }); + } + + Ok(attached) + } +} + +fn to_discovery_error(source: DynError) -> AttachProviderError { + AttachProviderError::Discovery { source } +} + +async fn discover_api_port(container_id: &str) -> Result { + let mapped_ports = inspect_mapped_tcp_ports(container_id).await?; + match mapped_ports.as_slice() { + [] => Err(format!( + "no mapped tcp ports discovered for attached compose service container '{container_id}'" + ) + .into()), + [port] => Ok(port.host_port), + _ => { + let mapped_ports = mapped_ports + .iter() + .map(|port| format!("{}->{}", port.container_port, port.host_port)) + .collect::>() + .join(", "); + + Err(format!( + "attached compose service container '{container_id}' has multiple mapped tcp ports ({mapped_ports}); provide a single exposed API port" + ) + .into()) + } + } +} + +fn build_service_endpoint(host: &str, port: u16) -> Result { + let endpoint = Url::parse(&format!("http://{host}:{port}/"))?; + Ok(endpoint) +} + +#[cfg(test)] +mod tests { + use super::build_service_endpoint; + use crate::docker::attached::parse_mapped_tcp_ports; + + #[test] + fn parse_mapped_tcp_ports_skips_non_tcp_and_invalid_keys() { + let raw = r#"{ + "18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}], + "9999/udp":[{"HostIp":"0.0.0.0","HostPort":"39999"}], + "invalid":[{"HostIp":"0.0.0.0","HostPort":"12345"}] + }"#; + + let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse"); + assert_eq!(mapped.len(), 1); + assert_eq!(mapped[0].container_port, 18018); + assert_eq!(mapped[0].host_port, 32001); + } + + #[test] + fn parse_mapped_tcp_ports_returns_sorted_ports() { + let raw = r#"{ + "18019/tcp":[{"HostIp":"0.0.0.0","HostPort":"32002"}], + "18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}] + }"#; + + let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse"); + assert_eq!(mapped[0].container_port, 18018); + assert_eq!(mapped[1].container_port, 18019); + } + + #[test] + fn build_service_endpoint_formats_http_url() { + let endpoint = build_service_endpoint("127.0.0.1", 32001).expect("endpoint should parse"); + assert_eq!(endpoint.as_str(), "http://127.0.0.1:32001/"); + } +} diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index e32acae..6e45367 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -1,3 +1,4 @@ +mod attach_provider; pub mod clients; pub mod orchestrator; pub mod ports; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 6585136..66cb9e6 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,10 +3,11 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, - NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan, - orchestrate_sources, + ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle, + FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, + Runner, Scenario, ScenarioSources, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -14,6 +15,7 @@ use tracing::info; use super::{ ComposeDeployer, + attach_provider::ComposeAttachProvider, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, @@ -21,13 +23,14 @@ use super::{ setup::{DeploymentContext, DeploymentSetup}, }; use crate::{ - docker::control::ComposeNodeControl, + docker::control::{ComposeAttachedNodeControl, ComposeNodeControl}, env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, ports::{HostPortMapping, compose_runner_host}, }, + lifecycle::block_feed::spawn_block_feed_with_retry, }; const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS"; @@ -55,6 +58,12 @@ impl DeploymentOrchestrator { } })?; + if scenario.sources().is_attached() { + return self + .deploy_attached_only::(scenario, source_plan) + .await; + } + let deployment = scenario.deployment(); let setup = DeploymentSetup::::new(deployment); setup.validate_environment().await?; @@ -81,7 +90,16 @@ impl DeploymentOrchestrator { .await?; // Source orchestration currently runs here after managed clients are prepared. - deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients) + let source_providers = SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new( + deployed.node_clients.snapshot(), + ))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)); + + deployed.node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) .await .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; @@ -106,6 +124,83 @@ impl DeploymentOrchestrator { Ok(runner) } + async fn deploy_attached_only( + &self, + scenario: &Scenario, + source_plan: testing_framework_core::scenario::SourceOrchestrationPlan, + ) -> Result, ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + let observability = resolve_observability_inputs(scenario)?; + let source_providers = SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new(Vec::new()))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)); + let node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) + .await + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + + if node_clients.is_empty() { + return Err(ComposeRunnerError::RuntimePreflight); + } + + let node_control = self.attached_node_control::(scenario)?; + let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; + let context = RunContext::new( + scenario.deployment().clone(), + node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + observability.telemetry_handle()?, + feed, + node_control, + ); + + let cleanup_guard: Box = Box::new(feed_task); + Ok(Runner::new(context, Some(cleanup_guard))) + } + + fn attached_node_control( + &self, + scenario: &Scenario, + ) -> Result>>, ComposeRunnerError> + where + Caps: RequiresNodeControl + Send + Sync, + { + if !Caps::REQUIRED { + return Ok(None); + } + + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(ComposeRunnerError::InternalInvariant { + message: "attached node control requested outside attached source mode", + }); + }; + + let AttachSource::Compose { project, .. } = attach else { + return Err(ComposeRunnerError::InternalInvariant { + message: "compose deployer requires compose attach source for node control", + }); + }; + + let Some(project_name) = project + .as_ref() + .map(|value| value.trim()) + .filter(|value| !value.is_empty()) + else { + return Err(ComposeRunnerError::InternalInvariant { + message: "attached compose mode requires explicit project name for node control", + }); + }; + + Ok(Some(Arc::new(ComposeAttachedNodeControl { + project_name: project_name.to_owned(), + }) as Arc>)) + } + async fn build_runner( &self, scenario: &Scenario, diff --git a/testing-framework/deployers/compose/src/docker/attached.rs b/testing-framework/deployers/compose/src/docker/attached.rs new file mode 100644 index 0000000..e47cb2e --- /dev/null +++ b/testing-framework/deployers/compose/src/docker/attached.rs @@ -0,0 +1,129 @@ +use std::process::Stdio; + +use serde_json::Value; +use testing_framework_core::scenario::DynError; +use tokio::process::Command; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct MappedTcpPort { + pub container_port: u16, + pub host_port: u16, +} + +pub async fn discover_service_container_id( + project: &str, + service: &str, +) -> Result { + let stdout = run_docker_capture([ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + &format!("label=com.docker.compose.service={service}"), + "--format", + "{{.ID}}", + ]) + .await?; + + let ids: Vec = stdout + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .map(ToOwned::to_owned) + .collect(); + + match ids.as_slice() { + [id] => Ok(id.clone()), + [] => Err(format!( + "no running container found for compose project '{project}' service '{service}'" + ) + .into()), + _ => Err(format!( + "multiple running containers found for compose project '{project}' service '{service}'" + ) + .into()), + } +} + +pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result, DynError> { + let stdout = run_docker_capture([ + "inspect", + "--format", + "{{json .NetworkSettings.Ports}}", + container_id, + ]) + .await?; + + parse_mapped_tcp_ports(&stdout) +} + +pub fn parse_mapped_tcp_ports(raw: &str) -> Result, DynError> { + let ports_value: Value = serde_json::from_str(raw.trim())?; + let ports_object = ports_value + .as_object() + .ok_or_else(|| "docker inspect ports payload is not an object".to_owned())?; + + let mut mapped = Vec::new(); + for (container_port_key, bindings) in ports_object { + let Some(container_port) = parse_container_port(container_port_key) else { + continue; + }; + + let Some(bindings_array) = bindings.as_array() else { + continue; + }; + + let Some(host_port) = bindings_array.iter().find_map(parse_host_port_binding) else { + continue; + }; + + mapped.push(MappedTcpPort { + container_port, + host_port, + }); + } + + mapped.sort_by_key(|port| port.container_port); + + Ok(mapped) +} + +pub async fn run_docker_capture(args: [&str; N]) -> Result { + let output = Command::new("docker") + .args(args) + .stdin(Stdio::null()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + return Err(format!( + "docker {} failed with status {}: {stderr}", + args.join(" "), + output.status + ) + .into()); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} + +fn parse_container_port(port_key: &str) -> Option { + let (port, proto) = port_key.split_once('/')?; + if proto != "tcp" { + return None; + } + + port.parse::().ok() +} + +fn parse_host_port_binding(binding: &Value) -> Option { + binding + .get("HostPort") + .and_then(Value::as_str)? + .parse::() + .ok() +} diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 0541e42..8e98948 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -7,13 +7,21 @@ use testing_framework_core::{ adjust_timeout, scenario::{Application, DynError, NodeControlHandle}, }; -use tokio::process::Command; +use tokio::{process::Command, time::timeout}; use tracing::info; -use crate::{docker::commands::run_docker_command, errors::ComposeRunnerError}; +use crate::{ + docker::{ + attached::discover_service_container_id, + commands::{ComposeCommandError, run_docker_command}, + }, + errors::ComposeRunnerError, +}; const COMPOSE_RESTART_TIMEOUT: Duration = Duration::from_secs(120); const COMPOSE_RESTART_DESCRIPTION: &str = "docker compose restart"; +const DOCKER_CONTAINER_RESTART_DESCRIPTION: &str = "docker container restart"; +const DOCKER_CONTAINER_STOP_DESCRIPTION: &str = "docker container stop"; pub async fn restart_compose_service( compose_file: &Path, @@ -38,6 +46,50 @@ pub async fn restart_compose_service( .map_err(ComposeRunnerError::Compose) } +pub async fn restart_attached_compose_service( + project_name: &str, + service: &str, +) -> Result<(), DynError> { + let container_id = discover_service_container_id(project_name, service).await?; + let command = docker_container_command("restart", &container_id); + + info!( + service, + project = project_name, + container = container_id, + "restarting attached compose service" + ); + + run_docker_action( + command, + DOCKER_CONTAINER_RESTART_DESCRIPTION, + adjust_timeout(COMPOSE_RESTART_TIMEOUT), + ) + .await +} + +pub async fn stop_attached_compose_service( + project_name: &str, + service: &str, +) -> Result<(), DynError> { + let container_id = discover_service_container_id(project_name, service).await?; + let command = docker_container_command("stop", &container_id); + + info!( + service, + project = project_name, + container = container_id, + "stopping attached compose service" + ); + + run_docker_action( + command, + DOCKER_CONTAINER_STOP_DESCRIPTION, + adjust_timeout(COMPOSE_RESTART_TIMEOUT), + ) + .await +} + fn compose_restart_command(compose_file: &Path, project_name: &str, service: &str) -> Command { let mut command = Command::new("docker"); command @@ -51,6 +103,43 @@ fn compose_restart_command(compose_file: &Path, project_name: &str, service: &st command } +fn docker_container_command(action: &str, container_id: &str) -> Command { + let mut command = Command::new("docker"); + command.arg(action).arg(container_id); + command +} + +async fn run_docker_action( + mut command: Command, + description: &str, + timeout_duration: Duration, +) -> Result<(), DynError> { + match timeout(timeout_duration, command.output()).await { + Ok(Ok(output)) => { + if output.status.success() { + return Ok(()); + } + + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + Err(format!( + "{description} failed with status {}: {stderr}", + output.status + ) + .into()) + } + Ok(Err(source)) => Err(format!("{description} failed to spawn: {source}").into()), + Err(_) => { + let compose_timeout = ComposeCommandError::Timeout { + command: description.to_owned(), + timeout: timeout_duration, + }; + + Err(compose_timeout.into()) + } + } +} + /// Compose-specific node control handle for restarting nodes. pub struct ComposeNodeControl { pub(crate) compose_file: PathBuf, @@ -65,3 +154,23 @@ impl NodeControlHandle for ComposeNodeControl { .map_err(|err| format!("node restart failed: {err}").into()) } } + +/// Node control handle for compose attached mode. +pub struct ComposeAttachedNodeControl { + pub(crate) project_name: String, +} + +#[async_trait::async_trait] +impl NodeControlHandle for ComposeAttachedNodeControl { + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + restart_attached_compose_service(&self.project_name, name) + .await + .map_err(|source| format!("node restart failed for service '{name}': {source}").into()) + } + + async fn stop_node(&self, name: &str) -> Result<(), DynError> { + stop_attached_compose_service(&self.project_name, name) + .await + .map_err(|source| format!("node stop failed for service '{name}': {source}").into()) + } +} diff --git a/testing-framework/deployers/compose/src/docker/mod.rs b/testing-framework/deployers/compose/src/docker/mod.rs index 3696492..c1b4581 100644 --- a/testing-framework/deployers/compose/src/docker/mod.rs +++ b/testing-framework/deployers/compose/src/docker/mod.rs @@ -1,3 +1,4 @@ +pub mod attached; pub mod commands; pub mod control; pub mod platform; From 6226f515989a7c64678209fc40de41e0f3d4f573 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 13:25:11 +0100 Subject: [PATCH 02/14] feat(compose): expose deploy metadata for attach node-control tests --- .../tests/compose_attach_node_control.rs | 71 +++++-------------- .../deployers/compose/src/deployer/mod.rs | 26 +++++++ .../compose/src/deployer/orchestrator.rs | 42 ++++++++++- 3 files changed, 82 insertions(+), 57 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 4a85422..0248e5a 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -1,5 +1,4 @@ use std::{ - env, process::{Command, Stdio}, thread, time::Duration, @@ -8,20 +7,23 @@ use std::{ use anyhow::{Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; +use testing_framework_runner_compose::ComposeRunnerError; #[tokio::test] +#[ignore = "requires Docker and mutates compose runtime state"] async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { - if env::var("TF_RUN_COMPOSE_ATTACH_NODE_CONTROL").is_err() { - return Ok(()); - } - let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() .with_run_duration(Duration::from_secs(5)) .build()?; let deployer = LbcComposeDeployer::default(); - let managed_runner: Runner = deployer.deploy(&managed).await?; + let (managed_runner, metadata): (Runner, _) = + match deployer.deploy_with_metadata(&managed).await { + Ok(result) => result, + Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), + Err(error) => return Err(anyhow::Error::new(error)), + }; let managed_client = managed_runner .context() .node_clients() @@ -29,12 +31,14 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .into_iter() .next() .ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?; - let api_port = managed_client + managed_client .base_url() .port() .ok_or_else(|| anyhow!("managed node base url has no port"))?; - let project_name = discover_compose_project_by_published_port(api_port)?; + let project_name = metadata + .project_name + .ok_or_else(|| anyhow!("compose metadata did not include project name"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() @@ -44,7 +48,11 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { ) .build()?; - let attached_runner: Runner = deployer.deploy(&attached).await?; + let attached_runner: Runner = match deployer.deploy(&attached).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), + Err(error) => return Err(anyhow::Error::new(error)), + }; let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?; let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?; let control = attached_runner @@ -65,51 +73,6 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { Ok(()) } - -fn discover_compose_project_by_published_port(port: u16) -> Result { - let container_ids = run_docker_capture(["ps", "-q"])?; - let host_port_token = format!("\"HostPort\":\"{port}\""); - let mut matching_projects = Vec::new(); - - for container_id in container_ids - .lines() - .map(str::trim) - .filter(|id| !id.is_empty()) - { - let ports = run_docker_capture([ - "inspect", - "--format", - "{{json .NetworkSettings.Ports}}", - container_id, - ])?; - if !ports.contains(&host_port_token) { - continue; - } - - let project = run_docker_capture([ - "inspect", - "--format", - "{{ index .Config.Labels \"com.docker.compose.project\" }}", - container_id, - ])?; - let project = project.trim(); - if !project.is_empty() { - matching_projects.push(project.to_owned()); - } - } - - match matching_projects.as_slice() { - [project] => Ok(project.clone()), - [] => Err(anyhow!( - "no compose project found exposing api host port {port}" - )), - _ => Err(anyhow!( - "multiple compose projects expose api host port {port}: {:?}", - matching_projects - )), - } -} - fn discover_compose_service_container(project: &str, service: &str) -> Result { let container = run_docker_capture([ "ps", diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 6e45367..4430a7d 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -22,6 +22,13 @@ pub struct ComposeDeployer { _env: PhantomData, } +/// Compose deployment metadata returned by compose-specific deployment APIs. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ComposeDeploymentMetadata { + /// Docker Compose project name used for this deployment when available. + pub project_name: Option, +} + impl Default for ComposeDeployer { fn default() -> Self { Self::new() @@ -42,6 +49,25 @@ impl ComposeDeployer { self.readiness_checks = enabled; self } + + /// Deploy and return compose-specific metadata alongside the generic + /// runner. + pub async fn deploy_with_metadata( + &self, + scenario: &Scenario, + ) -> Result<(Runner, ComposeDeploymentMetadata), ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + let deployer = Self { + readiness_checks: self.readiness_checks, + _env: PhantomData, + }; + + orchestrator::DeploymentOrchestrator::new(deployer) + .deploy_with_metadata(scenario) + .await + } } #[async_trait] diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 66cb9e6..7889f68 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -14,7 +14,7 @@ use testing_framework_core::{ use tracing::info; use super::{ - ComposeDeployer, + ComposeDeployer, ComposeDeploymentMetadata, attach_provider::ComposeAttachProvider, clients::ClientBuilder, make_cleanup_guard, @@ -48,6 +48,18 @@ impl DeploymentOrchestrator { &self, scenario: &Scenario, ) -> Result, ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + self.deploy_with_metadata(scenario) + .await + .map(|(runner, _)| runner) + } + + pub async fn deploy_with_metadata( + &self, + scenario: &Scenario, + ) -> Result<(Runner, ComposeDeploymentMetadata), ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { @@ -61,7 +73,8 @@ impl DeploymentOrchestrator { if scenario.sources().is_attached() { return self .deploy_attached_only::(scenario, source_plan) - .await; + .await + .map(|runner| (runner, attached_metadata(scenario))); } let deployment = scenario.deployment(); @@ -103,6 +116,8 @@ impl DeploymentOrchestrator { .await .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let project_name = prepared.environment.project_name().to_owned(); + let runner = self .build_runner::( scenario, @@ -121,7 +136,12 @@ impl DeploymentOrchestrator { readiness_enabled, ); - Ok(runner) + Ok(( + runner, + ComposeDeploymentMetadata { + project_name: Some(project_name), + }, + )) } async fn deploy_attached_only( @@ -304,6 +324,22 @@ impl DeploymentOrchestrator { } } +fn attached_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata +where + E: ComposeDeployEnv, + Caps: Send + Sync, +{ + let project_name = match scenario.sources() { + ScenarioSources::Attached { + attach: AttachSource::Compose { project, .. }, + .. + } => project.clone(), + _ => None, + }; + + ComposeDeploymentMetadata { project_name } +} + struct DeployedNodes { host_ports: HostPortMapping, host: String, From 7127c10aa6195e7d5d5289668e888c6d7ce532a1 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 13:31:03 +0100 Subject: [PATCH 03/14] refactor(compose): add metadata helpers for attach node-control test API --- .../tests/compose_attach_node_control.rs | 134 +++--------------- .../deployers/compose/src/deployer/mod.rs | 80 ++++++++++- .../deployers/compose/src/lib.rs | 2 +- 3 files changed, 96 insertions(+), 120 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 0248e5a..8c5efeb 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -1,13 +1,9 @@ -use std::{ - process::{Command, Stdio}, - thread, - time::Duration, -}; +use std::time::Duration; use anyhow::{Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; -use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; -use testing_framework_runner_compose::ComposeRunnerError; +use testing_framework_core::scenario::{Deployer as _, Runner}; +use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] @@ -18,33 +14,20 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .build()?; let deployer = LbcComposeDeployer::default(); - let (managed_runner, metadata): (Runner, _) = + let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = match deployer.deploy_with_metadata(&managed).await { Ok(result) => result, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(anyhow::Error::new(error)), }; - let managed_client = managed_runner - .context() - .node_clients() - .snapshot() - .into_iter() - .next() - .ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?; - managed_client - .base_url() - .port() - .ok_or_else(|| anyhow!("managed node base url has no port"))?; - - let project_name = metadata - .project_name - .ok_or_else(|| anyhow!("compose metadata did not include project name"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() .with_run_duration(Duration::from_secs(5)) .with_attach_source( - AttachSource::compose(vec!["node-0".to_owned()]).with_project(project_name.clone()), + metadata + .attach_source_for_services(vec!["node-0".to_owned()]) + .map_err(|err| anyhow!("{err}"))?, ) .build()?; @@ -53,107 +36,26 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(anyhow::Error::new(error)), }; - let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?; - let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?; + + let pre_restart_started_at = metadata + .service_started_at("node-0") + .await + .map_err(|err| anyhow!("{err}"))?; + let control = attached_runner .context() .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; + control .restart_node("node-0") .await .map_err(|err| anyhow!("attached restart failed: {err}"))?; - wait_until_container_restarted( - &project_name, - "node-0", - &pre_restart_started_at, - Duration::from_secs(30), - )?; + metadata + .wait_until_service_restarted("node-0", &pre_restart_started_at, Duration::from_secs(30)) + .await + .map_err(|err| anyhow!("{err}"))?; Ok(()) } -fn discover_compose_service_container(project: &str, service: &str) -> Result { - let container = run_docker_capture([ - "ps", - "--filter", - &format!("label=com.docker.compose.project={project}"), - "--filter", - &format!("label=com.docker.compose.service={service}"), - "--format", - "{{.ID}}", - ])?; - let mut lines = container - .lines() - .map(str::trim) - .filter(|line| !line.is_empty()); - let Some(container_id) = lines.next() else { - return Err(anyhow!( - "no running container found for compose project '{project}' service '{service}'" - )); - }; - if lines.next().is_some() { - return Err(anyhow!( - "multiple running containers found for compose project '{project}' service '{service}'" - )); - } - - Ok(container_id.to_owned()) -} - -fn inspect_container_started_at(container_id: &str) -> Result { - let started_at = - run_docker_capture(["inspect", "--format", "{{.State.StartedAt}}", container_id])?; - let started_at = started_at.trim(); - if started_at.is_empty() { - return Err(anyhow!( - "docker inspect returned empty StartedAt for container {container_id}" - )); - } - - Ok(started_at.to_owned()) -} - -fn wait_until_container_restarted( - project: &str, - service: &str, - previous_started_at: &str, - timeout: Duration, -) -> Result<()> { - let deadline = std::time::Instant::now() + timeout; - loop { - let container_id = discover_compose_service_container(project, service)?; - let started_at = inspect_container_started_at(&container_id)?; - if started_at != previous_started_at { - return Ok(()); - } - - if std::time::Instant::now() >= deadline { - return Err(anyhow!( - "timed out waiting for restarted container timestamp change: {project}/{service}" - )); - } - - thread::sleep(Duration::from_millis(500)); - } -} - -fn run_docker_capture(args: [&str; N]) -> Result { - let output = Command::new("docker") - .args(args) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .output()?; - - if !output.status.success() { - return Err(anyhow!( - "docker {} failed: status={} stderr={}", - args.join(" "), - output.status, - String::from_utf8_lossy(&output.stderr).trim() - )); - } - - Ok(String::from_utf8_lossy(&output.stdout).to_string()) -} diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 4430a7d..63ff456 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -5,13 +5,14 @@ pub mod ports; pub mod readiness; pub mod setup; -use std::marker::PhantomData; +use std::{marker::PhantomData, time::Duration}; use async_trait::async_trait; use testing_framework_core::scenario::{ - CleanupGuard, Deployer, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl, - Runner, Scenario, + AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, + RequiresNodeControl, Runner, Scenario, }; +use tokio::time::sleep; use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; @@ -29,6 +30,79 @@ pub struct ComposeDeploymentMetadata { pub project_name: Option, } +impl ComposeDeploymentMetadata { + /// Returns project name when deployment is bound to a specific compose + /// project. + #[must_use] + pub fn project_name(&self) -> Option<&str> { + self.project_name.as_deref() + } + + /// Builds an attach source for the same compose project. + pub fn attach_source_for_services( + &self, + services: Vec, + ) -> Result { + let Some(project_name) = self.project_name() else { + return Err("compose metadata has no project name".into()); + }; + + Ok(AttachSource::compose(services).with_project(project_name.to_owned())) + } + + /// Returns the current StartedAt timestamp for a compose service container. + pub async fn service_started_at(&self, service: &str) -> Result { + let Some(project_name) = self.project_name() else { + return Err("compose metadata has no project name".into()); + }; + + let container_id = + crate::docker::attached::discover_service_container_id(project_name, service).await?; + let started_at = crate::docker::attached::run_docker_capture([ + "inspect", + "--format", + "{{.State.StartedAt}}", + &container_id, + ]) + .await?; + let started_at = started_at.trim(); + + if started_at.is_empty() { + return Err(format!( + "docker inspect returned empty StartedAt for compose service '{service}'" + ) + .into()); + } + + Ok(started_at.to_owned()) + } + + /// Waits until a service container reports a different StartedAt timestamp. + pub async fn wait_until_service_restarted( + &self, + service: &str, + previous_started_at: &str, + timeout: Duration, + ) -> Result<(), DynError> { + let deadline = std::time::Instant::now() + timeout; + + loop { + let started_at = self.service_started_at(service).await?; + if started_at != previous_started_at { + return Ok(()); + } + + if std::time::Instant::now() >= deadline { + return Err( + format!("timed out waiting for restarted compose service '{service}'").into(), + ); + } + + sleep(Duration::from_millis(500)).await; + } + } +} + impl Default for ComposeDeployer { fn default() -> Self { Self::new() diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index c75890f..8cf715e 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -6,7 +6,7 @@ pub mod errors; pub mod infrastructure; pub mod lifecycle; -pub use deployer::ComposeDeployer; +pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata}; pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor}; pub use docker::{ commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs}, From 06613a1e758d0a012924a277c9d1a486953fdcfe Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 13:47:25 +0100 Subject: [PATCH 04/14] feat(compose): require labeled services for attach discovery --- .../tests/compose_attach_node_control.rs | 24 ++++--- .../compose/assets/docker-compose.yml.tera | 2 + .../compose/src/deployer/attach_provider.rs | 30 +++++++-- .../deployers/compose/src/deployer/mod.rs | 15 +++++ .../deployers/compose/src/docker/attached.rs | 67 +++++++++++++++++++ 5 files changed, 123 insertions(+), 15 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 8c5efeb..4de1eb3 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -21,14 +21,22 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { Err(error) => return Err(anyhow::Error::new(error)), }; + let services = metadata + .discover_services() + .await + .map_err(|err| anyhow!("{err}"))?; + let service = services + .first() + .cloned() + .ok_or_else(|| anyhow!("compose deployment metadata discovered no services"))?; + let attach_source = metadata + .attach_source_for_services(services) + .map_err(|err| anyhow!("{err}"))?; + let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() .with_run_duration(Duration::from_secs(5)) - .with_attach_source( - metadata - .attach_source_for_services(vec!["node-0".to_owned()]) - .map_err(|err| anyhow!("{err}"))?, - ) + .with_attach_source(attach_source) .build()?; let attached_runner: Runner = match deployer.deploy(&attached).await { @@ -38,7 +46,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { }; let pre_restart_started_at = metadata - .service_started_at("node-0") + .service_started_at(&service) .await .map_err(|err| anyhow!("{err}"))?; @@ -48,12 +56,12 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; control - .restart_node("node-0") + .restart_node(&service) .await .map_err(|err| anyhow!("attached restart failed: {err}"))?; metadata - .wait_until_service_restarted("node-0", &pre_restart_started_at, Duration::from_secs(30)) + .wait_until_service_restarted(&service, &pre_restart_started_at, Duration::from_secs(30)) .await .map_err(|err| anyhow!("{err}"))?; diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index ba21922..75fe122 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -18,6 +18,8 @@ services: {% for port in node.ports %} - {{ port }} {% endfor %} + labels: + testing-framework.node: "true" environment: {% for env in node.environment %} {{ env.key }}: "{{ env.value }}" diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 9086b76..7ddf243 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -7,7 +7,9 @@ use testing_framework_core::scenario::{ use url::Url; use crate::{ - docker::attached::{discover_service_container_id, inspect_mapped_tcp_ports}, + docker::attached::{ + discover_attachable_services, discover_service_container_id, inspect_mapped_tcp_ports, + }, env::ComposeDeployEnv, }; @@ -46,14 +48,12 @@ impl AttachProvider for ComposeAttachProvider { source: "compose attach source requires an explicit project name".into(), })?; - if services.is_empty() { - return Err(AttachProviderError::Discovery { - source: "compose attach source requires at least one service name".into(), - }); - } + let services = resolve_services(project, services) + .await + .map_err(to_discovery_error)?; let mut attached = Vec::with_capacity(services.len()); - for service in services { + for service in &services { let container_id = discover_service_container_id(project, service) .await .map_err(to_discovery_error)?; @@ -79,6 +79,22 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } +async fn resolve_services(project: &str, requested: &[String]) -> Result, DynError> { + if !requested.is_empty() { + return Ok(requested.to_owned()); + } + + let discovered = discover_attachable_services(project).await?; + + if discovered.is_empty() { + return Err( + format!("no running compose services discovered for project '{project}'").into(), + ); + } + + Ok(discovered) +} + async fn discover_api_port(container_id: &str) -> Result { let mapped_ports = inspect_mapped_tcp_ports(container_id).await?; match mapped_ports.as_slice() { diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 63ff456..69b8eca 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -50,6 +50,21 @@ impl ComposeDeploymentMetadata { Ok(AttachSource::compose(services).with_project(project_name.to_owned())) } + /// Discovers compose node services and builds an attach source for them. + pub async fn attach_source_for_discovered_services(&self) -> Result { + let services = self.discover_services().await?; + self.attach_source_for_services(services) + } + + /// Discovers node services for this compose deployment. + pub async fn discover_services(&self) -> Result, DynError> { + let Some(project_name) = self.project_name() else { + return Err("compose metadata has no project name".into()); + }; + + crate::docker::attached::discover_attachable_services(project_name).await + } + /// Returns the current StartedAt timestamp for a compose service container. pub async fn service_started_at(&self, service: &str) -> Result { let Some(project_name) = self.project_name() else { diff --git a/testing-framework/deployers/compose/src/docker/attached.rs b/testing-framework/deployers/compose/src/docker/attached.rs index e47cb2e..babe175 100644 --- a/testing-framework/deployers/compose/src/docker/attached.rs +++ b/testing-framework/deployers/compose/src/docker/attached.rs @@ -4,6 +4,9 @@ use serde_json::Value; use testing_framework_core::scenario::DynError; use tokio::process::Command; +pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node"; +pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true"; + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct MappedTcpPort { pub container_port: u16, @@ -45,6 +48,21 @@ pub async fn discover_service_container_id( } } +pub async fn discover_attachable_services(project: &str) -> Result, DynError> { + let attachable_filter = + format!("label={ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}"); + let attachable = discover_services_with_filters(project, Some(&attachable_filter)).await?; + + if attachable.is_empty() { + return Err(format!( + "no running compose services with label '{ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}' found for project '{project}'" + ) + .into()); + } + + Ok(attachable) +} + pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result, DynError> { let stdout = run_docker_capture([ "inspect", @@ -111,6 +129,55 @@ pub async fn run_docker_capture(args: [&str; N]) -> Result, +) -> Result, DynError> { + let mut args = vec![ + "ps".to_owned(), + "--filter".to_owned(), + format!("label=com.docker.compose.project={project}"), + ]; + + if let Some(filter) = extra_filter { + args.push("--filter".to_owned()); + args.push(filter.to_owned()); + } + + args.push("--format".to_owned()); + args.push("{{.Label \"com.docker.compose.service\"}}".to_owned()); + + let output = Command::new("docker") + .args(&args) + .stdin(Stdio::null()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .output() + .await?; + + if !output.status.success() { + return Err(format!( + "docker {} failed with status {}: {}", + args.join(" "), + output.status, + String::from_utf8_lossy(&output.stderr).trim() + ) + .into()); + } + + let mut services: Vec = output + .stdout + .split(|byte| *byte == b'\n') + .filter_map(|line| { + let parsed = String::from_utf8_lossy(line).trim().to_owned(); + (!parsed.is_empty()).then_some(parsed) + }) + .collect(); + services.sort(); + services.dedup(); + Ok(services) +} + fn parse_container_port(port_key: &str) -> Option { let (port, proto) = port_key.split_once('/')?; if proto != "tcp" { From 5568902b4655987b005b36d15b7791b2a63a808d Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 13:51:46 +0100 Subject: [PATCH 05/14] test(compose): use discovered tagged services for attach node-control --- .../tests/compose_attach_node_control.rs | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 4de1eb3..b7681a4 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -25,12 +25,8 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .discover_services() .await .map_err(|err| anyhow!("{err}"))?; - let service = services - .first() - .cloned() - .ok_or_else(|| anyhow!("compose deployment metadata discovered no services"))?; let attach_source = metadata - .attach_source_for_services(services) + .attach_source_for_services(services.clone()) .map_err(|err| anyhow!("{err}"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) @@ -45,25 +41,31 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { Err(error) => return Err(anyhow::Error::new(error)), }; - let pre_restart_started_at = metadata - .service_started_at(&service) - .await - .map_err(|err| anyhow!("{err}"))?; - let control = attached_runner .context() .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; - control - .restart_node(&service) - .await - .map_err(|err| anyhow!("attached restart failed: {err}"))?; + for service in services { + let pre_restart_started_at = metadata + .service_started_at(&service) + .await + .map_err(|err| anyhow!("{err}"))?; - metadata - .wait_until_service_restarted(&service, &pre_restart_started_at, Duration::from_secs(30)) - .await - .map_err(|err| anyhow!("{err}"))?; + control + .restart_node(&service) + .await + .map_err(|err| anyhow!("attached restart failed for {service}: {err}"))?; + + metadata + .wait_until_service_restarted( + &service, + &pre_restart_started_at, + Duration::from_secs(30), + ) + .await + .map_err(|err| anyhow!("{err}"))?; + } Ok(()) } From 25d5a4859b2e05f39d958cfe057d4dc4b3759b97 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 13:59:27 +0100 Subject: [PATCH 06/14] refactor(compose): simplify attach test to project-only source --- .../tests/compose_attach_node_control.rs | 25 +++++++++++++------ .../deployers/compose/src/deployer/mod.rs | 15 ----------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index b7681a4..5d8111a 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -2,7 +2,7 @@ use std::time::Duration; use anyhow::{Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; -use testing_framework_core::scenario::{Deployer as _, Runner}; +use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; #[tokio::test] @@ -21,13 +21,11 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { Err(error) => return Err(anyhow::Error::new(error)), }; - let services = metadata - .discover_services() - .await - .map_err(|err| anyhow!("{err}"))?; - let attach_source = metadata - .attach_source_for_services(services.clone()) - .map_err(|err| anyhow!("{err}"))?; + let project_name = metadata + .project_name() + .ok_or_else(|| anyhow!("compose deployment metadata has no project name"))? + .to_owned(); + let attach_source = AttachSource::compose(vec![]).with_project(project_name); let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() @@ -46,6 +44,17 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; + let services: Vec = attached_runner + .context() + .borrowed_nodes() + .into_iter() + .map(|node| node.identity) + .collect(); + + if services.is_empty() { + return Err(anyhow!("attached compose runner discovered no services")); + } + for service in services { let pre_restart_started_at = metadata .service_started_at(&service) diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 69b8eca..63ff456 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -50,21 +50,6 @@ impl ComposeDeploymentMetadata { Ok(AttachSource::compose(services).with_project(project_name.to_owned())) } - /// Discovers compose node services and builds an attach source for them. - pub async fn attach_source_for_discovered_services(&self) -> Result { - let services = self.discover_services().await?; - self.attach_source_for_services(services) - } - - /// Discovers node services for this compose deployment. - pub async fn discover_services(&self) -> Result, DynError> { - let Some(project_name) = self.project_name() else { - return Err("compose metadata has no project name".into()); - }; - - crate::docker::attached::discover_attachable_services(project_name).await - } - /// Returns the current StartedAt timestamp for a compose service container. pub async fn service_started_at(&self, service: &str) -> Result { let Some(project_name) = self.project_name() else { From d9c99322c761944c04445d5d68b4f90106d1ec53 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 14:08:56 +0100 Subject: [PATCH 07/14] refactor(compose): keep metadata API minimal and test-local restart checks --- .../tests/compose_attach_node_control.rs | 96 +++++++++++++--- logos/examples/tests/node_config_override.rs | 104 ------------------ .../deployers/compose/src/deployer/mod.rs | 55 +-------- 3 files changed, 84 insertions(+), 171 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 5d8111a..62c3c3d 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -4,6 +4,7 @@ use anyhow::{Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; +use tokio::process::Command; #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] @@ -25,7 +26,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .project_name() .ok_or_else(|| anyhow!("compose deployment metadata has no project name"))? .to_owned(); - let attach_source = AttachSource::compose(vec![]).with_project(project_name); + let attach_source = AttachSource::compose(vec![]).with_project(project_name.clone()); let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() @@ -56,25 +57,94 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { } for service in services { - let pre_restart_started_at = metadata - .service_started_at(&service) - .await - .map_err(|err| anyhow!("{err}"))?; + let pre_restart_started_at = service_started_at(&project_name, &service).await?; control .restart_node(&service) .await .map_err(|err| anyhow!("attached restart failed for {service}: {err}"))?; - metadata - .wait_until_service_restarted( - &service, - &pre_restart_started_at, - Duration::from_secs(30), - ) - .await - .map_err(|err| anyhow!("{err}"))?; + wait_until_service_restarted( + &project_name, + &service, + &pre_restart_started_at, + Duration::from_secs(30), + ) + .await?; } Ok(()) } + +async fn service_started_at(project: &str, service: &str) -> Result { + let container_id = run_docker(&[ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + &format!("label=com.docker.compose.service={service}"), + "--format", + "{{.ID}}", + ]) + .await?; + + let container_id = container_id + .lines() + .next() + .map(str::trim) + .filter(|value| !value.is_empty()) + .ok_or_else(|| anyhow!("no running container found for service '{service}'"))?; + + let started_at = + run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?; + + let started_at = started_at.trim().to_owned(); + + if started_at.is_empty() { + return Err(anyhow!( + "docker inspect returned empty StartedAt for service '{service}'" + )); + } + + Ok(started_at) +} + +async fn wait_until_service_restarted( + project: &str, + service: &str, + previous_started_at: &str, + timeout: Duration, +) -> Result<()> { + let deadline = std::time::Instant::now() + timeout; + + loop { + let started_at = service_started_at(project, service).await?; + + if started_at != previous_started_at { + return Ok(()); + } + + if std::time::Instant::now() >= deadline { + return Err(anyhow!( + "timed out waiting for restarted compose service '{service}'" + )); + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } +} + +async fn run_docker(args: &[&str]) -> Result { + let output = Command::new("docker").args(args).output().await?; + + if !output.status.success() { + return Err(anyhow!( + "docker {} failed with status {}: {}", + args.join(" "), + output.status, + String::from_utf8_lossy(&output.stderr).trim() + )); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} diff --git a/logos/examples/tests/node_config_override.rs b/logos/examples/tests/node_config_override.rs index 22da1db..5761eae 100644 --- a/logos/examples/tests/node_config_override.rs +++ b/logos/examples/tests/node_config_override.rs @@ -103,110 +103,6 @@ async fn scenario_builder_api_port_override() -> Result<()> { Ok(()) } -#[tokio::test] -#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_override`"] -async fn scenario_builder_identify_hide_listen_addrs_override() -> Result<()> { - let _ = try_init(); - // Required env vars (set on the command line when running this test): - // - `POL_PROOF_DEV_MODE=true` - // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` - // - `LOGOS_BLOCKCHAIN_CIRCUITS=...` - // - `RUST_LOG=info` (optional) - let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(1)); - let base_descriptors = base_builder.clone().build()?; - let base_node = base_descriptors.nodes().first().expect("node 0 descriptor"); - let mut run_config = build_node_run_config( - &base_descriptors, - base_node, - base_descriptors - .config() - .node_config_override(base_node.index()), - ) - .expect("build run config"); - - run_config - .user - .network - .backend - .swarm - .identify - .hide_listen_addrs = Some(true); - - let mut scenario = ScenarioBuilder::new(Box::new( - base_builder.with_node_config_override(0, run_config), - )) - .with_run_duration(Duration::from_secs(1)) - .build()?; - - let deployer = LbcLocalDeployer::default(); - let runner = deployer.deploy(&scenario).await?; - let handle = runner.run(&mut scenario).await?; - - let client = handle - .context() - .random_node_client() - .ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?; - - client - .consensus_info() - .await - .expect("consensus_info should succeed"); - - Ok(()) -} - -#[tokio::test] -#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_two_nodes`"] -async fn scenario_builder_identify_hide_listen_addrs_two_nodes() -> Result<()> { - let _ = try_init(); - // Required env vars (set on the command line when running this test): - // - `POL_PROOF_DEV_MODE=true` - // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` - // - `LOGOS_BLOCKCHAIN_CIRCUITS=...` - // - `RUST_LOG=info` (optional) - let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)); - let base_descriptors = base_builder.clone().build()?; - - let mut deployment_builder = base_builder; - for idx in 0..2 { - let node = &base_descriptors.nodes()[idx]; - let mut run_config = build_node_run_config( - &base_descriptors, - node, - base_descriptors.config().node_config_override(node.index()), - ) - .expect("build run config"); - run_config - .user - .network - .backend - .swarm - .identify - .hide_listen_addrs = Some(true); - deployment_builder = deployment_builder.with_node_config_override(idx, run_config); - } - - let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder)) - .with_run_duration(Duration::from_secs(3)) - .build()?; - - let deployer = LbcLocalDeployer::default(); - let runner = deployer.deploy(&scenario).await?; - let handle = runner.run(&mut scenario).await?; - - let client = handle - .context() - .random_node_client() - .ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?; - - client - .consensus_info() - .await - .expect("consensus_info should succeed"); - - Ok(()) -} - fn random_api_port() -> u16 { let listener = TcpListener::bind("127.0.0.1:0").expect("bind random API port"); listener.local_addr().expect("read API port").port() diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 63ff456..55cbc2f 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -5,14 +5,13 @@ pub mod ports; pub mod readiness; pub mod setup; -use std::{marker::PhantomData, time::Duration}; +use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl, Runner, Scenario, }; -use tokio::time::sleep; use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; @@ -49,58 +48,6 @@ impl ComposeDeploymentMetadata { Ok(AttachSource::compose(services).with_project(project_name.to_owned())) } - - /// Returns the current StartedAt timestamp for a compose service container. - pub async fn service_started_at(&self, service: &str) -> Result { - let Some(project_name) = self.project_name() else { - return Err("compose metadata has no project name".into()); - }; - - let container_id = - crate::docker::attached::discover_service_container_id(project_name, service).await?; - let started_at = crate::docker::attached::run_docker_capture([ - "inspect", - "--format", - "{{.State.StartedAt}}", - &container_id, - ]) - .await?; - let started_at = started_at.trim(); - - if started_at.is_empty() { - return Err(format!( - "docker inspect returned empty StartedAt for compose service '{service}'" - ) - .into()); - } - - Ok(started_at.to_owned()) - } - - /// Waits until a service container reports a different StartedAt timestamp. - pub async fn wait_until_service_restarted( - &self, - service: &str, - previous_started_at: &str, - timeout: Duration, - ) -> Result<(), DynError> { - let deadline = std::time::Instant::now() + timeout; - - loop { - let started_at = self.service_started_at(service).await?; - if started_at != previous_started_at { - return Ok(()); - } - - if std::time::Instant::now() >= deadline { - return Err( - format!("timed out waiting for restarted compose service '{service}'").into(), - ); - } - - sleep(Duration::from_millis(500)).await; - } - } } impl Default for ComposeDeployer { From 712f93db18b33bb13a9edc5c007a2f26412a6a27 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 14:24:27 +0100 Subject: [PATCH 08/14] refactor(compose): tighten attach errors and readability --- .../tests/compose_attach_node_control.rs | 35 +++++++--- .../compose/src/deployer/attach_provider.rs | 20 +++--- .../deployers/compose/src/deployer/mod.rs | 12 +++- .../compose/src/deployer/orchestrator.rs | 70 ++++++++++++------- 4 files changed, 86 insertions(+), 51 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 62c3c3d..386fd77 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -1,10 +1,10 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; -use anyhow::{Result, anyhow}; +use anyhow::{Error, Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; -use tokio::process::Command; +use tokio::{process::Command, time::sleep}; #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] @@ -19,7 +19,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { match deployer.deploy_with_metadata(&managed).await { Ok(result) => result, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), - Err(error) => return Err(anyhow::Error::new(error)), + Err(error) => return Err(Error::new(error)), }; let project_name = metadata @@ -37,7 +37,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { let attached_runner: Runner = match deployer.deploy(&attached).await { Ok(runner) => runner, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), - Err(error) => return Err(anyhow::Error::new(error)), + Err(error) => return Err(Error::new(error)), }; let control = attached_runner @@ -88,12 +88,25 @@ async fn service_started_at(project: &str, service: &str) -> Result { ]) .await?; - let container_id = container_id + let container_ids: Vec<&str> = container_id .lines() - .next() .map(str::trim) .filter(|value| !value.is_empty()) - .ok_or_else(|| anyhow!("no running container found for service '{service}'"))?; + .collect(); + + let container_id = match container_ids.as_slice() { + [] => { + return Err(anyhow!( + "no running container found for service '{service}'" + )); + } + [id] => *id, + _ => { + return Err(anyhow!( + "multiple running containers found for service '{service}'" + )); + } + }; let started_at = run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?; @@ -115,7 +128,7 @@ async fn wait_until_service_restarted( previous_started_at: &str, timeout: Duration, ) -> Result<()> { - let deadline = std::time::Instant::now() + timeout; + let deadline = Instant::now() + timeout; loop { let started_at = service_started_at(project, service).await?; @@ -124,13 +137,13 @@ async fn wait_until_service_restarted( return Ok(()); } - if std::time::Instant::now() >= deadline { + if Instant::now() >= deadline { return Err(anyhow!( "timed out waiting for restarted compose service '{service}'" )); } - tokio::time::sleep(Duration::from_millis(500)).await; + sleep(Duration::from_millis(500)).await; } } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 7ddf243..23d574e 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -18,6 +18,12 @@ pub(super) struct ComposeAttachProvider { _env: PhantomData, } +#[derive(Debug, thiserror::Error)] +enum ComposeAttachDiscoveryError { + #[error("compose attach source requires an explicit project name")] + MissingProjectName, +} + impl ComposeAttachProvider { pub(super) fn new(host: String) -> Self { Self { @@ -45,7 +51,7 @@ impl AttachProvider for ComposeAttachProvider { let project = project .as_ref() .ok_or_else(|| AttachProviderError::Discovery { - source: "compose attach source requires an explicit project name".into(), + source: ComposeAttachDiscoveryError::MissingProjectName.into(), })?; let services = resolve_services(project, services) @@ -57,9 +63,11 @@ impl AttachProvider for ComposeAttachProvider { let container_id = discover_service_container_id(project, service) .await .map_err(to_discovery_error)?; + let api_port = discover_api_port(&container_id) .await .map_err(to_discovery_error)?; + let endpoint = build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?; let source = ExternalNodeSource::new(service.clone(), endpoint.to_string()); @@ -84,15 +92,7 @@ async fn resolve_services(project: &str, requested: &[String]) -> Result Result { diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 55cbc2f..8bd695d 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -29,6 +29,12 @@ pub struct ComposeDeploymentMetadata { pub project_name: Option, } +#[derive(Debug, thiserror::Error)] +enum ComposeMetadataError { + #[error("compose deployment metadata has no project name")] + MissingProjectName, +} + impl ComposeDeploymentMetadata { /// Returns project name when deployment is bound to a specific compose /// project. @@ -42,9 +48,9 @@ impl ComposeDeploymentMetadata { &self, services: Vec, ) -> Result { - let Some(project_name) = self.project_name() else { - return Err("compose metadata has no project name".into()); - }; + let project_name = self + .project_name() + .ok_or(ComposeMetadataError::MissingProjectName)?; Ok(AttachSource::compose(services).with_project(project_name.to_owned())) } diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 7889f68..a001680 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -6,8 +6,8 @@ use testing_framework_core::{ ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, ScenarioSources, SourceProviders, StaticManagedProvider, - build_source_orchestration_plan, orchestrate_sources_with_providers, + Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -102,19 +102,11 @@ impl DeploymentOrchestrator { ) .await?; - // Source orchestration currently runs here after managed clients are prepared. - let source_providers = SourceProviders::default() - .with_managed(Arc::new(StaticManagedProvider::new( - deployed.node_clients.snapshot(), - ))) - .with_attach(Arc::new(ComposeAttachProvider::::new( - compose_runner_host(), - ))) - .with_external(Arc::new(ApplicationExternalProvider)); + let source_providers = self.source_providers(deployed.node_clients.snapshot()); - deployed.node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) - .await - .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + deployed.node_clients = self + .resolve_node_clients(&source_plan, source_providers) + .await?; let project_name = prepared.environment.project_name().to_owned(); @@ -147,25 +139,19 @@ impl DeploymentOrchestrator { async fn deploy_attached_only( &self, scenario: &Scenario, - source_plan: testing_framework_core::scenario::SourceOrchestrationPlan, + source_plan: SourceOrchestrationPlan, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let observability = resolve_observability_inputs(scenario)?; - let source_providers = SourceProviders::default() - .with_managed(Arc::new(StaticManagedProvider::new(Vec::new()))) - .with_attach(Arc::new(ComposeAttachProvider::::new( - compose_runner_host(), - ))) - .with_external(Arc::new(ApplicationExternalProvider)); - let node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) - .await - .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let source_providers = self.source_providers(Vec::new()); - if node_clients.is_empty() { - return Err(ComposeRunnerError::RuntimePreflight); - } + let node_clients = self + .resolve_node_clients(&source_plan, source_providers) + .await?; + + self.ensure_non_empty_node_clients(&node_clients)?; let node_control = self.attached_node_control::(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; @@ -183,6 +169,36 @@ impl DeploymentOrchestrator { Ok(Runner::new(context, Some(cleanup_guard))) } + fn source_providers(&self, managed_clients: Vec) -> SourceProviders { + SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new(managed_clients))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)) + } + + async fn resolve_node_clients( + &self, + source_plan: &SourceOrchestrationPlan, + source_providers: SourceProviders, + ) -> Result, ComposeRunnerError> { + orchestrate_sources_with_providers(source_plan, source_providers) + .await + .map_err(|source| ComposeRunnerError::SourceOrchestration { source }) + } + + fn ensure_non_empty_node_clients( + &self, + node_clients: &NodeClients, + ) -> Result<(), ComposeRunnerError> { + if node_clients.is_empty() { + return Err(ComposeRunnerError::RuntimePreflight); + } + + Ok(()) + } + fn attached_node_control( &self, scenario: &Scenario, From fd547aa119052c8f17e46e962ae8220c2ae9cf1e Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 08:04:15 +0100 Subject: [PATCH 09/14] Align compose attach with shared metadata flow --- .../tests/compose_attach_node_control.rs | 48 ++++++++++++++++++- .../deployers/compose/src/deployer/mod.rs | 10 ++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 386fd77..f265f01 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -2,10 +2,53 @@ use std::time::{Duration, Instant}; use anyhow::{Error, Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; -use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; +use testing_framework_core::scenario::{Deployer as _, Runner}; use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; use tokio::{process::Command, time::sleep}; +#[tokio::test] +#[ignore = "requires Docker and mutates compose runtime state"] +async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { + let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .with_run_duration(Duration::from_secs(5)) + .build()?; + + let deployer = LbcComposeDeployer::default(); + let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = + match deployer.deploy_with_metadata(&managed).await { + Ok(result) => result, + Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), + Err(error) => return Err(Error::new(error)), + }; + + let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; + let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .with_run_duration(Duration::from_secs(5)) + .with_attach_source(attach_source) + .build()?; + + let attached_runner: Runner = match deployer.deploy(&attached).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), + Err(error) => return Err(Error::new(error)), + }; + + if attached_runner.context().node_clients().is_empty() { + return Err(anyhow!("compose attach resolved no node clients")); + } + + for node_client in attached_runner.context().node_clients().snapshot() { + node_client.consensus_info().await.map_err(|err| { + anyhow!( + "attached node api query failed at {}: {err}", + node_client.base_url() + ) + })?; + } + + Ok(()) +} + #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { @@ -26,7 +69,8 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .project_name() .ok_or_else(|| anyhow!("compose deployment metadata has no project name"))? .to_owned(); - let attach_source = AttachSource::compose(vec![]).with_project(project_name.clone()); + + let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 8bd695d..60a88de 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -43,6 +43,16 @@ impl ComposeDeploymentMetadata { self.project_name.as_deref() } + /// Builds an attach source for the same compose project using deployer + /// discovery to resolve services. + pub fn attach_source(&self) -> Result { + let project_name = self + .project_name() + .ok_or(ComposeMetadataError::MissingProjectName)?; + + Ok(AttachSource::compose(Vec::new()).with_project(project_name.to_owned())) + } + /// Builds an attach source for the same compose project. pub fn attach_source_for_services( &self, From d4c5b9fe9984c962694e2a12e13a323b8a861f5c Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 08:32:31 +0100 Subject: [PATCH 10/14] Define attached runner readiness contract --- .../tests/compose_attach_node_control.rs | 42 +++++++-- logos/runtime/ext/src/compose_env.rs | 1 + .../core/src/scenario/control.rs | 8 ++ testing-framework/core/src/scenario/mod.rs | 2 +- .../core/src/scenario/runtime/context.rs | 35 ++++++- .../core/src/scenario/runtime/runner.rs | 4 + .../compose/assets/docker-compose.yml.tera | 1 + .../compose/src/deployer/attach_provider.rs | 94 ++++++++++++++----- .../compose/src/deployer/orchestrator.rs | 46 +++++++-- .../deployers/compose/src/descriptor/node.rs | 8 ++ .../deployers/compose/src/docker/attached.rs | 31 ++++++ 11 files changed, 234 insertions(+), 38 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index f265f01..a1fdc8d 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -33,6 +33,11 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; + attached_runner + .wait_network_ready() + .await + .map_err(|err| anyhow!("compose attached runner readiness failed: {err}"))?; + if attached_runner.context().node_clients().is_empty() { return Err(anyhow!("compose attach resolved no node clients")); } @@ -89,12 +94,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; - let services: Vec = attached_runner - .context() - .borrowed_nodes() - .into_iter() - .map(|node| node.identity) - .collect(); + let services = discover_attached_services(&project_name).await?; if services.is_empty() { return Err(anyhow!("attached compose runner discovered no services")); @@ -166,6 +166,36 @@ async fn service_started_at(project: &str, service: &str) -> Result { Ok(started_at) } +async fn discover_attached_services(project: &str) -> Result> { + let output = run_docker(&[ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + "label=testing-framework.node=true", + "--format", + "{{.Label \"com.docker.compose.service\"}}", + ]) + .await?; + + let mut services: Vec = output + .lines() + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) + .collect(); + services.sort(); + services.dedup(); + + if services.is_empty() { + return Err(anyhow!( + "attached compose runner discovered no labeled services" + )); + } + + Ok(services) +} + async fn wait_until_service_restarted( project: &str, service: &str, diff --git a/logos/runtime/ext/src/compose_env.rs b/logos/runtime/ext/src/compose_env.rs index 4fa3018..9803b12 100644 --- a/logos/runtime/ext/src/compose_env.rs +++ b/logos/runtime/ext/src/compose_env.rs @@ -254,6 +254,7 @@ fn build_compose_node_descriptor( base_volumes(), default_extra_hosts(), ports, + api_port, environment, platform, ) diff --git a/testing-framework/core/src/scenario/control.rs b/testing-framework/core/src/scenario/control.rs index d19f871..4f59621 100644 --- a/testing-framework/core/src/scenario/control.rs +++ b/testing-framework/core/src/scenario/control.rs @@ -33,3 +33,11 @@ pub trait NodeControlHandle: Send + Sync { None } } + +/// Deployer-agnostic wait surface for cluster readiness checks. +#[async_trait] +pub trait ClusterWaitHandle: Send + Sync { + async fn wait_network_ready(&self) -> Result<(), DynError> { + Err("wait_network_ready not supported by this deployer".into()) + } +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 411bdf1..f2e43d6 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -25,7 +25,7 @@ pub use capabilities::{ StartNodeOptions, StartedNode, }; pub use common_builder_ext::CoreBuilderExt; -pub use control::NodeControlHandle; +pub use control::{ClusterWaitHandle, NodeControlHandle}; #[doc(hidden)] pub use definition::{ Builder as CoreBuilder, // internal adapter-facing core builder diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 8e26133..6441149 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -1,7 +1,10 @@ use std::{sync::Arc, time::Duration}; use super::{metrics::Metrics, node_clients::ClusterClient}; -use crate::scenario::{Application, BorrowedNode, ManagedNode, NodeClients, NodeControlHandle}; +use crate::scenario::{ + Application, BorrowedNode, ClusterWaitHandle, DynError, ManagedNode, NodeClients, + NodeControlHandle, +}; /// Shared runtime context available to workloads and expectations. pub struct RunContext { @@ -12,6 +15,7 @@ pub struct RunContext { telemetry: Metrics, feed: ::Feed, node_control: Option>>, + cluster_wait: Option>>, } impl RunContext { @@ -36,9 +40,16 @@ impl RunContext { telemetry, feed, node_control, + cluster_wait: None, } } + #[must_use] + pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + self.cluster_wait = Some(cluster_wait); + self + } + #[must_use] pub fn descriptors(&self) -> &E::Deployment { &self.descriptors @@ -104,11 +115,29 @@ impl RunContext { self.node_control.clone() } + #[must_use] + pub fn cluster_wait(&self) -> Option>> { + self.cluster_wait.clone() + } + #[must_use] pub const fn controls_nodes(&self) -> bool { self.node_control.is_some() } + #[must_use] + pub const fn can_wait_network_ready(&self) -> bool { + self.cluster_wait.is_some() + } + + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + let Some(cluster_wait) = self.cluster_wait() else { + return Err("wait_network_ready is not available for this runner".into()); + }; + + cluster_wait.wait_network_ready().await + } + #[must_use] pub const fn cluster_client(&self) -> ClusterClient<'_, E> { self.node_clients.cluster_client() @@ -156,6 +185,10 @@ impl RunHandle { pub fn context(&self) -> &RunContext { &self.run_context } + + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.run_context.wait_network_ready().await + } } /// Derived metrics about the current run timing. diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index bcc453b..ac7bff9 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -43,6 +43,10 @@ impl Runner { Arc::clone(&self.context) } + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.context.wait_network_ready().await + } + pub(crate) fn cleanup(&mut self) { if let Some(guard) = self.cleanup_guard.take() { guard.cleanup(); diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index 75fe122..c6ecc29 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -20,6 +20,7 @@ services: {% endfor %} labels: testing-framework.node: "true" + testing-framework.api-container-port: "{{ node.api_container_port }}" environment: {% for env in node.environment %} {{ env.key }}: "{{ env.value }}" diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 23d574e..a1d0412 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -2,13 +2,15 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource, + AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, + ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; use url::Url; use crate::{ docker::attached::{ - discover_attachable_services, discover_service_container_id, inspect_mapped_tcp_ports, + discover_attachable_services, discover_service_container_id, + inspect_api_container_port_label, inspect_mapped_tcp_ports, }, env::ComposeDeployEnv, }; @@ -18,6 +20,12 @@ pub(super) struct ComposeAttachProvider { _env: PhantomData, } +pub(super) struct ComposeAttachedClusterWait { + host: String, + source: AttachSource, + _env: PhantomData, +} + #[derive(Debug, thiserror::Error)] enum ComposeAttachDiscoveryError { #[error("compose attach source requires an explicit project name")] @@ -33,6 +41,16 @@ impl ComposeAttachProvider { } } +impl ComposeAttachedClusterWait { + pub(super) fn new(host: String, source: AttachSource) -> Self { + Self { + host, + source, + _env: PhantomData, + } + } +} + #[async_trait] impl AttachProvider for ComposeAttachProvider { async fn discover( @@ -87,7 +105,10 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } -async fn resolve_services(project: &str, requested: &[String]) -> Result, DynError> { +pub(super) async fn resolve_services( + project: &str, + requested: &[String], +) -> Result, DynError> { if !requested.is_empty() { return Ok(requested.to_owned()); } @@ -95,34 +116,61 @@ async fn resolve_services(project: &str, requested: &[String]) -> Result Result { +pub(super) async fn discover_api_port(container_id: &str) -> Result { let mapped_ports = inspect_mapped_tcp_ports(container_id).await?; - match mapped_ports.as_slice() { - [] => Err(format!( - "no mapped tcp ports discovered for attached compose service container '{container_id}'" - ) - .into()), - [port] => Ok(port.host_port), - _ => { - let mapped_ports = mapped_ports - .iter() - .map(|port| format!("{}->{}", port.container_port, port.host_port)) - .collect::>() - .join(", "); + let api_container_port = inspect_api_container_port_label(container_id).await?; + let Some(api_port) = mapped_ports + .iter() + .find(|port| port.container_port == api_container_port) + .map(|port| port.host_port) + else { + let mapped_ports = mapped_ports + .iter() + .map(|port| format!("{}->{}", port.container_port, port.host_port)) + .collect::>() + .join(", "); - Err(format!( - "attached compose service container '{container_id}' has multiple mapped tcp ports ({mapped_ports}); provide a single exposed API port" - ) - .into()) - } - } + return Err(format!( + "attached compose service container '{container_id}' does not expose labeled API container port {api_container_port}; mapped tcp ports: {mapped_ports}" + ) + .into()); + }; + + Ok(api_port) } -fn build_service_endpoint(host: &str, port: u16) -> Result { +pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result { let endpoint = Url::parse(&format!("http://{host}:{port}/"))?; Ok(endpoint) } +#[async_trait] +impl ClusterWaitHandle for ComposeAttachedClusterWait { + async fn wait_network_ready(&self) -> Result<(), DynError> { + let AttachSource::Compose { project, services } = &self.source else { + return Err("compose cluster wait requires a compose attach source".into()); + }; + + let project = project + .as_ref() + .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; + let services = resolve_services(project, services).await?; + + let mut endpoints = Vec::with_capacity(services.len()); + for service in &services { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let mut endpoint = build_service_endpoint(&self.host, api_port)?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; + + Ok(()) + } +} + #[cfg(test)] mod tests { use super::build_service_endpoint; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index a001680..5d69218 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,11 +3,12 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle, - FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, - ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders, - StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, + ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, + DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, + RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, + SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -15,7 +16,7 @@ use tracing::info; use super::{ ComposeDeployer, ComposeDeploymentMetadata, - attach_provider::ComposeAttachProvider, + attach_provider::{ComposeAttachProvider, ComposeAttachedClusterWait}, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, @@ -117,6 +118,7 @@ impl DeploymentOrchestrator { deployed, observability, readiness_enabled, + project_name.clone(), ) .await?; @@ -154,6 +156,7 @@ impl DeploymentOrchestrator { self.ensure_non_empty_node_clients(&node_clients)?; let node_control = self.attached_node_control::(scenario)?; + let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; let context = RunContext::new( scenario.deployment().clone(), @@ -163,7 +166,8 @@ impl DeploymentOrchestrator { observability.telemetry_handle()?, feed, node_control, - ); + ) + .with_cluster_wait(cluster_wait); let cleanup_guard: Box = Box::new(feed_task); Ok(Runner::new(context, Some(cleanup_guard))) @@ -237,6 +241,25 @@ impl DeploymentOrchestrator { }) as Arc>)) } + fn attached_cluster_wait( + &self, + scenario: &Scenario, + ) -> Result>, ComposeRunnerError> + where + Caps: Send + Sync, + { + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(ComposeRunnerError::InternalInvariant { + message: "compose attached cluster wait requested outside attached source mode", + }); + }; + + Ok(Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + attach.clone(), + ))) + } + async fn build_runner( &self, scenario: &Scenario, @@ -244,6 +267,7 @@ impl DeploymentOrchestrator { deployed: DeployedNodes, observability: ObservabilityInputs, readiness_enabled: bool, + project_name: String, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, @@ -263,6 +287,10 @@ impl DeploymentOrchestrator { telemetry, environment: &mut prepared.environment, node_control, + cluster_wait: Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + AttachSource::compose(Vec::new()).with_project(project_name), + )), }; let runtime = build_compose_runtime::(input).await?; let cleanup_guard = @@ -376,6 +404,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, + cluster_wait: Arc>, } async fn build_compose_runtime( @@ -400,6 +429,7 @@ async fn build_compose_runtime( input.telemetry, feed, input.node_control, + input.cluster_wait, ); Ok(ComposeRuntime { context, feed_task }) @@ -443,6 +473,7 @@ fn build_run_context( telemetry: Metrics, feed: ::Feed, node_control: Option>>, + cluster_wait: Arc>, ) -> RunContext { RunContext::new( descriptors, @@ -453,6 +484,7 @@ fn build_run_context( feed, node_control, ) + .with_cluster_wait(cluster_wait) } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index c5f769b..d35f8f5 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -9,6 +9,7 @@ pub struct NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, + api_container_port: u16, environment: Vec, #[serde(skip_serializing_if = "Option::is_none")] platform: Option, @@ -49,6 +50,7 @@ impl NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, + api_container_port: u16, environment: Vec, platform: Option, ) -> Self { @@ -59,6 +61,7 @@ impl NodeDescriptor { volumes, extra_hosts, ports, + api_container_port, environment, platform, } @@ -77,4 +80,9 @@ impl NodeDescriptor { pub fn environment(&self) -> &[EnvEntry] { &self.environment } + + #[cfg(test)] + pub fn api_container_port(&self) -> u16 { + self.api_container_port + } } diff --git a/testing-framework/deployers/compose/src/docker/attached.rs b/testing-framework/deployers/compose/src/docker/attached.rs index babe175..b558a41 100644 --- a/testing-framework/deployers/compose/src/docker/attached.rs +++ b/testing-framework/deployers/compose/src/docker/attached.rs @@ -6,6 +6,7 @@ use tokio::process::Command; pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node"; pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true"; +pub const API_CONTAINER_PORT_LABEL_KEY: &str = "testing-framework.api-container-port"; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct MappedTcpPort { @@ -75,6 +76,18 @@ pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result Result { + let stdout = run_docker_capture([ + "inspect", + "--format", + "{{index .Config.Labels \"testing-framework.api-container-port\"}}", + container_id, + ]) + .await?; + + parse_api_container_port_label(&stdout) +} + pub fn parse_mapped_tcp_ports(raw: &str) -> Result, DynError> { let ports_value: Value = serde_json::from_str(raw.trim())?; let ports_object = ports_value @@ -194,3 +207,21 @@ fn parse_host_port_binding(binding: &Value) -> Option { .parse::() .ok() } + +fn parse_api_container_port_label(raw: &str) -> Result { + let value = raw.trim(); + + if value.is_empty() || value == "" { + return Err(format!( + "attached compose container is missing required label '{API_CONTAINER_PORT_LABEL_KEY}'" + ) + .into()); + } + + value.parse::().map_err(|err| { + format!( + "attached compose container label '{API_CONTAINER_PORT_LABEL_KEY}' has invalid value '{value}': {err}" + ) + .into() + }) +} From d34ac87411e76ea667a9cde79bec0489a68d0a38 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 08:44:35 +0100 Subject: [PATCH 11/14] Ensure runners clean up attached resources on drop --- testing-framework/core/src/scenario/runtime/runner.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index ac7bff9..652d740 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -27,6 +27,12 @@ pub struct Runner { cleanup_guard: Option>, } +impl Drop for Runner { + fn drop(&mut self) { + self.cleanup(); + } +} + impl Runner { /// Construct a runner from the run context and optional cleanup guard. #[must_use] From 45bd07737e4b8d7992223a9b582418adca5d6d19 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 08:58:08 +0100 Subject: [PATCH 12/14] Refine compose attach readability --- .../tests/compose_attach_node_control.rs | 46 +++--- .../core/src/scenario/runtime/context.rs | 17 ++- .../compose/src/deployer/attach_provider.rs | 131 +++++++++++------- .../compose/src/deployer/orchestrator.rs | 19 ++- 4 files changed, 125 insertions(+), 88 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index a1fdc8d..1f03ea7 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -93,13 +93,8 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .context() .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; - let services = discover_attached_services(&project_name).await?; - if services.is_empty() { - return Err(anyhow!("attached compose runner discovered no services")); - } - for service in services { let pre_restart_started_at = service_started_at(&project_name, &service).await?; @@ -121,7 +116,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { } async fn service_started_at(project: &str, service: &str) -> Result { - let container_id = run_docker(&[ + let container_output = run_docker(&[ "ps", "--filter", &format!("label=com.docker.compose.project={project}"), @@ -131,26 +126,7 @@ async fn service_started_at(project: &str, service: &str) -> Result { "{{.ID}}", ]) .await?; - - let container_ids: Vec<&str> = container_id - .lines() - .map(str::trim) - .filter(|value| !value.is_empty()) - .collect(); - - let container_id = match container_ids.as_slice() { - [] => { - return Err(anyhow!( - "no running container found for service '{service}'" - )); - } - [id] => *id, - _ => { - return Err(anyhow!( - "multiple running containers found for service '{service}'" - )); - } - }; + let container_id = single_container_id(service, &container_output)?; let started_at = run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?; @@ -166,6 +142,24 @@ async fn service_started_at(project: &str, service: &str) -> Result { Ok(started_at) } +fn single_container_id<'a>(service: &str, output: &'a str) -> Result<&'a str> { + let container_ids: Vec<&str> = output + .lines() + .map(str::trim) + .filter(|value| !value.is_empty()) + .collect(); + + match container_ids.as_slice() { + [] => Err(anyhow!( + "no running container found for service '{service}'" + )), + [id] => Ok(*id), + _ => Err(anyhow!( + "multiple running containers found for service '{service}'" + )), + } +} + async fn discover_attached_services(project: &str) -> Result> { let output = run_docker(&[ "ps", diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 6441149..65f28a5 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -6,6 +6,12 @@ use crate::scenario::{ NodeControlHandle, }; +#[derive(Debug, thiserror::Error)] +enum RunContextCapabilityError { + #[error("wait_network_ready is not available for this runner")] + MissingClusterWait, +} + /// Shared runtime context available to workloads and expectations. pub struct RunContext { descriptors: E::Deployment, @@ -131,17 +137,18 @@ impl RunContext { } pub async fn wait_network_ready(&self) -> Result<(), DynError> { - let Some(cluster_wait) = self.cluster_wait() else { - return Err("wait_network_ready is not available for this runner".into()); - }; - - cluster_wait.wait_network_ready().await + self.require_cluster_wait()?.wait_network_ready().await } #[must_use] pub const fn cluster_client(&self) -> ClusterClient<'_, E> { self.node_clients.cluster_client() } + + fn require_cluster_wait(&self) -> Result>, DynError> { + self.cluster_wait() + .ok_or_else(|| RunContextCapabilityError::MissingClusterWait.into()) + } } /// Handle returned by the runner to control the lifecycle of the run. diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index a1d0412..87358aa 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -51,50 +51,29 @@ impl ComposeAttachedClusterWait { } } +struct ComposeAttachRequest<'a> { + project: &'a str, + services: &'a [String], +} + #[async_trait] impl AttachProvider for ComposeAttachProvider { async fn discover( &self, source: &AttachSource, ) -> Result>, AttachProviderError> { - let (project, services) = match source { - AttachSource::Compose { project, services } => (project, services), - _ => { - return Err(AttachProviderError::UnsupportedSource { - attach_source: source.clone(), - }); - } - }; - - let project = project - .as_ref() - .ok_or_else(|| AttachProviderError::Discovery { - source: ComposeAttachDiscoveryError::MissingProjectName.into(), - })?; - - let services = resolve_services(project, services) + let request = compose_attach_request(source)?; + let services = resolve_services(request.project, request.services) .await .map_err(to_discovery_error)?; let mut attached = Vec::with_capacity(services.len()); for service in &services { - let container_id = discover_service_container_id(project, service) - .await - .map_err(to_discovery_error)?; - - let api_port = discover_api_port(&container_id) - .await - .map_err(to_discovery_error)?; - - let endpoint = - build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?; - let source = ExternalNodeSource::new(service.clone(), endpoint.to_string()); - let client = E::external_node_client(&source).map_err(to_discovery_error)?; - - attached.push(AttachedNode { - identity_hint: Some(service.clone()), - client, - }); + attached.push( + build_attached_node::(&self.host, request.project, service) + .await + .map_err(to_discovery_error)?, + ); } Ok(attached) @@ -105,6 +84,41 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } +fn compose_attach_request( + source: &AttachSource, +) -> Result, AttachProviderError> { + let AttachSource::Compose { project, services } = source else { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + }; + + let project = project + .as_deref() + .ok_or_else(|| AttachProviderError::Discovery { + source: ComposeAttachDiscoveryError::MissingProjectName.into(), + })?; + + Ok(ComposeAttachRequest { project, services }) +} + +async fn build_attached_node( + host: &str, + project: &str, + service: &str, +) -> Result, DynError> { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let endpoint = build_service_endpoint(host, api_port)?; + let source = ExternalNodeSource::new(service.to_owned(), endpoint.to_string()); + let client = E::external_node_client(&source)?; + + Ok(AttachedNode { + identity_hint: Some(service.to_owned()), + client, + }) +} + pub(super) async fn resolve_services( project: &str, requested: &[String], @@ -147,23 +161,10 @@ pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result ClusterWaitHandle for ComposeAttachedClusterWait { async fn wait_network_ready(&self) -> Result<(), DynError> { - let AttachSource::Compose { project, services } = &self.source else { - return Err("compose cluster wait requires a compose attach source".into()); - }; - - let project = project - .as_ref() - .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; - let services = resolve_services(project, services).await?; - - let mut endpoints = Vec::with_capacity(services.len()); - for service in &services { - let container_id = discover_service_container_id(project, service).await?; - let api_port = discover_api_port(&container_id).await?; - let mut endpoint = build_service_endpoint(&self.host, api_port)?; - endpoint.set_path(E::readiness_path()); - endpoints.push(endpoint); - } + let request = compose_wait_request(&self.source)?; + let services = resolve_services(request.project, request.services).await?; + let endpoints = + collect_readiness_endpoints::(&self.host, request.project, &services).await?; wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; @@ -171,6 +172,36 @@ impl ClusterWaitHandle for ComposeAttachedClusterWait } } +fn compose_wait_request(source: &AttachSource) -> Result, DynError> { + let AttachSource::Compose { project, services } = source else { + return Err("compose cluster wait requires a compose attach source".into()); + }; + + let project = project + .as_deref() + .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; + + Ok(ComposeAttachRequest { project, services }) +} + +async fn collect_readiness_endpoints( + host: &str, + project: &str, + services: &[String], +) -> Result, DynError> { + let mut endpoints = Vec::with_capacity(services.len()); + + for service in services { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let mut endpoint = build_service_endpoint(host, api_port)?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + Ok(endpoints) +} + #[cfg(test)] mod tests { use super::build_service_endpoint; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 5d69218..831f3fe 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -158,7 +158,7 @@ impl DeploymentOrchestrator { let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; - let context = RunContext::new( + let context = build_run_context( scenario.deployment().clone(), node_clients, scenario.duration(), @@ -166,8 +166,8 @@ impl DeploymentOrchestrator { observability.telemetry_handle()?, feed, node_control, - ) - .with_cluster_wait(cluster_wait); + cluster_wait, + ); let cleanup_guard: Box = Box::new(feed_task); Ok(Runner::new(context, Some(cleanup_guard))) @@ -274,6 +274,7 @@ impl DeploymentOrchestrator { { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); + let cluster_wait = self.managed_cluster_wait(project_name); log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); @@ -287,10 +288,7 @@ impl DeploymentOrchestrator { telemetry, environment: &mut prepared.environment, node_control, - cluster_wait: Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - AttachSource::compose(Vec::new()).with_project(project_name), - )), + cluster_wait, }; let runtime = build_compose_runtime::(input).await?; let cleanup_guard = @@ -320,6 +318,13 @@ impl DeploymentOrchestrator { }) } + fn managed_cluster_wait(&self, project_name: String) -> Arc> { + Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + AttachSource::compose(Vec::new()).with_project(project_name), + )) + } + fn log_deploy_start( &self, scenario: &Scenario, From 2c5eade572ce2396515a2776e9f0cccd7d0b0fa8 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 09:17:10 +0100 Subject: [PATCH 13/14] Use fresh deployer in compose attach tests --- .../examples/tests/compose_attach_node_control.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 1f03ea7..8b99646 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -13,9 +13,9 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { .with_run_duration(Duration::from_secs(5)) .build()?; - let deployer = LbcComposeDeployer::default(); + let managed_deployer = LbcComposeDeployer::default(); let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = - match deployer.deploy_with_metadata(&managed).await { + match managed_deployer.deploy_with_metadata(&managed).await { Ok(result) => result, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(Error::new(error)), @@ -27,7 +27,8 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { .with_attach_source(attach_source) .build()?; - let attached_runner: Runner = match deployer.deploy(&attached).await { + let attached_deployer = LbcComposeDeployer::default(); + let attached_runner: Runner = match attached_deployer.deploy(&attached).await { Ok(runner) => runner, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(Error::new(error)), @@ -62,9 +63,9 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .with_run_duration(Duration::from_secs(5)) .build()?; - let deployer = LbcComposeDeployer::default(); + let managed_deployer = LbcComposeDeployer::default(); let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = - match deployer.deploy_with_metadata(&managed).await { + match managed_deployer.deploy_with_metadata(&managed).await { Ok(result) => result, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(Error::new(error)), @@ -83,7 +84,8 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .with_attach_source(attach_source) .build()?; - let attached_runner: Runner = match deployer.deploy(&attached).await { + let attached_deployer = LbcComposeDeployer::default(); + let attached_runner: Runner = match attached_deployer.deploy(&attached).await { Ok(runner) => runner, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(Error::new(error)), From 1bffca40dfc8604a9bbc96fdece32b988900d3d5 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 09:25:54 +0100 Subject: [PATCH 14/14] Simplify compose attach coverage --- .../tests/compose_attach_node_control.rs | 180 +----------------- 1 file changed, 1 insertion(+), 179 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 8b99646..fa3befd 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -1,10 +1,9 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; use anyhow::{Error, Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; use testing_framework_core::scenario::{Deployer as _, Runner}; use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; -use tokio::{process::Command, time::sleep}; #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] @@ -54,180 +53,3 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { Ok(()) } - -#[tokio::test] -#[ignore = "requires Docker and mutates compose runtime state"] -async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { - let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) - .enable_node_control() - .with_run_duration(Duration::from_secs(5)) - .build()?; - - let managed_deployer = LbcComposeDeployer::default(); - let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = - match managed_deployer.deploy_with_metadata(&managed).await { - Ok(result) => result, - Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), - Err(error) => return Err(Error::new(error)), - }; - - let project_name = metadata - .project_name() - .ok_or_else(|| anyhow!("compose deployment metadata has no project name"))? - .to_owned(); - - let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; - - let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) - .enable_node_control() - .with_run_duration(Duration::from_secs(5)) - .with_attach_source(attach_source) - .build()?; - - let attached_deployer = LbcComposeDeployer::default(); - let attached_runner: Runner = match attached_deployer.deploy(&attached).await { - Ok(runner) => runner, - Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), - Err(error) => return Err(Error::new(error)), - }; - - let control = attached_runner - .context() - .node_control() - .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; - let services = discover_attached_services(&project_name).await?; - - for service in services { - let pre_restart_started_at = service_started_at(&project_name, &service).await?; - - control - .restart_node(&service) - .await - .map_err(|err| anyhow!("attached restart failed for {service}: {err}"))?; - - wait_until_service_restarted( - &project_name, - &service, - &pre_restart_started_at, - Duration::from_secs(30), - ) - .await?; - } - - Ok(()) -} - -async fn service_started_at(project: &str, service: &str) -> Result { - let container_output = run_docker(&[ - "ps", - "--filter", - &format!("label=com.docker.compose.project={project}"), - "--filter", - &format!("label=com.docker.compose.service={service}"), - "--format", - "{{.ID}}", - ]) - .await?; - let container_id = single_container_id(service, &container_output)?; - - let started_at = - run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?; - - let started_at = started_at.trim().to_owned(); - - if started_at.is_empty() { - return Err(anyhow!( - "docker inspect returned empty StartedAt for service '{service}'" - )); - } - - Ok(started_at) -} - -fn single_container_id<'a>(service: &str, output: &'a str) -> Result<&'a str> { - let container_ids: Vec<&str> = output - .lines() - .map(str::trim) - .filter(|value| !value.is_empty()) - .collect(); - - match container_ids.as_slice() { - [] => Err(anyhow!( - "no running container found for service '{service}'" - )), - [id] => Ok(*id), - _ => Err(anyhow!( - "multiple running containers found for service '{service}'" - )), - } -} - -async fn discover_attached_services(project: &str) -> Result> { - let output = run_docker(&[ - "ps", - "--filter", - &format!("label=com.docker.compose.project={project}"), - "--filter", - "label=testing-framework.node=true", - "--format", - "{{.Label \"com.docker.compose.service\"}}", - ]) - .await?; - - let mut services: Vec = output - .lines() - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(ToOwned::to_owned) - .collect(); - services.sort(); - services.dedup(); - - if services.is_empty() { - return Err(anyhow!( - "attached compose runner discovered no labeled services" - )); - } - - Ok(services) -} - -async fn wait_until_service_restarted( - project: &str, - service: &str, - previous_started_at: &str, - timeout: Duration, -) -> Result<()> { - let deadline = Instant::now() + timeout; - - loop { - let started_at = service_started_at(project, service).await?; - - if started_at != previous_started_at { - return Ok(()); - } - - if Instant::now() >= deadline { - return Err(anyhow!( - "timed out waiting for restarted compose service '{service}'" - )); - } - - sleep(Duration::from_millis(500)).await; - } -} - -async fn run_docker(args: &[&str]) -> Result { - let output = Command::new("docker").args(args).output().await?; - - if !output.status.success() { - return Err(anyhow!( - "docker {} failed with status {}: {}", - args.join(" "), - output.status, - String::from_utf8_lossy(&output.stderr).trim() - )); - } - - Ok(String::from_utf8_lossy(&output.stdout).to_string()) -}