Define attached runner readiness contract

This commit is contained in:
andrussal 2026-03-07 08:32:31 +01:00
parent fd547aa119
commit d4c5b9fe99
11 changed files with 234 additions and 38 deletions

View File

@ -33,6 +33,11 @@ async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> {
Err(error) => return Err(Error::new(error)),
};
attached_runner
.wait_network_ready()
.await
.map_err(|err| anyhow!("compose attached runner readiness failed: {err}"))?;
if attached_runner.context().node_clients().is_empty() {
return Err(anyhow!("compose attach resolved no node clients"));
}
@ -89,12 +94,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
.node_control()
.ok_or_else(|| anyhow!("attached compose node control is unavailable"))?;
let services: Vec<String> = attached_runner
.context()
.borrowed_nodes()
.into_iter()
.map(|node| node.identity)
.collect();
let services = discover_attached_services(&project_name).await?;
if services.is_empty() {
return Err(anyhow!("attached compose runner discovered no services"));
@ -166,6 +166,36 @@ async fn service_started_at(project: &str, service: &str) -> Result<String> {
Ok(started_at)
}
async fn discover_attached_services(project: &str) -> Result<Vec<String>> {
let output = run_docker(&[
"ps",
"--filter",
&format!("label=com.docker.compose.project={project}"),
"--filter",
"label=testing-framework.node=true",
"--format",
"{{.Label \"com.docker.compose.service\"}}",
])
.await?;
let mut services: Vec<String> = output
.lines()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.collect();
services.sort();
services.dedup();
if services.is_empty() {
return Err(anyhow!(
"attached compose runner discovered no labeled services"
));
}
Ok(services)
}
async fn wait_until_service_restarted(
project: &str,
service: &str,

View File

@ -254,6 +254,7 @@ fn build_compose_node_descriptor(
base_volumes(),
default_extra_hosts(),
ports,
api_port,
environment,
platform,
)

View File

@ -33,3 +33,11 @@ pub trait NodeControlHandle<E: Application>: Send + Sync {
None
}
}
/// Deployer-agnostic wait surface for cluster readiness checks.
#[async_trait]
pub trait ClusterWaitHandle<E: Application>: Send + Sync {
async fn wait_network_ready(&self) -> Result<(), DynError> {
Err("wait_network_ready not supported by this deployer".into())
}
}

View File

@ -25,7 +25,7 @@ pub use capabilities::{
StartNodeOptions, StartedNode,
};
pub use common_builder_ext::CoreBuilderExt;
pub use control::NodeControlHandle;
pub use control::{ClusterWaitHandle, NodeControlHandle};
#[doc(hidden)]
pub use definition::{
Builder as CoreBuilder, // internal adapter-facing core builder

View File

@ -1,7 +1,10 @@
use std::{sync::Arc, time::Duration};
use super::{metrics::Metrics, node_clients::ClusterClient};
use crate::scenario::{Application, BorrowedNode, ManagedNode, NodeClients, NodeControlHandle};
use crate::scenario::{
Application, BorrowedNode, ClusterWaitHandle, DynError, ManagedNode, NodeClients,
NodeControlHandle,
};
/// Shared runtime context available to workloads and expectations.
pub struct RunContext<E: Application> {
@ -12,6 +15,7 @@ pub struct RunContext<E: Application> {
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
impl<E: Application> RunContext<E> {
@ -36,9 +40,16 @@ impl<E: Application> RunContext<E> {
telemetry,
feed,
node_control,
cluster_wait: None,
}
}
#[must_use]
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
self.cluster_wait = Some(cluster_wait);
self
}
#[must_use]
pub fn descriptors(&self) -> &E::Deployment {
&self.descriptors
@ -104,11 +115,29 @@ impl<E: Application> RunContext<E> {
self.node_control.clone()
}
#[must_use]
pub fn cluster_wait(&self) -> Option<Arc<dyn ClusterWaitHandle<E>>> {
self.cluster_wait.clone()
}
#[must_use]
pub const fn controls_nodes(&self) -> bool {
self.node_control.is_some()
}
#[must_use]
pub const fn can_wait_network_ready(&self) -> bool {
self.cluster_wait.is_some()
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
let Some(cluster_wait) = self.cluster_wait() else {
return Err("wait_network_ready is not available for this runner".into());
};
cluster_wait.wait_network_ready().await
}
#[must_use]
pub const fn cluster_client(&self) -> ClusterClient<'_, E> {
self.node_clients.cluster_client()
@ -156,6 +185,10 @@ impl<E: Application> RunHandle<E> {
pub fn context(&self) -> &RunContext<E> {
&self.run_context
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
self.run_context.wait_network_ready().await
}
}
/// Derived metrics about the current run timing.

View File

@ -43,6 +43,10 @@ impl<E: Application> Runner<E> {
Arc::clone(&self.context)
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
self.context.wait_network_ready().await
}
pub(crate) fn cleanup(&mut self) {
if let Some(guard) = self.cleanup_guard.take() {
guard.cleanup();

View File

@ -20,6 +20,7 @@ services:
{% endfor %}
labels:
testing-framework.node: "true"
testing-framework.api-container-port: "{{ node.api_container_port }}"
environment:
{% for env in node.environment %}
{{ env.key }}: "{{ env.value }}"

View File

@ -2,13 +2,15 @@ use std::marker::PhantomData;
use async_trait::async_trait;
use testing_framework_core::scenario::{
AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource,
AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError,
ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness,
};
use url::Url;
use crate::{
docker::attached::{
discover_attachable_services, discover_service_container_id, inspect_mapped_tcp_ports,
discover_attachable_services, discover_service_container_id,
inspect_api_container_port_label, inspect_mapped_tcp_ports,
},
env::ComposeDeployEnv,
};
@ -18,6 +20,12 @@ pub(super) struct ComposeAttachProvider<E: ComposeDeployEnv> {
_env: PhantomData<E>,
}
pub(super) struct ComposeAttachedClusterWait<E: ComposeDeployEnv> {
host: String,
source: AttachSource,
_env: PhantomData<E>,
}
#[derive(Debug, thiserror::Error)]
enum ComposeAttachDiscoveryError {
#[error("compose attach source requires an explicit project name")]
@ -33,6 +41,16 @@ impl<E: ComposeDeployEnv> ComposeAttachProvider<E> {
}
}
impl<E: ComposeDeployEnv> ComposeAttachedClusterWait<E> {
pub(super) fn new(host: String, source: AttachSource) -> Self {
Self {
host,
source,
_env: PhantomData,
}
}
}
#[async_trait]
impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
async fn discover(
@ -87,7 +105,10 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
async fn resolve_services(project: &str, requested: &[String]) -> Result<Vec<String>, DynError> {
pub(super) async fn resolve_services(
project: &str,
requested: &[String],
) -> Result<Vec<String>, DynError> {
if !requested.is_empty() {
return Ok(requested.to_owned());
}
@ -95,34 +116,61 @@ async fn resolve_services(project: &str, requested: &[String]) -> Result<Vec<Str
discover_attachable_services(project).await
}
async fn discover_api_port(container_id: &str) -> Result<u16, DynError> {
pub(super) async fn discover_api_port(container_id: &str) -> Result<u16, DynError> {
let mapped_ports = inspect_mapped_tcp_ports(container_id).await?;
match mapped_ports.as_slice() {
[] => Err(format!(
"no mapped tcp ports discovered for attached compose service container '{container_id}'"
)
.into()),
[port] => Ok(port.host_port),
_ => {
let mapped_ports = mapped_ports
.iter()
.map(|port| format!("{}->{}", port.container_port, port.host_port))
.collect::<Vec<_>>()
.join(", ");
let api_container_port = inspect_api_container_port_label(container_id).await?;
let Some(api_port) = mapped_ports
.iter()
.find(|port| port.container_port == api_container_port)
.map(|port| port.host_port)
else {
let mapped_ports = mapped_ports
.iter()
.map(|port| format!("{}->{}", port.container_port, port.host_port))
.collect::<Vec<_>>()
.join(", ");
Err(format!(
"attached compose service container '{container_id}' has multiple mapped tcp ports ({mapped_ports}); provide a single exposed API port"
)
.into())
}
}
return Err(format!(
"attached compose service container '{container_id}' does not expose labeled API container port {api_container_port}; mapped tcp ports: {mapped_ports}"
)
.into());
};
Ok(api_port)
}
fn build_service_endpoint(host: &str, port: u16) -> Result<Url, DynError> {
pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result<Url, DynError> {
let endpoint = Url::parse(&format!("http://{host}:{port}/"))?;
Ok(endpoint)
}
#[async_trait]
impl<E: ComposeDeployEnv> ClusterWaitHandle<E> for ComposeAttachedClusterWait<E> {
async fn wait_network_ready(&self) -> Result<(), DynError> {
let AttachSource::Compose { project, services } = &self.source else {
return Err("compose cluster wait requires a compose attach source".into());
};
let project = project
.as_ref()
.ok_or(ComposeAttachDiscoveryError::MissingProjectName)?;
let services = resolve_services(project, services).await?;
let mut endpoints = Vec::with_capacity(services.len());
for service in &services {
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let mut endpoint = build_service_endpoint(&self.host, api_port)?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::build_service_endpoint;

View File

@ -3,11 +3,12 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url;
use testing_framework_core::{
scenario::{
ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle,
DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -15,7 +16,7 @@ use tracing::info;
use super::{
ComposeDeployer, ComposeDeploymentMetadata,
attach_provider::ComposeAttachProvider,
attach_provider::{ComposeAttachProvider, ComposeAttachedClusterWait},
clients::ClientBuilder,
make_cleanup_guard,
ports::PortManager,
@ -117,6 +118,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
deployed,
observability,
readiness_enabled,
project_name.clone(),
)
.await?;
@ -154,6 +156,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
self.ensure_non_empty_node_clients(&node_clients)?;
let node_control = self.attached_node_control::<Caps>(scenario)?;
let cluster_wait = self.attached_cluster_wait(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let context = RunContext::new(
scenario.deployment().clone(),
@ -163,7 +166,8 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
observability.telemetry_handle()?,
feed,
node_control,
);
)
.with_cluster_wait(cluster_wait);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(Runner::new(context, Some(cleanup_guard)))
@ -237,6 +241,25 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}) as Arc<dyn NodeControlHandle<E>>))
}
fn attached_cluster_wait<Caps>(
&self,
scenario: &Scenario<E, Caps>,
) -> Result<Arc<dyn ClusterWaitHandle<E>>, ComposeRunnerError>
where
Caps: Send + Sync,
{
let ScenarioSources::Attached { attach, .. } = scenario.sources() else {
return Err(ComposeRunnerError::InternalInvariant {
message: "compose attached cluster wait requested outside attached source mode",
});
};
Ok(Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
attach.clone(),
)))
}
async fn build_runner<Caps>(
&self,
scenario: &Scenario<E, Caps>,
@ -244,6 +267,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
deployed: DeployedNodes<E>,
observability: ObservabilityInputs,
readiness_enabled: bool,
project_name: String,
) -> Result<Runner<E>, ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
@ -263,6 +287,10 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
telemetry,
environment: &mut prepared.environment,
node_control,
cluster_wait: Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
AttachSource::compose(Vec::new()).with_project(project_name),
)),
};
let runtime = build_compose_runtime::<E>(input).await?;
let cleanup_guard =
@ -376,6 +404,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
telemetry: Metrics,
environment: &'a mut StackEnvironment,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
}
async fn build_compose_runtime<E: ComposeDeployEnv>(
@ -400,6 +429,7 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
input.telemetry,
feed,
input.node_control,
input.cluster_wait,
);
Ok(ComposeRuntime { context, feed_task })
@ -443,6 +473,7 @@ fn build_run_context<E: ComposeDeployEnv>(
telemetry: Metrics,
feed: <E::FeedRuntime as FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
descriptors,
@ -453,6 +484,7 @@ fn build_run_context<E: ComposeDeployEnv>(
feed,
node_control,
)
.with_cluster_wait(cluster_wait)
}
fn resolve_observability_inputs<E, Caps>(

View File

@ -9,6 +9,7 @@ pub struct NodeDescriptor {
volumes: Vec<String>,
extra_hosts: Vec<String>,
ports: Vec<String>,
api_container_port: u16,
environment: Vec<EnvEntry>,
#[serde(skip_serializing_if = "Option::is_none")]
platform: Option<String>,
@ -49,6 +50,7 @@ impl NodeDescriptor {
volumes: Vec<String>,
extra_hosts: Vec<String>,
ports: Vec<String>,
api_container_port: u16,
environment: Vec<EnvEntry>,
platform: Option<String>,
) -> Self {
@ -59,6 +61,7 @@ impl NodeDescriptor {
volumes,
extra_hosts,
ports,
api_container_port,
environment,
platform,
}
@ -77,4 +80,9 @@ impl NodeDescriptor {
pub fn environment(&self) -> &[EnvEntry] {
&self.environment
}
#[cfg(test)]
pub fn api_container_port(&self) -> u16 {
self.api_container_port
}
}

View File

@ -6,6 +6,7 @@ use tokio::process::Command;
pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node";
pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true";
pub const API_CONTAINER_PORT_LABEL_KEY: &str = "testing-framework.api-container-port";
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct MappedTcpPort {
@ -75,6 +76,18 @@ pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result<Vec<MappedTc
parse_mapped_tcp_ports(&stdout)
}
pub async fn inspect_api_container_port_label(container_id: &str) -> Result<u16, DynError> {
let stdout = run_docker_capture([
"inspect",
"--format",
"{{index .Config.Labels \"testing-framework.api-container-port\"}}",
container_id,
])
.await?;
parse_api_container_port_label(&stdout)
}
pub fn parse_mapped_tcp_ports(raw: &str) -> Result<Vec<MappedTcpPort>, DynError> {
let ports_value: Value = serde_json::from_str(raw.trim())?;
let ports_object = ports_value
@ -194,3 +207,21 @@ fn parse_host_port_binding(binding: &Value) -> Option<u16> {
.parse::<u16>()
.ok()
}
fn parse_api_container_port_label(raw: &str) -> Result<u16, DynError> {
let value = raw.trim();
if value.is_empty() || value == "<no value>" {
return Err(format!(
"attached compose container is missing required label '{API_CONTAINER_PORT_LABEL_KEY}'"
)
.into());
}
value.parse::<u16>().map_err(|err| {
format!(
"attached compose container label '{API_CONTAINER_PORT_LABEL_KEY}' has invalid value '{value}': {err}"
)
.into()
})
}