diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs new file mode 100644 index 0000000..fa3befd --- /dev/null +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -0,0 +1,55 @@ +use std::time::Duration; + +use anyhow::{Error, Result, anyhow}; +use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; +use testing_framework_core::scenario::{Deployer as _, Runner}; +use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError}; + +#[tokio::test] +#[ignore = "requires Docker and mutates compose runtime state"] +async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> { + let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .with_run_duration(Duration::from_secs(5)) + .build()?; + + let managed_deployer = LbcComposeDeployer::default(); + let (_managed_runner, metadata): (Runner, ComposeDeploymentMetadata) = + match managed_deployer.deploy_with_metadata(&managed).await { + Ok(result) => result, + Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), + Err(error) => return Err(Error::new(error)), + }; + + let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?; + let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .with_run_duration(Duration::from_secs(5)) + .with_attach_source(attach_source) + .build()?; + + let attached_deployer = LbcComposeDeployer::default(); + let attached_runner: Runner = match attached_deployer.deploy(&attached).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => return Ok(()), + Err(error) => return Err(Error::new(error)), + }; + + attached_runner + .wait_network_ready() + .await + .map_err(|err| anyhow!("compose attached runner readiness failed: {err}"))?; + + if attached_runner.context().node_clients().is_empty() { + return Err(anyhow!("compose attach resolved no node clients")); + } + + for node_client in attached_runner.context().node_clients().snapshot() { + node_client.consensus_info().await.map_err(|err| { + anyhow!( + "attached node api query failed at {}: {err}", + node_client.base_url() + ) + })?; + } + + Ok(()) +} diff --git a/logos/runtime/ext/src/compose_env.rs b/logos/runtime/ext/src/compose_env.rs index 4fa3018..9803b12 100644 --- a/logos/runtime/ext/src/compose_env.rs +++ b/logos/runtime/ext/src/compose_env.rs @@ -254,6 +254,7 @@ fn build_compose_node_descriptor( base_volumes(), default_extra_hosts(), ports, + api_port, environment, platform, ) diff --git a/testing-framework/core/src/scenario/control.rs b/testing-framework/core/src/scenario/control.rs index d19f871..4f59621 100644 --- a/testing-framework/core/src/scenario/control.rs +++ b/testing-framework/core/src/scenario/control.rs @@ -33,3 +33,11 @@ pub trait NodeControlHandle: Send + Sync { None } } + +/// Deployer-agnostic wait surface for cluster readiness checks. +#[async_trait] +pub trait ClusterWaitHandle: Send + Sync { + async fn wait_network_ready(&self) -> Result<(), DynError> { + Err("wait_network_ready not supported by this deployer".into()) + } +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index ffa3aa3..f2e43d6 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -25,7 +25,7 @@ pub use capabilities::{ StartNodeOptions, StartedNode, }; pub use common_builder_ext::CoreBuilderExt; -pub use control::NodeControlHandle; +pub use control::{ClusterWaitHandle, NodeControlHandle}; #[doc(hidden)] pub use definition::{ Builder as CoreBuilder, // internal adapter-facing core builder @@ -37,7 +37,8 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ - BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode, + BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory, ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, @@ -46,9 +47,10 @@ pub use runtime::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports, - wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, - wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, + orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed, + wait_for_http_ports, wait_for_http_ports_with_host, + wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, + wait_http_readiness, wait_until_stable, }; pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; pub use workload::Workload; diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 8e26133..65f28a5 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -1,7 +1,16 @@ use std::{sync::Arc, time::Duration}; use super::{metrics::Metrics, node_clients::ClusterClient}; -use crate::scenario::{Application, BorrowedNode, ManagedNode, NodeClients, NodeControlHandle}; +use crate::scenario::{ + Application, BorrowedNode, ClusterWaitHandle, DynError, ManagedNode, NodeClients, + NodeControlHandle, +}; + +#[derive(Debug, thiserror::Error)] +enum RunContextCapabilityError { + #[error("wait_network_ready is not available for this runner")] + MissingClusterWait, +} /// Shared runtime context available to workloads and expectations. pub struct RunContext { @@ -12,6 +21,7 @@ pub struct RunContext { telemetry: Metrics, feed: ::Feed, node_control: Option>>, + cluster_wait: Option>>, } impl RunContext { @@ -36,9 +46,16 @@ impl RunContext { telemetry, feed, node_control, + cluster_wait: None, } } + #[must_use] + pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { + self.cluster_wait = Some(cluster_wait); + self + } + #[must_use] pub fn descriptors(&self) -> &E::Deployment { &self.descriptors @@ -104,15 +121,34 @@ impl RunContext { self.node_control.clone() } + #[must_use] + pub fn cluster_wait(&self) -> Option>> { + self.cluster_wait.clone() + } + #[must_use] pub const fn controls_nodes(&self) -> bool { self.node_control.is_some() } + #[must_use] + pub const fn can_wait_network_ready(&self) -> bool { + self.cluster_wait.is_some() + } + + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.require_cluster_wait()?.wait_network_ready().await + } + #[must_use] pub const fn cluster_client(&self) -> ClusterClient<'_, E> { self.node_clients.cluster_client() } + + fn require_cluster_wait(&self) -> Result>, DynError> { + self.cluster_wait() + .ok_or_else(|| RunContextCapabilityError::MissingClusterWait.into()) + } } /// Handle returned by the runner to control the lifecycle of the run. @@ -156,6 +192,10 @@ impl RunHandle { pub fn context(&self) -> &RunContext { &self.run_context } + + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.run_context.wait_network_ready().await + } } /// Derived metrics about the current run timing. diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 7e0804f..97bd2d6 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -16,10 +16,13 @@ pub use node_clients::NodeClients; #[doc(hidden)] pub use orchestration::{ ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources, - resolve_sources, + orchestrate_sources_with_providers, resolve_sources, }; #[doc(hidden)] -pub use providers::{SourceProviders, StaticManagedProvider}; +pub use providers::{ + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, + SourceProviders, StaticManagedProvider, +}; pub use readiness::{ HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, diff --git a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs index cb7d5b2..9e71458 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs @@ -7,4 +7,7 @@ pub use source_orchestration_plan::{ ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError, }; -pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources}; +pub use source_resolver::{ + build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers, + resolve_sources, +}; diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index e88857b..dd57ee9 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -65,13 +65,10 @@ impl SourceOrchestrationPlan { ) -> Result { let mode = mode_from_sources(sources); - let plan = Self { + Ok(Self { mode, readiness_policy, - }; - - plan.ensure_currently_wired()?; - Ok(plan) + }) } #[must_use] @@ -82,13 +79,24 @@ impl SourceOrchestrationPlan { | SourceOrchestrationMode::ExternalOnly { external } => external, } } +} - fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> { - match self.mode { - SourceOrchestrationMode::Managed { .. } - | SourceOrchestrationMode::ExternalOnly { .. } => Ok(()), - SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached), - } +#[cfg(test)] +mod tests { + use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; + use crate::scenario::{AttachSource, ScenarioSources, SourceReadinessPolicy}; + + #[test] + fn attached_sources_are_planned() { + let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()])); + let plan = + SourceOrchestrationPlan::try_from_sources(&sources, SourceReadinessPolicy::AllReady) + .expect("attached sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode, + SourceOrchestrationMode::Attached { .. } + )); } } @@ -107,7 +115,3 @@ fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode { }, } } - -fn not_wired(mode: SourceModeName) -> Result<(), SourceOrchestrationPlanError> { - Err(SourceOrchestrationPlanError::SourceModeNotWiredYet { mode }) -} diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs index 80fce0b..6ebd646 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -91,8 +91,7 @@ pub async fn resolve_sources( /// - Managed mode is backed by prebuilt deployer-managed clients via /// `StaticManagedProvider`. /// - External nodes are resolved via `Application::external_node_client`. -/// - Attached mode remains blocked at plan validation until attach providers -/// are fully wired. +/// - Attached nodes are discovered through the selected attach provider. pub async fn orchestrate_sources( plan: &SourceOrchestrationPlan, node_clients: NodeClients, @@ -103,6 +102,17 @@ pub async fn orchestrate_sources( ))) .with_external(Arc::new(ApplicationExternalProvider)); + orchestrate_sources_with_providers(plan, providers).await +} + +/// Orchestrates scenario sources with caller-supplied provider set. +/// +/// Deployer runtimes can use this to inject attach/external providers with +/// backend-specific discovery and control semantics. +pub async fn orchestrate_sources_with_providers( + plan: &SourceOrchestrationPlan, + providers: SourceProviders, +) -> Result, DynError> { let resolved = resolve_sources(plan, &providers).await?; if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() { diff --git a/testing-framework/core/src/scenario/runtime/providers/mod.rs b/testing-framework/core/src/scenario/runtime/providers/mod.rs index 60edaae..0d5a1a3 100644 --- a/testing-framework/core/src/scenario/runtime/providers/mod.rs +++ b/testing-framework/core/src/scenario/runtime/providers/mod.rs @@ -7,7 +7,7 @@ mod managed_provider; #[allow(dead_code)] mod source_providers; -pub use attach_provider::{AttachProviderError, AttachedNode}; +pub use attach_provider::{AttachProvider, AttachProviderError, AttachedNode}; pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError}; pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider}; pub use source_providers::SourceProviders; diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index bcc453b..652d740 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -27,6 +27,12 @@ pub struct Runner { cleanup_guard: Option>, } +impl Drop for Runner { + fn drop(&mut self) { + self.cleanup(); + } +} + impl Runner { /// Construct a runner from the run context and optional cleanup guard. #[must_use] @@ -43,6 +49,10 @@ impl Runner { Arc::clone(&self.context) } + pub async fn wait_network_ready(&self) -> Result<(), DynError> { + self.context.wait_network_ready().await + } + pub(crate) fn cleanup(&mut self) { if let Some(guard) = self.cleanup_guard.take() { guard.cleanup(); diff --git a/testing-framework/deployers/compose/Cargo.toml b/testing-framework/deployers/compose/Cargo.toml index 4880401..9ac8b7a 100644 --- a/testing-framework/deployers/compose/Cargo.toml +++ b/testing-framework/deployers/compose/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1" async-trait = { workspace = true } reqwest = { features = ["json"], workspace = true } serde = { features = ["derive"], workspace = true } +serde_json = { workspace = true } tempfile = { workspace = true } tera = "1.19" testing-framework-core = { path = "../../core" } @@ -30,5 +31,4 @@ uuid = { features = ["v4"], version = "1" } [dev-dependencies] groth16 = { workspace = true } key-management-system-service = { workspace = true } -serde_json = { workspace = true } zksign = { workspace = true } diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index ba21922..c6ecc29 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -18,6 +18,9 @@ services: {% for port in node.ports %} - {{ port }} {% endfor %} + labels: + testing-framework.node: "true" + testing-framework.api-container-port: "{{ node.api_container_port }}" environment: {% for env in node.environment %} {{ env.key }}: "{{ env.value }}" diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs new file mode 100644 index 0000000..87358aa --- /dev/null +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -0,0 +1,241 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use testing_framework_core::scenario::{ + AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, + ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, +}; +use url::Url; + +use crate::{ + docker::attached::{ + discover_attachable_services, discover_service_container_id, + inspect_api_container_port_label, inspect_mapped_tcp_ports, + }, + env::ComposeDeployEnv, +}; + +pub(super) struct ComposeAttachProvider { + host: String, + _env: PhantomData, +} + +pub(super) struct ComposeAttachedClusterWait { + host: String, + source: AttachSource, + _env: PhantomData, +} + +#[derive(Debug, thiserror::Error)] +enum ComposeAttachDiscoveryError { + #[error("compose attach source requires an explicit project name")] + MissingProjectName, +} + +impl ComposeAttachProvider { + pub(super) fn new(host: String) -> Self { + Self { + host, + _env: PhantomData, + } + } +} + +impl ComposeAttachedClusterWait { + pub(super) fn new(host: String, source: AttachSource) -> Self { + Self { + host, + source, + _env: PhantomData, + } + } +} + +struct ComposeAttachRequest<'a> { + project: &'a str, + services: &'a [String], +} + +#[async_trait] +impl AttachProvider for ComposeAttachProvider { + async fn discover( + &self, + source: &AttachSource, + ) -> Result>, AttachProviderError> { + let request = compose_attach_request(source)?; + let services = resolve_services(request.project, request.services) + .await + .map_err(to_discovery_error)?; + + let mut attached = Vec::with_capacity(services.len()); + for service in &services { + attached.push( + build_attached_node::(&self.host, request.project, service) + .await + .map_err(to_discovery_error)?, + ); + } + + Ok(attached) + } +} + +fn to_discovery_error(source: DynError) -> AttachProviderError { + AttachProviderError::Discovery { source } +} + +fn compose_attach_request( + source: &AttachSource, +) -> Result, AttachProviderError> { + let AttachSource::Compose { project, services } = source else { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + }; + + let project = project + .as_deref() + .ok_or_else(|| AttachProviderError::Discovery { + source: ComposeAttachDiscoveryError::MissingProjectName.into(), + })?; + + Ok(ComposeAttachRequest { project, services }) +} + +async fn build_attached_node( + host: &str, + project: &str, + service: &str, +) -> Result, DynError> { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let endpoint = build_service_endpoint(host, api_port)?; + let source = ExternalNodeSource::new(service.to_owned(), endpoint.to_string()); + let client = E::external_node_client(&source)?; + + Ok(AttachedNode { + identity_hint: Some(service.to_owned()), + client, + }) +} + +pub(super) async fn resolve_services( + project: &str, + requested: &[String], +) -> Result, DynError> { + if !requested.is_empty() { + return Ok(requested.to_owned()); + } + + discover_attachable_services(project).await +} + +pub(super) async fn discover_api_port(container_id: &str) -> Result { + let mapped_ports = inspect_mapped_tcp_ports(container_id).await?; + let api_container_port = inspect_api_container_port_label(container_id).await?; + let Some(api_port) = mapped_ports + .iter() + .find(|port| port.container_port == api_container_port) + .map(|port| port.host_port) + else { + let mapped_ports = mapped_ports + .iter() + .map(|port| format!("{}->{}", port.container_port, port.host_port)) + .collect::>() + .join(", "); + + return Err(format!( + "attached compose service container '{container_id}' does not expose labeled API container port {api_container_port}; mapped tcp ports: {mapped_ports}" + ) + .into()); + }; + + Ok(api_port) +} + +pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result { + let endpoint = Url::parse(&format!("http://{host}:{port}/"))?; + Ok(endpoint) +} + +#[async_trait] +impl ClusterWaitHandle for ComposeAttachedClusterWait { + async fn wait_network_ready(&self) -> Result<(), DynError> { + let request = compose_wait_request(&self.source)?; + let services = resolve_services(request.project, request.services).await?; + let endpoints = + collect_readiness_endpoints::(&self.host, request.project, &services).await?; + + wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; + + Ok(()) + } +} + +fn compose_wait_request(source: &AttachSource) -> Result, DynError> { + let AttachSource::Compose { project, services } = source else { + return Err("compose cluster wait requires a compose attach source".into()); + }; + + let project = project + .as_deref() + .ok_or(ComposeAttachDiscoveryError::MissingProjectName)?; + + Ok(ComposeAttachRequest { project, services }) +} + +async fn collect_readiness_endpoints( + host: &str, + project: &str, + services: &[String], +) -> Result, DynError> { + let mut endpoints = Vec::with_capacity(services.len()); + + for service in services { + let container_id = discover_service_container_id(project, service).await?; + let api_port = discover_api_port(&container_id).await?; + let mut endpoint = build_service_endpoint(host, api_port)?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + Ok(endpoints) +} + +#[cfg(test)] +mod tests { + use super::build_service_endpoint; + use crate::docker::attached::parse_mapped_tcp_ports; + + #[test] + fn parse_mapped_tcp_ports_skips_non_tcp_and_invalid_keys() { + let raw = r#"{ + "18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}], + "9999/udp":[{"HostIp":"0.0.0.0","HostPort":"39999"}], + "invalid":[{"HostIp":"0.0.0.0","HostPort":"12345"}] + }"#; + + let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse"); + assert_eq!(mapped.len(), 1); + assert_eq!(mapped[0].container_port, 18018); + assert_eq!(mapped[0].host_port, 32001); + } + + #[test] + fn parse_mapped_tcp_ports_returns_sorted_ports() { + let raw = r#"{ + "18019/tcp":[{"HostIp":"0.0.0.0","HostPort":"32002"}], + "18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}] + }"#; + + let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse"); + assert_eq!(mapped[0].container_port, 18018); + assert_eq!(mapped[1].container_port, 18019); + } + + #[test] + fn build_service_endpoint_formats_http_url() { + let endpoint = build_service_endpoint("127.0.0.1", 32001).expect("endpoint should parse"); + assert_eq!(endpoint.as_str(), "http://127.0.0.1:32001/"); + } +} diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index e32acae..60a88de 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -1,3 +1,4 @@ +mod attach_provider; pub mod clients; pub mod orchestrator; pub mod ports; @@ -8,8 +9,8 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ - CleanupGuard, Deployer, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl, - Runner, Scenario, + AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider, + RequiresNodeControl, Runner, Scenario, }; use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; @@ -21,6 +22,50 @@ pub struct ComposeDeployer { _env: PhantomData, } +/// Compose deployment metadata returned by compose-specific deployment APIs. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ComposeDeploymentMetadata { + /// Docker Compose project name used for this deployment when available. + pub project_name: Option, +} + +#[derive(Debug, thiserror::Error)] +enum ComposeMetadataError { + #[error("compose deployment metadata has no project name")] + MissingProjectName, +} + +impl ComposeDeploymentMetadata { + /// Returns project name when deployment is bound to a specific compose + /// project. + #[must_use] + pub fn project_name(&self) -> Option<&str> { + self.project_name.as_deref() + } + + /// Builds an attach source for the same compose project using deployer + /// discovery to resolve services. + pub fn attach_source(&self) -> Result { + let project_name = self + .project_name() + .ok_or(ComposeMetadataError::MissingProjectName)?; + + Ok(AttachSource::compose(Vec::new()).with_project(project_name.to_owned())) + } + + /// Builds an attach source for the same compose project. + pub fn attach_source_for_services( + &self, + services: Vec, + ) -> Result { + let project_name = self + .project_name() + .ok_or(ComposeMetadataError::MissingProjectName)?; + + Ok(AttachSource::compose(services).with_project(project_name.to_owned())) + } +} + impl Default for ComposeDeployer { fn default() -> Self { Self::new() @@ -41,6 +86,25 @@ impl ComposeDeployer { self.readiness_checks = enabled; self } + + /// Deploy and return compose-specific metadata alongside the generic + /// runner. + pub async fn deploy_with_metadata( + &self, + scenario: &Scenario, + ) -> Result<(Runner, ComposeDeploymentMetadata), ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + let deployer = Self { + readiness_checks: self.readiness_checks, + _env: PhantomData, + }; + + orchestrator::DeploymentOrchestrator::new(deployer) + .deploy_with_metadata(scenario) + .await + } } #[async_trait] diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 6585136..831f3fe 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,17 +3,20 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ + ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan, - orchestrate_sources, + RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, + SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; use tracing::info; use super::{ - ComposeDeployer, + ComposeDeployer, ComposeDeploymentMetadata, + attach_provider::{ComposeAttachProvider, ComposeAttachedClusterWait}, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, @@ -21,13 +24,14 @@ use super::{ setup::{DeploymentContext, DeploymentSetup}, }; use crate::{ - docker::control::ComposeNodeControl, + docker::control::{ComposeAttachedNodeControl, ComposeNodeControl}, env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, ports::{HostPortMapping, compose_runner_host}, }, + lifecycle::block_feed::spawn_block_feed_with_retry, }; const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS"; @@ -45,6 +49,18 @@ impl DeploymentOrchestrator { &self, scenario: &Scenario, ) -> Result, ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + self.deploy_with_metadata(scenario) + .await + .map(|(runner, _)| runner) + } + + pub async fn deploy_with_metadata( + &self, + scenario: &Scenario, + ) -> Result<(Runner, ComposeDeploymentMetadata), ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { @@ -55,6 +71,13 @@ impl DeploymentOrchestrator { } })?; + if scenario.sources().is_attached() { + return self + .deploy_attached_only::(scenario, source_plan) + .await + .map(|runner| (runner, attached_metadata(scenario))); + } + let deployment = scenario.deployment(); let setup = DeploymentSetup::::new(deployment); setup.validate_environment().await?; @@ -80,10 +103,13 @@ impl DeploymentOrchestrator { ) .await?; - // Source orchestration currently runs here after managed clients are prepared. - deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients) - .await - .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let source_providers = self.source_providers(deployed.node_clients.snapshot()); + + deployed.node_clients = self + .resolve_node_clients(&source_plan, source_providers) + .await?; + + let project_name = prepared.environment.project_name().to_owned(); let runner = self .build_runner::( @@ -92,6 +118,7 @@ impl DeploymentOrchestrator { deployed, observability, readiness_enabled, + project_name.clone(), ) .await?; @@ -103,7 +130,134 @@ impl DeploymentOrchestrator { readiness_enabled, ); - Ok(runner) + Ok(( + runner, + ComposeDeploymentMetadata { + project_name: Some(project_name), + }, + )) + } + + async fn deploy_attached_only( + &self, + scenario: &Scenario, + source_plan: SourceOrchestrationPlan, + ) -> Result, ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + let observability = resolve_observability_inputs(scenario)?; + let source_providers = self.source_providers(Vec::new()); + + let node_clients = self + .resolve_node_clients(&source_plan, source_providers) + .await?; + + self.ensure_non_empty_node_clients(&node_clients)?; + + let node_control = self.attached_node_control::(scenario)?; + let cluster_wait = self.attached_cluster_wait(scenario)?; + let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; + let context = build_run_context( + scenario.deployment().clone(), + node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + observability.telemetry_handle()?, + feed, + node_control, + cluster_wait, + ); + + let cleanup_guard: Box = Box::new(feed_task); + Ok(Runner::new(context, Some(cleanup_guard))) + } + + fn source_providers(&self, managed_clients: Vec) -> SourceProviders { + SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new(managed_clients))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)) + } + + async fn resolve_node_clients( + &self, + source_plan: &SourceOrchestrationPlan, + source_providers: SourceProviders, + ) -> Result, ComposeRunnerError> { + orchestrate_sources_with_providers(source_plan, source_providers) + .await + .map_err(|source| ComposeRunnerError::SourceOrchestration { source }) + } + + fn ensure_non_empty_node_clients( + &self, + node_clients: &NodeClients, + ) -> Result<(), ComposeRunnerError> { + if node_clients.is_empty() { + return Err(ComposeRunnerError::RuntimePreflight); + } + + Ok(()) + } + + fn attached_node_control( + &self, + scenario: &Scenario, + ) -> Result>>, ComposeRunnerError> + where + Caps: RequiresNodeControl + Send + Sync, + { + if !Caps::REQUIRED { + return Ok(None); + } + + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(ComposeRunnerError::InternalInvariant { + message: "attached node control requested outside attached source mode", + }); + }; + + let AttachSource::Compose { project, .. } = attach else { + return Err(ComposeRunnerError::InternalInvariant { + message: "compose deployer requires compose attach source for node control", + }); + }; + + let Some(project_name) = project + .as_ref() + .map(|value| value.trim()) + .filter(|value| !value.is_empty()) + else { + return Err(ComposeRunnerError::InternalInvariant { + message: "attached compose mode requires explicit project name for node control", + }); + }; + + Ok(Some(Arc::new(ComposeAttachedNodeControl { + project_name: project_name.to_owned(), + }) as Arc>)) + } + + fn attached_cluster_wait( + &self, + scenario: &Scenario, + ) -> Result>, ComposeRunnerError> + where + Caps: Send + Sync, + { + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(ComposeRunnerError::InternalInvariant { + message: "compose attached cluster wait requested outside attached source mode", + }); + }; + + Ok(Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + attach.clone(), + ))) } async fn build_runner( @@ -113,12 +267,14 @@ impl DeploymentOrchestrator { deployed: DeployedNodes, observability: ObservabilityInputs, readiness_enabled: bool, + project_name: String, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); + let cluster_wait = self.managed_cluster_wait(project_name); log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); @@ -132,6 +288,7 @@ impl DeploymentOrchestrator { telemetry, environment: &mut prepared.environment, node_control, + cluster_wait, }; let runtime = build_compose_runtime::(input).await?; let cleanup_guard = @@ -161,6 +318,13 @@ impl DeploymentOrchestrator { }) } + fn managed_cluster_wait(&self, project_name: String) -> Arc> { + Arc::new(ComposeAttachedClusterWait::::new( + compose_runner_host(), + AttachSource::compose(Vec::new()).with_project(project_name), + )) + } + fn log_deploy_start( &self, scenario: &Scenario, @@ -209,6 +373,22 @@ impl DeploymentOrchestrator { } } +fn attached_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata +where + E: ComposeDeployEnv, + Caps: Send + Sync, +{ + let project_name = match scenario.sources() { + ScenarioSources::Attached { + attach: AttachSource::Compose { project, .. }, + .. + } => project.clone(), + _ => None, + }; + + ComposeDeploymentMetadata { project_name } +} + struct DeployedNodes { host_ports: HostPortMapping, host: String, @@ -229,6 +409,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, + cluster_wait: Arc>, } async fn build_compose_runtime( @@ -253,6 +434,7 @@ async fn build_compose_runtime( input.telemetry, feed, input.node_control, + input.cluster_wait, ); Ok(ComposeRuntime { context, feed_task }) @@ -296,6 +478,7 @@ fn build_run_context( telemetry: Metrics, feed: ::Feed, node_control: Option>>, + cluster_wait: Arc>, ) -> RunContext { RunContext::new( descriptors, @@ -306,6 +489,7 @@ fn build_run_context( feed, node_control, ) + .with_cluster_wait(cluster_wait) } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index c5f769b..d35f8f5 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -9,6 +9,7 @@ pub struct NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, + api_container_port: u16, environment: Vec, #[serde(skip_serializing_if = "Option::is_none")] platform: Option, @@ -49,6 +50,7 @@ impl NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, + api_container_port: u16, environment: Vec, platform: Option, ) -> Self { @@ -59,6 +61,7 @@ impl NodeDescriptor { volumes, extra_hosts, ports, + api_container_port, environment, platform, } @@ -77,4 +80,9 @@ impl NodeDescriptor { pub fn environment(&self) -> &[EnvEntry] { &self.environment } + + #[cfg(test)] + pub fn api_container_port(&self) -> u16 { + self.api_container_port + } } diff --git a/testing-framework/deployers/compose/src/docker/attached.rs b/testing-framework/deployers/compose/src/docker/attached.rs new file mode 100644 index 0000000..b558a41 --- /dev/null +++ b/testing-framework/deployers/compose/src/docker/attached.rs @@ -0,0 +1,227 @@ +use std::process::Stdio; + +use serde_json::Value; +use testing_framework_core::scenario::DynError; +use tokio::process::Command; + +pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node"; +pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true"; +pub const API_CONTAINER_PORT_LABEL_KEY: &str = "testing-framework.api-container-port"; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct MappedTcpPort { + pub container_port: u16, + pub host_port: u16, +} + +pub async fn discover_service_container_id( + project: &str, + service: &str, +) -> Result { + let stdout = run_docker_capture([ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + &format!("label=com.docker.compose.service={service}"), + "--format", + "{{.ID}}", + ]) + .await?; + + let ids: Vec = stdout + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .map(ToOwned::to_owned) + .collect(); + + match ids.as_slice() { + [id] => Ok(id.clone()), + [] => Err(format!( + "no running container found for compose project '{project}' service '{service}'" + ) + .into()), + _ => Err(format!( + "multiple running containers found for compose project '{project}' service '{service}'" + ) + .into()), + } +} + +pub async fn discover_attachable_services(project: &str) -> Result, DynError> { + let attachable_filter = + format!("label={ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}"); + let attachable = discover_services_with_filters(project, Some(&attachable_filter)).await?; + + if attachable.is_empty() { + return Err(format!( + "no running compose services with label '{ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}' found for project '{project}'" + ) + .into()); + } + + Ok(attachable) +} + +pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result, DynError> { + let stdout = run_docker_capture([ + "inspect", + "--format", + "{{json .NetworkSettings.Ports}}", + container_id, + ]) + .await?; + + parse_mapped_tcp_ports(&stdout) +} + +pub async fn inspect_api_container_port_label(container_id: &str) -> Result { + let stdout = run_docker_capture([ + "inspect", + "--format", + "{{index .Config.Labels \"testing-framework.api-container-port\"}}", + container_id, + ]) + .await?; + + parse_api_container_port_label(&stdout) +} + +pub fn parse_mapped_tcp_ports(raw: &str) -> Result, DynError> { + let ports_value: Value = serde_json::from_str(raw.trim())?; + let ports_object = ports_value + .as_object() + .ok_or_else(|| "docker inspect ports payload is not an object".to_owned())?; + + let mut mapped = Vec::new(); + for (container_port_key, bindings) in ports_object { + let Some(container_port) = parse_container_port(container_port_key) else { + continue; + }; + + let Some(bindings_array) = bindings.as_array() else { + continue; + }; + + let Some(host_port) = bindings_array.iter().find_map(parse_host_port_binding) else { + continue; + }; + + mapped.push(MappedTcpPort { + container_port, + host_port, + }); + } + + mapped.sort_by_key(|port| port.container_port); + + Ok(mapped) +} + +pub async fn run_docker_capture(args: [&str; N]) -> Result { + let output = Command::new("docker") + .args(args) + .stdin(Stdio::null()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + return Err(format!( + "docker {} failed with status {}: {stderr}", + args.join(" "), + output.status + ) + .into()); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} + +async fn discover_services_with_filters( + project: &str, + extra_filter: Option<&str>, +) -> Result, DynError> { + let mut args = vec![ + "ps".to_owned(), + "--filter".to_owned(), + format!("label=com.docker.compose.project={project}"), + ]; + + if let Some(filter) = extra_filter { + args.push("--filter".to_owned()); + args.push(filter.to_owned()); + } + + args.push("--format".to_owned()); + args.push("{{.Label \"com.docker.compose.service\"}}".to_owned()); + + let output = Command::new("docker") + .args(&args) + .stdin(Stdio::null()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .output() + .await?; + + if !output.status.success() { + return Err(format!( + "docker {} failed with status {}: {}", + args.join(" "), + output.status, + String::from_utf8_lossy(&output.stderr).trim() + ) + .into()); + } + + let mut services: Vec = output + .stdout + .split(|byte| *byte == b'\n') + .filter_map(|line| { + let parsed = String::from_utf8_lossy(line).trim().to_owned(); + (!parsed.is_empty()).then_some(parsed) + }) + .collect(); + services.sort(); + services.dedup(); + Ok(services) +} + +fn parse_container_port(port_key: &str) -> Option { + let (port, proto) = port_key.split_once('/')?; + if proto != "tcp" { + return None; + } + + port.parse::().ok() +} + +fn parse_host_port_binding(binding: &Value) -> Option { + binding + .get("HostPort") + .and_then(Value::as_str)? + .parse::() + .ok() +} + +fn parse_api_container_port_label(raw: &str) -> Result { + let value = raw.trim(); + + if value.is_empty() || value == "" { + return Err(format!( + "attached compose container is missing required label '{API_CONTAINER_PORT_LABEL_KEY}'" + ) + .into()); + } + + value.parse::().map_err(|err| { + format!( + "attached compose container label '{API_CONTAINER_PORT_LABEL_KEY}' has invalid value '{value}': {err}" + ) + .into() + }) +} diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 0541e42..8e98948 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -7,13 +7,21 @@ use testing_framework_core::{ adjust_timeout, scenario::{Application, DynError, NodeControlHandle}, }; -use tokio::process::Command; +use tokio::{process::Command, time::timeout}; use tracing::info; -use crate::{docker::commands::run_docker_command, errors::ComposeRunnerError}; +use crate::{ + docker::{ + attached::discover_service_container_id, + commands::{ComposeCommandError, run_docker_command}, + }, + errors::ComposeRunnerError, +}; const COMPOSE_RESTART_TIMEOUT: Duration = Duration::from_secs(120); const COMPOSE_RESTART_DESCRIPTION: &str = "docker compose restart"; +const DOCKER_CONTAINER_RESTART_DESCRIPTION: &str = "docker container restart"; +const DOCKER_CONTAINER_STOP_DESCRIPTION: &str = "docker container stop"; pub async fn restart_compose_service( compose_file: &Path, @@ -38,6 +46,50 @@ pub async fn restart_compose_service( .map_err(ComposeRunnerError::Compose) } +pub async fn restart_attached_compose_service( + project_name: &str, + service: &str, +) -> Result<(), DynError> { + let container_id = discover_service_container_id(project_name, service).await?; + let command = docker_container_command("restart", &container_id); + + info!( + service, + project = project_name, + container = container_id, + "restarting attached compose service" + ); + + run_docker_action( + command, + DOCKER_CONTAINER_RESTART_DESCRIPTION, + adjust_timeout(COMPOSE_RESTART_TIMEOUT), + ) + .await +} + +pub async fn stop_attached_compose_service( + project_name: &str, + service: &str, +) -> Result<(), DynError> { + let container_id = discover_service_container_id(project_name, service).await?; + let command = docker_container_command("stop", &container_id); + + info!( + service, + project = project_name, + container = container_id, + "stopping attached compose service" + ); + + run_docker_action( + command, + DOCKER_CONTAINER_STOP_DESCRIPTION, + adjust_timeout(COMPOSE_RESTART_TIMEOUT), + ) + .await +} + fn compose_restart_command(compose_file: &Path, project_name: &str, service: &str) -> Command { let mut command = Command::new("docker"); command @@ -51,6 +103,43 @@ fn compose_restart_command(compose_file: &Path, project_name: &str, service: &st command } +fn docker_container_command(action: &str, container_id: &str) -> Command { + let mut command = Command::new("docker"); + command.arg(action).arg(container_id); + command +} + +async fn run_docker_action( + mut command: Command, + description: &str, + timeout_duration: Duration, +) -> Result<(), DynError> { + match timeout(timeout_duration, command.output()).await { + Ok(Ok(output)) => { + if output.status.success() { + return Ok(()); + } + + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + Err(format!( + "{description} failed with status {}: {stderr}", + output.status + ) + .into()) + } + Ok(Err(source)) => Err(format!("{description} failed to spawn: {source}").into()), + Err(_) => { + let compose_timeout = ComposeCommandError::Timeout { + command: description.to_owned(), + timeout: timeout_duration, + }; + + Err(compose_timeout.into()) + } + } +} + /// Compose-specific node control handle for restarting nodes. pub struct ComposeNodeControl { pub(crate) compose_file: PathBuf, @@ -65,3 +154,23 @@ impl NodeControlHandle for ComposeNodeControl { .map_err(|err| format!("node restart failed: {err}").into()) } } + +/// Node control handle for compose attached mode. +pub struct ComposeAttachedNodeControl { + pub(crate) project_name: String, +} + +#[async_trait::async_trait] +impl NodeControlHandle for ComposeAttachedNodeControl { + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + restart_attached_compose_service(&self.project_name, name) + .await + .map_err(|source| format!("node restart failed for service '{name}': {source}").into()) + } + + async fn stop_node(&self, name: &str) -> Result<(), DynError> { + stop_attached_compose_service(&self.project_name, name) + .await + .map_err(|source| format!("node stop failed for service '{name}': {source}").into()) + } +} diff --git a/testing-framework/deployers/compose/src/docker/mod.rs b/testing-framework/deployers/compose/src/docker/mod.rs index 3696492..c1b4581 100644 --- a/testing-framework/deployers/compose/src/docker/mod.rs +++ b/testing-framework/deployers/compose/src/docker/mod.rs @@ -1,3 +1,4 @@ +pub mod attached; pub mod commands; pub mod control; pub mod platform; diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index c75890f..8cf715e 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -6,7 +6,7 @@ pub mod errors; pub mod infrastructure; pub mod lifecycle; -pub use deployer::ComposeDeployer; +pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata}; pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor}; pub use docker::{ commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs},