feat(compose): require labeled services for attach discovery

This commit is contained in:
andrussal 2026-03-06 13:47:25 +01:00
parent 7127c10aa6
commit 06613a1e75
5 changed files with 123 additions and 15 deletions

View File

@ -21,14 +21,22 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
Err(error) => return Err(anyhow::Error::new(error)),
};
let services = metadata
.discover_services()
.await
.map_err(|err| anyhow!("{err}"))?;
let service = services
.first()
.cloned()
.ok_or_else(|| anyhow!("compose deployment metadata discovered no services"))?;
let attach_source = metadata
.attach_source_for_services(services)
.map_err(|err| anyhow!("{err}"))?;
let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.enable_node_control()
.with_run_duration(Duration::from_secs(5))
.with_attach_source(
metadata
.attach_source_for_services(vec!["node-0".to_owned()])
.map_err(|err| anyhow!("{err}"))?,
)
.with_attach_source(attach_source)
.build()?;
let attached_runner: Runner<LbcExtEnv> = match deployer.deploy(&attached).await {
@ -38,7 +46,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
};
let pre_restart_started_at = metadata
.service_started_at("node-0")
.service_started_at(&service)
.await
.map_err(|err| anyhow!("{err}"))?;
@ -48,12 +56,12 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
.ok_or_else(|| anyhow!("attached compose node control is unavailable"))?;
control
.restart_node("node-0")
.restart_node(&service)
.await
.map_err(|err| anyhow!("attached restart failed: {err}"))?;
metadata
.wait_until_service_restarted("node-0", &pre_restart_started_at, Duration::from_secs(30))
.wait_until_service_restarted(&service, &pre_restart_started_at, Duration::from_secs(30))
.await
.map_err(|err| anyhow!("{err}"))?;

View File

@ -18,6 +18,8 @@ services:
{% for port in node.ports %}
- {{ port }}
{% endfor %}
labels:
testing-framework.node: "true"
environment:
{% for env in node.environment %}
{{ env.key }}: "{{ env.value }}"

View File

@ -7,7 +7,9 @@ use testing_framework_core::scenario::{
use url::Url;
use crate::{
docker::attached::{discover_service_container_id, inspect_mapped_tcp_ports},
docker::attached::{
discover_attachable_services, discover_service_container_id, inspect_mapped_tcp_ports,
},
env::ComposeDeployEnv,
};
@ -46,14 +48,12 @@ impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
source: "compose attach source requires an explicit project name".into(),
})?;
if services.is_empty() {
return Err(AttachProviderError::Discovery {
source: "compose attach source requires at least one service name".into(),
});
}
let services = resolve_services(project, services)
.await
.map_err(to_discovery_error)?;
let mut attached = Vec::with_capacity(services.len());
for service in services {
for service in &services {
let container_id = discover_service_container_id(project, service)
.await
.map_err(to_discovery_error)?;
@ -79,6 +79,22 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
async fn resolve_services(project: &str, requested: &[String]) -> Result<Vec<String>, DynError> {
if !requested.is_empty() {
return Ok(requested.to_owned());
}
let discovered = discover_attachable_services(project).await?;
if discovered.is_empty() {
return Err(
format!("no running compose services discovered for project '{project}'").into(),
);
}
Ok(discovered)
}
async fn discover_api_port(container_id: &str) -> Result<u16, DynError> {
let mapped_ports = inspect_mapped_tcp_ports(container_id).await?;
match mapped_ports.as_slice() {

View File

@ -50,6 +50,21 @@ impl ComposeDeploymentMetadata {
Ok(AttachSource::compose(services).with_project(project_name.to_owned()))
}
/// Discovers compose node services and builds an attach source for them.
pub async fn attach_source_for_discovered_services(&self) -> Result<AttachSource, DynError> {
let services = self.discover_services().await?;
self.attach_source_for_services(services)
}
/// Discovers node services for this compose deployment.
pub async fn discover_services(&self) -> Result<Vec<String>, DynError> {
let Some(project_name) = self.project_name() else {
return Err("compose metadata has no project name".into());
};
crate::docker::attached::discover_attachable_services(project_name).await
}
/// Returns the current StartedAt timestamp for a compose service container.
pub async fn service_started_at(&self, service: &str) -> Result<String, DynError> {
let Some(project_name) = self.project_name() else {

View File

@ -4,6 +4,9 @@ use serde_json::Value;
use testing_framework_core::scenario::DynError;
use tokio::process::Command;
pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node";
pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true";
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct MappedTcpPort {
pub container_port: u16,
@ -45,6 +48,21 @@ pub async fn discover_service_container_id(
}
}
pub async fn discover_attachable_services(project: &str) -> Result<Vec<String>, DynError> {
let attachable_filter =
format!("label={ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}");
let attachable = discover_services_with_filters(project, Some(&attachable_filter)).await?;
if attachable.is_empty() {
return Err(format!(
"no running compose services with label '{ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}' found for project '{project}'"
)
.into());
}
Ok(attachable)
}
pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result<Vec<MappedTcpPort>, DynError> {
let stdout = run_docker_capture([
"inspect",
@ -111,6 +129,55 @@ pub async fn run_docker_capture<const N: usize>(args: [&str; N]) -> Result<Strin
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
async fn discover_services_with_filters(
project: &str,
extra_filter: Option<&str>,
) -> Result<Vec<String>, DynError> {
let mut args = vec![
"ps".to_owned(),
"--filter".to_owned(),
format!("label=com.docker.compose.project={project}"),
];
if let Some(filter) = extra_filter {
args.push("--filter".to_owned());
args.push(filter.to_owned());
}
args.push("--format".to_owned());
args.push("{{.Label \"com.docker.compose.service\"}}".to_owned());
let output = Command::new("docker")
.args(&args)
.stdin(Stdio::null())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.output()
.await?;
if !output.status.success() {
return Err(format!(
"docker {} failed with status {}: {}",
args.join(" "),
output.status,
String::from_utf8_lossy(&output.stderr).trim()
)
.into());
}
let mut services: Vec<String> = output
.stdout
.split(|byte| *byte == b'\n')
.filter_map(|line| {
let parsed = String::from_utf8_lossy(line).trim().to_owned();
(!parsed.is_empty()).then_some(parsed)
})
.collect();
services.sort();
services.dedup();
Ok(services)
}
fn parse_container_port(port_key: &str) -> Option<u16> {
let (port, proto) = port_key.split_once('/')?;
if proto != "tcp" {