diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs index a1fdc8d..1f03ea7 100644 --- a/logos/examples/tests/compose_attach_node_control.rs +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -93,13 +93,8 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { .context() .node_control() .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; - let services = discover_attached_services(&project_name).await?; - if services.is_empty() { - return Err(anyhow!("attached compose runner discovered no services")); - } - for service in services { let pre_restart_started_at = service_started_at(&project_name, &service).await?; @@ -121,7 +116,7 @@ async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { } async fn service_started_at(project: &str, service: &str) -> Result { - let container_id = run_docker(&[ + let container_output = run_docker(&[ "ps", "--filter", &format!("label=com.docker.compose.project={project}"), @@ -131,26 +126,7 @@ async fn service_started_at(project: &str, service: &str) -> Result { "{{.ID}}", ]) .await?; - - let container_ids: Vec<&str> = container_id - .lines() - .map(str::trim) - .filter(|value| !value.is_empty()) - .collect(); - - let container_id = match container_ids.as_slice() { - [] => { - return Err(anyhow!( - "no running container found for service '{service}'" - )); - } - [id] => *id, - _ => { - return Err(anyhow!( - "multiple running containers found for service '{service}'" - )); - } - }; + let container_id = single_container_id(service, &container_output)?; let started_at = run_docker(&["inspect", "--format", "{{.State.StartedAt}}", container_id]).await?; @@ -166,6 +142,24 @@ async fn service_started_at(project: &str, service: &str) -> Result { Ok(started_at) } +fn single_container_id<'a>(service: &str, output: &'a str) -> Result<&'a str> { + let container_ids: Vec<&str> = output + .lines() + .map(str::trim) + .filter(|value| !value.is_empty()) + .collect(); + + match container_ids.as_slice() { + [] => Err(anyhow!( + "no running container found for service '{service}'" + )), + [id] => Ok(*id), + _ => Err(anyhow!( + "multiple running containers found for service '{service}'" + )), + } +} + async fn discover_attached_services(project: &str) -> Result> { let output = run_docker(&[ "ps", diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 6441149..65f28a5 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -6,6 +6,12 @@ use crate::scenario::{ NodeControlHandle, }; +#[derive(Debug, thiserror::Error)] +enum RunContextCapabilityError { + #[error("wait_network_ready is not available for this runner")] + MissingClusterWait, +} + /// Shared runtime context available to workloads and expectations. pub struct RunContext { descriptors: E::Deployment, @@ -131,17 +137,18 @@ impl RunContext { } pub async fn wait_network_ready(&self) -> Result<(), DynError> { - let Some(cluster_wait) = self.cluster_wait() else { - return Err("wait_network_ready is not available for this runner".into()); - }; - - cluster_wait.wait_network_ready().await + self.require_cluster_wait()?.wait_network_ready().await } #[must_use] pub const fn cluster_client(&self) -> ClusterClient<'_, E> { self.node_clients.cluster_client() } + + fn require_cluster_wait(&self) -> Result>, DynError> { + self.cluster_wait() + .ok_or_else(|| RunContextCapabilityError::MissingClusterWait.into()) + } } /// Handle returned by the runner to control the lifecycle of the run. diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs index a1d0412..87358aa 100644 --- a/testing-framework/deployers/compose/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -51,50 +51,29 @@ impl ComposeAttachedClusterWait { } } +struct ComposeAttachRequest<'a> { + project: &'a str, + services: &'a [String], +} + #[async_trait] impl AttachProvider for ComposeAttachProvider { async fn discover( &self, source: &AttachSource, ) -> Result>, AttachProviderError> { - let (project, services) = match source { - AttachSource::Compose { project, services } => (project, services), - _ => { - return Err(AttachProviderError::UnsupportedSource { - attach_source: source.clone(), - }); - } - }; - - let project = project - .as_ref() - .ok_or_else(|| AttachProviderError::Discovery { - source: ComposeAttachDiscoveryError::MissingProjectName.into(), - })?; - - let services = resolve_services(project, services) + let request = compose_attach_request(source)?; + let services = resolve_services(request.project, request.services) .await .map_err(to_discovery_error)?; let mut attached = Vec::with_capacity(services.len()); for service in &services { - let container_id = discover_service_container_id(project, service) - .await - .map_err(to_discovery_error)?; - - let api_port = discover_api_port(&container_id) - .await - .map_err(to_discovery_error)?; - - let endpoint = - build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?; - let source = ExternalNodeSource::new(service.clone(), endpoint.to_string()); - let client = E::external_node_client(&source).map_err(to_discovery_error)?; - - attached.push(AttachedNode { - identity_hint: Some(service.clone()), - client, - }); + attached.push( + build_attached_node::(&self.host, request.project, service) + .await + .map_err(to_discovery_error)?, + ); } Ok(attached) @@ -105,6 +84,41 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } +fn compose_attach_request( + source: &AttachSource, +) -> Result, AttachProviderError> { + let AttachSource::Compose { project, services } = source else { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + }; + + let project = project + .as_deref() + .ok_or_else(|| AttachProviderError::Discovery { + source: ComposeAttachDiscoveryError::MissingProjectName.into(), + })?; + + Ok(ComposeAttachRequest { project, services }) +} + +async fn build_attached_node( + host: &str, + project: &str, + service: &str, +) -> Result, DynError> { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let endpoint = build_service_endpoint(host, api_port)?; + let source = ExternalNodeSource::new(service.to_owned(), endpoint.to_string()); + let client = E::external_node_client(&source)?; + + Ok(AttachedNode { + identity_hint: Some(service.to_owned()), + client, + }) +} + pub(super) async fn resolve_services( project: &str, requested: &[String], @@ -147,23 +161,10 @@ pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result ClusterWaitHandle for ComposeAttachedClusterWait { async fn wait_network_ready(&self) -> Result<(), DynError> { - let AttachSource::Compose { project, services } = &self.source else { - return Err("compose cluster wait requires a compose attach source".into()); - }; - - let project = project - .as_ref() - .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; - let services = resolve_services(project, services).await?; - - let mut endpoints = Vec::with_capacity(services.len()); - for service in &services { - let container_id = discover_service_container_id(project, service).await?; - let api_port = discover_api_port(&container_id).await?; - let mut endpoint = build_service_endpoint(&self.host, api_port)?; - endpoint.set_path(E::readiness_path()); - endpoints.push(endpoint); - } + let request = compose_wait_request(&self.source)?; + let services = resolve_services(request.project, request.services).await?; + let endpoints = + collect_readiness_endpoints::(&self.host, request.project, &services).await?; wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; @@ -171,6 +172,36 @@ impl ClusterWaitHandle for ComposeAttachedClusterWait } } +fn compose_wait_request(source: &AttachSource) -> Result, DynError> { + let AttachSource::Compose { project, services } = source else { + return Err("compose cluster wait requires a compose attach source".into()); + }; + + let project = project + .as_deref() + .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; + + Ok(ComposeAttachRequest { project, services }) +} + +async fn collect_readiness_endpoints( + host: &str, + project: &str, + services: &[String], +) -> Result, DynError> { + let mut endpoints = Vec::with_capacity(services.len()); + + for service in services { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let mut endpoint = build_service_endpoint(host, api_port)?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + Ok(endpoints) +} + #[cfg(test)] mod tests { use super::build_service_endpoint; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 5d69218..831f3fe 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -158,7 +158,7 @@ impl DeploymentOrchestrator { let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; - let context = RunContext::new( + let context = build_run_context( scenario.deployment().clone(), node_clients, scenario.duration(), @@ -166,8 +166,8 @@ impl DeploymentOrchestrator { observability.telemetry_handle()?, feed, node_control, - ) - .with_cluster_wait(cluster_wait); + cluster_wait, + ); let cleanup_guard: Box = Box::new(feed_task); Ok(Runner::new(context, Some(cleanup_guard))) @@ -274,6 +274,7 @@ impl DeploymentOrchestrator { { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); + let cluster_wait = self.managed_cluster_wait(project_name); log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); @@ -287,10 +288,7 @@ impl DeploymentOrchestrator { telemetry, environment: &mut prepared.environment, node_control, - cluster_wait: Arc::new(ComposeAttachedClusterWait::::new( - compose_runner_host(), - AttachSource::compose(Vec::new()).with_project(project_name), - )), + cluster_wait, }; let runtime = build_compose_runtime::(input).await?; let cleanup_guard = @@ -320,6 +318,13 @@ impl DeploymentOrchestrator { }) } + fn managed_cluster_wait(&self, project_name: String) -> Arc> { + Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + AttachSource::compose(Vec::new()).with_project(project_name), + )) + } + fn log_deploy_start( &self, scenario: &Scenario,