Refine compose attach readability

This commit is contained in:
andrussal 2026-03-07 08:58:08 +01:00
parent d34ac87411
commit 45bd07737e
4 changed files with 125 additions and 88 deletions

View File

@ -93,13 +93,8 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
.context()
.node_control()
.ok_or_else(|| anyhow!("attached compose node control is unavailable"))?;
let services = discover_attached_services(&project_name).await?;
if services.is_empty() {
return Err(anyhow!("attached compose runner discovered no services"));
}
for service in services {
let pre_restart_started_at = service_started_at(&project_name, &service).await?;
@ -121,7 +116,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
}
async fn service_started_at(project: &str, service: &str) -> Result<String> {
let container_id = run_docker(&[
let container_output = run_docker(&[
"ps",
"--filter",
&format!("label=com.docker.compose.project={project}"),
@ -131,26 +126,7 @@ async fn service_started_at(project: &str, service: &str) -> Result<String> {
"{{.ID}}",
])
.await?;
let container_ids: Vec<&str> = container_id
.lines()
.map(str::trim)
.filter(|value| !value.is_empty())
.collect();
let container_id = match container_ids.as_slice() {
[] => {
return Err(anyhow!(
"no running container found for service '{service}'"
));
}
[id] => *id,
_ => {
return Err(anyhow!(
"multiple running containers found for service '{service}'"
));
}
};
let container_id = single_container_id(service, &container_output)?;
let started_at =
run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?;
@ -166,6 +142,24 @@ async fn service_started_at(project: &str, service: &str) -> Result<String> {
Ok(started_at)
}
fn single_container_id<'a>(service: &str, output: &'a str) -> Result<&'a str> {
let container_ids: Vec<&str> = output
.lines()
.map(str::trim)
.filter(|value| !value.is_empty())
.collect();
match container_ids.as_slice() {
[] => Err(anyhow!(
"no running container found for service '{service}'"
)),
[id] => Ok(*id),
_ => Err(anyhow!(
"multiple running containers found for service '{service}'"
)),
}
}
async fn discover_attached_services(project: &str) -> Result<Vec<String>> {
let output = run_docker(&[
"ps",

View File

@ -6,6 +6,12 @@ use crate::scenario::{
NodeControlHandle,
};
#[derive(Debug, thiserror::Error)]
enum RunContextCapabilityError {
#[error("wait_network_ready is not available for this runner")]
MissingClusterWait,
}
/// Shared runtime context available to workloads and expectations.
pub struct RunContext<E: Application> {
descriptors: E::Deployment,
@ -131,17 +137,18 @@ impl<E: Application> RunContext<E> {
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
let Some(cluster_wait) = self.cluster_wait() else {
return Err("wait_network_ready is not available for this runner".into());
};
cluster_wait.wait_network_ready().await
self.require_cluster_wait()?.wait_network_ready().await
}
#[must_use]
pub const fn cluster_client(&self) -> ClusterClient<'_, E> {
self.node_clients.cluster_client()
}
fn require_cluster_wait(&self) -> Result<Arc<dyn ClusterWaitHandle<E>>, DynError> {
self.cluster_wait()
.ok_or_else(|| RunContextCapabilityError::MissingClusterWait.into())
}
}
/// Handle returned by the runner to control the lifecycle of the run.

View File

@ -51,50 +51,29 @@ impl<E: ComposeDeployEnv> ComposeAttachedClusterWait<E> {
}
}
struct ComposeAttachRequest<'a> {
project: &'a str,
services: &'a [String],
}
#[async_trait]
impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
async fn discover(
&self,
source: &AttachSource,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
let (project, services) = match source {
AttachSource::Compose { project, services } => (project, services),
_ => {
return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(),
});
}
};
let project = project
.as_ref()
.ok_or_else(|| AttachProviderError::Discovery {
source: ComposeAttachDiscoveryError::MissingProjectName.into(),
})?;
let services = resolve_services(project, services)
let request = compose_attach_request(source)?;
let services = resolve_services(request.project, request.services)
.await
.map_err(to_discovery_error)?;
let mut attached = Vec::with_capacity(services.len());
for service in &services {
let container_id = discover_service_container_id(project, service)
.await
.map_err(to_discovery_error)?;
let api_port = discover_api_port(&container_id)
.await
.map_err(to_discovery_error)?;
let endpoint =
build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?;
let source = ExternalNodeSource::new(service.clone(), endpoint.to_string());
let client = E::external_node_client(&source).map_err(to_discovery_error)?;
attached.push(AttachedNode {
identity_hint: Some(service.clone()),
client,
});
attached.push(
build_attached_node::<E>(&self.host, request.project, service)
.await
.map_err(to_discovery_error)?,
);
}
Ok(attached)
@ -105,6 +84,41 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
fn compose_attach_request(
source: &AttachSource,
) -> Result<ComposeAttachRequest<'_>, AttachProviderError> {
let AttachSource::Compose { project, services } = source else {
return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(),
});
};
let project = project
.as_deref()
.ok_or_else(|| AttachProviderError::Discovery {
source: ComposeAttachDiscoveryError::MissingProjectName.into(),
})?;
Ok(ComposeAttachRequest { project, services })
}
async fn build_attached_node<E: ComposeDeployEnv>(
host: &str,
project: &str,
service: &str,
) -> Result<AttachedNode<E>, DynError> {
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let endpoint = build_service_endpoint(host, api_port)?;
let source = ExternalNodeSource::new(service.to_owned(), endpoint.to_string());
let client = E::external_node_client(&source)?;
Ok(AttachedNode {
identity_hint: Some(service.to_owned()),
client,
})
}
pub(super) async fn resolve_services(
project: &str,
requested: &[String],
@ -147,23 +161,10 @@ pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result<Url, DynEr
#[async_trait]
impl<E: ComposeDeployEnv> ClusterWaitHandle<E> for ComposeAttachedClusterWait<E> {
async fn wait_network_ready(&self) -> Result<(), DynError> {
let AttachSource::Compose { project, services } = &self.source else {
return Err("compose cluster wait requires a compose attach source".into());
};
let project = project
.as_ref()
.ok_or(ComposeAttachDiscoveryError::MissingProjectName)?;
let services = resolve_services(project, services).await?;
let mut endpoints = Vec::with_capacity(services.len());
for service in &services {
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let mut endpoint = build_service_endpoint(&self.host, api_port)?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
let request = compose_wait_request(&self.source)?;
let services = resolve_services(request.project, request.services).await?;
let endpoints =
collect_readiness_endpoints::<E>(&self.host, request.project, &services).await?;
wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?;
@ -171,6 +172,36 @@ impl<E: ComposeDeployEnv> ClusterWaitHandle<E> for ComposeAttachedClusterWait<E>
}
}
fn compose_wait_request(source: &AttachSource) -> Result<ComposeAttachRequest<'_>, DynError> {
let AttachSource::Compose { project, services } = source else {
return Err("compose cluster wait requires a compose attach source".into());
};
let project = project
.as_deref()
.ok_or(ComposeAttachDiscoveryError::MissingProjectName)?;
Ok(ComposeAttachRequest { project, services })
}
async fn collect_readiness_endpoints<E: ComposeDeployEnv>(
host: &str,
project: &str,
services: &[String],
) -> Result<Vec<Url>, DynError> {
let mut endpoints = Vec::with_capacity(services.len());
for service in services {
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let mut endpoint = build_service_endpoint(host, api_port)?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
Ok(endpoints)
}
#[cfg(test)]
mod tests {
use super::build_service_endpoint;

View File

@ -158,7 +158,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
let node_control = self.attached_node_control::<Caps>(scenario)?;
let cluster_wait = self.attached_cluster_wait(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let context = RunContext::new(
let context = build_run_context(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
@ -166,8 +166,8 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
observability.telemetry_handle()?,
feed,
node_control,
)
.with_cluster_wait(cluster_wait);
cluster_wait,
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(Runner::new(context, Some(cleanup_guard)))
@ -274,6 +274,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
{
let telemetry = observability.telemetry_handle()?;
let node_control = self.maybe_node_control::<Caps>(&prepared.environment);
let cluster_wait = self.managed_cluster_wait(project_name);
log_observability_endpoints(&observability);
log_profiling_urls(&deployed.host, &deployed.host_ports);
@ -287,10 +288,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
telemetry,
environment: &mut prepared.environment,
node_control,
cluster_wait: Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
AttachSource::compose(Vec::new()).with_project(project_name),
)),
cluster_wait,
};
let runtime = build_compose_runtime::<E>(input).await?;
let cleanup_guard =
@ -320,6 +318,13 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
})
}
fn managed_cluster_wait(&self, project_name: String) -> Arc<dyn ClusterWaitHandle<E>> {
Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
AttachSource::compose(Vec::new()).with_project(project_name),
))
}
fn log_deploy_start<Caps>(
&self,
scenario: &Scenario<E, Caps>,