From 7127c10aa6195e7d5d5289668e888c6d7ce532a1 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 6 Mar 2026 13:31:03 +0100 Subject: [PATCH] refactor(compose): add metadata helpers for attach node-control test API --- .../tests/compose_attach_node_control.rs | 134 +++--------------- .../deployers/compose/src/deployer/mod.rs | 80 ++++++++++- .../deployers/compose/src/lib.rs | 2 +- 3 files changed, 96 insertions(+), 120 deletions(-) diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index 0248e5a..8c5efeb 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -1,13 +1,9 @@ -use std::{ - process::{Command, Stdio}, - thread, - time::Duration, -}; +use std::time::Duration; use anyhow::{Result, anyhow}; use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; -use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; -use testing_framework_runner_compose::ComposeRunnerError; +use testing_framework_core::scenario::{Deployer as _, Runner}; +use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; #[tokio::test] #[ignore = "requires Docker and mutates compose runtime state"] @@ -18,33 +14,20 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .build()?; let deployer = LbcComposeDeployer::default(); - let (managed_runner, metadata): (Runner, _) = + let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = match deployer.deploy_with_metadata(&managed).await { Ok(result) => result, Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(anyhow::Error::new(error)), }; - let managed_client = managed_runner - .context() - .node_clients() - .snapshot() - .into_iter() - .next() - .ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?; - managed_client - .base_url() - .port() - .ok_or_else(|| anyhow!("managed node base url has no port"))?; - - let project_name = metadata - .project_name - .ok_or_else(|| anyhow!("compose metadata did not include project name"))?; let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) .enable_node_control() .with_run_duration(Duration::from_secs(5)) .with_attach_source( - AttachSource::compose(vec!["node-0".to_owned()]).with_project(project_name.clone()), + metadata + .attach_source_for_services(vec!["node-0".to_owned()]) + .map_err(|err| anyhow!("{err}"))?, ) .build()?; @@ -53,107 +36,26 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), Err(error) => return Err(anyhow::Error::new(error)), }; - let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?; - let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?; + + let pre_restart_started_at = metadata + .service_started_at("node-0") + .await + .map_err(|err| anyhow!("{err}"))?; + let control = attached_runner .context() .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; + control .restart_node("node-0") .await .map_err(|err| anyhow!("attached restart failed: {err}"))?; - wait_until_container_restarted( - &project_name, - "node-0", - &pre_restart_started_at, - Duration::from_secs(30), - )?; + metadata + .wait_until_service_restarted("node-0", &pre_restart_started_at, Duration::from_secs(30)) + .await + .map_err(|err| anyhow!("{err}"))?; Ok(()) } -fn discover_compose_service_container(project: &str, service: &str) -> Result { - let container = run_docker_capture([ - "ps", - "--filter", - &format!("label=com.docker.compose.project={project}"), - "--filter", - &format!("label=com.docker.compose.service={service}"), - "--format", - "{{.ID}}", - ])?; - let mut lines = container - .lines() - .map(str::trim) - .filter(|line| !line.is_empty()); - let Some(container_id) = lines.next() else { - return Err(anyhow!( - "no running container found for compose project '{project}' service '{service}'" - )); - }; - if lines.next().is_some() { - return Err(anyhow!( - "multiple running containers found for compose project '{project}' service '{service}'" - )); - } - - Ok(container_id.to_owned()) -} - -fn inspect_container_started_at(container_id: &str) -> Result { - let started_at = - run_docker_capture(["inspect", "--format", "{{.State.StartedAt}}", container_id])?; - let started_at = started_at.trim(); - if started_at.is_empty() { - return Err(anyhow!( - "docker inspect returned empty StartedAt for container {container_id}" - )); - } - - Ok(started_at.to_owned()) -} - -fn wait_until_container_restarted( - project: &str, - service: &str, - previous_started_at: &str, - timeout: Duration, -) -> Result<()> { - let deadline = std::time::Instant::now() + timeout; - loop { - let container_id = discover_compose_service_container(project, service)?; - let started_at = inspect_container_started_at(&container_id)?; - if started_at != previous_started_at { - return Ok(()); - } - - if std::time::Instant::now() >= deadline { - return Err(anyhow!( - "timed out waiting for restarted container timestamp change: {project}/{service}" - )); - } - - thread::sleep(Duration::from_millis(500)); - } -} - -fn run_docker_capture(args: [&str; N]) -> Result { - let output = Command::new("docker") - .args(args) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .output()?; - - if !output.status.success() { - return Err(anyhow!( - "docker {} failed: status={} stderr={}", - args.join(" "), - output.status, - String::from_utf8_lossy(&output.stderr).trim() - )); - } - - Ok(String::from_utf8_lossy(&output.stdout).to_string()) -} diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 4430a7d..63ff456 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -5,13 +5,14 @@ pub mod ports; pub mod readiness; pub mod setup; -use std::marker::PhantomData; +use std::{marker::PhantomData, time::Duration}; use async_trait::async_trait; use testing_framework_core::scenario::{ - CleanupGuard, Deployer, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl, - Runner, Scenario, + AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, + RequiresNodeControl, Runner, Scenario, }; +use tokio::time::sleep; use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; @@ -29,6 +30,79 @@ pub struct ComposeDeploymentMetadata { pub project_name: Option, } +impl ComposeDeploymentMetadata { + /// Returns project name when deployment is bound to a specific compose + /// project. + #[must_use] + pub fn project_name(&self) -> Option<&str> { + self.project_name.as_deref() + } + + /// Builds an attach source for the same compose project. + pub fn attach_source_for_services( + &self, + services: Vec, + ) -> Result { + let Some(project_name) = self.project_name() else { + return Err("compose metadata has no project name".into()); + }; + + Ok(AttachSource::compose(services).with_project(project_name.to_owned())) + } + + /// Returns the current StartedAt timestamp for a compose service container. + pub async fn service_started_at(&self, service: &str) -> Result { + let Some(project_name) = self.project_name() else { + return Err("compose metadata has no project name".into()); + }; + + let container_id = + crate::docker::attached::discover_service_container_id(project_name, service).await?; + let started_at = crate::docker::attached::run_docker_capture([ + "inspect", + "--format", + "{{.State.StartedAt}}", + &container_id, + ]) + .await?; + let started_at = started_at.trim(); + + if started_at.is_empty() { + return Err(format!( + "docker inspect returned empty StartedAt for compose service '{service}'" + ) + .into()); + } + + Ok(started_at.to_owned()) + } + + /// Waits until a service container reports a different StartedAt timestamp. + pub async fn wait_until_service_restarted( + &self, + service: &str, + previous_started_at: &str, + timeout: Duration, + ) -> Result<(), DynError> { + let deadline = std::time::Instant::now() + timeout; + + loop { + let started_at = self.service_started_at(service).await?; + if started_at != previous_started_at { + return Ok(()); + } + + if std::time::Instant::now() >= deadline { + return Err( + format!("timed out waiting for restarted compose service '{service}'").into(), + ); + } + + sleep(Duration::from_millis(500)).await; + } + } +} + impl Default for ComposeDeployer { fn default() -> Self { Self::new() diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index c75890f..8cf715e 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -6,7 +6,7 @@ pub mod errors; pub mod infrastructure; pub mod lifecycle; -pub use deployer::ComposeDeployer; +pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata}; pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor}; pub use docker::{ commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs},