diff --git a/logos/examples/tests/compose_attach_node_control.rs b/logos/examples/tests/compose_attach_node_control.rs new file mode 100644 index 0000000..4a85422 --- /dev/null +++ b/logos/examples/tests/compose_attach_node_control.rs @@ -0,0 +1,196 @@ +use std::{ + env, + process::{Command, Stdio}, + thread, + time::Duration, +}; + +use anyhow::{Result, anyhow}; +use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder}; +use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner}; + +#[tokio::test] +async fn compose_attach_mode_restart_node_opt_in() -> Result<()> { + if env::var("TF_RUN_COMPOSE_ATTACH_NODE_CONTROL").is_err() { + return Ok(()); + } + + let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .enable_node_control() + .with_run_duration(Duration::from_secs(5)) + .build()?; + + let deployer = LbcComposeDeployer::default(); + let managed_runner: Runner = deployer.deploy(&managed).await?; + let managed_client = managed_runner + .context() + .node_clients() + .snapshot() + .into_iter() + .next() + .ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?; + let api_port = managed_client + .base_url() + .port() + .ok_or_else(|| anyhow!("managed node base url has no port"))?; + + let project_name = discover_compose_project_by_published_port(api_port)?; + + let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1)) + .enable_node_control() + .with_run_duration(Duration::from_secs(5)) + .with_attach_source( + AttachSource::compose(vec!["node-0".to_owned()]).with_project(project_name.clone()), + ) + .build()?; + + let attached_runner: Runner = deployer.deploy(&attached).await?; + let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?; + let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?; + let control = attached_runner + .context() + .node_control() + .ok_or_else(|| anyhow!("attached compose node control is unavailable"))?; + control + .restart_node("node-0") + .await + .map_err(|err| anyhow!("attached restart failed: {err}"))?; + + wait_until_container_restarted( + &project_name, + "node-0", + &pre_restart_started_at, + Duration::from_secs(30), + )?; + + Ok(()) +} + +fn discover_compose_project_by_published_port(port: u16) -> Result { + let container_ids = run_docker_capture(["ps", "-q"])?; + let host_port_token = format!("\"HostPort\":\"{port}\""); + let mut matching_projects = Vec::new(); + + for container_id in container_ids + .lines() + .map(str::trim) + .filter(|id| !id.is_empty()) + { + let ports = run_docker_capture([ + "inspect", + "--format", + "{{json .NetworkSettings.Ports}}", + container_id, + ])?; + if !ports.contains(&host_port_token) { + continue; + } + + let project = run_docker_capture([ + "inspect", + "--format", + "{{ index .Config.Labels \"com.docker.compose.project\" }}", + container_id, + ])?; + let project = project.trim(); + if !project.is_empty() { + matching_projects.push(project.to_owned()); + } + } + + match matching_projects.as_slice() { + [project] => Ok(project.clone()), + [] => Err(anyhow!( + "no compose project found exposing api host port {port}" + )), + _ => Err(anyhow!( + "multiple compose projects expose api host port {port}: {:?}", + matching_projects + )), + } +} + +fn discover_compose_service_container(project: &str, service: &str) -> Result { + let container = run_docker_capture([ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + &format!("label=com.docker.compose.service={service}"), + "--format", + "{{.ID}}", + ])?; + let mut lines = container + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()); + let Some(container_id) = lines.next() else { + return Err(anyhow!( + "no running container found for compose project '{project}' service '{service}'" + )); + }; + if lines.next().is_some() { + return Err(anyhow!( + "multiple running containers found for compose project '{project}' service '{service}'" + )); + } + + Ok(container_id.to_owned()) +} + +fn inspect_container_started_at(container_id: &str) -> Result { + let started_at = + run_docker_capture(["inspect", "--format", "{{.State.StartedAt}}", container_id])?; + let started_at = started_at.trim(); + if started_at.is_empty() { + return Err(anyhow!( + "docker inspect returned empty StartedAt for container {container_id}" + )); + } + + Ok(started_at.to_owned()) +} + +fn wait_until_container_restarted( + project: &str, + service: &str, + previous_started_at: &str, + timeout: Duration, +) -> Result<()> { + let deadline = std::time::Instant::now() + timeout; + loop { + let container_id = discover_compose_service_container(project, service)?; + let started_at = inspect_container_started_at(&container_id)?; + if started_at != previous_started_at { + return Ok(()); + } + + if std::time::Instant::now() >= deadline { + return Err(anyhow!( + "timed out waiting for restarted container timestamp change: {project}/{service}" + )); + } + + thread::sleep(Duration::from_millis(500)); + } +} + +fn run_docker_capture(args: [&str; N]) -> Result { + let output = Command::new("docker") + .args(args) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .output()?; + + if !output.status.success() { + return Err(anyhow!( + "docker {} failed: status={} stderr={}", + args.join(" "), + output.status, + String::from_utf8_lossy(&output.stderr).trim() + )); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} diff --git a/logos/examples/tests/node_config_override.rs b/logos/examples/tests/node_config_override.rs index 5761eae..22da1db 100644 --- a/logos/examples/tests/node_config_override.rs +++ b/logos/examples/tests/node_config_override.rs @@ -103,6 +103,110 @@ async fn scenario_builder_api_port_override() -> Result<()> { Ok(()) } +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_override`"] +async fn scenario_builder_identify_hide_listen_addrs_override() -> Result<()> { + let _ = try_init(); + // Required env vars (set on the command line when running this test): + // - `POL_PROOF_DEV_MODE=true` + // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` + // - `LOGOS_BLOCKCHAIN_CIRCUITS=...` + // - `RUST_LOG=info` (optional) + let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(1)); + let base_descriptors = base_builder.clone().build()?; + let base_node = base_descriptors.nodes().first().expect("node 0 descriptor"); + let mut run_config = build_node_run_config( + &base_descriptors, + base_node, + base_descriptors + .config() + .node_config_override(base_node.index()), + ) + .expect("build run config"); + + run_config + .user + .network + .backend + .swarm + .identify + .hide_listen_addrs = Some(true); + + let mut scenario = ScenarioBuilder::new(Box::new( + base_builder.with_node_config_override(0, run_config), + )) + .with_run_duration(Duration::from_secs(1)) + .build()?; + + let deployer = LbcLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + let handle = runner.run(&mut scenario).await?; + + let client = handle + .context() + .random_node_client() + .ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?; + + client + .consensus_info() + .await + .expect("consensus_info should succeed"); + + Ok(()) +} + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_two_nodes`"] +async fn scenario_builder_identify_hide_listen_addrs_two_nodes() -> Result<()> { + let _ = try_init(); + // Required env vars (set on the command line when running this test): + // - `POL_PROOF_DEV_MODE=true` + // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` + // - `LOGOS_BLOCKCHAIN_CIRCUITS=...` + // - `RUST_LOG=info` (optional) + let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)); + let base_descriptors = base_builder.clone().build()?; + + let mut deployment_builder = base_builder; + for idx in 0..2 { + let node = &base_descriptors.nodes()[idx]; + let mut run_config = build_node_run_config( + &base_descriptors, + node, + base_descriptors.config().node_config_override(node.index()), + ) + .expect("build run config"); + run_config + .user + .network + .backend + .swarm + .identify + .hide_listen_addrs = Some(true); + deployment_builder = deployment_builder.with_node_config_override(idx, run_config); + } + + let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder)) + .with_run_duration(Duration::from_secs(3)) + .build()?; + + let deployer = LbcLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + let handle = runner.run(&mut scenario).await?; + + let client = handle + .context() + .random_node_client() + .ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?; + + client + .consensus_info() + .await + .expect("consensus_info should succeed"); + + Ok(()) +} + fn random_api_port() -> u16 { let listener = TcpListener::bind("127.0.0.1:0").expect("bind random API port"); listener.local_addr().expect("read API port").port() diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index ffa3aa3..411bdf1 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -37,7 +37,8 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ - BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode, + BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory, ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, @@ -46,9 +47,10 @@ pub use runtime::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports, - wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, - wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, + orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed, + wait_for_http_ports, wait_for_http_ports_with_host, + wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, + wait_http_readiness, wait_until_stable, }; pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; pub use workload::Workload; diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 7e0804f..97bd2d6 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -16,10 +16,13 @@ pub use node_clients::NodeClients; #[doc(hidden)] pub use orchestration::{ ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources, - resolve_sources, + orchestrate_sources_with_providers, resolve_sources, }; #[doc(hidden)] -pub use providers::{SourceProviders, StaticManagedProvider}; +pub use providers::{ + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, + SourceProviders, StaticManagedProvider, +}; pub use readiness::{ HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, diff --git a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs index cb7d5b2..9e71458 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs @@ -7,4 +7,7 @@ pub use source_orchestration_plan::{ ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError, }; -pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources}; +pub use source_resolver::{ + build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers, + resolve_sources, +}; diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index e88857b..dd57ee9 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -65,13 +65,10 @@ impl SourceOrchestrationPlan { ) -> Result { let mode = mode_from_sources(sources); - let plan = Self { + Ok(Self { mode, readiness_policy, - }; - - plan.ensure_currently_wired()?; - Ok(plan) + }) } #[must_use] @@ -82,13 +79,24 @@ impl SourceOrchestrationPlan { | SourceOrchestrationMode::ExternalOnly { external } => external, } } +} - fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> { - match self.mode { - SourceOrchestrationMode::Managed { .. } - | SourceOrchestrationMode::ExternalOnly { .. } => Ok(()), - SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached), - } +#[cfg(test)] +mod tests { + use super::{SourceOrchestrationMode, SourceOrchestrationPlan}; + use crate::scenario::{AttachSource, ScenarioSources, SourceReadinessPolicy}; + + #[test] + fn attached_sources_are_planned() { + let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()])); + let plan = + SourceOrchestrationPlan::try_from_sources(&sources, SourceReadinessPolicy::AllReady) + .expect("attached sources should build a source orchestration plan"); + + assert!(matches!( + plan.mode, + SourceOrchestrationMode::Attached { .. } + )); } } @@ -107,7 +115,3 @@ fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode { }, } } - -fn not_wired(mode: SourceModeName) -> Result<(), SourceOrchestrationPlanError> { - Err(SourceOrchestrationPlanError::SourceModeNotWiredYet { mode }) -} diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs index 80fce0b..6ebd646 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -91,8 +91,7 @@ pub async fn resolve_sources( /// - Managed mode is backed by prebuilt deployer-managed clients via /// `StaticManagedProvider`. /// - External nodes are resolved via `Application::external_node_client`. -/// - Attached mode remains blocked at plan validation until attach providers -/// are fully wired. +/// - Attached nodes are discovered through the selected attach provider. pub async fn orchestrate_sources( plan: &SourceOrchestrationPlan, node_clients: NodeClients, @@ -103,6 +102,17 @@ pub async fn orchestrate_sources( ))) .with_external(Arc::new(ApplicationExternalProvider)); + orchestrate_sources_with_providers(plan, providers).await +} + +/// Orchestrates scenario sources with caller-supplied provider set. +/// +/// Deployer runtimes can use this to inject attach/external providers with +/// backend-specific discovery and control semantics. +pub async fn orchestrate_sources_with_providers( + plan: &SourceOrchestrationPlan, + providers: SourceProviders, +) -> Result, DynError> { let resolved = resolve_sources(plan, &providers).await?; if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() { diff --git a/testing-framework/core/src/scenario/runtime/providers/mod.rs b/testing-framework/core/src/scenario/runtime/providers/mod.rs index 60edaae..0d5a1a3 100644 --- a/testing-framework/core/src/scenario/runtime/providers/mod.rs +++ b/testing-framework/core/src/scenario/runtime/providers/mod.rs @@ -7,7 +7,7 @@ mod managed_provider; #[allow(dead_code)] mod source_providers; -pub use attach_provider::{AttachProviderError, AttachedNode}; +pub use attach_provider::{AttachProvider, AttachProviderError, AttachedNode}; pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError}; pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider}; pub use source_providers::SourceProviders; diff --git a/testing-framework/deployers/compose/Cargo.toml b/testing-framework/deployers/compose/Cargo.toml index 4880401..9ac8b7a 100644 --- a/testing-framework/deployers/compose/Cargo.toml +++ b/testing-framework/deployers/compose/Cargo.toml @@ -17,6 +17,7 @@ anyhow = "1" async-trait = { workspace = true } reqwest = { features = ["json"], workspace = true } serde = { features = ["derive"], workspace = true } +serde_json = { workspace = true } tempfile = { workspace = true } tera = "1.19" testing-framework-core = { path = "../../core" } @@ -30,5 +31,4 @@ uuid = { features = ["v4"], version = "1" } [dev-dependencies] groth16 = { workspace = true } key-management-system-service = { workspace = true } -serde_json = { workspace = true } zksign = { workspace = true } diff --git a/testing-framework/deployers/compose/src/deployer/attach_provider.rs b/testing-framework/deployers/compose/src/deployer/attach_provider.rs new file mode 100644 index 0000000..9086b76 --- /dev/null +++ b/testing-framework/deployers/compose/src/deployer/attach_provider.rs @@ -0,0 +1,146 @@ +use std::marker::PhantomData; + +use async_trait::async_trait; +use testing_framework_core::scenario::{ + AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource, +}; +use url::Url; + +use crate::{ + docker::attached::{discover_service_container_id, inspect_mapped_tcp_ports}, + env::ComposeDeployEnv, +}; + +pub(super) struct ComposeAttachProvider { + host: String, + _env: PhantomData, +} + +impl ComposeAttachProvider { + pub(super) fn new(host: String) -> Self { + Self { + host, + _env: PhantomData, + } + } +} + +#[async_trait] +impl AttachProvider for ComposeAttachProvider { + async fn discover( + &self, + source: &AttachSource, + ) -> Result>, AttachProviderError> { + let (project, services) = match source { + AttachSource::Compose { project, services } => (project, services), + _ => { + return Err(AttachProviderError::UnsupportedSource { + attach_source: source.clone(), + }); + } + }; + + let project = project + .as_ref() + .ok_or_else(|| AttachProviderError::Discovery { + source: "compose attach source requires an explicit project name".into(), + })?; + + if services.is_empty() { + return Err(AttachProviderError::Discovery { + source: "compose attach source requires at least one service name".into(), + }); + } + + let mut attached = Vec::with_capacity(services.len()); + for service in services { + let container_id = discover_service_container_id(project, service) + .await + .map_err(to_discovery_error)?; + let api_port = discover_api_port(&container_id) + .await + .map_err(to_discovery_error)?; + let endpoint = + build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?; + let source = ExternalNodeSource::new(service.clone(), endpoint.to_string()); + let client = E::external_node_client(&source).map_err(to_discovery_error)?; + + attached.push(AttachedNode { + identity_hint: Some(service.clone()), + client, + }); + } + + Ok(attached) + } +} + +fn to_discovery_error(source: DynError) -> AttachProviderError { + AttachProviderError::Discovery { source } +} + +async fn discover_api_port(container_id: &str) -> Result { + let mapped_ports = inspect_mapped_tcp_ports(container_id).await?; + match mapped_ports.as_slice() { + [] => Err(format!( + "no mapped tcp ports discovered for attached compose service container '{container_id}'" + ) + .into()), + [port] => Ok(port.host_port), + _ => { + let mapped_ports = mapped_ports + .iter() + .map(|port| format!("{}->{}", port.container_port, port.host_port)) + .collect::>() + .join(", "); + + Err(format!( + "attached compose service container '{container_id}' has multiple mapped tcp ports ({mapped_ports}); provide a single exposed API port" + ) + .into()) + } + } +} + +fn build_service_endpoint(host: &str, port: u16) -> Result { + let endpoint = Url::parse(&format!("http://{host}:{port}/"))?; + Ok(endpoint) +} + +#[cfg(test)] +mod tests { + use super::build_service_endpoint; + use crate::docker::attached::parse_mapped_tcp_ports; + + #[test] + fn parse_mapped_tcp_ports_skips_non_tcp_and_invalid_keys() { + let raw = r#"{ + "18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}], + "9999/udp":[{"HostIp":"0.0.0.0","HostPort":"39999"}], + "invalid":[{"HostIp":"0.0.0.0","HostPort":"12345"}] + }"#; + + let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse"); + assert_eq!(mapped.len(), 1); + assert_eq!(mapped[0].container_port, 18018); + assert_eq!(mapped[0].host_port, 32001); + } + + #[test] + fn parse_mapped_tcp_ports_returns_sorted_ports() { + let raw = r#"{ + "18019/tcp":[{"HostIp":"0.0.0.0","HostPort":"32002"}], + "18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}] + }"#; + + let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse"); + assert_eq!(mapped[0].container_port, 18018); + assert_eq!(mapped[1].container_port, 18019); + } + + #[test] + fn build_service_endpoint_formats_http_url() { + let endpoint = build_service_endpoint("127.0.0.1", 32001).expect("endpoint should parse"); + assert_eq!(endpoint.as_str(), "http://127.0.0.1:32001/"); + } +} diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index e32acae..6e45367 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -1,3 +1,4 @@ +mod attach_provider; pub mod clients; pub mod orchestrator; pub mod ports; diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 6585136..66cb9e6 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -3,10 +3,11 @@ use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ - DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, - NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan, - orchestrate_sources, + ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle, + FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, + Runner, Scenario, ScenarioSources, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -14,6 +15,7 @@ use tracing::info; use super::{ ComposeDeployer, + attach_provider::ComposeAttachProvider, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, @@ -21,13 +23,14 @@ use super::{ setup::{DeploymentContext, DeploymentSetup}, }; use crate::{ - docker::control::ComposeNodeControl, + docker::control::{ComposeAttachedNodeControl, ComposeNodeControl}, env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, ports::{HostPortMapping, compose_runner_host}, }, + lifecycle::block_feed::spawn_block_feed_with_retry, }; const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS"; @@ -55,6 +58,12 @@ impl DeploymentOrchestrator { } })?; + if scenario.sources().is_attached() { + return self + .deploy_attached_only::(scenario, source_plan) + .await; + } + let deployment = scenario.deployment(); let setup = DeploymentSetup::::new(deployment); setup.validate_environment().await?; @@ -81,7 +90,16 @@ impl DeploymentOrchestrator { .await?; // Source orchestration currently runs here after managed clients are prepared. - deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients) + let source_providers = SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new( + deployed.node_clients.snapshot(), + ))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)); + + deployed.node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) .await .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; @@ -106,6 +124,83 @@ impl DeploymentOrchestrator { Ok(runner) } + async fn deploy_attached_only( + &self, + scenario: &Scenario, + source_plan: testing_framework_core::scenario::SourceOrchestrationPlan, + ) -> Result, ComposeRunnerError> + where + Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, + { + let observability = resolve_observability_inputs(scenario)?; + let source_providers = SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new(Vec::new()))) + .with_attach(Arc::new(ComposeAttachProvider::::new( + compose_runner_host(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)); + let node_clients = orchestrate_sources_with_providers(&source_plan, source_providers) + .await + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + + if node_clients.is_empty() { + return Err(ComposeRunnerError::RuntimePreflight); + } + + let node_control = self.attached_node_control::(scenario)?; + let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; + let context = RunContext::new( + scenario.deployment().clone(), + node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + observability.telemetry_handle()?, + feed, + node_control, + ); + + let cleanup_guard: Box = Box::new(feed_task); + Ok(Runner::new(context, Some(cleanup_guard))) + } + + fn attached_node_control( + &self, + scenario: &Scenario, + ) -> Result>>, ComposeRunnerError> + where + Caps: RequiresNodeControl + Send + Sync, + { + if !Caps::REQUIRED { + return Ok(None); + } + + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(ComposeRunnerError::InternalInvariant { + message: "attached node control requested outside attached source mode", + }); + }; + + let AttachSource::Compose { project, .. } = attach else { + return Err(ComposeRunnerError::InternalInvariant { + message: "compose deployer requires compose attach source for node control", + }); + }; + + let Some(project_name) = project + .as_ref() + .map(|value| value.trim()) + .filter(|value| !value.is_empty()) + else { + return Err(ComposeRunnerError::InternalInvariant { + message: "attached compose mode requires explicit project name for node control", + }); + }; + + Ok(Some(Arc::new(ComposeAttachedNodeControl { + project_name: project_name.to_owned(), + }) as Arc>)) + } + async fn build_runner( &self, scenario: &Scenario, diff --git a/testing-framework/deployers/compose/src/docker/attached.rs b/testing-framework/deployers/compose/src/docker/attached.rs new file mode 100644 index 0000000..e47cb2e --- /dev/null +++ b/testing-framework/deployers/compose/src/docker/attached.rs @@ -0,0 +1,129 @@ +use std::process::Stdio; + +use serde_json::Value; +use testing_framework_core::scenario::DynError; +use tokio::process::Command; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct MappedTcpPort { + pub container_port: u16, + pub host_port: u16, +} + +pub async fn discover_service_container_id( + project: &str, + service: &str, +) -> Result { + let stdout = run_docker_capture([ + "ps", + "--filter", + &format!("label=com.docker.compose.project={project}"), + "--filter", + &format!("label=com.docker.compose.service={service}"), + "--format", + "{{.ID}}", + ]) + .await?; + + let ids: Vec = stdout + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .map(ToOwned::to_owned) + .collect(); + + match ids.as_slice() { + [id] => Ok(id.clone()), + [] => Err(format!( + "no running container found for compose project '{project}' service '{service}'" + ) + .into()), + _ => Err(format!( + "multiple running containers found for compose project '{project}' service '{service}'" + ) + .into()), + } +} + +pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result, DynError> { + let stdout = run_docker_capture([ + "inspect", + "--format", + "{{json .NetworkSettings.Ports}}", + container_id, + ]) + .await?; + + parse_mapped_tcp_ports(&stdout) +} + +pub fn parse_mapped_tcp_ports(raw: &str) -> Result, DynError> { + let ports_value: Value = serde_json::from_str(raw.trim())?; + let ports_object = ports_value + .as_object() + .ok_or_else(|| "docker inspect ports payload is not an object".to_owned())?; + + let mut mapped = Vec::new(); + for (container_port_key, bindings) in ports_object { + let Some(container_port) = parse_container_port(container_port_key) else { + continue; + }; + + let Some(bindings_array) = bindings.as_array() else { + continue; + }; + + let Some(host_port) = bindings_array.iter().find_map(parse_host_port_binding) else { + continue; + }; + + mapped.push(MappedTcpPort { + container_port, + host_port, + }); + } + + mapped.sort_by_key(|port| port.container_port); + + Ok(mapped) +} + +pub async fn run_docker_capture(args: [&str; N]) -> Result { + let output = Command::new("docker") + .args(args) + .stdin(Stdio::null()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .output() + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + return Err(format!( + "docker {} failed with status {}: {stderr}", + args.join(" "), + output.status + ) + .into()); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} + +fn parse_container_port(port_key: &str) -> Option { + let (port, proto) = port_key.split_once('/')?; + if proto != "tcp" { + return None; + } + + port.parse::().ok() +} + +fn parse_host_port_binding(binding: &Value) -> Option { + binding + .get("HostPort") + .and_then(Value::as_str)? + .parse::() + .ok() +} diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 0541e42..8e98948 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -7,13 +7,21 @@ use testing_framework_core::{ adjust_timeout, scenario::{Application, DynError, NodeControlHandle}, }; -use tokio::process::Command; +use tokio::{process::Command, time::timeout}; use tracing::info; -use crate::{docker::commands::run_docker_command, errors::ComposeRunnerError}; +use crate::{ + docker::{ + attached::discover_service_container_id, + commands::{ComposeCommandError, run_docker_command}, + }, + errors::ComposeRunnerError, +}; const COMPOSE_RESTART_TIMEOUT: Duration = Duration::from_secs(120); const COMPOSE_RESTART_DESCRIPTION: &str = "docker compose restart"; +const DOCKER_CONTAINER_RESTART_DESCRIPTION: &str = "docker container restart"; +const DOCKER_CONTAINER_STOP_DESCRIPTION: &str = "docker container stop"; pub async fn restart_compose_service( compose_file: &Path, @@ -38,6 +46,50 @@ pub async fn restart_compose_service( .map_err(ComposeRunnerError::Compose) } +pub async fn restart_attached_compose_service( + project_name: &str, + service: &str, +) -> Result<(), DynError> { + let container_id = discover_service_container_id(project_name, service).await?; + let command = docker_container_command("restart", &container_id); + + info!( + service, + project = project_name, + container = container_id, + "restarting attached compose service" + ); + + run_docker_action( + command, + DOCKER_CONTAINER_RESTART_DESCRIPTION, + adjust_timeout(COMPOSE_RESTART_TIMEOUT), + ) + .await +} + +pub async fn stop_attached_compose_service( + project_name: &str, + service: &str, +) -> Result<(), DynError> { + let container_id = discover_service_container_id(project_name, service).await?; + let command = docker_container_command("stop", &container_id); + + info!( + service, + project = project_name, + container = container_id, + "stopping attached compose service" + ); + + run_docker_action( + command, + DOCKER_CONTAINER_STOP_DESCRIPTION, + adjust_timeout(COMPOSE_RESTART_TIMEOUT), + ) + .await +} + fn compose_restart_command(compose_file: &Path, project_name: &str, service: &str) -> Command { let mut command = Command::new("docker"); command @@ -51,6 +103,43 @@ fn compose_restart_command(compose_file: &Path, project_name: &str, service: &st command } +fn docker_container_command(action: &str, container_id: &str) -> Command { + let mut command = Command::new("docker"); + command.arg(action).arg(container_id); + command +} + +async fn run_docker_action( + mut command: Command, + description: &str, + timeout_duration: Duration, +) -> Result<(), DynError> { + match timeout(timeout_duration, command.output()).await { + Ok(Ok(output)) => { + if output.status.success() { + return Ok(()); + } + + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + + Err(format!( + "{description} failed with status {}: {stderr}", + output.status + ) + .into()) + } + Ok(Err(source)) => Err(format!("{description} failed to spawn: {source}").into()), + Err(_) => { + let compose_timeout = ComposeCommandError::Timeout { + command: description.to_owned(), + timeout: timeout_duration, + }; + + Err(compose_timeout.into()) + } + } +} + /// Compose-specific node control handle for restarting nodes. pub struct ComposeNodeControl { pub(crate) compose_file: PathBuf, @@ -65,3 +154,23 @@ impl NodeControlHandle for ComposeNodeControl { .map_err(|err| format!("node restart failed: {err}").into()) } } + +/// Node control handle for compose attached mode. +pub struct ComposeAttachedNodeControl { + pub(crate) project_name: String, +} + +#[async_trait::async_trait] +impl NodeControlHandle for ComposeAttachedNodeControl { + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + restart_attached_compose_service(&self.project_name, name) + .await + .map_err(|source| format!("node restart failed for service '{name}': {source}").into()) + } + + async fn stop_node(&self, name: &str) -> Result<(), DynError> { + stop_attached_compose_service(&self.project_name, name) + .await + .map_err(|source| format!("node stop failed for service '{name}': {source}").into()) + } +} diff --git a/testing-framework/deployers/compose/src/docker/mod.rs b/testing-framework/deployers/compose/src/docker/mod.rs index 3696492..c1b4581 100644 --- a/testing-framework/deployers/compose/src/docker/mod.rs +++ b/testing-framework/deployers/compose/src/docker/mod.rs @@ -1,3 +1,4 @@ +pub mod attached; pub mod commands; pub mod control; pub mod platform;