feat(compose): Attached mode support

This commit is contained in:
Andrus Salumets 2026-03-08 17:26:58 +07:00 committed by GitHub
commit 5550cb5e3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1015 additions and 42 deletions

View File

@ -0,0 +1,55 @@
use std::time::Duration;
use anyhow::{Error, Result, anyhow};
use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder};
use testing_framework_core::scenario::{Deployer as _, Runner};
use testing_framework_runner_compose::{ComposeDeploymentMetadata, ComposeRunnerError};
#[tokio::test]
#[ignore = "requires Docker and mutates compose runtime state"]
async fn compose_attach_mode_queries_node_api_opt_in() -> Result<()> {
let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.with_run_duration(Duration::from_secs(5))
.build()?;
let managed_deployer = LbcComposeDeployer::default();
let (_managed_runner, metadata): (Runner<LbcExtEnv>, ComposeDeploymentMetadata) =
match managed_deployer.deploy_with_metadata(&managed).await {
Ok(result) => result,
Err(ComposeRunnerError::DockerUnavailable) => return Ok(()),
Err(error) => return Err(Error::new(error)),
};
let attach_source = metadata.attach_source().map_err(|err| anyhow!("{err}"))?;
let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.with_run_duration(Duration::from_secs(5))
.with_attach_source(attach_source)
.build()?;
let attached_deployer = LbcComposeDeployer::default();
let attached_runner: Runner<LbcExtEnv> = match attached_deployer.deploy(&attached).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => return Ok(()),
Err(error) => return Err(Error::new(error)),
};
attached_runner
.wait_network_ready()
.await
.map_err(|err| anyhow!("compose attached runner readiness failed: {err}"))?;
if attached_runner.context().node_clients().is_empty() {
return Err(anyhow!("compose attach resolved no node clients"));
}
for node_client in attached_runner.context().node_clients().snapshot() {
node_client.consensus_info().await.map_err(|err| {
anyhow!(
"attached node api query failed at {}: {err}",
node_client.base_url()
)
})?;
}
Ok(())
}

View File

@ -254,6 +254,7 @@ fn build_compose_node_descriptor(
base_volumes(),
default_extra_hosts(),
ports,
api_port,
environment,
platform,
)

View File

@ -33,3 +33,11 @@ pub trait NodeControlHandle<E: Application>: Send + Sync {
None
}
}
/// Deployer-agnostic wait surface for cluster readiness checks.
#[async_trait]
pub trait ClusterWaitHandle<E: Application>: Send + Sync {
async fn wait_network_ready(&self) -> Result<(), DynError> {
Err("wait_network_ready not supported by this deployer".into())
}
}

View File

@ -25,7 +25,7 @@ pub use capabilities::{
StartNodeOptions, StartedNode,
};
pub use common_builder_ext::CoreBuilderExt;
pub use control::NodeControlHandle;
pub use control::{ClusterWaitHandle, NodeControlHandle};
#[doc(hidden)]
pub use definition::{
Builder as CoreBuilder, // internal adapter-facing core builder
@ -37,7 +37,8 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
pub use expectation::Expectation;
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime,
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode,
BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime,
HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory,
ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError,
SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider,
@ -46,9 +47,10 @@ pub use runtime::{
CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError,
PrometheusEndpoint, PrometheusInstantSample,
},
orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,
wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable,
orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed,
wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_http_readiness, wait_until_stable,
};
pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy};
pub use workload::Workload;

View File

@ -1,7 +1,16 @@
use std::{sync::Arc, time::Duration};
use super::{metrics::Metrics, node_clients::ClusterClient};
use crate::scenario::{Application, BorrowedNode, ManagedNode, NodeClients, NodeControlHandle};
use crate::scenario::{
Application, BorrowedNode, ClusterWaitHandle, DynError, ManagedNode, NodeClients,
NodeControlHandle,
};
#[derive(Debug, thiserror::Error)]
enum RunContextCapabilityError {
#[error("wait_network_ready is not available for this runner")]
MissingClusterWait,
}
/// Shared runtime context available to workloads and expectations.
pub struct RunContext<E: Application> {
@ -12,6 +21,7 @@ pub struct RunContext<E: Application> {
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
impl<E: Application> RunContext<E> {
@ -36,9 +46,16 @@ impl<E: Application> RunContext<E> {
telemetry,
feed,
node_control,
cluster_wait: None,
}
}
#[must_use]
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
self.cluster_wait = Some(cluster_wait);
self
}
#[must_use]
pub fn descriptors(&self) -> &E::Deployment {
&self.descriptors
@ -104,15 +121,34 @@ impl<E: Application> RunContext<E> {
self.node_control.clone()
}
#[must_use]
pub fn cluster_wait(&self) -> Option<Arc<dyn ClusterWaitHandle<E>>> {
self.cluster_wait.clone()
}
#[must_use]
pub const fn controls_nodes(&self) -> bool {
self.node_control.is_some()
}
#[must_use]
pub const fn can_wait_network_ready(&self) -> bool {
self.cluster_wait.is_some()
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
self.require_cluster_wait()?.wait_network_ready().await
}
#[must_use]
pub const fn cluster_client(&self) -> ClusterClient<'_, E> {
self.node_clients.cluster_client()
}
fn require_cluster_wait(&self) -> Result<Arc<dyn ClusterWaitHandle<E>>, DynError> {
self.cluster_wait()
.ok_or_else(|| RunContextCapabilityError::MissingClusterWait.into())
}
}
/// Handle returned by the runner to control the lifecycle of the run.
@ -156,6 +192,10 @@ impl<E: Application> RunHandle<E> {
pub fn context(&self) -> &RunContext<E> {
&self.run_context
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
self.run_context.wait_network_ready().await
}
}
/// Derived metrics about the current run timing.

View File

@ -16,10 +16,13 @@ pub use node_clients::NodeClients;
#[doc(hidden)]
pub use orchestration::{
ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources,
resolve_sources,
orchestrate_sources_with_providers, resolve_sources,
};
#[doc(hidden)]
pub use providers::{SourceProviders, StaticManagedProvider};
pub use providers::{
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode,
SourceProviders, StaticManagedProvider,
};
pub use readiness::{
HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,

View File

@ -7,4 +7,7 @@ pub use source_orchestration_plan::{
ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan,
SourceOrchestrationPlanError,
};
pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources};
pub use source_resolver::{
build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers,
resolve_sources,
};

View File

@ -65,13 +65,10 @@ impl SourceOrchestrationPlan {
) -> Result<Self, SourceOrchestrationPlanError> {
let mode = mode_from_sources(sources);
let plan = Self {
Ok(Self {
mode,
readiness_policy,
};
plan.ensure_currently_wired()?;
Ok(plan)
})
}
#[must_use]
@ -82,13 +79,24 @@ impl SourceOrchestrationPlan {
| SourceOrchestrationMode::ExternalOnly { external } => external,
}
}
}
fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> {
match self.mode {
SourceOrchestrationMode::Managed { .. }
| SourceOrchestrationMode::ExternalOnly { .. } => Ok(()),
SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached),
}
#[cfg(test)]
mod tests {
use super::{SourceOrchestrationMode, SourceOrchestrationPlan};
use crate::scenario::{AttachSource, ScenarioSources, SourceReadinessPolicy};
#[test]
fn attached_sources_are_planned() {
let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()]));
let plan =
SourceOrchestrationPlan::try_from_sources(&sources, SourceReadinessPolicy::AllReady)
.expect("attached sources should build a source orchestration plan");
assert!(matches!(
plan.mode,
SourceOrchestrationMode::Attached { .. }
));
}
}
@ -107,7 +115,3 @@ fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode {
},
}
}
fn not_wired(mode: SourceModeName) -> Result<(), SourceOrchestrationPlanError> {
Err(SourceOrchestrationPlanError::SourceModeNotWiredYet { mode })
}

View File

@ -91,8 +91,7 @@ pub async fn resolve_sources<E: Application>(
/// - Managed mode is backed by prebuilt deployer-managed clients via
/// `StaticManagedProvider`.
/// - External nodes are resolved via `Application::external_node_client`.
/// - Attached mode remains blocked at plan validation until attach providers
/// are fully wired.
/// - Attached nodes are discovered through the selected attach provider.
pub async fn orchestrate_sources<E: Application>(
plan: &SourceOrchestrationPlan,
node_clients: NodeClients<E>,
@ -103,6 +102,17 @@ pub async fn orchestrate_sources<E: Application>(
)))
.with_external(Arc::new(ApplicationExternalProvider));
orchestrate_sources_with_providers(plan, providers).await
}
/// Orchestrates scenario sources with caller-supplied provider set.
///
/// Deployer runtimes can use this to inject attach/external providers with
/// backend-specific discovery and control semantics.
pub async fn orchestrate_sources_with_providers<E: Application>(
plan: &SourceOrchestrationPlan,
providers: SourceProviders<E>,
) -> Result<NodeClients<E>, DynError> {
let resolved = resolve_sources(plan, &providers).await?;
if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() {

View File

@ -7,7 +7,7 @@ mod managed_provider;
#[allow(dead_code)]
mod source_providers;
pub use attach_provider::{AttachProviderError, AttachedNode};
pub use attach_provider::{AttachProvider, AttachProviderError, AttachedNode};
pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError};
pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider};
pub use source_providers::SourceProviders;

View File

