use std::sync::Arc; use testing_framework_core::scenario::{ NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, Runner, Scenario, }; use tracing::info; use super::{ ComposeDeployer, clients::ClientBuilder, make_cleanup_guard, ports::PortManager, readiness::ReadinessChecker, setup::{DeploymentContext, DeploymentSetup}, }; use crate::{ docker::control::ComposeNodeControl, errors::ComposeRunnerError, infrastructure::{ environment::StackEnvironment, ports::{HostPortMapping, compose_runner_host}, }, }; pub struct DeploymentOrchestrator { deployer: ComposeDeployer, } impl DeploymentOrchestrator { pub const fn new(deployer: ComposeDeployer) -> Self { Self { deployer } } pub async fn deploy( &self, scenario: &Scenario, ) -> Result where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { let setup = DeploymentSetup::new(scenario.topology()); setup.validate_environment().await?; let env_inputs = ObservabilityInputs::from_env()?; let cap_inputs = scenario .capabilities() .observability_capability() .map(ObservabilityInputs::from_capability) .unwrap_or_default(); let observability = env_inputs.with_overrides(cap_inputs); let DeploymentContext { mut environment, descriptors, } = setup.prepare_workspace(&observability).await?; tracing::info!( validators = descriptors.validators().len(), executors = descriptors.executors().len(), duration_secs = scenario.duration().as_secs(), readiness_checks = self.deployer.readiness_checks, metrics_query_url = observability.metrics_query_url.as_ref().map(|u| u.as_str()), metrics_otlp_ingest_url = observability .metrics_otlp_ingest_url .as_ref() .map(|u| u.as_str()), grafana_url = observability.grafana_url.as_ref().map(|u| u.as_str()), "compose deployment starting" ); let validator_count = descriptors.validators().len(); let executor_count = descriptors.executors().len(); let host_ports = PortManager::prepare(&mut environment, &descriptors).await?; if self.deployer.readiness_checks { ReadinessChecker::wait_all(&descriptors, &host_ports, &mut environment).await?; } else { info!("readiness checks disabled; giving the stack a short grace period"); crate::lifecycle::readiness::maybe_sleep_for_disabled_readiness(false).await; } let host = compose_runner_host(); let client_builder = ClientBuilder::new(); let node_clients = client_builder .build_node_clients(&descriptors, &host_ports, &host, &mut environment) .await?; let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&environment); if let Some(url) = observability.metrics_query_url.as_ref() { info!(metrics_query_url = %url.as_str(), "metrics query endpoint configured"); } if let Some(url) = observability.grafana_url.as_ref() { info!(grafana_url = %url.as_str(), "grafana url configured"); } log_profiling_urls(&host, &host_ports); if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() { let prometheus = observability .metrics_query_url .as_ref() .map(|u| u.as_str().to_string()) .unwrap_or_else(|| "".to_string()); let grafana = observability .grafana_url .as_ref() .map(|u| u.as_str().to_string()) .unwrap_or_else(|| "".to_string()); println!( "TESTNET_ENDPOINTS prometheus={} grafana={}", prometheus, grafana ); print_profiling_urls(&host, &host_ports); } let (block_feed, block_feed_guard) = client_builder .start_block_feed(&node_clients, &mut environment) .await?; let cleanup_guard = make_cleanup_guard(environment.into_cleanup()?, block_feed_guard); let context = RunContext::new( descriptors, None, node_clients, scenario.duration(), telemetry, block_feed, node_control, ); info!( validators = validator_count, executors = executor_count, duration_secs = scenario.duration().as_secs(), readiness_checks = self.deployer.readiness_checks, host, "compose deployment ready; handing control to scenario runner" ); Ok(Runner::new(context, Some(cleanup_guard))) } fn maybe_node_control( &self, environment: &StackEnvironment, ) -> Option> where Caps: RequiresNodeControl + Send + Sync, { Caps::REQUIRED.then(|| { Arc::new(ComposeNodeControl { compose_file: environment.compose_path().to_path_buf(), project_name: environment.project_name().to_owned(), }) as Arc }) } } fn log_profiling_urls(host: &str, ports: &HostPortMapping) { for (idx, node) in ports.validators.iter().enumerate() { tracing::info!( validator = idx, profiling_url = %format!( "http://{}:{}/debug/pprof/profile?seconds=15&format=proto", host, node.api ), "validator profiling endpoint (profiling feature required)" ); } for (idx, node) in ports.executors.iter().enumerate() { tracing::info!( executor = idx, profiling_url = %format!( "http://{}:{}/debug/pprof/profile?seconds=15&format=proto", host, node.api ), "executor profiling endpoint (profiling feature required)" ); } } fn print_profiling_urls(host: &str, ports: &HostPortMapping) { for (idx, node) in ports.validators.iter().enumerate() { println!( "TESTNET_PPROF validator_{}=http://{}:{}/debug/pprof/profile?seconds=15&format=proto", idx, host, node.api ); } for (idx, node) in ports.executors.iter().enumerate() { println!( "TESTNET_PPROF executor_{}=http://{}:{}/debug/pprof/profile?seconds=15&format=proto", idx, host, node.api ); } }