refactor(compose): add metadata helpers for attach node-control test API

This commit is contained in:
andrussal 2026-03-06 13:31:03 +01:00
parent 6226f51598
commit 7127c10aa6
3 changed files with 96 additions and 120 deletions

View File

@ -1,13 +1,9 @@
use std::{
process::{Command, Stdio},
thread,
time::Duration,
};
use std::time::Duration;
use anyhow::{Result, anyhow};
use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder};
use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner};
use testing_framework_runner_compose::ComposeRunnerError;
use testing_framework_core::scenario::{Deployer as _, Runner};
use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError};
#[tokio::test]
#[ignore = "requires Docker and mutates compose runtime state"]
@ -18,33 +14,20 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
.build()?;
let deployer = LbcComposeDeployer::default();
let (managed_runner, metadata): (Runner<LbcExtEnv>, _) =
let (_managed_runner, metadata): (Runner<LbcExtEnv>, ComposeDeploymentMetadata) =
match deployer.deploy_with_metadata(&managed).await {
Ok(result) => result,
Err(ComposeRunnerError::DockerUnavailable) => return Ok(()),
Err(error) => return Err(anyhow::Error::new(error)),
};
let managed_client = managed_runner
.context()
.node_clients()
.snapshot()
.into_iter()
.next()
.ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?;
managed_client
.base_url()
.port()
.ok_or_else(|| anyhow!("managed node base url has no port"))?;
let project_name = metadata
.project_name
.ok_or_else(|| anyhow!("compose metadata did not include project name"))?;
let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.enable_node_control()
.with_run_duration(Duration::from_secs(5))
.with_attach_source(
AttachSource::compose(vec!["node-0".to_owned()]).with_project(project_name.clone()),
metadata
.attach_source_for_services(vec!["node-0".to_owned()])
.map_err(|err| anyhow!("{err}"))?,
)
.build()?;
@ -53,107 +36,26 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
Err(ComposeRunnerError::DockerUnavailable) => return Ok(()),
Err(error) => return Err(anyhow::Error::new(error)),
};
let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?;
let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?;
let pre_restart_started_at = metadata
.service_started_at("node-0")
.await
.map_err(|err| anyhow!("{err}"))?;
let control = attached_runner
.context()
.node_control()
.ok_or_else(|| anyhow!("attached compose node control is unavailable"))?;
control
.restart_node("node-0")
.await
.map_err(|err| anyhow!("attached restart failed: {err}"))?;
wait_until_container_restarted(
&project_name,
"node-0",
&pre_restart_started_at,
Duration::from_secs(30),
)?;
metadata
.wait_until_service_restarted("node-0", &pre_restart_started_at, Duration::from_secs(30))
.await
.map_err(|err| anyhow!("{err}"))?;
Ok(())
}
fn discover_compose_service_container(project: &str, service: &str) -> Result<String> {
let container = run_docker_capture([
"ps",
"--filter",
&format!("label=com.docker.compose.project={project}"),
"--filter",
&format!("label=com.docker.compose.service={service}"),
"--format",
"{{.ID}}",
])?;
let mut lines = container
.lines()
.map(str::trim)
.filter(|line| !line.is_empty());
let Some(container_id) = lines.next() else {
return Err(anyhow!(
"no running container found for compose project '{project}' service '{service}'"
));
};
if lines.next().is_some() {
return Err(anyhow!(
"multiple running containers found for compose project '{project}' service '{service}'"
));
}
Ok(container_id.to_owned())
}
fn inspect_container_started_at(container_id: &str) -> Result<String> {
let started_at =
run_docker_capture(["inspect", "--format", "{{.State.StartedAt}}", container_id])?;
let started_at = started_at.trim();
if started_at.is_empty() {
return Err(anyhow!(
"docker inspect returned empty StartedAt for container {container_id}"
));
}
Ok(started_at.to_owned())
}
fn wait_until_container_restarted(
project: &str,
service: &str,
previous_started_at: &str,
timeout: Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + timeout;
loop {
let container_id = discover_compose_service_container(project, service)?;
let started_at = inspect_container_started_at(&container_id)?;
if started_at != previous_started_at {
return Ok(());
}
if std::time::Instant::now() >= deadline {
return Err(anyhow!(
"timed out waiting for restarted container timestamp change: {project}/{service}"
));
}
thread::sleep(Duration::from_millis(500));
}
}
fn run_docker_capture<const N: usize>(args: [&str; N]) -> Result<String> {
let output = Command::new("docker")
.args(args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()?;
if !output.status.success() {
return Err(anyhow!(
"docker {} failed: status={} stderr={}",
args.join(" "),
output.status,
String::from_utf8_lossy(&output.stderr).trim()
));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}

View File

@ -5,13 +5,14 @@ pub mod ports;
pub mod readiness;
pub mod setup;
use std::marker::PhantomData;
use std::{marker::PhantomData, time::Duration};
use async_trait::async_trait;
use testing_framework_core::scenario::{
CleanupGuard, Deployer, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl,
Runner, Scenario,
AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider,
RequiresNodeControl, Runner, Scenario,
};
use tokio::time::sleep;
use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup};
@ -29,6 +30,79 @@ pub struct ComposeDeploymentMetadata {
pub project_name: Option<String>,
}
impl ComposeDeploymentMetadata {
/// Returns project name when deployment is bound to a specific compose
/// project.
#[must_use]
pub fn project_name(&self) -> Option<&str> {
self.project_name.as_deref()
}
/// Builds an attach source for the same compose project.
pub fn attach_source_for_services(
&self,
services: Vec<String>,
) -> Result<AttachSource, DynError> {
let Some(project_name) = self.project_name() else {
return Err("compose metadata has no project name".into());
};
Ok(AttachSource::compose(services).with_project(project_name.to_owned()))
}
/// Returns the current StartedAt timestamp for a compose service container.
pub async fn service_started_at(&self, service: &str) -> Result<String, DynError> {
let Some(project_name) = self.project_name() else {
return Err("compose metadata has no project name".into());
};
let container_id =
crate::docker::attached::discover_service_container_id(project_name, service).await?;
let started_at = crate::docker::attached::run_docker_capture([
"inspect",
"--format",
"{{.State.StartedAt}}",
&container_id,
])
.await?;
let started_at = started_at.trim();
if started_at.is_empty() {
return Err(format!(
"docker inspect returned empty StartedAt for compose service '{service}'"
)
.into());
}
Ok(started_at.to_owned())
}
/// Waits until a service container reports a different StartedAt timestamp.
pub async fn wait_until_service_restarted(
&self,
service: &str,
previous_started_at: &str,
timeout: Duration,
) -> Result<(), DynError> {
let deadline = std::time::Instant::now() + timeout;
loop {
let started_at = self.service_started_at(service).await?;
if started_at != previous_started_at {
return Ok(());
}
if std::time::Instant::now() >= deadline {
return Err(
format!("timed out waiting for restarted compose service '{service}'").into(),
);
}
sleep(Duration::from_millis(500)).await;
}
}
}
impl<E: ComposeDeployEnv> Default for ComposeDeployer<E> {
fn default() -> Self {
Self::new()

View File

@ -6,7 +6,7 @@ pub mod errors;
pub mod infrastructure;
pub mod lifecycle;
pub use deployer::ComposeDeployer;
pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata};
pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor};
pub use docker::{
commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs},