use std::{env, sync::Arc, time::Duration}; use reqwest::Url; use testing_framework_core::{ scenario::{ ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; use tracing::info; use super::{ ComposeDeployer, ComposeDeploymentMetadata, attach_provider::{ComposeAttachProvider, ComposeAttachedClusterWait}, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, readiness::ReadinessChecker, setup::{DeploymentContext, DeploymentSetup}, }; use crate::{ docker::control::{ComposeAttachedNodeControl, ComposeNodeControl}, env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, ports::{HostPortMapping, compose_runner_host}, }, lifecycle::block_feed::spawn_block_feed_with_retry, }; const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS"; pub struct DeploymentOrchestrator { deployer: ComposeDeployer, } impl DeploymentOrchestrator { pub const fn new(deployer: ComposeDeployer) -> Self { Self { deployer } } pub async fn deploy( &self, scenario: &Scenario, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { self.deploy_with_metadata(scenario) .await .map(|(runner, _)| runner) } pub async fn deploy_with_metadata( &self, scenario: &Scenario, ) -> Result<(Runner, ComposeDeploymentMetadata), ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { // Source planning is currently resolved here before deployer-specific setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ComposeRunnerError::SourceOrchestration { source: source.into(), } })?; if scenario.sources().is_attached() { return self .deploy_attached_only::(scenario, source_plan) .await .map(|runner| (runner, attached_metadata(scenario))); } let deployment = scenario.deployment(); let setup = DeploymentSetup::::new(deployment); setup.validate_environment().await?; let observability = resolve_observability_inputs(scenario)?; let mut prepared = prepare_deployment::(setup, &observability).await?; let deployment_policy = scenario.deployment_policy(); let readiness_enabled = self.deployer.readiness_checks && deployment_policy.readiness_enabled; self.log_deploy_start( scenario, &prepared.descriptors, deployment_policy, &observability, ); let mut deployed = deploy_nodes::( &mut prepared.environment, &prepared.descriptors, readiness_enabled, deployment_policy.readiness_requirement, ) .await?; let source_providers = self.source_providers(deployed.node_clients.snapshot()); deployed.node_clients = self .resolve_node_clients(&source_plan, source_providers) .await?; let project_name = prepared.environment.project_name().to_owned(); let runner = self .build_runner::( scenario, prepared, deployed, observability, readiness_enabled, project_name.clone(), ) .await?; self.log_deploy_ready( scenario, deployment_policy, deployment.node_count(), &compose_runner_host(), readiness_enabled, ); Ok(( runner, ComposeDeploymentMetadata { project_name: Some(project_name), }, )) } async fn deploy_attached_only( &self, scenario: &Scenario, source_plan: SourceOrchestrationPlan, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let observability = resolve_observability_inputs(scenario)?; let source_providers = self.source_providers(Vec::new()); let node_clients = self .resolve_node_clients(&source_plan, source_providers) .await?; self.ensure_non_empty_node_clients(&node_clients)?; let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; let context = build_run_context( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), observability.telemetry_handle()?, feed, node_control, cluster_wait, ); let cleanup_guard: Box = Box::new(feed_task); Ok(Runner::new(context, Some(cleanup_guard))) } fn source_providers(&self, managed_clients: Vec) -> SourceProviders { SourceProviders::default() .with_managed(Arc::new(StaticManagedProvider::new(managed_clients))) .with_attach(Arc::new(ComposeAttachProvider::::new( compose_runner_host(), ))) .with_external(Arc::new(ApplicationExternalProvider)) } async fn resolve_node_clients( &self, source_plan: &SourceOrchestrationPlan, source_providers: SourceProviders, ) -> Result, ComposeRunnerError> { orchestrate_sources_with_providers(source_plan, source_providers) .await .map_err(|source| ComposeRunnerError::SourceOrchestration { source }) } fn ensure_non_empty_node_clients( &self, node_clients: &NodeClients, ) -> Result<(), ComposeRunnerError> { if node_clients.is_empty() { return Err(ComposeRunnerError::RuntimePreflight); } Ok(()) } fn attached_node_control( &self, scenario: &Scenario, ) -> Result>>, ComposeRunnerError> where Caps: RequiresNodeControl + Send + Sync, { if !Caps::REQUIRED { return Ok(None); } let attach = scenario .attached_source() .ok_or(ComposeRunnerError::InternalInvariant { message: "attached node control requested outside attached source mode", })?; let Some(project_name) = attach .compose_project() .map(str::trim) .filter(|value| !value.is_empty()) else { return Err(ComposeRunnerError::InternalInvariant { message: "attached compose mode requires explicit project name for node control", }); }; Ok(Some(Arc::new(ComposeAttachedNodeControl { project_name: project_name.to_owned(), }) as Arc>)) } fn attached_cluster_wait( &self, scenario: &Scenario, ) -> Result>, ComposeRunnerError> where Caps: Send + Sync, { let attach = scenario .attached_source() .ok_or(ComposeRunnerError::InternalInvariant { message: "compose attached cluster wait requested outside attached source mode", })?; Ok(Arc::new(ComposeAttachedClusterWait::::new( compose_runner_host(), attach.clone(), ))) } async fn build_runner( &self, scenario: &Scenario, mut prepared: PreparedDeployment, deployed: DeployedNodes, observability: ObservabilityInputs, readiness_enabled: bool, project_name: String, ) -> Result, ComposeRunnerError> where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&prepared.environment); let cluster_wait = self.managed_cluster_wait(project_name); log_observability_endpoints(&observability); log_profiling_urls(&deployed.host, &deployed.host_ports); maybe_print_endpoints(&observability, &deployed.host, &deployed.host_ports); let input = RuntimeBuildInput { deployed: &deployed, descriptors: prepared.descriptors.clone(), duration: scenario.duration(), expectation_cooldown: scenario.expectation_cooldown(), telemetry, environment: &mut prepared.environment, node_control, cluster_wait, }; let runtime = build_compose_runtime::(input).await?; let cleanup_guard = make_cleanup_guard(prepared.environment.into_cleanup()?, runtime.feed_task); info!( effective_readiness = readiness_enabled, host = deployed.host, "compose runtime prepared" ); Ok(Runner::new(runtime.context, Some(cleanup_guard))) } fn maybe_node_control( &self, environment: &StackEnvironment, ) -> Option>> where Caps: RequiresNodeControl + Send + Sync, { Caps::REQUIRED.then(|| { Arc::new(ComposeNodeControl { compose_file: environment.compose_path().to_path_buf(), project_name: environment.project_name().to_owned(), }) as Arc> }) } fn managed_cluster_wait(&self, project_name: String) -> Arc> { Arc::new(ComposeAttachedClusterWait::::new( compose_runner_host(), AttachSource::compose_in_project(Vec::new(), project_name), )) } fn log_deploy_start( &self, scenario: &Scenario, descriptors: &E::Deployment, deployment_policy: DeploymentPolicy, observability: &ObservabilityInputs, ) { let effective_readiness = self.deployer.readiness_checks && deployment_policy.readiness_enabled; info!( nodes = descriptors.node_count(), duration_secs = scenario.duration().as_secs(), readiness_checks = self.deployer.readiness_checks, readiness_enabled = deployment_policy.readiness_enabled, readiness_requirement = ?deployment_policy.readiness_requirement, effective_readiness, metrics_query_url = observability.metrics_query_url.as_ref().map(|u| u.as_str()), metrics_otlp_ingest_url = observability .metrics_otlp_ingest_url .as_ref() .map(|u| u.as_str()), grafana_url = observability.grafana_url.as_ref().map(|u| u.as_str()), "compose deployment starting" ); } fn log_deploy_ready( &self, scenario: &Scenario, deployment_policy: DeploymentPolicy, node_count: usize, host: &str, readiness_enabled: bool, ) { info!( nodes = node_count, duration_secs = scenario.duration().as_secs(), readiness_checks = self.deployer.readiness_checks, readiness_enabled = deployment_policy.readiness_enabled, readiness_requirement = ?deployment_policy.readiness_requirement, effective_readiness = readiness_enabled, host, "compose deployment ready; handing control to scenario runner" ); } } fn attached_metadata(scenario: &Scenario) -> ComposeDeploymentMetadata where E: ComposeDeployEnv, Caps: Send + Sync, { let project_name = scenario .attached_source() .and_then(|attach| attach.compose_project()) .map(ToOwned::to_owned); ComposeDeploymentMetadata { project_name } } struct DeployedNodes { host_ports: HostPortMapping, host: String, node_clients: NodeClients, client_builder: ClientBuilder, } struct ComposeRuntime { context: RunContext, feed_task: FeedHandle, } struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { deployed: &'a DeployedNodes, descriptors: E::Deployment, duration: Duration, expectation_cooldown: Duration, telemetry: Metrics, environment: &'a mut StackEnvironment, node_control: Option>>, cluster_wait: Arc>, } async fn build_compose_runtime( input: RuntimeBuildInput<'_, E>, ) -> Result, ComposeRunnerError> { let node_clients = input.deployed.node_clients.clone(); if node_clients.is_empty() { return Err(ComposeRunnerError::RuntimePreflight); } let (feed, feed_task) = input .deployed .client_builder .start_block_feed(&node_clients, input.environment) .await?; let context = build_run_context( input.descriptors, node_clients, input.duration, input.expectation_cooldown, input.telemetry, feed, input.node_control, input.cluster_wait, ); Ok(ComposeRuntime { context, feed_task }) } async fn deploy_nodes( environment: &mut StackEnvironment, descriptors: &E::Deployment, readiness_enabled: bool, readiness_requirement: HttpReadinessRequirement, ) -> Result, ComposeRunnerError> { let host_ports = PortManager::::prepare(environment, descriptors).await?; wait_for_readiness_or_grace_period::( readiness_enabled, descriptors, readiness_requirement, &host_ports, environment, ) .await?; let host = compose_runner_host(); let client_builder = ClientBuilder::::new(); let node_clients = client_builder .build_node_clients(descriptors, &host_ports, &host, environment) .await?; Ok(DeployedNodes { host_ports, host, node_clients, client_builder, }) } fn build_run_context( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, expectation_cooldown: Duration, telemetry: Metrics, feed: ::Feed, node_control: Option>>, cluster_wait: Arc>, ) -> RunContext { RunContext::new( descriptors, node_clients, run_duration, expectation_cooldown, telemetry, feed, node_control, ) .with_cluster_wait(cluster_wait) } fn resolve_observability_inputs( scenario: &Scenario, ) -> Result where Caps: ObservabilityCapabilityProvider, E: ComposeDeployEnv, { let env_inputs = ObservabilityInputs::from_env()?; let cap_inputs = scenario .capabilities() .observability_capability() .map(ObservabilityInputs::from_capability) .unwrap_or_default(); Ok(env_inputs.with_overrides(cap_inputs)) } async fn wait_for_readiness_or_grace_period( readiness_checks: bool, descriptors: &E::Deployment, readiness_requirement: HttpReadinessRequirement, host_ports: &HostPortMapping, environment: &mut StackEnvironment, ) -> Result<(), ComposeRunnerError> { if readiness_checks { ReadinessChecker::::wait_all( descriptors, host_ports, readiness_requirement, environment, ) .await?; return Ok(()); } info!("readiness checks disabled; giving the stack a short grace period"); crate::lifecycle::readiness::maybe_sleep_for_disabled_readiness(false).await; Ok(()) } fn log_observability_endpoints(observability: &ObservabilityInputs) { if let Some(url) = observability.metrics_query_url.as_ref() { info!( metrics_query_url = %url.as_str(), "metrics query endpoint configured" ); } if let Some(url) = observability.grafana_url.as_ref() { info!(grafana_url = %url.as_str(), "grafana url configured"); } } fn maybe_print_endpoints(observability: &ObservabilityInputs, host: &str, ports: &HostPortMapping) { if !should_print_endpoints() { return; } let prometheus = endpoint_or_disabled(observability.metrics_query_url.as_ref()); let grafana = endpoint_or_disabled(observability.grafana_url.as_ref()); println!( "TESTNET_ENDPOINTS prometheus={} grafana={}", prometheus, grafana ); print_profiling_urls(host, ports); } fn should_print_endpoints() -> bool { env::var(PRINT_ENDPOINTS_ENV).is_ok() } fn endpoint_or_disabled(endpoint: Option<&Url>) -> String { endpoint.map_or_else(|| "".to_string(), |url| url.as_str().to_string()) } fn log_profiling_urls(host: &str, ports: &HostPortMapping) { for (idx, node) in ports.nodes.iter().enumerate() { info!( node = idx, profiling_url = %profiling_url(host, node.api), "node profiling endpoint (profiling feature required)" ); } } fn print_profiling_urls(host: &str, ports: &HostPortMapping) { for (idx, node) in ports.nodes.iter().enumerate() { println!( "TESTNET_PPROF node_{}={}", idx, profiling_url(host, node.api) ); } } fn profiling_url(host: &str, api_port: u16) -> String { format!("http://{host}:{api_port}/debug/pprof/profile?seconds=15&format=proto") } struct PreparedDeployment { environment: StackEnvironment, descriptors: E::Deployment, } async fn prepare_deployment( setup: DeploymentSetup<'_, E>, observability: &ObservabilityInputs, ) -> Result, ComposeRunnerError> { let DeploymentContext { environment, descriptors, } = setup.prepare_workspace(observability).await?; Ok(PreparedDeployment { environment, descriptors: descriptors.clone(), }) }