diff --git a/testing-framework/runners/compose/src/deployer/clients.rs b/testing-framework/runners/compose/src/deployer/clients.rs new file mode 100644 index 0000000..7f6121a --- /dev/null +++ b/testing-framework/runners/compose/src/deployer/clients.rs @@ -0,0 +1,54 @@ +use testing_framework_core::{ + scenario::{BlockFeed, BlockFeedTask, NodeClients}, + topology::generation::GeneratedTopology, +}; +use tracing::info; + +use crate::{ + block_feed::spawn_block_feed_with_retry, environment::StackEnvironment, + errors::ComposeRunnerError, ports::HostPortMapping, readiness::build_node_clients_with_ports, +}; + +pub struct ClientBuilder; + +impl ClientBuilder { + #[must_use] + pub const fn new() -> Self { + Self + } + + pub async fn build_node_clients( + &self, + descriptors: &GeneratedTopology, + host_ports: &HostPortMapping, + host: &str, + environment: &mut StackEnvironment, + ) -> Result { + match build_node_clients_with_ports(descriptors, host_ports, host) { + Ok(clients) => Ok(clients), + Err(err) => { + environment + .fail("failed to construct node api clients") + .await; + Err(err.into()) + } + } + } + + pub async fn start_block_feed( + &self, + node_clients: &NodeClients, + environment: &mut StackEnvironment, + ) -> Result<(BlockFeed, BlockFeedTask), ComposeRunnerError> { + match spawn_block_feed_with_retry(node_clients).await { + Ok(pair) => { + info!("block feed connected to validator"); + Ok(pair) + } + Err(err) => { + environment.fail("failed to initialize block feed").await; + Err(err) + } + } + } +} diff --git a/testing-framework/runners/compose/src/deployer.rs b/testing-framework/runners/compose/src/deployer/mod.rs similarity index 55% rename from testing-framework/runners/compose/src/deployer.rs rename to testing-framework/runners/compose/src/deployer/mod.rs index 49d8839..2dc9a37 100644 --- a/testing-framework/runners/compose/src/deployer.rs +++ b/testing-framework/runners/compose/src/deployer/mod.rs @@ -1,38 +1,15 @@ -use std::{ - env, - net::{Ipv4Addr, TcpListener as StdTcpListener}, - sync::Arc, -}; +pub mod clients; +pub mod orchestrator; +pub mod ports; +pub mod readiness; +pub mod setup; use async_trait::async_trait; -use testing_framework_core::{ - scenario::{ - BlockFeed, BlockFeedTask, CleanupGuard, Deployer, NodeClients, NodeControlHandle, - RequiresNodeControl, RunContext, Runner, Scenario, - }, - topology::generation::GeneratedTopology, +use testing_framework_core::scenario::{ + BlockFeedTask, CleanupGuard, Deployer, RequiresNodeControl, Runner, Scenario, }; -use tracing::{debug, info}; -use crate::{ - block_feed::spawn_block_feed_with_retry, - cleanup::RunnerCleanup, - control::ComposeNodeControl, - docker::ensure_docker_available, - environment::{ - PortReservation, StackEnvironment, ensure_supported_topology, prepare_environment, - }, - errors::ComposeRunnerError, - ports::{ - HostPortMapping, compose_runner_host, discover_host_ports, - ensure_remote_readiness_with_ports, - }, - readiness::{ - build_node_clients_with_ports, ensure_executors_ready_with_ports, - ensure_validators_ready_with_ports, maybe_sleep_for_disabled_readiness, - metrics_handle_from_port, - }, -}; +use crate::{cleanup::RunnerCleanup, errors::ComposeRunnerError}; /// Docker Compose-based deployer for Nomos test scenarios. #[derive(Clone, Copy)] @@ -59,130 +36,6 @@ impl ComposeDeployer { self.readiness_checks = enabled; self } - - async fn prepare_ports( - &self, - environment: &mut StackEnvironment, - descriptors: &GeneratedTopology, - ) -> Result { - debug!("resolving host ports for compose services"); - match discover_host_ports(environment, descriptors).await { - Ok(mapping) => { - info!( - validator_ports = ?mapping.validator_api_ports(), - executor_ports = ?mapping.executor_api_ports(), - prometheus_port = environment.prometheus_port(), - "resolved container host ports" - ); - Ok(mapping) - } - Err(err) => { - environment - .fail("failed to determine container host ports") - .await; - Err(err) - } - } - } - - async fn wait_for_readiness( - &self, - descriptors: &GeneratedTopology, - host_ports: &HostPortMapping, - environment: &mut StackEnvironment, - ) -> Result<(), ComposeRunnerError> { - info!( - ports = ?host_ports.validator_api_ports(), - "waiting for validator HTTP endpoints" - ); - if let Err(err) = - ensure_validators_ready_with_ports(&host_ports.validator_api_ports()).await - { - environment.fail("validator readiness failed").await; - return Err(err.into()); - } - - info!( - ports = ?host_ports.executor_api_ports(), - "waiting for executor HTTP endpoints" - ); - if let Err(err) = ensure_executors_ready_with_ports(&host_ports.executor_api_ports()).await - { - environment.fail("executor readiness failed").await; - return Err(err.into()); - } - - info!("waiting for remote service readiness"); - if let Err(err) = ensure_remote_readiness_with_ports(descriptors, host_ports).await { - environment.fail("remote readiness probe failed").await; - return Err(err.into()); - } - - Ok(()) - } - - async fn build_node_clients( - &self, - descriptors: &GeneratedTopology, - host_ports: &HostPortMapping, - host: &str, - environment: &mut StackEnvironment, - ) -> Result { - match build_node_clients_with_ports(descriptors, host_ports, host) { - Ok(clients) => Ok(clients), - Err(err) => { - environment - .fail("failed to construct node api clients") - .await; - Err(err.into()) - } - } - } - - async fn start_block_feed( - &self, - node_clients: &NodeClients, - environment: &mut StackEnvironment, - ) -> Result<(BlockFeed, BlockFeedTask), ComposeRunnerError> { - match spawn_block_feed_with_retry(node_clients).await { - Ok(pair) => { - info!("block feed connected to validator"); - Ok(pair) - } - Err(err) => { - environment.fail("failed to initialize block feed").await; - Err(err) - } - } - } - - fn maybe_node_control( - &self, - environment: &StackEnvironment, - ) -> Option> - where - Caps: RequiresNodeControl + Send + Sync, - { - Caps::REQUIRED.then(|| { - Arc::new(ComposeNodeControl { - compose_file: environment.compose_path().to_path_buf(), - project_name: environment.project_name().to_owned(), - }) as Arc - }) - } -} - -pub(crate) const PROMETHEUS_PORT_ENV: &str = "TEST_FRAMEWORK_PROMETHEUS_PORT"; -pub(crate) const DEFAULT_PROMETHEUS_PORT: u16 = 9090; - -fn allocate_prometheus_port() -> Option { - reserve_port(DEFAULT_PROMETHEUS_PORT).or_else(|| reserve_port(0)) -} - -fn reserve_port(port: u16) -> Option { - let listener = StdTcpListener::bind((Ipv4Addr::LOCALHOST, port)).ok()?; - let actual_port = listener.local_addr().ok()?.port(); - Some(PortReservation::new(actual_port, Some(listener))) } #[async_trait] @@ -193,75 +46,13 @@ where type Error = ComposeRunnerError; async fn deploy(&self, scenario: &Scenario) -> Result { - ensure_docker_available().await?; - let descriptors = scenario.topology().clone(); - ensure_supported_topology(&descriptors)?; - - info!( - validators = descriptors.validators().len(), - executors = descriptors.executors().len(), - "starting compose deployment" - ); - - let prometheus_env = env::var(PROMETHEUS_PORT_ENV) - .ok() - .and_then(|raw| raw.parse::().ok()); - if prometheus_env.is_some() { - info!(port = prometheus_env, "using prometheus port from env"); - } - let prometheus_port = prometheus_env - .and_then(|port| reserve_port(port)) - .or_else(|| allocate_prometheus_port()) - .unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None)); - let mut environment = - prepare_environment(&descriptors, prometheus_port, prometheus_env.is_some()).await?; - info!( - compose_file = %environment.compose_path().display(), - project = environment.project_name(), - root = %environment.root().display(), - "compose workspace prepared" - ); - - let host_ports = self.prepare_ports(&mut environment, &descriptors).await?; - - if self.readiness_checks { - self.wait_for_readiness(&descriptors, &host_ports, &mut environment) - .await?; - } else { - info!("readiness checks disabled; giving the stack a short grace period"); - maybe_sleep_for_disabled_readiness(false).await; - } - - info!("compose stack ready; building node clients"); - let host = compose_runner_host(); - let node_clients = self - .build_node_clients(&descriptors, &host_ports, &host, &mut environment) - .await?; - let telemetry = metrics_handle_from_port(environment.prometheus_port(), &host)?; - let node_control = self.maybe_node_control::(&environment); - - let (block_feed, block_feed_guard) = self - .start_block_feed(&node_clients, &mut environment) - .await?; - let cleanup_guard: Box = Box::new(ComposeCleanupGuard::new( - environment.into_cleanup(), - block_feed_guard, - )); - let context = RunContext::new( - descriptors, - None, - node_clients, - scenario.duration(), - telemetry, - block_feed, - node_control, - ); - - Ok(Runner::new(context, Some(cleanup_guard))) + orchestrator::DeploymentOrchestrator::new(*self) + .deploy(scenario) + .await } } -struct ComposeCleanupGuard { +pub(super) struct ComposeCleanupGuard { environment: RunnerCleanup, block_feed: Option, } @@ -284,6 +75,13 @@ impl CleanupGuard for ComposeCleanupGuard { } } +pub(super) fn make_cleanup_guard( + environment: RunnerCleanup, + block_feed: BlockFeedTask, +) -> Box { + Box::new(ComposeCleanupGuard::new(environment, block_feed)) +} + #[cfg(test)] mod tests { use std::{collections::HashMap, net::Ipv4Addr}; diff --git a/testing-framework/runners/compose/src/deployer/orchestrator.rs b/testing-framework/runners/compose/src/deployer/orchestrator.rs new file mode 100644 index 0000000..436dffa --- /dev/null +++ b/testing-framework/runners/compose/src/deployer/orchestrator.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use testing_framework_core::scenario::{ + NodeControlHandle, RequiresNodeControl, RunContext, Runner, Scenario, +}; +use tracing::info; + +use super::{ + ComposeDeployer, + clients::ClientBuilder, + make_cleanup_guard, + ports::PortManager, + readiness::ReadinessChecker, + setup::{DeploymentContext, DeploymentSetup}, +}; +use crate::{ + control::ComposeNodeControl, errors::ComposeRunnerError, ports::compose_runner_host, + readiness::metrics_handle_from_port, +}; + +pub struct DeploymentOrchestrator { + deployer: ComposeDeployer, +} + +impl DeploymentOrchestrator { + pub const fn new(deployer: ComposeDeployer) -> Self { + Self { deployer } + } + + pub async fn deploy( + &self, + scenario: &Scenario, + ) -> Result + where + Caps: RequiresNodeControl + Send + Sync, + { + let setup = DeploymentSetup::new(scenario.topology()); + setup.validate_environment().await?; + + let DeploymentContext { + mut environment, + descriptors, + } = setup.prepare_workspace().await?; + + let host_ports = PortManager::prepare(&mut environment, &descriptors).await?; + + if self.deployer.readiness_checks { + ReadinessChecker::wait_all(&descriptors, &host_ports, &mut environment).await?; + } else { + info!("readiness checks disabled; giving the stack a short grace period"); + crate::readiness::maybe_sleep_for_disabled_readiness(false).await; + } + + let host = compose_runner_host(); + let client_builder = ClientBuilder::new(); + let node_clients = client_builder + .build_node_clients(&descriptors, &host_ports, &host, &mut environment) + .await?; + let telemetry = metrics_handle_from_port(environment.prometheus_port(), &host)?; + let node_control = self.maybe_node_control::(&environment); + + let (block_feed, block_feed_guard) = client_builder + .start_block_feed(&node_clients, &mut environment) + .await?; + let cleanup_guard = make_cleanup_guard(environment.into_cleanup(), block_feed_guard); + + let context = RunContext::new( + descriptors, + None, + node_clients, + scenario.duration(), + telemetry, + block_feed, + node_control, + ); + + Ok(Runner::new(context, Some(cleanup_guard))) + } + + fn maybe_node_control( + &self, + environment: &crate::environment::StackEnvironment, + ) -> Option> + where + Caps: RequiresNodeControl + Send + Sync, + { + Caps::REQUIRED.then(|| { + Arc::new(ComposeNodeControl { + compose_file: environment.compose_path().to_path_buf(), + project_name: environment.project_name().to_owned(), + }) as Arc + }) + } +} diff --git a/testing-framework/runners/compose/src/deployer/ports.rs b/testing-framework/runners/compose/src/deployer/ports.rs new file mode 100644 index 0000000..667c166 --- /dev/null +++ b/testing-framework/runners/compose/src/deployer/ports.rs @@ -0,0 +1,36 @@ +use testing_framework_core::topology::generation::GeneratedTopology; +use tracing::{debug, info}; + +use crate::{ + environment::StackEnvironment, + errors::ComposeRunnerError, + ports::{HostPortMapping, discover_host_ports}, +}; + +pub struct PortManager; + +impl PortManager { + pub async fn prepare( + environment: &mut StackEnvironment, + descriptors: &GeneratedTopology, + ) -> Result { + debug!("resolving host ports for compose services"); + match discover_host_ports(environment, descriptors).await { + Ok(mapping) => { + info!( + validator_ports = ?mapping.validator_api_ports(), + executor_ports = ?mapping.executor_api_ports(), + prometheus_port = environment.prometheus_port(), + "resolved container host ports" + ); + Ok(mapping) + } + Err(err) => { + environment + .fail("failed to determine container host ports") + .await; + Err(err) + } + } + } +} diff --git a/testing-framework/runners/compose/src/deployer/readiness.rs b/testing-framework/runners/compose/src/deployer/readiness.rs new file mode 100644 index 0000000..caca8c6 --- /dev/null +++ b/testing-framework/runners/compose/src/deployer/readiness.rs @@ -0,0 +1,48 @@ +use testing_framework_core::topology::generation::GeneratedTopology; +use tracing::info; + +use crate::{ + environment::StackEnvironment, + errors::ComposeRunnerError, + ports::{HostPortMapping, ensure_remote_readiness_with_ports}, + readiness::{ensure_executors_ready_with_ports, ensure_validators_ready_with_ports}, +}; + +pub struct ReadinessChecker; + +impl ReadinessChecker { + pub async fn wait_all( + descriptors: &GeneratedTopology, + host_ports: &HostPortMapping, + environment: &mut StackEnvironment, + ) -> Result<(), ComposeRunnerError> { + info!( + ports = ?host_ports.validator_api_ports(), + "waiting for validator HTTP endpoints" + ); + if let Err(err) = + ensure_validators_ready_with_ports(&host_ports.validator_api_ports()).await + { + environment.fail("validator readiness failed").await; + return Err(err.into()); + } + + info!( + ports = ?host_ports.executor_api_ports(), + "waiting for executor HTTP endpoints" + ); + if let Err(err) = ensure_executors_ready_with_ports(&host_ports.executor_api_ports()).await + { + environment.fail("executor readiness failed").await; + return Err(err.into()); + } + + info!("waiting for remote service readiness"); + if let Err(err) = ensure_remote_readiness_with_ports(descriptors, host_ports).await { + environment.fail("remote readiness probe failed").await; + return Err(err.into()); + } + + Ok(()) + } +} diff --git a/testing-framework/runners/compose/src/deployer/setup.rs b/testing-framework/runners/compose/src/deployer/setup.rs new file mode 100644 index 0000000..8b7de81 --- /dev/null +++ b/testing-framework/runners/compose/src/deployer/setup.rs @@ -0,0 +1,86 @@ +use std::{ + env, + net::{Ipv4Addr, TcpListener as StdTcpListener}, +}; + +use testing_framework_core::topology::generation::GeneratedTopology; +use tracing::info; + +use crate::{ + docker::ensure_docker_available, + environment::{ + PortReservation, StackEnvironment, ensure_supported_topology, prepare_environment, + }, + errors::ComposeRunnerError, +}; + +pub const PROMETHEUS_PORT_ENV: &str = "TEST_FRAMEWORK_PROMETHEUS_PORT"; +pub const DEFAULT_PROMETHEUS_PORT: u16 = 9090; + +pub struct DeploymentSetup { + descriptors: GeneratedTopology, +} + +pub struct DeploymentContext { + pub descriptors: GeneratedTopology, + pub environment: StackEnvironment, +} + +impl DeploymentSetup { + pub fn new(descriptors: &GeneratedTopology) -> Self { + Self { + descriptors: descriptors.clone(), + } + } + + pub async fn validate_environment(&self) -> Result<(), ComposeRunnerError> { + ensure_docker_available().await?; + ensure_supported_topology(&self.descriptors)?; + + info!( + validators = self.descriptors.validators().len(), + executors = self.descriptors.executors().len(), + "starting compose deployment" + ); + + Ok(()) + } + + pub async fn prepare_workspace(self) -> Result { + let prometheus_env = env::var(PROMETHEUS_PORT_ENV) + .ok() + .and_then(|raw| raw.parse::().ok()); + if prometheus_env.is_some() { + info!(port = prometheus_env, "using prometheus port from env"); + } + let prometheus_port = prometheus_env + .and_then(|port| reserve_port(port)) + .or_else(|| allocate_prometheus_port()) + .unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None)); + let environment = + prepare_environment(&self.descriptors, prometheus_port, prometheus_env.is_some()) + .await?; + + info!( + compose_file = %environment.compose_path().display(), + project = environment.project_name(), + root = %environment.root().display(), + "compose workspace prepared" + ); + + Ok(DeploymentContext { + descriptors: self.descriptors, + environment, + }) + } +} + +fn allocate_prometheus_port() -> Option { + reserve_port(DEFAULT_PROMETHEUS_PORT).or_else(|| reserve_port(0)) +} + +fn reserve_port(port: u16) -> Option { + let listener = StdTcpListener::bind((Ipv4Addr::LOCALHOST, port)).ok()?; + let actual_port = listener.local_addr().ok()?.port(); + Some(PortReservation::new(actual_port, Some(listener))) +} diff --git a/testing-framework/runners/compose/src/environment.rs b/testing-framework/runners/compose/src/environment.rs index 2625b80..b9cc597 100644 --- a/testing-framework/runners/compose/src/environment.rs +++ b/testing-framework/runners/compose/src/environment.rs @@ -16,7 +16,7 @@ use crate::{ cfgsync::{CfgsyncServerHandle, update_cfgsync_config}, cleanup::RunnerCleanup, commands::{compose_up, dump_compose_logs}, - deployer::DEFAULT_PROMETHEUS_PORT, + deployer::setup::DEFAULT_PROMETHEUS_PORT, descriptor::ComposeDescriptor, docker::{ensure_compose_image, run_docker_command}, errors::{ComposeRunnerError, ConfigError, WorkspaceError},