@ -27,6 +27,12 @@ pub struct Runner<E: Application> {
cleanup_guard: Option<Box<dyn CleanupGuard>>,
}
impl<E: Application> Drop for Runner<E> {
fn drop(&mut self) {
self.cleanup();
}
}
impl<E: Application> Runner<E> {
/// Construct a runner from the run context and optional cleanup guard.
#[must_use]
@ -43,6 +49,10 @@ impl<E: Application> Runner<E> {
Arc::clone(&self.context)
}
pub async fn wait_network_ready(&self) -> Result<(), DynError> {
self.context.wait_network_ready().await
}
pub(crate) fn cleanup(&mut self) {
if let Some(guard) = self.cleanup_guard.take() {
guard.cleanup();

View File

@ -17,6 +17,7 @@ anyhow = "1"
async-trait = { workspace = true }
reqwest = { features = ["json"], workspace = true }
serde = { features = ["derive"], workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tera = "1.19"
testing-framework-core = { path = "../../core" }
@ -30,5 +31,4 @@ uuid = { features = ["v4"], version = "1" }
[dev-dependencies]
groth16 = { workspace = true }
key-management-system-service = { workspace = true }
serde_json = { workspace = true }
zksign = { workspace = true }

View File

@ -18,6 +18,9 @@ services:
{% for port in node.ports %}
- {{ port }}
{% endfor %}
labels:
testing-framework.node: "true"
testing-framework.api-container-port: "{{ node.api_container_port }}"
environment:
{% for env in node.environment %}
{{ env.key }}: "{{ env.value }}"

View File

@ -0,0 +1,241 @@
use std::marker::PhantomData;
use async_trait::async_trait;
use testing_framework_core::scenario::{
AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError,
ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness,
};
use url::Url;
use crate::{
docker::attached::{
discover_attachable_services, discover_service_container_id,
inspect_api_container_port_label, inspect_mapped_tcp_ports,
},
env::ComposeDeployEnv,
};
pub(super) struct ComposeAttachProvider<E: ComposeDeployEnv> {
host: String,
_env: PhantomData<E>,
}
pub(super) struct ComposeAttachedClusterWait<E: ComposeDeployEnv> {
host: String,
source: AttachSource,
_env: PhantomData<E>,
}
#[derive(Debug, thiserror::Error)]
enum ComposeAttachDiscoveryError {
#[error("compose attach source requires an explicit project name")]
MissingProjectName,
}
impl<E: ComposeDeployEnv> ComposeAttachProvider<E> {
pub(super) fn new(host: String) -> Self {
Self {
host,
_env: PhantomData,
}
}
}
impl<E: ComposeDeployEnv> ComposeAttachedClusterWait<E> {
pub(super) fn new(host: String, source: AttachSource) -> Self {
Self {
host,
source,
_env: PhantomData,
}
}
}
struct ComposeAttachRequest<'a> {
project: &'a str,
services: &'a [String],
}
#[async_trait]
impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
async fn discover(
&self,
source: &AttachSource,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
let request = compose_attach_request(source)?;
let services = resolve_services(request.project, request.services)
.await
.map_err(to_discovery_error)?;
let mut attached = Vec::with_capacity(services.len());
for service in &services {
attached.push(
build_attached_node::<E>(&self.host, request.project, service)
.await
.map_err(to_discovery_error)?,
);
}
Ok(attached)
}
}
fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
fn compose_attach_request(
source: &AttachSource,
) -> Result<ComposeAttachRequest<'_>, AttachProviderError> {
let AttachSource::Compose { project, services } = source else {
return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(),
});
};
let project = project
.as_deref()
.ok_or_else(|| AttachProviderError::Discovery {
source: ComposeAttachDiscoveryError::MissingProjectName.into(),
})?;
Ok(ComposeAttachRequest { project, services })
}
async fn build_attached_node<E: ComposeDeployEnv>(
host: &str,
project: &str,
service: &str,
) -> Result<AttachedNode<E>, DynError> {
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let endpoint = build_service_endpoint(host, api_port)?;
let source = ExternalNodeSource::new(service.to_owned(), endpoint.to_string());
let client = E::external_node_client(&source)?;
Ok(AttachedNode {
identity_hint: Some(service.to_owned()),
client,
})
}
pub(super) async fn resolve_services(
project: &str,
requested: &[String],
) -> Result<Vec<String>, DynError> {
if !requested.is_empty() {
return Ok(requested.to_owned());
}
discover_attachable_services(project).await
}
pub(super) async fn discover_api_port(container_id: &str) -> Result<u16, DynError> {
let mapped_ports = inspect_mapped_tcp_ports(container_id).await?;
let api_container_port = inspect_api_container_port_label(container_id).await?;
let Some(api_port) = mapped_ports
.iter()
.find(|port| port.container_port == api_container_port)
.map(|port| port.host_port)
else {
let mapped_ports = mapped_ports
.iter()
.map(|port| format!("{}->{}", port.container_port, port.host_port))
.collect::<Vec<_>>()
.join(", ");
return Err(format!(
"attached compose service container '{container_id}' does not expose labeled API container port {api_container_port}; mapped tcp ports: {mapped_ports}"
)
.into());
};
Ok(api_port)
}
pub(super) fn build_service_endpoint(host: &str, port: u16) -> Result<Url, DynError> {
let endpoint = Url::parse(&format!("http://{host}:{port}/"))?;
Ok(endpoint)
}
#[async_trait]
impl<E: ComposeDeployEnv> ClusterWaitHandle<E> for ComposeAttachedClusterWait<E> {
async fn wait_network_ready(&self) -> Result<(), DynError> {
let request = compose_wait_request(&self.source)?;
let services = resolve_services(request.project, request.services).await?;
let endpoints =
collect_readiness_endpoints::<E>(&self.host, request.project, &services).await?;
wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?;
Ok(())
}
}
fn compose_wait_request(source: &AttachSource) -> Result<ComposeAttachRequest<'_>, DynError> {
let AttachSource::Compose { project, services } = source else {
return Err("compose cluster wait requires a compose attach source".into());
};
let project = project
.as_deref()
.ok_or(ComposeAttachDiscoveryError::MissingProjectName)?;
Ok(ComposeAttachRequest { project, services })
}
async fn collect_readiness_endpoints<E: ComposeDeployEnv>(
host: &str,
project: &str,
services: &[String],
) -> Result<Vec<Url>, DynError> {
let mut endpoints = Vec::with_capacity(services.len());
for service in services {
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let mut endpoint = build_service_endpoint(host, api_port)?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
Ok(endpoints)
}
#[cfg(test)]
mod tests {
use super::build_service_endpoint;
use crate::docker::attached::parse_mapped_tcp_ports;
#[test]
fn parse_mapped_tcp_ports_skips_non_tcp_and_invalid_keys() {
let raw = r#"{
"18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}],
"9999/udp":[{"HostIp":"0.0.0.0","HostPort":"39999"}],
"invalid":[{"HostIp":"0.0.0.0","HostPort":"12345"}]
}"#;
let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse");
assert_eq!(mapped.len(), 1);
assert_eq!(mapped[0].container_port, 18018);
assert_eq!(mapped[0].host_port, 32001);
}
#[test]
fn parse_mapped_tcp_ports_returns_sorted_ports() {
let raw = r#"{
"18019/tcp":[{"HostIp":"0.0.0.0","HostPort":"32002"}],
"18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}]
}"#;
let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse");
assert_eq!(mapped[0].container_port, 18018);
assert_eq!(mapped[1].container_port, 18019);
}
#[test]
fn build_service_endpoint_formats_http_url() {
let endpoint = build_service_endpoint("127.0.0.1", 32001).expect("endpoint should parse");
assert_eq!(endpoint.as_str(), "http://127.0.0.1:32001/");
}
}

