diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index f265f01..a1fdc8d 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -33,6 +33,11 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; + attached_runner + .wait_network_ready() + .await + .map_err(|err| anyhow!("compose attached runner readiness failed: {err}"))?; + if attached_runner.context().node_clients().is_empty() { return Err(anyhow!("compose attach resolved no node clients")); } @@ -89,12 +94,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; - let services: Vec = attached_runner - .context() - .borrowed_nodes() - .into_iter() - .map(|node| node.identity) - .collect(); + let services = discover_attached_services(&project_name).await?; if services.is_empty() { return Err(anyhow!("attached compose runner discovered no services")); @@ -166,6 +166,36 @@ async fn service_started_at(project: &str, service: &str) -> Result { Ok(started_at) } +async fn discover_attached_services(project: &str) -> Result> { + let output = run_docker(&[ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + "label=testing-framework.node=true", + "--format", + "{{.Label \"com.docker.compose.service\"}}", + ]) + .await?; + + let mut services: Vec = output + .lines() + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) + .collect(); + services.sort(); + services.dedup(); + + if services.is_empty() { + return Err(anyhow!( + "attached compose runner discovered no labeled services" + )); + } + + Ok(services) +} + async fn wait_until_service_restarted( project: &str, service: &str, diff --git a/logos/runtime/ext/src/compose_env.rs b/logos/runtime/ext/src/compose_env.rs index 4fa3018..9803b12 100644 --- a/logos/runtime/ext/src/compose_env.rs +++ b/logos/runtime/ext/src/compose_env.rs @@ -254,6 +254,7 @@ fn build_compose_node_descriptor( base_volumes(), default_extra_hosts(), ports, + api_port, environment, platform, ) diff --git a/testing-framework/core/src/scenario/control.rs b/testing-framework/core/src/scenario/control.rs index d19f871..4f59621 100644 --- a/testing-framework/core/src/scenario/control.rs +++ b/testing-framework/core/src/scenario/control.rs @@ -33,3 +33,11 @@ pub trait NodeControlHandle: Send + Sync { None } } + +/// Deployer-agnostic wait surface for cluster readiness checks. +#[async_trait] +pub trait ClusterWaitHandle: Send + Sync { + async fn wait_network_ready(&self) -> Result<(), DynError> { + Err("wait_network_ready not supported by this deployer".into()) + } +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 411bdf1..f2e43d6 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -25,7 +25,7 @@ pub use capabilities::{ StartNodeOptions, StartedNode, }; pub use common_builder_ext::CoreBuilderExt; -pub use control::NodeControlHandle; +pub use control::{ClusterWaitHandle, NodeControlHandle}; #[doc(hidden)] pub use definition::{ Builder as CoreBuilder, // internal adapter-facing core builder diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 8e26133..6441149 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -1,7 +1,10 @@ use std::{sync::Arc, time::Duration}; use super::{metrics::Metrics, node_clients::ClusterClient}; -use crate::scenario::{Application, BorrowedNode, ManagedNode, NodeClients, NodeControlHandle}; +use crate::scenario::{ + Application, BorrowedNode, ClusterWaitHandle, DynError, ManagedNode, NodeClients, + NodeControlHandle, +}; /// Shared runtime context available to workloads and expectations. pub struct RunContext { @@ -12,6 +15,7 @@ pub struct RunContext { telemetry: Metrics, feed: ::Feed, node_control: Option>>, + cluster_wait: Option>>, } impl RunContext { @@ -36,9 +40,16 @@ impl RunContext { telemetry, feed, node_control, + cluster_wait: None, } } + #[must_use] + pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + self.cluster_wait = Some(cluster_wait); + self + } + #[must_use] pub fn descriptors(&self) -> &E::Deployment { &self.descriptors @@ -104,11 +115,29 @@ impl RunContext { self.node_control.clone() } + #[must_use] + pub fn cluster_wait(&self) -> Option>> { + self.cluster_wait.clone() + } + #[must_use] pub const fn controls_nodes(&self) -> bool { self.node_control.is_some() } + #[must_use] + pub const fn can_wait_network_ready(&self) -> bool { + self.cluster_wait.is_some() + } + + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + let Some(cluster_wait) = self.cluster_wait() else { + return Err("wait_network_ready is not available for this runner".into()); + }; + + cluster_wait.wait_network_ready().await + } + #[must_use] pub const fn cluster_client(&self) -> ClusterClient<'_, E> { self.node_clients.cluster_client() @@ -156,6 +185,10 @@ impl RunHandle { pub fn context(&self) -> &RunContext { &self.run_context } + + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.run_context.wait_network_ready().await + } } /// Derived metrics about the current run timing. diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index bcc453b..ac7bff9 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -43,6 +43,10 @@ impl Runner { Arc::clone(&self.context) } + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.context.wait_network_ready().await + } + pub(crate) fn cleanup(&mut self) { if let Some(guard) = self.cleanup_guard.take() { guard.cleanup(); diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index 75fe122..c6ecc29 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -20,6 +20,7 @@ services: {% endfor %} labels: testing-framework.node: "true" + testing-framework.api-container-port: "{{ node.api_container_port }}" environment: {% for env in node.environment %} {{ env.key }}: "{{ env.value }}" diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index 23d574e..a1d0412 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -2,13 +2,15 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource, + AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, + ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; use url::Url; use crate::{ docker::attached::{ - discover_attachable_services, discover_service_container_id, inspect_mapped_tcp_ports, + discover_attachable_services, discover_service_container_id, + inspect_api_container_port_label, inspect_mapped_tcp_ports, }, env::ComposeDeployEnv, }; @@ -18,6 +20,12 @@ pub(super) struct ComposeAttachProvider { _env: PhantomData, } +pub(super) struct ComposeAttachedClusterWait { + host: String, + source: AttachSource, + _env: PhantomData, +} + #[derive(Debug, thiserror::Error)] enum ComposeAttachDiscoveryError { #[error("compose attach source requires an explicit project name")] @@ -33,6 +41,16 @@ impl ComposeAttachProvider { } } +impl ComposeAttachedClusterWait { + pub(super) fn new(host: String, source: AttachSource) -> Self { + Self { + host, + source, + _env: PhantomData, + } + } +} + #[async_trait] impl AttachProvider for ComposeAttachProvider { async fn discover( @@ -87,7 +105,10 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } -async fn resolve_services(project: &str, requested: &[String]) -> Result, DynError> { +pub(super) async fn resolve_services( + project: &str, + requested: &[String], +) -> Result, DynError> { if !requested.is_empty() { return Ok(requested.to_owned()); } @@ -95,34 +116,61 @@ async fn resolve_services(project: &str, requested: &[String]) -> Result Result { +pub(super) async fn discover_api_port(container_id: &str) -> Result { let mapped_ports = inspect_mapped_tcp_ports(container_id).await?; - match mapped_ports.as_slice() { - [] => Err(format!( - "no mapped tcp ports discovered for attached compose service container '{container_id}'" - ) - .into()), - [port] => Ok(port.host_port), - _ => { - let mapped_ports = mapped_ports - .iter() - .map(|port| format!("{}->{}", port.container_port, port.host_port)) - .collect::>() - .join(", "); + let api_container_port = inspect_api_container_port_label(container_id).await?; + let Some(api_port) = mapped_ports + .iter() + .find(|port| port.container_port == api_container_port) + .map(|port| port.host_port) + else { + let mapped_ports = mapped_ports + .iter() + .map(|port| format!("{}->{}", port.container_port, port.host_port)) + .collect::>() + .join(", "); - Err(format!( - "attached compose service container '{container_id}' has multiple mapped tcp ports ({mapped_ports}); provide a single exposed API port" - ) - .into()) - } - } + return Err(format!( + "attached compose service container '{container_id}' does not expose labeled API container port {api_container_port}; mapped tcp ports: {mapped_ports}" + ) + .into()); + }; + + Ok(api_port) } -fn build_service_endpoint(host: &str, port: u16) -> Result { +pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result { let endpoint = Url::parse(&format!("http://{host}:{port}/"))?; Ok(endpoint) } +#[async_trait] +impl ClusterWaitHandle for ComposeAttachedClusterWait { + async fn wait_network_ready(&self) -> Result<(), DynError> { + let AttachSource::Compose { project, services } = &self.source else { + return Err("compose cluster wait requires a compose attach source".into()); + }; + + let project = project + .as_ref() + .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; + let services = resolve_services(project, services).await?; + + let mut endpoints = Vec::with_capacity(services.len()); + for service in &services { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let mut endpoint = build_service_endpoint(&self.host, api_port)?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; + + Ok(()) + } +} + #[cfg(test)] mod tests { use super::build_service_endpoint; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index a001680..5d69218 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,11 +3,12 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle, - FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, - ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders, - StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, + ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, + DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, + RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, + SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -15,7 +16,7 @@ use tracing::info; use super::{ ComposeDeployer, ComposeDeploymentMetadata, - attach_provider::ComposeAttachProvider, + attach_provider::{ComposeAttachProvider, ComposeAttachedClusterWait}, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, @@ -117,6 +118,7 @@ impl DeploymentOrchestrator { deployed, observability, readiness_enabled, + project_name.clone(), ) .await?; @@ -154,6 +156,7 @@ impl DeploymentOrchestrator { self.ensure_non_empty_node_clients(&node_clients)?; let node_control = self.attached_node_control::(scenario)?; + let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; let context = RunContext::new( scenario.deployment().clone(), @@ -163,7 +166,8 @@ impl DeploymentOrchestrator { observability.telemetry_handle()?, feed, node_control, - ); + ) + .with_cluster_wait(cluster_wait); let cleanup_guard: Box = Box::new(feed_task); Ok(Runner::new(context, Some(cleanup_guard))) @@ -237,6 +241,25 @@ impl DeploymentOrchestrator { }) as Arc>)) } + fn attached_cluster_wait( + &self, + scenario: &Scenario, + ) -> Result>, ComposeRunnerError> + where + Caps: Send + Sync, + { + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(ComposeRunnerError::InternalInvariant { + message: "compose attached cluster wait requested outside attached source mode", + }); + }; + + Ok(Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + attach.clone(), + ))) + } + async fn build_runner( &self, scenario: &Scenario, @@ -244,6 +267,7 @@ impl DeploymentOrchestrator { deployed: DeployedNodes, observability: ObservabilityInputs, readiness_enabled: bool, + project_name: String, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, @@ -263,6 +287,10 @@ impl DeploymentOrchestrator { telemetry, environment: &mut prepared.environment, node_control, + cluster_wait: Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + AttachSource::compose(Vec::new()).with_project(project_name), + )), }; let runtime = build_compose_runtime::(input).await?; let cleanup_guard = @@ -376,6 +404,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, + cluster_wait: Arc>, } async fn build_compose_runtime( @@ -400,6 +429,7 @@ async fn build_compose_runtime( input.telemetry, feed, input.node_control, + input.cluster_wait, ); Ok(ComposeRuntime { context, feed_task }) @@ -443,6 +473,7 @@ fn build_run_context( telemetry: Metrics, feed: ::Feed, node_control: Option>>, + cluster_wait: Arc>, ) -> RunContext { RunContext::new( descriptors, @@ -453,6 +484,7 @@ fn build_run_context( feed, node_control, ) + .with_cluster_wait(cluster_wait) } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index c5f769b..d35f8f5 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -9,6 +9,7 @@ pub struct NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, + api_container_port: u16, environment: Vec, #[serde(skip_serializing_if = "Option::is_none")] platform: Option, @@ -49,6 +50,7 @@ impl NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, + api_container_port: u16, environment: Vec, platform: Option, ) -> Self { @@ -59,6 +61,7 @@ impl NodeDescriptor { volumes, extra_hosts, ports, + api_container_port, environment, platform, } @@ -77,4 +80,9 @@ impl NodeDescriptor { pub fn environment(&self) -> &[EnvEntry] { &self.environment } + + #[cfg(test)] + pub fn api_container_port(&self) -> u16 { + self.api_container_port + } } diff --git a/testing-framework/deployers/compose/src/docker/attached.rs b/testing-framework/deployers/compose/src/docker/attached.rs index babe175..b558a41 100644 --- a/testing-framework/deployers/compose/src/docker/attached.rs +++ b/testing-framework/deployers/compose/src/docker/attached.rs @@ -6,6 +6,7 @@ use tokio::process::Command; pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node"; pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true"; +pub const API_CONTAINER_PORT_LABEL_KEY: &str = "testing-framework.api-container-port"; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct MappedTcpPort { @@ -75,6 +76,18 @@ pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result Result { + let stdout = run_docker_capture([ + "inspect", + "--format", + "{{index .Config.Labels \"testing-framework.api-container-port\"}}", + container_id, + ]) + .await?; + + parse_api_container_port_label(&stdout) +} + pub fn parse_mapped_tcp_ports(raw: &str) -> Result, DynError> { let ports_value: Value = serde_json::from_str(raw.trim())?; let ports_object = ports_value @@ -194,3 +207,21 @@ fn parse_host_port_binding(binding: &Value) -> Option { .parse::() .ok() } + +fn parse_api_container_port_label(raw: &str) -> Result { + let value = raw.trim(); + + if value.is_empty() || value == "" { + return Err(format!( + "attached compose container is missing required label '{API_CONTAINER_PORT_LABEL_KEY}'" + ) + .into()); + } + + value.parse::().map_err(|err| { + format!( + "attached compose container label '{API_CONTAINER_PORT_LABEL_KEY}' has invalid value '{value}': {err}" + ) + .into() + }) +}