View File

@ -1,3 +1,4 @@
mod attach_provider;
pub mod clients;
pub mod orchestrator;
pub mod ports;
@ -8,8 +9,8 @@ use std::marker::PhantomData;
use async_trait::async_trait;
use testing_framework_core::scenario::{
CleanupGuard, Deployer, FeedHandle, ObservabilityCapabilityProvider, RequiresNodeControl,
Runner, Scenario,
AttachSource, CleanupGuard, Deployer, DynError, FeedHandle, ObservabilityCapabilityProvider,
RequiresNodeControl, Runner, Scenario,
};
use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup};
@ -21,6 +22,50 @@ pub struct ComposeDeployer<E: ComposeDeployEnv> {
_env: PhantomData<E>,
}
/// Compose deployment metadata returned by compose-specific deployment APIs.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ComposeDeploymentMetadata {
/// Docker Compose project name used for this deployment when available.
pub project_name: Option<String>,
}
#[derive(Debug, thiserror::Error)]
enum ComposeMetadataError {
#[error("compose deployment metadata has no project name")]
MissingProjectName,
}
impl ComposeDeploymentMetadata {
/// Returns project name when deployment is bound to a specific compose
/// project.
#[must_use]
pub fn project_name(&self) -> Option<&str> {
self.project_name.as_deref()
}
/// Builds an attach source for the same compose project using deployer
/// discovery to resolve services.
pub fn attach_source(&self) -> Result<AttachSource, DynError> {
let project_name = self
.project_name()
.ok_or(ComposeMetadataError::MissingProjectName)?;
Ok(AttachSource::compose(Vec::new()).with_project(project_name.to_owned()))
}
/// Builds an attach source for the same compose project.
pub fn attach_source_for_services(
&self,
services: Vec<String>,
) -> Result<AttachSource, DynError> {
let project_name = self
.project_name()
.ok_or(ComposeMetadataError::MissingProjectName)?;
Ok(AttachSource::compose(services).with_project(project_name.to_owned()))
}
}
impl<E: ComposeDeployEnv> Default for ComposeDeployer<E> {
fn default() -> Self {
Self::new()
@ -41,6 +86,25 @@ impl<E: ComposeDeployEnv> ComposeDeployer<E> {
self.readiness_checks = enabled;
self
}
/// Deploy and return compose-specific metadata alongside the generic
/// runner.
pub async fn deploy_with_metadata<Caps>(
&self,
scenario: &Scenario<E, Caps>,
) -> Result<(Runner<E>, ComposeDeploymentMetadata), ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
let deployer = Self {
readiness_checks: self.readiness_checks,
_env: PhantomData,
};
orchestrator::DeploymentOrchestrator::new(deployer)
.deploy_with_metadata(scenario)
.await
}
}
#[async_trait]

View File

@ -3,17 +3,20 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url;
use testing_framework_core::{
scenario::{
ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle,
DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan,
orchestrate_sources,
RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
use tracing::info;
use super::{
ComposeDeployer,
ComposeDeployer, ComposeDeploymentMetadata,
attach_provider::{ComposeAttachProvider, ComposeAttachedClusterWait},
clients::ClientBuilder,
make_cleanup_guard,
ports::PortManager,
@ -21,13 +24,14 @@ use super::{
setup::{DeploymentContext, DeploymentSetup},
};
use crate::{
docker::control::ComposeNodeControl,
docker::control::{ComposeAttachedNodeControl, ComposeNodeControl},
env::ComposeDeployEnv,
errors::ComposeRunnerError,
infrastructure::{
environment::StackEnvironment,
ports::{HostPortMapping, compose_runner_host},
},
lifecycle::block_feed::spawn_block_feed_with_retry,
};
const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS";
@ -45,6 +49,18 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
&self,
scenario: &Scenario<E, Caps>,
) -> Result<Runner<E>, ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
self.deploy_with_metadata(scenario)
.await
.map(|(runner, _)| runner)
}
pub async fn deploy_with_metadata<Caps>(
&self,
scenario: &Scenario<E, Caps>,
) -> Result<(Runner<E>, ComposeDeploymentMetadata), ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
@ -55,6 +71,13 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}
})?;
if scenario.sources().is_attached() {
return self
.deploy_attached_only::<Caps>(scenario, source_plan)
.await
.map(|runner| (runner, attached_metadata(scenario)));
}
let deployment = scenario.deployment();
let setup = DeploymentSetup::<E>::new(deployment);
setup.validate_environment().await?;
@ -80,10 +103,13 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
)
.await?;
// Source orchestration currently runs here after managed clients are prepared.
deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let source_providers = self.source_providers(deployed.node_clients.snapshot());
deployed.node_clients = self
.resolve_node_clients(&source_plan, source_providers)
.await?;
let project_name = prepared.environment.project_name().to_owned();
let runner = self
.build_runner::<Caps>(
@ -92,6 +118,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
deployed,
observability,
readiness_enabled,
project_name.clone(),
)
.await?;
@ -103,7 +130,134 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
readiness_enabled,
);
Ok(runner)
Ok((
runner,
ComposeDeploymentMetadata {
project_name: Some(project_name),
},
))
}
async fn deploy_attached_only<Caps>(
&self,
scenario: &Scenario<E, Caps>,
source_plan: SourceOrchestrationPlan,
) -> Result<Runner<E>, ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
let observability = resolve_observability_inputs(scenario)?;
let source_providers = self.source_providers(Vec::new());
let node_clients = self
.resolve_node_clients(&source_plan, source_providers)
.await?;
self.ensure_non_empty_node_clients(&node_clients)?;
let node_control = self.attached_node_control::<Caps>(scenario)?;
let cluster_wait = self.attached_cluster_wait(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let context = build_run_context(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
observability.telemetry_handle()?,
feed,
node_control,
cluster_wait,
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(Runner::new(context, Some(cleanup_guard)))
}
fn source_providers(&self, managed_clients: Vec<E::NodeClient>) -> SourceProviders<E> {
SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(managed_clients)))
.with_attach(Arc::new(ComposeAttachProvider::<E>::new(
compose_runner_host(),
)))
.with_external(Arc::new(ApplicationExternalProvider))
}
async fn resolve_node_clients(
&self,
source_plan: &SourceOrchestrationPlan,
source_providers: SourceProviders<E>,
) -> Result<NodeClients<E>, ComposeRunnerError> {
orchestrate_sources_with_providers(source_plan, source_providers)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })
}
fn ensure_non_empty_node_clients(
&self,
node_clients: &NodeClients<E>,
) -> Result<(), ComposeRunnerError> {
if node_clients.is_empty() {
return Err(ComposeRunnerError::RuntimePreflight);
}
Ok(())
}
fn attached_node_control<Caps>(
&self,
scenario: &Scenario<E, Caps>,
) -> Result<Option<Arc<dyn NodeControlHandle<E>>>, ComposeRunnerError>
where
Caps: RequiresNodeControl + Send + Sync,
{
if !Caps::REQUIRED {
return Ok(None);
}
let ScenarioSources::Attached { attach, .. } = scenario.sources() else {
return Err(ComposeRunnerError::InternalInvariant {
message: "attached node control requested outside attached source mode",
});
};
let AttachSource::Compose { project, .. } = attach else {
return Err(ComposeRunnerError::InternalInvariant {
message: "compose deployer requires compose attach source for node control",
});
};
let Some(project_name) = project
.as_ref()
.map(|value| value.trim())
.filter(|value| !value.is_empty())
else {
return Err(ComposeRunnerError::InternalInvariant {
message: "attached compose mode requires explicit project name for node control",
});
};
Ok(Some(Arc::new(ComposeAttachedNodeControl {
project_name: project_name.to_owned(),
}) as Arc<dyn NodeControlHandle<E>>))
}
fn attached_cluster_wait<Caps>(
&self,
scenario: &Scenario<E, Caps>,
) -> Result<Arc<dyn ClusterWaitHandle<E>>, ComposeRunnerError>
where
Caps: Send + Sync,
{
let ScenarioSources::Attached { attach, .. } = scenario.sources() else {
return Err(ComposeRunnerError::InternalInvariant {
message: "compose attached cluster wait requested outside attached source mode",
});
};
Ok(Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
attach.clone(),
)))
}
async fn build_runner<Caps>(
@ -113,12 +267,14 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
deployed: DeployedNodes<E>,
observability: ObservabilityInputs,
readiness_enabled: bool,
project_name: String,
) -> Result<Runner<E>, ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
let telemetry = observability.telemetry_handle()?;
let node_control = self.maybe_node_control::<Caps>(&prepared.environment);
let cluster_wait = self.managed_cluster_wait(project_name);
log_observability_endpoints(&observability);
log_profiling_urls(&deployed.host, &deployed.host_ports);
@ -132,6 +288,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
telemetry,
environment: &mut prepared.environment,
node_control,
cluster_wait,
};
let runtime = build_compose_runtime::<E>(input).await?;
let cleanup_guard =
@ -161,6 +318,13 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
})
}
fn managed_cluster_wait(&self, project_name: String) -> Arc<dyn ClusterWaitHandle<E>> {
Arc::new(ComposeAttachedClusterWait::<E>::new(
compose_runner_host(),
AttachSource::compose(Vec::new()).with_project(project_name),
))
}
fn log_deploy_start<Caps>(
&self,
scenario: &Scenario<E, Caps>,
@ -209,6 +373,22 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}
}
fn attached_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> ComposeDeploymentMetadata
where
E: ComposeDeployEnv,
Caps: Send + Sync,
{
let project_name = match scenario.sources() {
ScenarioSources::Attached {
attach: AttachSource::Compose { project, .. },
..
} => project.clone(),
_ => None,
};
ComposeDeploymentMetadata { project_name }
}
struct DeployedNodes<E: ComposeDeployEnv> {
host_ports: HostPortMapping,
host: String,
@ -229,6 +409,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
telemetry: Metrics,
environment: &'a mut StackEnvironment,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
}
async fn build_compose_runtime<E: ComposeDeployEnv>(
@ -253,6 +434,7 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
input.telemetry,
feed,
input.node_control,
input.cluster_wait,
);
Ok(ComposeRuntime { context, feed_task })
@ -296,6 +478,7 @@ fn build_run_context<E: ComposeDeployEnv>(
telemetry: Metrics,
feed: <E::FeedRuntime as FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
descriptors,
@ -306,6 +489,7 @@ fn build_run_context<E: ComposeDeployEnv>(
feed,
node_control,
)
.with_cluster_wait(cluster_wait)
}
fn resolve_observability_inputs<E, Caps>(

View File

@ -9,6 +9,7 @@ pub struct NodeDescriptor {
volumes: Vec<String>,
extra_hosts: Vec<String>,
ports: Vec<String>,
api_container_port: u16,
environment: Vec<EnvEntry>,
#[serde(skip_serializing_if = "Option::is_none")]
platform: Option<String>,
@ -49,6 +50,7 @@ impl NodeDescriptor {
volumes: Vec<String>,
extra_hosts: Vec<String>,
ports: Vec<String>,
api_container_port: u16,
environment: Vec<EnvEntry>,
platform: Option<String>,
) -> Self {
@ -59,6 +61,7 @@ impl NodeDescriptor {
volumes,
extra_hosts,
ports,
api_container_port,
environment,
platform,
}
@ -77,4 +80,9 @@ impl NodeDescriptor {
pub fn environment(&self) -> &[EnvEntry] {
&self.environment
}
#[cfg(test)]
pub fn api_container_port(&self) -> u16 {
self.api_container_port
}
}

View File

@ -0,0 +1,227 @@
use std::process::Stdio;
use serde_json::Value;
use testing_framework_core::scenario::DynError;
use tokio::process::Command;
pub const ATTACHABLE_NODE_LABEL_KEY: &str = "testing-framework.node";
pub const ATTACHABLE_NODE_LABEL_VALUE: &str = "true";
pub const API_CONTAINER_PORT_LABEL_KEY: &str = "testing-framework.api-container-port";
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct MappedTcpPort {
pub container_port: u16,
pub host_port: u16,
}
pub async fn discover_service_container_id(
project: &str,
service: &str,
) -> Result<String, DynError> {
let stdout = run_docker_capture([
"ps",
"--filter",
&format!("label=com.docker.compose.project={project}"),
"--filter",
&format!("label=com.docker.compose.service={service}"),
"--format",
"{{.ID}}",
])
.await?;
let ids: Vec<String> = stdout
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.map(ToOwned::to_owned)
.collect();
match ids.as_slice() {
[id] => Ok(id.clone()),
[] => Err(format!(
"no running container found for compose project '{project}' service '{service}'"
)
.into()),
_ => Err(format!(
"multiple running containers found for compose project '{project}' service '{service}'"
)
.into()),
}
}
pub async fn discover_attachable_services(project: &str) -> Result<Vec<String>, DynError> {
let attachable_filter =
format!("label={ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}");
let attachable = discover_services_with_filters(project, Some(&attachable_filter)).await?;
if attachable.is_empty() {
return Err(format!(
"no running compose services with label '{ATTACHABLE_NODE_LABEL_KEY}={ATTACHABLE_NODE_LABEL_VALUE}' found for project '{project}'"
)
.into());
}
Ok(attachable)
}
pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result<Vec<MappedTcpPort>, DynError> {
let stdout = run_docker_capture([
"inspect",
"--format",
"{{json .NetworkSettings.Ports}}",
container_id,
])
.await?;
parse_mapped_tcp_ports(&stdout)
}
pub async fn inspect_api_container_port_label(container_id: &str) -> Result<u16, DynError> {
let stdout = run_docker_capture([
"inspect",
"--format",
"{{index .Config.Labels \"testing-framework.api-container-port\"}}",
container_id,
])
.await?;
parse_api_container_port_label(&stdout)
}
pub fn parse_mapped_tcp_ports(raw: &str) -> Result<Vec<MappedTcpPort>, DynError> {
let ports_value: Value = serde_json::from_str(raw.trim())?;
let ports_object = ports_value
.as_object()
.ok_or_else(|| "docker inspect ports payload is not an object".to_owned())?;
let mut mapped = Vec::new();
for (container_port_key, bindings) in ports_object {
let Some(container_port) = parse_container_port(container_port_key) else {
continue;
};
let Some(bindings_array) = bindings.as_array() else {
continue;
};
let Some(host_port) = bindings_array.iter().find_map(parse_host_port_binding) else {
continue;
};
mapped.push(MappedTcpPort {
container_port,
host_port,
});
}
mapped.sort_by_key(|port| port.container_port);
Ok(mapped)
}
pub async fn run_docker_capture<const N: usize>(args: [&str; N]) -> Result<String, DynError> {
let output = Command::new("docker")
.args(args)
.stdin(Stdio::null())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
return Err(format!(
"docker {} failed with status {}: {stderr}",
args.join(" "),
output.status
)
.into());
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
async fn discover_services_with_filters(
project: &str,
extra_filter: Option<&str>,
) -> Result<Vec<String>, DynError> {
let mut args = vec![
"ps".to_owned(),
"--filter".to_owned(),
format!("label=com.docker.compose.project={project}"),
];
if let Some(filter) = extra_filter {
args.push("--filter".to_owned());
args.push(filter.to_owned());
}
args.push("--format".to_owned());
args.push("{{.Label \"com.docker.compose.service\"}}".to_owned());
let output = Command::new("docker")
.args(&args)
.stdin(Stdio::null())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.output()
.await?;
if !output.status.success() {
return Err(format!(
"docker {} failed with status {}: {}",
args.join(" "),
output.status,
String::from_utf8_lossy(&output.stderr).trim()
)
.into());
}
let mut services: Vec<String> = output
.stdout
.split(|byte| *byte == b'\n')
.filter_map(|line| {
let parsed = String::from_utf8_lossy(line).trim().to_owned();
(!parsed.is_empty()).then_some(parsed)
})
.collect();
services.sort();
services.dedup();
Ok(services)
}
fn parse_container_port(port_key: &str) -> Option<u16> {
let (port, proto) = port_key.split_once('/')?;
if proto != "tcp" {
return None;
}
port.parse::<u16>().ok()
}
fn parse_host_port_binding(binding: &Value) -> Option<u16> {
binding
.get("HostPort")
.and_then(Value::as_str)?
.parse::<u16>()
.ok()
}
fn parse_api_container_port_label(raw: &str) -> Result<u16, DynError> {
let value = raw.trim();
if value.is_empty() || value == "<no value>" {
return Err(format!(
"attached compose container is missing required label '{API_CONTAINER_PORT_LABEL_KEY}'"
)
.into());
}
value.parse::<u16>().map_err(|err| {
format!(
"attached compose container label '{API_CONTAINER_PORT_LABEL_KEY}' has invalid value '{value}': {err}"
)
.into()
})
}

View File

@ -7,13 +7,21 @@ use testing_framework_core::{
adjust_timeout,
scenario::{Application, DynError, NodeControlHandle},
};
use tokio::process::Command;
use tokio::{process::Command, time::timeout};
use tracing::info;
use crate::{docker::commands::run_docker_command, errors::ComposeRunnerError};
use crate::{
docker::{
attached::discover_service_container_id,
commands::{ComposeCommandError, run_docker_command},
},
errors::ComposeRunnerError,
};
const COMPOSE_RESTART_TIMEOUT: Duration = Duration::from_secs(120);
const COMPOSE_RESTART_DESCRIPTION: &str = "docker compose restart";
const DOCKER_CONTAINER_RESTART_DESCRIPTION: &str = "docker container restart";
const DOCKER_CONTAINER_STOP_DESCRIPTION: &str = "docker container stop";
pub async fn restart_compose_service(
compose_file: &Path,
@ -38,6 +46,50 @@ pub async fn restart_compose_service(
.map_err(ComposeRunnerError::Compose)
}
pub async fn restart_attached_compose_service(
project_name: &str,
service: &str,
) -> Result<(), DynError> {
let container_id = discover_service_container_id(project_name, service).await?;
let command = docker_container_command("restart", &container_id);
info!(
service,
project = project_name,
container = container_id,
"restarting attached compose service"
);
run_docker_action(
command,
DOCKER_CONTAINER_RESTART_DESCRIPTION,
adjust_timeout(COMPOSE_RESTART_TIMEOUT),
)
.await
}
pub async fn stop_attached_compose_service(
project_name: &str,
service: &str,
) -> Result<(), DynError> {
let container_id = discover_service_container_id(project_name, service).await?;
let command = docker_container_command("stop", &container_id);
info!(
service,
project = project_name,
container = container_id,
"stopping attached compose service"
);
run_docker_action(
command,
DOCKER_CONTAINER_STOP_DESCRIPTION,
adjust_timeout(COMPOSE_RESTART_TIMEOUT),
)
.await
}
fn compose_restart_command(compose_file: &Path, project_name: &str, service: &str) -> Command {
let mut command = Command::new("docker");
command
@ -51,6 +103,43 @@ fn compose_restart_command(compose_file: &Path, project_name: &str, service: &st
command
}
fn docker_container_command(action: &str, container_id: &str) -> Command {
let mut command = Command::new("docker");
command.arg(action).arg(container_id);
command
}
async fn run_docker_action(
mut command: Command,
description: &str,
timeout_duration: Duration,
) -> Result<(), DynError> {
match timeout(timeout_duration, command.output()).await {
Ok(Ok(output)) => {
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
Err(format!(
"{description} failed with status {}: {stderr}",
output.status
)
.into())
}
Ok(Err(source)) => Err(format!("{description} failed to spawn: {source}").into()),
Err(_) => {
let compose_timeout = ComposeCommandError::Timeout {
command: description.to_owned(),
timeout: timeout_duration,
};
Err(compose_timeout.into())
}
}
}
/// Compose-specific node control handle for restarting nodes.
pub struct ComposeNodeControl {
pub(crate) compose_file: PathBuf,
@ -65,3 +154,23 @@ impl<E: Application> NodeControlHandle<E> for ComposeNodeControl {
.map_err(|err| format!("node restart failed: {err}").into())
}
}
/// Node control handle for compose attached mode.
pub struct ComposeAttachedNodeControl {
pub(crate) project_name: String,
}
#[async_trait::async_trait]
impl<E: Application> NodeControlHandle<E> for ComposeAttachedNodeControl {
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
restart_attached_compose_service(&self.project_name, name)
.await
.map_err(|source| format!("node restart failed for service '{name}': {source}").into())
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
stop_attached_compose_service(&self.project_name, name)
.await
.map_err(|source| format!("node stop failed for service '{name}': {source}").into())
}
}

View File

@ -1,3 +1,4 @@
pub mod attached;
pub mod commands;
pub mod control;
pub mod platform;

View File

@ -6,7 +6,7 @@ pub mod errors;
pub mod infrastructure;
pub mod lifecycle;
pub use deployer::ComposeDeployer;
pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata};
pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor};
pub use docker::{
commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs},